农场主的黑科技.

Java并发编程实战-第7章-取消与关闭

字数统计: 6.4k阅读时长: 40 min
2018/11/05 Share

本章将给出各种实现取消和中断的机制,以及如何编写任务和服务,使它们能对取消请求做出响应。

任务取消

可取消的操作 : 外部代码能在某个操作正常完成之前将其置入“完成”状态

取消某个操作的原因:

  • 点击某个桌面应用中的取消按钮时;
  • 某个操作超过了一定的执行时间限制需要中止时;
  • 多个线程做相同的事情,只要一个线程成功其它线程都可以取消时;
  • 一组线程中的一个或多个出现错误导致整组都无法继续时;
  • 当一个应用或服务需要停止时。

Java中没有一种安全的抢占式方法来停止线程,只有一些协作式的机制,代码都会遵循的一种协议。

比如:

“已请求取消”标志

任务将定期地查看这个标志。如果设置了这个标志,那么任务将提前结束。

😊PrimeGenerator持续地枚举素数(Prime),直到cancelled被设置。为了确保可靠性cancelled被设置为volatile。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//        7-1      使用Volatile类型的域来保存取消状态  
public class PrimeGenerator implements Runnable{
private final List<BigInteger> primes=
new ArrayList<BigInteger>();
private volatile boolean cancelled;//volatile变量能确保可见性 禁止指令重排序

public void run(){
BigInteger p=BigInteger.ONE;//常量
while(!cancelled){ //任务将定期地查看这个标志。如果设置了这个标志,那么任务将提前结束。(即如果cancelled不为true则一直运行)
p=p.nextProbablePrime();//返回一个比当前大的
synchronized (this) { //确保不会被添加多次
primes.add(p);
}
}
}

public void cancel(){
cancelled=true; //将“已请求取消标志”取为true
}

public synchronized List<BigInteger> get(){
return new ArrayList<BigInteger>(primes);
}
}

上面代码的客户端程序,一个仅运行一秒的素数生成器

1
2
3
4
5
6
7
8
9
10
11
12
//            7-2      一个仅运行一秒的素数生成器
static List<BigInteger> aSecondOfPrimes() throws InterruptedException{
PrimeGenerator generator=new PrimeGenerator();
new Thread(generator).start();
try{
SECONDS.sleep(1); //延迟一秒才执行cancel
}finally{
generator.cancel();
}
return generator.get();

}

cancel方法由finally块调用,确保即使在调用sleep时被中断也能取消素数生成器的执行。

中断

PrimeGenerator的问题:果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么任务可能永远不会检查取消标志,因此永远不会结束。

😧生产者线程生成素数,并将它们放入一个阻塞队列。如果生成者的速度超过了消费者的处理速度, 队列将被填满,put方法也会阻塞。因此,生产者却永远不能检查这个标志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//             7-3    不可靠的取消操作吧生成者置于阻塞的操作中(不要这么做)
public class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled=false;

public BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue=queue;
}

public void run(){
try{
BigInteger p=BigInteger.ONE;
while(!cancelled){
queue.put(p=p.nextProbablePrime());
}
}catch (InterruptedException consumed) {

}
}
}

void consumePrimes() throws InterruptedException{
BlockingQueue<BigInteger> primes=new ArrayBlockingQueue<>(MAX_PRIORITY);
BrokenPrimeProducer producer=new BrokenPrimeProducer(primes);
producer.start();
try{
while(needMorePrimes()) //如果需要继续消费
consume(primes.take());
}finally{
//它可以调用cancel方法来设置cancelled标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态。)
producer.cancel();
}
}

Thread的中断
阻塞库方法定期检查Thread的中断,并在适当时候停止当前工作

Thread中的中断方法 : 每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。

在Thread中包含了中断线程以及查询线程中断状态的方法,如下:

  • interrupt方法能中断目标线程
  • isInterrupt方法能返回目标线程的中断状态
  • 静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

阻塞库方法,如Thread.sleep和Object.wait,在发现中断时提前返回。它们在响应中断时会:清除中断状态,抛出InterruptedException

调用Interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

上面的BrokenPrimeProducer的解决方式:进行阻塞的BlockingQueue可以检测Thread的interrupt,所以用interrupt代替自定义的中断方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//       7-5 通过中断来取消
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
//在每次迭代循环中,有两个位置可以检测出中断:在阻塞的put方法中调用,以及在循环开始处查询中断状态。
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* 允许线程退出 */
}
}

public void cancel() {
interrupt();
}
}

中断策略

中断策略:在发现中断请求时应该做哪些工作

