农场主的黑科技.

并发操作合集-10.Executor和线程池

字数统计: 3.4k阅读时长: 21 min
2018/11/04 Share

🍤 并发操作合集系列 目录

🍕 并发操作合集系列 源代码

Executor

并发API引入了ExecutorService作为一个在程序中直接使用Thread的高层次的替换方案。Executos支持运行异步任务,通常管理一个线程池,这样一来我们就不需要手动去创建新的线程。在不断地处理任务的过程中,线程池内部线程将会得到复用,因此,在我们可以使用一个executor service来运行和我们想在我们整个程序中执行的一样多的并发任务。

下面是使用executors的第一个代码示例:

1
2
3
4
5
6
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});
// => Hello pool-1-thread-1

Executors类提供了便利的工厂方法来创建不同类型的 executor services。在这个示例中我们使用了一个单线程线程池的 executor。

代码运行的结果类似于上一个示例,但是当运行代码时,你会注意到一个很大的差别:Java进程从没有停止!Executors必须显式的停止-否则它们将持续监听新的任务。

ExecutorService提供了两个方法来达到这个目的——shutdwon()会等待正在执行的任务执行完而shutdownNow()会终止所有正在执行的任务并立即关闭execuotr。

这是我喜欢的通常关闭executors的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try {
System.out.println("尝试终止executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);// 5秒内尝试终止,期间会阻塞当前线程

} catch (InterruptedException e) {
System.err.println("终止行为被中断");
} finally {
if (!executor.isTerminated()) {
System.err.println("强制终止未结束的任务");
}
executor.shutdownNow();
System.out.println("终止完毕");
}

executor通过等待指定的时间让当前执行的任务终止来“温柔的”关闭executor。在等待最长5秒的时间后,execuote最终会通过中断所有的正在执行的任务关闭。

Callable 和 Future

除了Runnable,executor还支持另一种类型的任务——Callable。Callables也是类似于runnables的函数接口,不同之处在于,Callable返回一个值。

下面的lambda表达式定义了一个callable:在休眠一分钟后返回一个整数。

1
2
3
4
5
6
7
8
9
Callable<Integer> task = () -> {
try {
TimeUnit.MINUTES.sleep(1);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
};

Callbale也可以像runnbales一样提交给 executor services。但是callables的结果怎么办?因为submit()不会等待任务完成,executor service不能直接返回callable的结果。不过,executor 可以返回一个Future类型的结果,它可以用来在稍后某个时间取出实际的结果。

1
2
3
4
5
6
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("future done? " + future.isDone());
Integer result = future.get();
System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在将callable提交给exector之后,我们先通过调用isDone()来检查这个future是否已经完成执行。我十分确定这会发生什么,因为在返回那个整数之前callable会休眠一分钟。

你可能注意到我们这次创建executor的方式与上一个例子稍有不同。我们使用newFixedThreadPool(1)来创建一个单线程线程池的 execuot service。
这等同于使用newSingleThreadExecutor不过使用第二种方式我们可以稍后通过简单的传入一个比1大的值来增加线程池的大小。稍后我还会介绍其他创建线程池的方法。

在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果123之前执行完成。现在future执行完毕,我们可以在控制台看到如下的结果:

1
2
3
future done? false
future done? true
result: 123

Future与底层的executor service紧密的结合在一起。记住,如果你关闭executor,所有的未中止的future都会抛出异常。

1
2
executor.shutdownNow();
future.get();

任何future.get()调用都会阻塞,然后等待直到callable中止。在最糟糕的情况下,一个callable持续运行——因此使你的程序将没有响应。我们可以简单的传入一个时长来避免这种情况。

1
2
3
4
5
6
7
8
9
10
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(() -> {
try {
TimeUnit.MINUTES.sleep(2);
return 123;
} catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
});
future.get(1, TimeUnit.MINUTES);

运行上面的代码将会产生一个TimeoutException

1
2
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你可能已经猜到为什么会抛出这个异常。我们指定的最长等待时间为1分钟,而这个callable在返回结果之前实际需要两分钟。

invokeAll

Executors支持通过invokeAll()一次批量提交多个callable。这个方法结果一个callable的集合,然后返回一个future的列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);

在这个例子中,我们利用Java8中的函数流(stream)来处理invokeAll()调用返回的所有future。我们首先将每一个future映射到它的返回值,然后将每个值打印到控制台。

invokeAny

批量提交callable的另一种方式就是invokeAny(),它的工作方式与invokeAll()稍有不同。在等待future对象的过程中,这个方法将会阻塞直到第一个callable中止然后返回这一个callable的结果。

为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的callable。这个方法返回一个callable,这个callable休眠指定 的时间直到返回给定的结果。

1
2
3
4
5
6
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}

