Executor
并发API引入了ExecutorService
作为一个在程序中直接使用Thread的高层次的替换方案。Executos支持运行异步任务,通常管理一个线程池,这样一来我们就不需要手动去创建新的线程。在不断地处理任务的过程中,线程池内部线程将会得到复用,因此,在我们可以使用一个executor service来运行和我们想在我们整个程序中执行的一样多的并发任务。
下面是使用executors的第一个代码示例:
1 | ExecutorService executor = Executors.newSingleThreadExecutor(); |
Executors
类提供了便利的工厂方法来创建不同类型的 executor services。在这个示例中我们使用了一个单线程线程池的 executor。
代码运行的结果类似于上一个示例,但是当运行代码时,你会注意到一个很大的差别:Java进程从没有停止!Executors必须显式的停止-否则它们将持续监听新的任务。
ExecutorService
提供了两个方法来达到这个目的——shutdwon()
会等待正在执行的任务执行完而shutdownNow()
会终止所有正在执行的任务并立即关闭execuotr。
这是我喜欢的通常关闭executors的方式:
1 | try { |
executor通过等待指定的时间让当前执行的任务终止来“温柔的”关闭executor。在等待最长5秒的时间后,execuote最终会通过中断所有的正在执行的任务关闭。
Callable 和 Future
除了Runnable
,executor还支持另一种类型的任务——Callable
。Callables也是类似于runnables的函数接口,不同之处在于,Callable返回一个值。
下面的lambda表达式定义了一个callable:在休眠一分钟后返回一个整数。
1 | Callable<Integer> task = () -> { |
Callbale也可以像runnbales一样提交给 executor services。但是callables的结果怎么办?因为submit()
不会等待任务完成,executor service不能直接返回callable的结果。不过,executor 可以返回一个Future
类型的结果,它可以用来在稍后某个时间取出实际的结果。
1 | ExecutorService executor = Executors.newFixedThreadPool(1); |
在将callable提交给exector之后,我们先通过调用isDone()
来检查这个future是否已经完成执行。我十分确定这会发生什么,因为在返回那个整数之前callable会休眠一分钟。
你可能注意到我们这次创建executor的方式与上一个例子稍有不同。我们使用newFixedThreadPool(1)
来创建一个单线程线程池的 execuot service。
这等同于使用newSingleThreadExecutor
不过使用第二种方式我们可以稍后通过简单的传入一个比1大的值来增加线程池的大小。稍后我还会介绍其他创建线程池的方法。
在调用get()
方法时,当前线程会阻塞等待,直到callable在返回实际的结果123之前执行完成。现在future执行完毕,我们可以在控制台看到如下的结果:
1 | future done? false |
Future与底层的executor service紧密的结合在一起。记住,如果你关闭executor,所有的未中止的future都会抛出异常。
1 | executor.shutdownNow(); |
任何future.get()
调用都会阻塞,然后等待直到callable中止。在最糟糕的情况下,一个callable持续运行——因此使你的程序将没有响应。我们可以简单的传入一个时长来避免这种情况。
1 | ExecutorService executor = Executors.newFixedThreadPool(1); |
运行上面的代码将会产生一个TimeoutException
:
1 | Exception in thread "main" java.util.concurrent.TimeoutException |
你可能已经猜到为什么会抛出这个异常。我们指定的最长等待时间为1分钟,而这个callable在返回结果之前实际需要两分钟。
invokeAll
Executors支持通过invokeAll()
一次批量提交多个callable。这个方法结果一个callable的集合,然后返回一个future的列表。
1 | ExecutorService executor = Executors.newWorkStealingPool(); |
在这个例子中,我们利用Java8中的函数流(stream)来处理invokeAll()
调用返回的所有future。我们首先将每一个future映射到它的返回值,然后将每个值打印到控制台。
invokeAny
批量提交callable的另一种方式就是invokeAny()
,它的工作方式与invokeAll()
稍有不同。在等待future对象的过程中,这个方法将会阻塞直到第一个callable中止然后返回这一个callable的结果。
为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的callable。这个方法返回一个callable,这个callable休眠指定 的时间直到返回给定的结果。
1 | Callable<String> callable(String result, long sleepSeconds) { |
我们利用这个方法创建一组callable,这些callable拥有不同的执行时间,从1秒到3秒。通过invokeAny()
将这些callable提交给一个executor,返回最快的callable的字符串结果-在这个例子中为任务2:
1 | ExecutorService executor = Executors.newWorkStealingPool(); |
上面这个例子又使用了另一种方式来创建executor——调用newWorkStealingPool()
。这个工厂方法是Java8引入的,返回一个ForkJoinPool
类型的 executor,它的工作方法与其他常见的execuotr稍有不同。与使用一个固定大小的线程池不同,ForkJoinPools
使用一个并行因子数来创建,默认值为主机CPU的可用核心数。
ScheduledExecutor
我们已经学习了如何在一个 executor 中提交和运行一次任务。为了持续的多次执行常见的任务,我们可以利用调度线程池。
ScheduledExecutorService
支持任务调度,持续执行或者延迟一段时间后执行。
下面的实例,调度一个任务在延迟3秒钟后执行:
1 | ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); |
调度一个任务将会产生一个专门的future类型——ScheduleFuture
,它除了提供了Future的所有方法之外,他还提供了getDelay()
方法来获得剩余的延迟。在延迟消逝后,任务将会并发执行。
为了调度任务持续的执行,executors 提供了两个方法scheduleAtFixedRate()
和scheduleWithFixedDelay()
。第一个方法用来以固定频率来执行一个任务,比如,下面这个示例中,每秒钟一次:
1 | ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); |
另外,这个方法还接收一个初始化延迟,用来指定这个任务首次被执行等待的时长。
请记住:scheduleAtFixedRate()
并不考虑任务的实际用时。所以,如果你指定了一个period为1分钟而任务需要执行2分钟,那么线程池为了性能会更快的执行。也就是说他不会延迟等待1分钟,在2分钟执行过后直接进入下一个2分钟。
在这种情况下,你应该考虑使用scheduleWithFixedDelay()
。这个方法的工作方式与上我们上面描述的类似。不同之处在于等待时间 period 的应用是在一次任务的结束和下一个任务的开始之间。例如:
1 | ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); |
这个例子调度了一个任务,并在一次执行的结束和下一次执行的开始之间设置了一个1秒钟的固定延迟。初始化延迟为0,任务执行时间为0。所以我们分别在0s,3s,6s,9s等间隔处结束一次执行。如你所见,scheduleWithFixedDelay()
在你不能预测调度任务的执行时长时是很有用的。
线程池
我们在介绍Executor的过程中用到了几个线程池,下面会详细总结这些线程池。
Java8通过Executors提供七种线程池,分别为:
- newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
- newSingleThreadScheduledExcutor:创建一个单例线程池,定期或延时执行任务。
- newWorkStealingPool 创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。
- ForkJoinPool 支持大任务分解成小任务的线程池,这是Java8新增线程池,通常配合ForkJoinTask接口的子类RecursiveAction或RecursiveTask使用。
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:
1 | ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); |
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
1 | ExecutorService cachedThreadPool = Executors.newFixedThreadPool(3); |
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
1 | ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); |
每过一秒执行一次。
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
1 | ExecutorService executor = Executors.newSingleThreadExecutor(); |
结果依次输出,相当于顺序执行各个任务。
newWorkStealingPool和ForkJoinPool
JDK1.8中新引入的这两个线程池,可以参考下面两个链接