最合理的终端策略:尽快退出,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了这个最合理的取消策略。但它可以推迟处理中断请求,并直到某个更合适的时刻,在抛出InterruptedException。

响应中断

有两种实用策略可用于处理InterruptedException:

  • 传递异常(可能在某个特定于任务额清除操作之后),从而使你的方法也称为可中断的阻塞方法。
  • 恢复中断状态,从而使调用栈上的上层代码能够对其进行处理。也就是catch InterruptedException

比如,当一个由ThreadPoolExecutor拥有的工作者线程检测到中断时,它会检测线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则它可能创建一个新线程将线程池恢复到合理的规模

示例:计时运行

😧在指定时间内运行一个Runnable,指定时间后中断,该异常会被timeRun1的调用者捕获

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//        7-8    在外部线程中安排中断(不要这么做)
public class TimedRun1 {
private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);
//r是任务
public static void timedRun(Runnable r,
long timeout, TimeUnit unit) {
//返回一个并发执行的线程的引用
final Thread taskThread = Thread.currentThread();
//它在调用线程中运行任务,并安排了一个取消任务,在运行指定的时间间隔后中断它。
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
//运行任务
r.run();
}
}

上面的程序有哪些问题,主要是由于timedRun不了解每个参数Runnable的中断策略:

  • 可能这个Runnable在超时前就完成了,那么这个中断异常将会在调用者之后的程序中才抛出。
  • 可能任务不响应中断,timedRun会在任务结束时才返回,没有起到时间限制的作用。

😕 上面问题的解决方案:执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍然能返回到它的调用者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//           7-9  在专门的线程中中断任务     
public class timeRun2 {
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);

public static void timedRun(final Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t; //所有error和exception的超类

public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}

void rethrow() {
if (t != null)
throw LaunderThrowable.launderThrowable(t);
}
}
//执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍然能返回到它的调用者。
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
taskThread.join(unit.toMillis(timeout));//等待任务线程的消亡,即等它运行完毕
//在join返回后,它将检查任务中是否有异常抛出,如果有的话,则会在调用timedRun2的线程中再次抛出应该异常。
task.rethrow();
}
}

这个程序的问题是,它依赖于一个限时的join
join的不足:无法知道执行控制是因为线程正常退出还是因为join超时而返回。

通过Future来实现取消

先说一下Future带有的cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功。

mayInterruptIfRunning参数:true表示线程能被中断,false则表示不能。

什么时候可以把cancel的参数指定为true?如果是在标准的Executor中创建,并且你打算用Future来取消,则可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//          7-10     通过Future来取消任务
public class TimedRun {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();

public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);//在限时内取得结果,如果要等待则等待
} catch (TimeoutException e) {
// 接下来任务会被取消
} catch (ExecutionException e) {
// 如果在任务中抛出了异常,那么重新抛出该异常
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经结束,那么执行取消操作也不会有任何影响
task.cancel(true); // 如果任务正在进行,那么将被中断
}
}
}

处理不可中断的阻塞

