农场主的黑科技.

Java并发编程实战-第6章-任务执行

字数统计: 5.7k阅读时长: 35 min
2018/10/30 Share

什么是任务?任务就是工作单元,我们把应用中的工作分成多个任务,从而进行事务操作和并行工作。

在线程中执行任务

首先任务之间必须是相互独立的:任务并不依赖于其他任务的状态,结果或边界效应。

串行地执行任务

有多种调度任务的策略,而其中一些策略能够更好地利用潜在并发性。

策略1 : 在单个线程中串行地执行各项任务,也就是按顺序一个个执行。

😕 串行Web服务器

1
2
3
4
5
6
7
8
9
10
//         6-1  串行的Web服务器(并不好)
public class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) { //处理多个请求
Socket connection = socket.accept();//通过80端口接收到HTTP请求
handleRequest(connection); //处理请求
}
}
}

上面的服务器程序中,一次只能处理一个请求,性能非常差。当等待时间过长时,客户端会认为服务器没有响应,而且服务器cpu资源利用非常低。对于服务器而言这种串行执行的方式无疑是不合适的。

显示地为任务创建线程

策略2 : 通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。

😧 为每个请求启动一个新的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//     6-2   在Web服务器中为每个请求启动一个新的线程(不要这么做)
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) { //多个请求
final Socket connection = socket.accept();
Runnable task = new Runnable() { //为每个请求启动一个新的线程
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();//线程启动
}
}
}

对于每个连接,主循环都创建一个新线程来处理请求,而不是在主循环中处理。这么一来,服务器吞吐率会明显提高,需要注意的是任务处理代码必须是线程安全的。

无限创建线程的不足

那么策略2的缺点是什么?

  • 线程生命周期的开销非常高。

  • 资源消耗。
    活跃的线程会消耗系统资源,尤其是内存。

  • 稳定性
    在可创建线程的数量上存在一个限制。甚至可能抛出OutOfMemoryError异常。

策略2最大的问题是,他没有创建线程的上限。当一个用户而已发送多个请求时服务器会崩溃。

Executor框架

事实上我们需要一个线程池来管理服务器的任务执行。和上一章中用有界队列来管理资源的方式相似java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在任务执行时,Executor比Thread更常用。

1
2
3
4
//                   Excutor接口
public interface Executor{
void execute(Runnable command);
}

Executor接口为灵活且强大的异步执行任务框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。

Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。

示例 : 基于Executor的Web服务器

策略3 : 使用Executor构造线程池。

😀 基于Executor的Web服务器。使用了标准的Executor实现,即一个固定长度的线程池,可以容纳100个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//      6-4    基于线程池的Web服务器
public class TaskExecutionWebServer {
private static final int NTHREADS=100;
private static final Executor exec=
Executors.newFixedThreadPool(NTHREADS); //创建了一个固定长度的线程池,可以容纳100个线程

public static void main(String[] args)throws IOException{
ServerSocket socket=new ServerSocket(80);//创建一个Server Socket绑定到80端口自
while(true){
final Socket connection=socket.accept();
Runnable task=new Runnable(){
public void run(){
handleRequest(connection);
}
};
exec.execute(task);//将任务提交到工作队列中,在某个时刻被工作线程取出并执行
}
}
private static void handleRequest(Socket connection) {
//处理请求
}
}

那是否能用Executor实现策略2和策略1的做法?答案是可以,只需要实现Executor接口。

用Executor实现策略2的为每个请求启动一个新线程方式:

1
2
3
4
5
6
//     6-5   为每个请求启动一个新线程的Executor
public class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
};
}

用Executor实现策略1的串行执行请求的方式:

1
2
3
4
5
6
//        6-6   在调用线程中以同步方式执行所有任务的Executor
public class WithinThreadExecutor implements Executor {
public void execute(Runnable r) {
r.run();
};
}

执行策略

像Executor这种把任务的提交和执行的好处是什么?能灵活的制定和修改执行策略。

执行策略包括:

  • 在什么线程中执行任务
  • 任务按照什么顺序执行(FIFO,LIFO,优先级)
  • 有多少个任务可并发执行
  • 在队列中有多少个任务在等待执行
  • 如果系统由于过程而需要拒绝一个任务,应该选择哪一个任务?另外,如何通知应用程序有任务被拒绝?
  • 在执行一个任务之前或之后,应该进行哪些动作

线程池