我们利用这个方法创建一组callable,这些callable拥有不同的执行时间,从1秒到3秒。通过invokeAny()将这些callable提交给一个executor,返回最快的callable的字符串结果-在这个例子中为任务2:

1
2
3
4
5
6
7
8
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2

上面这个例子又使用了另一种方式来创建executor——调用newWorkStealingPool()。这个工厂方法是Java8引入的,返回一个ForkJoinPool类型的 executor,它的工作方法与其他常见的execuotr稍有不同。与使用一个固定大小的线程池不同,ForkJoinPools使用一个并行因子数来创建,默认值为主机CPU的可用核心数。

ScheduledExecutor

我们已经学习了如何在一个 executor 中提交和运行一次任务。为了持续的多次执行常见的任务,我们可以利用调度线程池。

ScheduledExecutorService支持任务调度,持续执行或者延迟一段时间后执行。

下面的实例,调度一个任务在延迟3秒钟后执行:

1
2
3
4
5
6
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1337);
long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

调度一个任务将会产生一个专门的future类型——ScheduleFuture,它除了提供了Future的所有方法之外,他还提供了getDelay()方法来获得剩余的延迟。在延迟消逝后,任务将会并发执行。

为了调度任务持续的执行,executors 提供了两个方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一个方法用来以固定频率来执行一个任务,比如,下面这个示例中,每秒钟一次:

1
2
3
4
5
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外,这个方法还接收一个初始化延迟,用来指定这个任务首次被执行等待的时长。

请记住:scheduleAtFixedRate()并不考虑任务的实际用时。所以,如果你指定了一个period为1分钟而任务需要执行2分钟,那么线程池为了性能会更快的执行。也就是说他不会延迟等待1分钟,在2分钟执行过后直接进入下一个2分钟。

在这种情况下,你应该考虑使用scheduleWithFixedDelay()。这个方法的工作方式与上我们上面描述的类似。不同之处在于等待时间 period 的应用是在一次任务的结束和下一个任务的开始之间。例如:

1
2
3
4
5
6
7
8
9
10
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Scheduling: " + System.nanoTime());
} catch (InterruptedException e) {
System.err.println("task interrupted");
}
};
executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

这个例子调度了一个任务,并在一次执行的结束和下一次执行的开始之间设置了一个1秒钟的固定延迟。初始化延迟为0,任务执行时间为0。所以我们分别在0s,3s,6s,9s等间隔处结束一次执行。如你所见,scheduleWithFixedDelay()在你不能预测调度任务的执行时长时是很有用的。

线程池

我们在介绍Executor的过程中用到了几个线程池,下面会详细总结这些线程池。

Java8通过Executors提供七种线程池,分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  • newSingleThreadScheduledExcutor:创建一个单例线程池,定期或延时执行任务。
  • newWorkStealingPool 创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。
  • ForkJoinPool 支持大任务分解成小任务的线程池,这是Java8新增线程池,通常配合ForkJoinTask接口的子类RecursiveAction或RecursiveTask使用。

newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 6; i++) {
final int index = i;
try {
Thread.sleep(index * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+" : " + index));
}
cachedThreadPool.shutdown();
-------------------------------
输出
pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService cachedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 6; i++) {
final int index = i;
cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+" : " + index));
}
cachedThreadPool.shutdown();
--------------------------------
输出
pool-1-thread-1 : 0
pool-1-thread-3 : 2
pool-1-thread-2 : 1
pool-1-thread-3 : 4
pool-1-thread-1 : 3
pool-1-thread-2 : 5

定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
Runnable task = () -> System.out.println(Thread.currentThread().getName());
int initialDelay= 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(5);
executor.shutdown();
--------------------------------
输出
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2

每过一秒执行一次。

newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 6; i++) {
final int index = i;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(() -> System.out.println(Thread.currentThread().getName()+" : " + index));
}
executor.shutdown();
-------------------------------
输出
pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5

结果依次输出,相当于顺序执行各个任务。

newWorkStealingPool和ForkJoinPool

JDK1.8中新引入的这两个线程池,可以参考下面两个链接

Reference

Java 8 并发教程:Threads和Executors

Java四种线程池的使用

Java 中7种线程池详解+示例代码

CATALOG
  1. 1. Executor
    1. 1.1. Callable 和 Future
    2. 1.2. invokeAll
    3. 1.3. invokeAny
    4. 1.4. ScheduledExecutor
  2. 2. 线程池
    1. 2.1. newCachedThreadPool
    2. 2.2. newFixedThreadPool
    3. 2.3. newScheduledThreadPool
    4. 2.4. newSingleThreadExecutor
    5. 2.5. newWorkStealingPool和ForkJoinPool
  3. 3. Reference