不是所有Java中的阻塞机制都会直接响应InterruptException,如一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞。但他们也有类似InterruptException的机制:

  • Java.io包中的同步Socket I/O
    在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法不会响应中断,但通过关闭底层的socket(套接字),可以使得由于执行read和write等方法而被阻塞的线程抛出一个SocketException。
  • Java.io包中的同步I/O
    当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(channel)(这会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现(implements)了InterruptibleChannel。
  • Selector的异步I/O
    如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close和wakeup方法会使线程抛出ClosedByInterruptException并提前返回。
  • 获取某个锁
    如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会得到锁,所有将不会理会中断请求。但是,在Lock类提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。

让一个Socket既能处理标准的中断,也能关闭底层的套接字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//               7-11 通过改写interrupt方法将非标准的取消操作封装在Thread中
public class ReaderThread extends Thread{
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;

public ReaderThread(Socket socket) throws IOException{
this.socket=socket;
this.in=socket.getInputStream();
}
//改写了interrupt方法,使其既能处理标准的中断,也能关闭底层的套接字
public void interrupt(){
try{
socket.close();
}catch (IOException ignored) {
}finally{
//调用继承的Thread中的方法处理标准的中断
super.interrupt();
}
}

public void run(){
try{
byte[] buf=new byte[BUFSZ];
while(true){ //用同步的方式从该套接字中读取数据
int count=in.read(buf);
if(count<0)
break;
else if(count>0)
processBuffer(buf,count); //并将接收到的数据传递给processBuffer
}
}catch (IOException e) {
// 允许线程退出
}
}
}

无论ReaderThread线程是在read方法中阻塞还是在某个可中断的阻塞方法中阻塞,都可以被中断并停止执行当前的工作。

采用newTaskFor来封装非标准的取消

newTaskFor:ThreadPoolExecutor中的一个工厂方法,它将创建Future来代表任务。newTaskFor还能返回一个RunnableFuture接口,该接口继承(扩展)了Future和Runnable(并由FutureTask实现)。

改写Future.cancel也可以实现和上面代码相同的功能

1
2
3
4
5
6
//   7-12     通过newTaskFor将非标准的取消操作封装在一个任务中
//CancellableTask定义了一个CancellableTask接口,该接口扩展了Callable,并增加了一个cancel方法和newTask方法来构造RunnableFuture。
public interface CancellableTask<T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
1
2
3
4
5
6
7
8
9
10
11
12
//   7-12     通过newTaskFor将非标准的取消操作封装在一个任务中
//CancellingExecutor继承了THReadPoolExecutor,并通过改写newTaskFor使得CancellableTask可以创建自己的Future
@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {
...
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//            7-12 通过newTaskFor将非标准的取消操作封装在一个任务中
// SocketUsingTask实现了CancellableTask
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
private Socket socket;

protected synchronized void setSocket(Socket s) {
socket = s;
}
//定义了Future.cancel来关闭套接字
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
//如果SocketUsingTask通过其自己的Future来取消,那么底层的套接字将被关闭并且线程将被中断
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) { //Creates a FutureTask that will, upon running(正在运行), execute the given Callable.
@SuppressWarnings("finally")
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally { //调用super.cancel
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}

停止基于线程的服务

服务应该提供生命周期方法来关闭它自己以及它所拥有的线程,比如ExecutorServi提供了shutdown和shutdownNow等方法。

示例:日志服务

😕 日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接写入输出流,而是由LogWriter通过BlockingQueue将消息提交给日志线程,由日志线程写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//         7-13 不支持关闭的生产者-消费者日志服务(不好)
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger; //logger(记录器)
private static final int CAPACITY = 1000;

public LogWriter(Writer writer){
this.queue=new LinkedBlockingQueue<>(CAPACITY);
this.logger=new LoggerThread(writer);
}

public void start(){ //启动线程
logger.start();
}
//充当生产者
public void log(String msg)throws InterruptedException{
queue.put(msg);
}
//产生日志消息的线程并不会将消息直接写入输出流,而是由LogWriter通过BlockingQueue将消息提交给日志线程,由日志线程写入。
private class LoggerThread extends Thread{ //日志线程
private final PrintWriter writer; //PrintWriter是线程安全的

public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // true参数代表了autoflush
}

public void run(){
try{
while(true)
writer.println(queue.take());
}catch (InterruptedException ignored) { //可以做些什么..但是..
}finally{
writer.close();
}
}
}
}

如果将日志线程修改为当捕获到InterruptedException时退出,那么只需中断日志线程就能停止服务。但是,1.会丢失那些正在等待被写入到日志的信息,2.其他线程将在调用log时被阻塞,因为日志消息队列是满的(日志线程停止了take)

😕 另一种关闭方法:设置某个“已请求关闭”标志(与前面的已请求取消标志类似),避免进一步提交日志信息。

1
2
3
4
5
6
7
8
//          7-14    通过一种不可靠的方式为日志增加关闭支持
public void log(String msg) throws InterruptedException {
if (!shutdownRequested) //存在竞态条件,因为它是“先判断再运行”
//检查的时候还未停止,而put的时候停止了,这个时候将会阻塞
queue.put(msg);
else //否则抛出异常
throw new IllegalStateException("logger is shut down");
}

😊 通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提交信息的权利。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//           7-15  向LogWriter添加可靠的取消操作
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
private boolean isShutdown;
private int reservations; //保留,设置一个计数器来记录有多少个任务“保持”提交信息的权利

public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}

public void start(){
loggerThread.start();
}

public void stop(){
synchronized (this) { //避免在未设置的时候有任务开始,在设置后任务因为停止了阻塞
isShutdown=true;
}
loggerThread.interrupt(); //日志线程中断
}

public void log(String msg)throws InterruptedException{
//避免检查的时候还未停止,而put的时候停止了,这个时候将会阻塞
synchronized (this) { //通过原子操作来检查关闭请求
if(isShutdown)
throw new IllegalStateException(/*...*/);
++reservations; //递增一个计数器来“保持”提交信息的权利
}
queue.put(msg);
}