可以通过调用Executor中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPool: 创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个先后才能由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)
  • newCachedThreadPool :创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutor : 一个单线程的Executor,他创建单个工作者线程来执行任务,也就是串行执行。如果这个线程异常结束,会创建另一个线程代替。

  • newScheduledThreadPool : 创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似Timer。

Executor的生命周期

Executor是需要手动去关闭的,不然程序不会结束。这个关闭过程可以是“温柔的”也可以是“暴力的”,他们会将关闭操作中受影响的任务的状态回馈给应用。

ExecutorService接口继承了Executor接口,给出了管理生命周期的方法(同时还有一些用于任务提交的便利方法),如下:

1
2
3
4
5
6
7
8
9
10
//             6-7  ExecutorService中的生命周期管理方法
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown(); //关闭
boolean isTerminated(); //已终结
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ... 其他用于任务提交的便利方法
}

ExecutorService的生命周期有三种状态:运行,关闭和已终止。

关于关闭

  • shutdown方法将执行“温柔”关闭:不在接受新的任务,同时等待已经提交的任务执行完成——包括那么还没有开始执行的任务。
  • shutdownnow方法将执行“暴力”关闭:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

在ExecutorService关闭提交的任务将由“拒绝执行处理器(Rejected Execution Handler)”来处理。

关于终止

等所有任务都完成后将转入终止状态。

  • awaitTermination : 等待ExecutorService达到终止状态。
    通常在调用awaitTermiation之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。
  • isTerminated : 用于轮询ExecutorService是否已经终止。

以下程序通过增加生命周期来扩展Web服务器的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//    6-8   支持关闭操作的Web服务器
public class LifecycleWebServer {
private final ExecutorService exec = Executors.newCachedThreadPool(); //新建可缓存的线程池

public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) { //只要未被关闭,exec.isShutdown当关闭返回true
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) { //在关闭后提交的任务,会抛出异常
if (!exec.isShutdown()) //如果还没关闭
log("task submission rejected", e);
}
}
}

public void stop() {
exec.shutdown(); //不再接受新的任务,同时等待已经提交的任务执行完成——包括还未开始执行的任务
}

void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req))
stop(); //不再接受新的任务,同时等待已经提交的任务执行完成——包括还未开始执行的任务
else
dispatchRequest(req);
}
}

延迟任务与周期任务

ScheduledThreadPoolExecutor来管理延迟任务(在“100ms后执行该任务”)和周期任务(“每10ms执行一次该任务”)。 可以通过ScheduledThreadPoolExecutor的构造函数或newScheduledThreadPool工厂方法来创建该类的对象。

事实上ScheduledThreadPoolExecutorTimer的代替方案,Timer中存在许多缺陷,如下:

  • Timer在执行所有定时任务时只会创建一个线程。
    如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时准确性。假如某个周期的TimerTask需要每10ms执行一次,另一个TimerTask需要执行40ms,那么这个10ms周期任务在40ms任务执行完成后快速连续地调用4次,或者彻底“丢失”4次调用(取决于它是基于固定速率来调度还是基于固定延时来调度)。
  • 如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。
    Timer线程不会捕获异常,当发生异常终止时,会错误地认为整个Timer都被取消了。因此已经被调度但尚未执行的TimeTask将不会再执行,新的任务也不能被调用(线程泄漏(Thread Leakage))。

你可能会认为程序会运行6秒后退出,但实际情况是运行1秒就结束了,并抛出了一个异常信息“Timer already cancelled”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//            6-9     错误的Timer行为
public class OutOfTime {
public static void main(String[] args) throws Exception {
Timer timer = new Timer(); //新建一个Timer
timer.schedule(new ThrowTask(), 1); //排定任务在延迟后执行
SECONDS.sleep(1);//SECONDS为秒单位,sleep休眠1秒
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(5);
}

static class ThrowTask extends TimerTask {
public void run() {
throw new RuntimeException();
}
}
}

如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。

找出可利用的并行性

下面我们将开发html的页面渲染功能。我们将开发不同版本的组件,并且每个版本都实现了不同程度的并发性。

示例:串行的页面渲染器

串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,想通过网络获取它,然后再绘制到图像缓存中。

这很容易实现,程序只需将输入中的每个元素处理一次(甚至不需要缓存文档),但这种方法可能会令用户感到烦恼,他们必须等待很长时间,直到显示所有的文本。

所以我们在这里实现的是这个的优化版,它先绘制文本元素,同时为图像预留出矩形的占位空间。在处理完第一遍文本后,程序开始下载图像,并将它们绘制到相应的占位空间中。

😕

