什么是任务?任务就是工作单元,我们把应用中的工作分成多个任务,从而进行事务操作和并行工作。
在线程中执行任务
首先任务之间必须是相互独立的:任务并不依赖于其他任务的状态,结果或边界效应。
串行地执行任务
有多种调度任务的策略,而其中一些策略能够更好地利用潜在并发性。
策略1 : 在单个线程中串行地执行各项任务,也就是按顺序一个个执行。
😕 串行Web服务器
1 2 3 4 5 6 7 8 9 10
| public class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { Socket connection = socket.accept(); handleRequest(connection); } } }
|
上面的服务器程序中,一次只能处理一个请求,性能非常差。当等待时间过长时,客户端会认为服务器没有响应,而且服务器cpu资源利用非常低。对于服务器而言这种串行执行的方式无疑是不合适的。
显示地为任务创建线程
策略2 : 通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。
😧 为每个请求启动一个新的线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(connection); } }; new Thread(task).start(); } } }
|
对于每个连接,主循环都创建一个新线程来处理请求,而不是在主循环中处理。这么一来,服务器吞吐率会明显提高,需要注意的是任务处理代码必须是线程安全的。
无限创建线程的不足
那么策略2的缺点是什么?
策略2最大的问题是,他没有创建线程的上限。当一个用户而已发送多个请求时服务器会崩溃。
Executor框架
事实上我们需要一个线程池来管理服务器的任务执行。和上一章中用有界队列来管理资源的方式相似java.util.concurrent
提供了一种灵活的线程池实现作为Executor框架的一部分。在任务执行时,Executor比Thread更常用。
1 2 3 4
| public interface Executor{ void execute(Runnable command); }
|
Executor接口为灵活且强大的异步执行任务框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。
示例 : 基于Executor的Web服务器
策略3 : 使用Executor构造线程池。
😀 基于Executor的Web服务器。使用了标准的Executor实现,即一个固定长度的线程池,可以容纳100个线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class TaskExecutionWebServer { private static final int NTHREADS=100; private static final Executor exec= Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args)throws IOException{ ServerSocket socket=new ServerSocket(80); while(true){ final Socket connection=socket.accept(); Runnable task=new Runnable(){ public void run(){ handleRequest(connection); } }; exec.execute(task); } } private static void handleRequest(Socket connection) { } }
|
那是否能用Executor实现策略2和策略1的做法?答案是可以,只需要实现Executor接口。
用Executor实现策略2的为每个请求启动一个新线程方式:
1 2 3 4 5 6
| public class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); }; }
|
用Executor实现策略1的串行执行请求的方式:
1 2 3 4 5 6
| public class WithinThreadExecutor implements Executor { public void execute(Runnable r) { r.run(); }; }
|
执行策略
像Executor这种把任务的提交和执行的好处是什么?能灵活的制定和修改执行策略。
执行策略包括:
- 在什么线程中执行任务
- 任务按照什么顺序执行(FIFO,LIFO,优先级)
- 有多少个任务可并发执行
- 在队列中有多少个任务在等待执行
- 如果系统由于过程而需要拒绝一个任务,应该选择哪一个任务?另外,如何通知应用程序有任务被拒绝?
- 在执行一个任务之前或之后,应该进行哪些动作
线程池
可以通过调用Executor中的静态工厂方法之一来创建一个线程池:
Executor的生命周期
Executor是需要手动去关闭的,不然程序不会结束。这个关闭过程可以是“温柔的”也可以是“暴力的”,他们会将关闭操作中受影响的任务的状态回馈给应用。
ExecutorService
接口继承了Executor
接口,给出了管理生命周期的方法(同时还有一些用于任务提交的便利方法),如下:
1 2 3 4 5 6 7 8 9 10
| public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; }
|
ExecutorService的生命周期有三种状态:运行,关闭和已终止。
关于关闭
shutdown
方法将执行“温柔”关闭:不在接受新的任务,同时等待已经提交的任务执行完成——包括那么还没有开始执行的任务。
shutdownnow
方法将执行“暴力”关闭:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
在ExecutorService关闭提交的任务将由“拒绝执行处理器(Rejected Execution Handler)”来处理。
关于终止
等所有任务都完成后将转入终止状态。
awaitTermination
: 等待ExecutorService达到终止状态。
通常在调用awaitTermiation
之后会立即调用shutdown
,从而产生同步地关闭ExecutorService的效果。
isTerminated
: 用于轮询ExecutorService是否已经终止。
以下程序通过增加生命周期来扩展Web服务器的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class LifecycleWebServer { private final ExecutorService exec = Executors.newCachedThreadPool();
public void start() throws IOException { ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) log("task submission rejected", e); } } }
public void stop() { exec.shutdown(); }
void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); else dispatchRequest(req); } }
|
延迟任务与周期任务
用ScheduledThreadPoolExecutor
来管理延迟任务(在“100ms后执行该任务”)和周期任务(“每10ms执行一次该任务”)。 可以通过ScheduledThreadPoolExecutor
的构造函数或newScheduledThreadPool
工厂方法来创建该类的对象。
事实上ScheduledThreadPoolExecutor
是Timer
的代替方案,Timer
中存在许多缺陷,如下:
Timer
在执行所有定时任务时只会创建一个线程。
如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时准确性。假如某个周期的TimerTask需要每10ms执行一次,另一个TimerTask需要执行40ms,那么这个10ms周期任务在40ms任务执行完成后快速连续地调用4次,或者彻底“丢失”4次调用(取决于它是基于固定速率来调度还是基于固定延时来调度)。
- 如果TimerTask抛出了一个未检查的异常,那么
Timer
将表现出糟糕的行为。
Timer
线程不会捕获异常,当发生异常终止时,会错误地认为整个Timer都被取消了。因此已经被调度但尚未执行的TimeTask将不会再执行,新的任务也不能被调用(线程泄漏(Thread Leakage)
)。
你可能会认为程序会运行6秒后退出,但实际情况是运行1秒就结束了,并抛出了一个异常信息“Timer already cancelled”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class OutOfTime { public static void main(String[] args) throws Exception { Timer timer = new Timer(); timer.schedule(new ThrowTask(), 1); SECONDS.sleep(1); timer.schedule(new ThrowTask(), 1); SECONDS.sleep(5); }
static class ThrowTask extends TimerTask { public void run() { throw new RuntimeException(); } } }
|
如果要构建自己的调度服务,那么可以使用DelayQueue
,它实现了BlockingQueue
,并为ScheduledThreadPoolExecutor
提供调度功能。
找出可利用的并行性
下面我们将开发html的页面渲染功能。我们将开发不同版本的组件,并且每个版本都实现了不同程度的并发性。
示例:串行的页面渲染器
串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,想通过网络获取它,然后再绘制到图像缓存中。
这很容易实现,程序只需将输入中的每个元素处理一次(甚至不需要缓存文档),但这种方法可能会令用户感到烦恼,他们必须等待很长时间,直到显示所有的文本。
所以我们在这里实现的是这个的优化版,它先绘制文本元素,同时为图像预留出矩形的占位空间。在处理完第一遍文本后,程序开始下载图像,并将它们绘制到相应的占位空间中。
😕
1 2 3 4 5 6 7 8 9 10 11
| public abstract class SingleThreadRenderer { void renderPage(CharSequence source){ renderText(source); List<ImageData> imageData=new ArrayList<ImageData>(); for(ImageInfo imageInfo:scanForImageInfo(source)) imageData.add(imageInfo.downloadImage()); for(ImageData data:imageData) renderImage(data); } }
|
携带结果的任务Callable和Future
在看下面一个示例之前,先说下Callable和Future。
Callable和Runnable一样都是描述的都是抽象的计算任务。
- Runnable : 不能返回一个值或抛出一个受检查的异常。
- Callable : 有返回值,也可以抛出一个异常。
更适用于,从数据库查询,从网络上获取资源等获取某个结果的场景。
1 2 3 4
| public interface Callable<V> { V call() throws Exception; }
|
那么Future是什么?Executor执行的任务有4个生命周期阶段:创建,提交,开始和完成。Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。当某个任务完成后,它就永远停留在“完成”状态上。
Future的get
方法的行为取决与任务的状态(尚未开始,正在运行,已完成)。
- 如果任务已完成,那么get会立即返回或抛出一个Exception。
- 如果任务没有完成,那么get将阻塞直到任务完成。
- 如果任务抛出了异常,那么get将异常封装为
ExecutionException
并重新抛出。
此时可以通过getCause
来获得被封装的初始异常。
- 如果任务被取消,那么get将抛出
CancellationException
。
1 2 3 4 5 6 7 8 9 10 11
| public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException, CancellationException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException; }
|
如何创建Future?。当使用ExecutorService中的所有submit
方法提交Runnable或Callable时就会返回Future。
示例 : 使用Future实现页面渲染器
下个例子中,我们将渲染过程分解为两个任务,一个是渲染所有的文本,一个是下载所有的图像(因为其中一个任务是CPU密集型,而另一个任务是I/O密集型,因此这种方法即使在单CPU系统上也能提升性能)。
Callable和Future有组与表示这些协同任务之间的交互。
😕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public abstract class FutureRenderer { private final ExecutorService executor=Executors.newCachedThreadPool();
void renderPage(CharSequence source){ final List<ImageInfo> imageInfos=scanForImageInfo(source); Callable<List<ImageData>> task= new Callable<List<ImageData>>(){ public List<ImageData> call(){ List<ImageData> result= new ArrayList<ImageData>(); for(ImageInfo imageInfo :imageInfos) result.add(imageInfo.downloadImage()); return result; } }; Future<List<ImageData>> future=executor.submit(task); renderText(source);
try{ List<ImageData> imageData=future.get(); for(ImageData data:imageData) renderImage(data); }catch (InterruptedException e) { Thread.currentThread().interrupt(); future.cancel(true); }catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } }
|
上面的程序中,当主任务需要图像时,它会等到Future.get调用结果,如果幸运的话,当开始请求时所有图像就已经下载完成,即使没有,至少图像的下载任务也已经提前开始了。
在异构任务并行化存在的局限
上一个示例中存在的问题:使用两个任务,其中一个复杂渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像速度(可能性很大),那么程序的最终性能与串行执行相比性能差别不大,而代码却复杂了。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。
CompletionService: Executor 与 BlockingQueue
不希望被Future的get
的阻塞又能及时获取到结果时,我们会怎么做。我们可以将get的timeout参数设为0,然后通过轮询来判断任务是否结束。然而,还有一种更好的方法:完成服务(CompletionService)
CompletionService
将Executor
与BlockingQueue
的功能融合在一起。你可以将Callable任务提交给它执行,然后使用类似与队列操作的take和poll等方法来获得已完成的结果。而这些结果会在完成时被封装为Future。ExecutorCompletionService
实现了CompletionService
,并将计算部分委托给一个Executor
。
示例 : 使用CompletionService实现页面渲染器
下个例子中,我们为每一幅图像的下载都创建一个独立任务,并从线程池中执行它们,从而将串行的下载过程转化为并行的过程:减少下载的总时间。
😊
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public abstract class Renderer { private final ExecutorService executor;
Renderer(ExecutorService executor){ this.executor=executor; }
void renderPage(CharSequence source){ List<ImageInfo> info=scanForImageInfo(source); CompletionService<ImageData> completionService= new ExecutorCompletionService<ImageData>(executor); for(final ImageInfo imageInfo:info) completionService.submit(new Callable<ImageData>(){ public ImageData call(){ return imageInfo.downloadImage(); } }); renderText(source); try{ for(int t=0,n=info.size();t<n;t++){ Future<ImageData> f=completionService.take(); ImageData imageData=f.get(); renderImage(imageData); } }catch (InterruptedException e) { Thread.currentThread().interrupt(); }catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } }
|
通过CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来,使用户获得一个更加动态和更高响应性的用户界面。
为任务设置时限
有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。 在支持时间限制的Future.get中支持这种需求:当结果可用时,它立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException
。
下面是在指定时间内获取广告信息的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class RenderWithTimeBudget { private static final Ad DEFAULT_AD = new Ad(); private static final long TIME_BUDGET = 1000; private static final ExecutorService exec = Executors.newCachedThreadPool(); Page renderPageWithAd() throws InterruptedException{ long endNanos=System.nanoTime()+TIME_BUDGET; Future<Ad> f=exec.submit(new FetchAdTask()); Page page=renderPageBody(); Ad ad; try{ long timeLeft=endNanos-System.nanoTime(); ad = f.get(timeLeft, NANOSECONDS); }catch (ExecutionException e) { ad=DEFAULT_AD; }catch (TimeoutException e) { ad=DEFAULT_AD; f.cancel(true); } page.setAd(ad); return page; } }
|
示例:旅行预定客户网站
接下来考虑一个旅行预定门户网站。从一个公司获取报价的过程与其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到线程池,保留n个Futrue,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但我们还可以使用一个更简单的方法——invokeAll
下面是用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
private class QuoteTask implements Callable<TravelQuote>{ private final TravelCompany company; private final TravelInfo travelInfo;
public TravelQuote call()throw Exception{ return company.solicitQuote(travelInfo); } }
public List<TravelQuote> getRankedTravelQuotes( TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking(long time, TimeUnit unit) throws InterruptedException { List<QuoteTask> tasks = new ArrayList<QuoteTask>(); for (TravelCompany company : companies) tasks.add(new QuoteTask(company, travelInfo)); List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size()); Iterator<QuoteTask> taskIter = tasks.iterator(); for (Future<TravelQuote> f : futures) { QuoteTask task = taskIter.next(); try { quotes.add(f.get()); } catch (ExecutionException e) { quotes.add(task.getFailureQuote(e.getCause())); } catch (CancellationException e) { quotes.add(task.getTimeoutQuote(e)); } } Collections.sort(quotes, ranking); return quotes; }
|
Reference
Java并发编程实战
源代码
Java并发编程实战(学习笔记五 第六章 任务执行)