private class LoggerThread extends Thread{
public void run(){
try{
while(true){
try{ //用的都是LogService的内置锁
synchronized (LogService.this) { //通过原子操作来检查关闭请求和拥有提交信息权利的任务的数量
if(isShutdown&&reservations==0)
break;
}
String msg=queue.take();
synchronized (LogService.this) {//这是一个复合操作,需要上锁
--reservations;
}
writer.println(msg);
}catch (InterruptedException e) {
// 重试
}
}
}finally{
writer.close();
}
}
}
}

关闭ExecutorService

ExecutorService提供了两种关闭方法:1.shutdown正常关闭。2.shutdownNow强行关闭,首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。

使用ExecutorService的日志服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//        7-16  使用ExecutorService的日志服务
public class LogService {
//将管理线程的工作委托给一个ExecutorService,而不是自行管理
private final ExecutorService exec=newSingleThreadExecutor();
private final PrintWriter writer;
//...
public void start(){

}

public void stop()throws InterruptedException{
try{
exec.shutdown();
//再接受到shutdown请求后会等待任务完成或超时, 或者并发线程被中断
exec.awaitTermination(TIMEOUT, UNIT);
}finally {
writer.close();
}
}

public void log(String msg){
try{
exec.execute(new WriteTask(msg));
}catch (RejectedExecutionException ignored) {
// TODO: handle exception
}
}
}

“毒丸”对象

“毒丸”:“当得到这个对象时,立即停止”。 在FIFO队列中使用

一个生产者和一个消费者的桌面搜索示例,通过“毒丸”对象来关闭服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//          7-17    通过毒丸对象来关闭服务
public class IndexingService {
private static final int CAPACITY = 1000; //capacitry容量
private static final File POSION=new File("");
private final CrawlerThread producer=new CrawlerThread();
private final IndexerThread consumer=new IndexerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;

public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter() {
public boolean accept(File f) { //测试该路径名称该不该被加入到路径名称列表中
return f.isDirectory() || fileFilter.accept(f);
}
};
}

private boolean alreadyIndexed(File f) {
return false;
}

public void start(){
producer.start();
consumer.start();
}

public void stop() { //停止生产
producer.interrupt();
}