1
2
3
4
5
6
7
8
9
10
11
//         6-10    串行地渲染页面元素(不好)
public abstract class SingleThreadRenderer {
void renderPage(CharSequence source){//CharSequence是char类型的值的一个可读序列
renderText(source); //渲染文本
List<ImageData> imageData=new ArrayList<ImageData>();
for(ImageInfo imageInfo:scanForImageInfo(source))
imageData.add(imageInfo.downloadImage()); //下载图片
for(ImageData data:imageData)
renderImage(data); //渲染图片
}
}

携带结果的任务Callable和Future

在看下面一个示例之前,先说下Callable和Future。

Callable和Runnable一样都是描述的都是抽象的计算任务。

  • Runnable : 不能返回一个值或抛出一个受检查的异常。
  • Callable : 有返回值,也可以抛出一个异常。
    更适用于,从数据库查询,从网络上获取资源等获取某个结果的场景。
1
2
3
4
//       6-11  Callable接口
public interface Callable<V> {
V call() throws Exception;
}

那么Future是什么?Executor执行的任务有4个生命周期阶段:创建,提交,开始和完成。Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。当某个任务完成后,它就永远停留在“完成”状态上。

Future的get方法的行为取决与任务的状态(尚未开始,正在运行,已完成)

  • 如果任务已完成,那么get会立即返回或抛出一个Exception。
  • 如果任务没有完成,那么get将阻塞直到任务完成。
  • 如果任务抛出了异常,那么get将异常封装为ExecutionException并重新抛出。
    此时可以通过getCause来获得被封装的初始异常。
  • 如果任务被取消,那么get将抛出CancellationException
1
2
3
4
5
6
7
8
9
10
11
//       6-11 Future接口
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException,
CancellationException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
CancellationException, TimeoutException;
}

如何创建Future?。当使用ExecutorService中的所有submit方法提交Runnable或Callable时就会返回Future。

示例 : 使用Future实现页面渲染器

下个例子中,我们将渲染过程分解为两个任务,一个是渲染所有的文本,一个是下载所有的图像(因为其中一个任务是CPU密集型,而另一个任务是I/O密集型,因此这种方法即使在单CPU系统上也能提升性能)。

Callable和Future有组与表示这些协同任务之间的交互。