public void awaitTermination() throws InterruptedException {
consumer.join(); //等待前面执行的线程消亡后再执行
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//           7-18  IndexingService的生产者线程
public class CrawlerThread extends Thread{
public void run(){
try{
crawl(root);
}catch (InterruptedException e) {
// 发生异常
}finally{ //提交毒丸对象后,将不再提交任何工作
while(true){
try{
queue.put(POSION);
break;
}catch (InterruptedException e) {
//重新尝试
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter); //返回符合过滤器的文件的绝对路径
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory()) //如果是文件夹则递归
crawl(entry);
else if (!alreadyIndexed(entry)) //如果还没给文件编索引完成,则放入阻塞队列中
queue.put(entry);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//           7-19  IndexingService的消费者线程
public class IndexerThread extends Thread{
public void run(){ //在毒丸对象提交前的所有工作都被被处理
try{
while(true){
File file=queue.take();
if(file==POSION)
break;
else
indexFile(file);
}
}catch (InterruptedException comsumed) {
}
}
public void indexFile(File file) {
/*...*/
};
}

示例:只执行一次的服务

checkMail方法能在多台主机上并行地检查新邮件。它创建一个私有的Executor,并向每台主机提交一个任务。然后,当所有邮件检查任务都执行完毕后,关闭Executor并等待结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CheckForMail {
public boolean checkMail(Set<String> hosts,long timeout,TimeUnit unit)throws InterruptedException{
ExecutorService exec=Executors.newCachedThreadPool();
//之所以用AtomicBoolean代替volatile,是因为能从内部的Runnable中访问hasNewMail标志,因此它必须是final类型以免修改
final AtomicBoolean hasNewMail=new AtomicBoolean(false);//初始值为false
try{
for(final String host:hosts)
exec.execute(new Runnable(){ //Executes the given command at some time in the future
public void run(){
if(checkMail(host))
hasNewMail.set(true);
}
});
}finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
private boolean checkMail(String host) {
//检查邮件
return false;
}
}

shutdownNow的局限性

使用shutdownNow时,虽然他会返回已提交但尚未开始的任务,但我们无法在关闭中知道正在执行的任务的状态

记录哪些任务是在关闭后取消的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//          7-21   在ExecutorService中跟踪在关闭之后取消的任务
public class TrackingExecutor extends AbstractExecutorService{
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown=
Collections.synchronizedSet(new HashSet<Runnable>()); //创建一个线程安全的Set来记录那些任务是关闭后取消的

public List<Runnable> getCancelledTasks(){ //客户端可以通过这个方法知道,哪些任务是在关闭后取消的
if(!exec.isShutdown())
throw new IllegalStateException();
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}

public void execute(final Runnable runnable){
exec.execute(new Runnable(){
public void run(){
try{
runnable.run();
}finally{ //如果已经ExecutorService关闭了并且任务被中断(取消),添加到Set中
if(isShutdown()&&Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
// 将ExecutorService的其他方法委托给exec
}

TrackingExecutor的客户端程序。

当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//       7-22    使用TrackingExecutorService来保存未完成的任务已备后续执行
public abstract class WebCrawler {
private volatile TrackingExecutor exec;
@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>(); //存放

private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;

public WebCrawler(URL startUrl) {
urlsToCrawl.add(startUrl);
}

public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool()); //创建可获得在关闭后被取消的任务的TrackingExecutor
for (URL url : urlsToCrawl) submitCrawlTask(url); //将urlsToCrawl中的url加入执行队列
urlsToCrawl.clear(); //清空
}

public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(exec.shutdownNow()); //shutdownNow首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。
if (exec.awaitTermination(TIMEOUT, UNIT))
saveUncrawled(exec.getCancelledTasks()); //返回在关闭后被取消的任务并保存
} finally {
exec = null;
}
}

protected abstract List<URL> processPage(URL url);

private void saveUncrawled(List<Runnable> uncrawled) { //保存已在任务清单中但尚未执行的任务
for (Runnable task : uncrawled)
urlsToCrawl.add(((CrawlTask) task).getPage());
}

private void submitCrawlTask(URL u) { //加入执行队列
exec.execute(new CrawlTask(u));
}

private class CrawlTask implements Runnable {
private final URL url;

CrawlTask(URL url) {
this.url = url;
}

private int count = 1;

boolean alreadyCrawled() { //表示该url已经被爬过,putIfAbsent包含了判断的,是原子操作
return seen.putIfAbsent(url, true) != null;
}

void markUncrawled() { //移除
seen.remove(url);
System.out.printf("marking %s uncrawled%n", url);
}

public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted()) //如果已经被中断就不做操作
return;
submitCrawlTask(link); //如果未中断则递归,一直往exec中添加任务
}
}

public URL getPage() { //返回url
return url;
}
}
}

处理非正常的线程终止

最主要原因就是RuntimeException。

下面代码中,如果任务抛出了一个运行时异常,那么它将使线程终结。ThreadPoolExecutor也使用了这种技术。

1
2
3
4
5
6
7
8
9
10
11
12
//       典型的线程池工作者线程结构
public void run() {
Throwable thrown = null;
try {
while (!isInterrupted()) //未中断
runTask(getTaskFromWorkQueue());
} catch (Throwable e) { //检查到异常
thrown = e;
} finally { //线程终结
threadExited(this, thrown);
}
}

未捕获异常的处理

除了上一种种主动方法来解决运行时异常,在Thread API 中同样提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。这两种方法是互补的,通过将两者结合在一起,就能有效地防止线程泄漏问题。

当线程由于未捕获异常而退出时,JVM会吧这个事件报告给应用程序提的UncaughtExceptionHandler异常处理器。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err。

或者像下面程序这样,写入日志。

1
2
3
4
5
6
7
//      7-25  将异常写入日志的UncaughtExceptionHandler
public class UEHLogger implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
Logger logger = Logger.getAnonymousLogger(); //创建一个匿名的日志
logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
}
}

Reference

Java并发编程实战

源代码

Java并发编程实战(学习笔记六 第七章 取消与关闭 上)

Java并发编程实战(学习笔记六 第七章 取消与关闭 下)

CATALOG
  1. 1. 任务取消
    1. 1.1. 中断
    2. 1.2. 中断策略
    3. 1.3. 响应中断
    4. 1.4. 示例:计时运行
    5. 1.5. 通过Future来实现取消
    6. 1.6. 处理不可中断的阻塞
    7. 1.7. 采用newTaskFor来封装非标准的取消
  2. 2. 停止基于线程的服务
    1. 2.1. 示例:日志服务
    2. 2.2. 关闭ExecutorService
    3. 2.3. “毒丸”对象
    4. 2.4. 示例:只执行一次的服务
    5. 2.5. shutdownNow的局限性
  3. 3. 处理非正常的线程终止
    1. 3.1. 未捕获异常的处理
  4. 4. Reference