😕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//        6-13    使用Future等待图片下载
public abstract class FutureRenderer {
private final ExecutorService executor=Executors.newCachedThreadPool();//新建线程池

void renderPage(CharSequence source){
final List<ImageInfo> imageInfos=scanForImageInfo(source);//页面信息
// 新建了一个Callable来下载所有的图像。
Callable<List<ImageData>> task=
new Callable<List<ImageData>>(){
public List<ImageData> call(){
//图片
List<ImageData> result=
new ArrayList<ImageData>();
for(ImageInfo imageInfo :imageInfos)
result.add(imageInfo.downloadImage());//下载图片
return result; //Callable会返回结果
}
};
//提交一个带返回值的任务,并且返回一个代表即将得到任务的结果的Future
Future<List<ImageData>> future=executor.submit(task);
renderText(source); //渲染文本

try{
//当主任务需要图像时,它会等到Future.get调用结果,如果幸运的话,当开始请求时所有图像就已经下载完成,即使没有,至少图像的下载任务也已经提前开始了。
List<ImageData> imageData=future.get(); //得到图片
for(ImageData data:imageData)
renderImage(data); //渲染图片
}catch (InterruptedException e) {
//重新设置线程的中断状态
Thread.currentThread().interrupt();
//由于不需要结果,因此取消任务。
future.cancel(true);
}catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}

上面的程序中,当主任务需要图像时,它会等到Future.get调用结果,如果幸运的话,当开始请求时所有图像就已经下载完成,即使没有,至少图像的下载任务也已经提前开始了。

在异构任务并行化存在的局限

上一个示例中存在的问题:使用两个任务,其中一个复杂渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像速度(可能性很大),那么程序的最终性能与串行执行相比性能差别不大,而代码却复杂了。

只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

CompletionService: Executor 与 BlockingQueue

不希望被Future的get的阻塞又能及时获取到结果时,我们会怎么做。我们可以将get的timeout参数设为0,然后通过轮询来判断任务是否结束。然而,还有一种更好的方法:完成服务(CompletionService)

CompletionServiceExecutorBlockingQueue的功能融合在一起。你可以将Callable任务提交给它执行,然后使用类似与队列操作的take和poll等方法来获得已完成的结果。而这些结果会在完成时被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor

示例 : 使用CompletionService实现页面渲染器

下个例子中,我们为每一幅图像的下载都创建一个独立任务,并从线程池中执行它们,从而将串行的下载过程转化为并行的过程:减少下载的总时间。

😊

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//         6-15 使用CompletionService,使页面在下载完成后立即显示出来
public abstract class Renderer {
//CompletionService将Executor与BlockingQueue的功能融合在一起
private final ExecutorService executor;

Renderer(ExecutorService executor){
this.executor=executor;
}

void renderPage(CharSequence source){
List<ImageInfo> info=scanForImageInfo(source);
//ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
CompletionService<ImageData> completionService=
new ExecutorCompletionService<ImageData>(executor);
for(final ImageInfo imageInfo:info)
//提交一个带返回值的任务,并且返回一个代表即将得到任务的结果的Future
completionService.submit(new Callable<ImageData>(){
public ImageData call(){
return imageInfo.downloadImage(); //CompletionService中有类似队列的操作
}
});
renderText(source);
try{
//通过CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来
for(int t=0,n=info.size();t<n;t++){
//取得并移除已完成任务,如果没有则等待
Future<ImageData> f=completionService.take();
ImageData imageData=f.get();
renderImage(imageData);
}
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}

通过CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来,使用户获得一个更加动态和更高响应性的用户界面。

为任务设置时限

有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。 在支持时间限制的Future.get中支持这种需求:当结果可用时,它立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException

下面是在指定时间内获取广告信息的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//          6-16   在指定时间内获取广告信息
public class RenderWithTimeBudget {
private static final Ad DEFAULT_AD = new Ad();
private static final long TIME_BUDGET = 1000;
private static final ExecutorService exec = Executors.newCachedThreadPool();
Page renderPageWithAd() throws InterruptedException{
long endNanos=System.nanoTime()+TIME_BUDGET; //返回纳秒级的时间,再加上时限
Future<Ad> f=exec.submit(new FetchAdTask());
//在等待广告的同时显示页面
Page page=renderPageBody();
Ad ad;
try{
//只等待指定的时间长度
long timeLeft=endNanos-System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);//在指定时限内获取,NANOSECONDS为时间单位
}catch (ExecutionException e) {
ad=DEFAULT_AD;
}catch (TimeoutException e) {
//如果超时了,广告转为默认广告,并取消获取任务
ad=DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad); //为页面设置广告
return page;
}
}

示例:旅行预定客户网站

接下来考虑一个旅行预定门户网站。从一个公司获取报价的过程与其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到线程池,保留n个Futrue,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但我们还可以使用一个更简单的方法——invokeAll

下面是用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//           6-17     在预定时间内请求旅游报价
// 请求旅游报价的方法
private class QuoteTask implements Callable<TravelQuote>{
private final TravelCompany company;
private final TravelInfo travelInfo;

public TravelQuote call()throw Exception{
return company.solicitQuote(travelInfo);
}
}

public List<TravelQuote> getRankedTravelQuotes(
TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking(long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
//为每家公司添加报价任务
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
//InvokeAll方法的参数为一组任务,并返回一组Future ,用时限来限制时间
List<Future<TravelQuote>> futures =
exec.invokeAll(tasks, time, unit);

List<TravelQuote> quotes =
new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
//invokeAll按照任务集合中迭代器额顺序肩所有的Future添加到返回的集合中
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}

Reference

Java并发编程实战

源代码

Java并发编程实战(学习笔记五 第六章 任务执行)

CATALOG
  1. 1. 在线程中执行任务
    1. 1.1. 串行地执行任务
    2. 1.2. 显示地为任务创建线程
    3. 1.3. 无限创建线程的不足
  2. 2. Executor框架
    1. 2.1. 示例 : 基于Executor的Web服务器
    2. 2.2. 执行策略
    3. 2.3. 线程池
    4. 2.4. Executor的生命周期
    5. 2.5. 延迟任务与周期任务
  3. 3. 找出可利用的并行性
    1. 3.1. 示例:串行的页面渲染器
    2. 3.2. 携带结果的任务Callable和Future
    3. 3.3. 示例 : 使用Future实现页面渲染器
    4. 3.4. 在异构任务并行化存在的局限
    5. 3.5. CompletionService: Executor 与 BlockingQueue
    6. 3.6. 示例 : 使用CompletionService实现页面渲染器
    7. 3.7. 为任务设置时限
    8. 3.8. 示例:旅行预定客户网站
  4. 4. Reference