本章将给出各种实现取消和中断的机制,以及如何编写任务和服务,使它们能对取消请求做出响应。
任务取消 可取消的操作 : 外部代码能在某个操作正常完成之前将其置入“完成”状态
取消某个操作的原因:
点击某个桌面应用中的取消按钮时;
某个操作超过了一定的执行时间限制需要中止时;
多个线程做相同的事情,只要一个线程成功其它线程都可以取消时;
一组线程中的一个或多个出现错误导致整组都无法继续时;
当一个应用或服务需要停止时。
Java中没有一种安全的抢占式方法来停止线程,只有一些协作式的机制,代码都会遵循的一种协议。
比如:
“已请求取消”标志
任务将定期地查看这个标志。如果设置了这个标志,那么任务将提前结束。
😊PrimeGenerator持续地枚举素数(Prime),直到cancelled被设置。为了确保可靠性cancelled被设置为volatile。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class PrimeGenerator implements Runnable { private final List<BigInteger> primes= new ArrayList<BigInteger>(); private volatile boolean cancelled; public void run () { BigInteger p=BigInteger.ONE; while (!cancelled){ p=p.nextProbablePrime(); synchronized (this ) { primes.add(p); } } } public void cancel () { cancelled=true ; } public synchronized List<BigInteger> get () { return new ArrayList<BigInteger>(primes); } }
上面代码的客户端程序,一个仅运行一秒的素数生成器
1 2 3 4 5 6 7 8 9 10 11 12 static List<BigInteger> aSecondOfPrimes () throws InterruptedException { PrimeGenerator generator=new PrimeGenerator(); new Thread(generator).start(); try { SECONDS.sleep(1 ); }finally { generator.cancel(); } return generator.get(); }
cancel方法由finally块调用,确保即使在调用sleep时被中断也能取消素数生成器的执行。
中断 PrimeGenerator的问题:果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么任务可能永远不会检查取消标志,因此永远不会结束。
😧生产者线程生成素数,并将它们放入一个阻塞队列。如果生成者的速度超过了消费者的处理速度, 队列将被填满,put方法也会阻塞。因此,生产者却永远不能检查这个标志。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class BrokenPrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; private volatile boolean cancelled=false ; public BrokenPrimeProducer (BlockingQueue<BigInteger> queue) { this .queue=queue; } public void run () { try { BigInteger p=BigInteger.ONE; while (!cancelled){ queue.put(p=p.nextProbablePrime()); } }catch (InterruptedException consumed) { } } } void consumePrimes () throws InterruptedException { BlockingQueue<BigInteger> primes=new ArrayBlockingQueue<>(MAX_PRIORITY); BrokenPrimeProducer producer=new BrokenPrimeProducer(primes); producer.start(); try { while (needMorePrimes()) consume(primes.take()); }finally { producer.cancel(); } }
Thread的中断 阻塞库方法定期检查Thread的中断,并在适当时候停止当前工作
Thread中的中断方法 : 每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。
在Thread中包含了中断线程以及查询线程中断状态的方法,如下:
interrupt方法能中断目标线程
isInterrupt方法能返回目标线程的中断状态
静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。
阻塞库方法,如Thread.sleep和Object.wait,在发现中断时提前返回。它们在响应中断时会:清除中断状态,抛出InterruptedException
调用Interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。
上面的BrokenPrimeProducer的解决方式:进行阻塞的BlockingQueue可以检测Thread的interrupt,所以用interrupt代替自定义的中断方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; PrimeProducer(BlockingQueue<BigInteger> queue) { this .queue = queue; } public void run () { try { BigInteger p = BigInteger.ONE; while (!Thread.currentThread().isInterrupted()) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { } } public void cancel () { interrupt(); } }
中断策略 中断策略:在发现中断请求时应该做哪些工作
最合理的终端策略:尽快退出,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。
这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了这个最合理的取消策略。但它可以推迟处理中断请求,并直到某个更合适的时刻,在抛出InterruptedException。
响应中断 有两种实用策略可用于处理InterruptedException:
传递异常 (可能在某个特定于任务额清除操作之后),从而使你的方法也称为可中断的阻塞方法。
恢复中断状态 ,从而使调用栈上的上层代码能够对其进行处理。也就是catch InterruptedException
比如,当一个由ThreadPoolExecutor拥有的工作者线程检测到中断时,它会检测线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则它可能创建一个新线程将线程池恢复到合理的规模
示例:计时运行 😧在指定时间内运行一个Runnable,指定时间后中断,该异常会被timeRun1的调用者捕获
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class TimedRun1 { private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1 ); public static void timedRun (Runnable r, long timeout, TimeUnit unit) { final Thread taskThread = Thread.currentThread(); cancelExec.schedule(new Runnable() { public void run () { taskThread.interrupt(); } }, timeout, unit); r.run(); } }
上面的程序有哪些问题,主要是由于timedRun不了解每个参数Runnable的中断策略:
可能这个Runnable在超时前就完成了,那么这个中断异常将会在调用者之后的程序中才抛出。
可能任务不响应中断,timedRun会在任务结束时才返回,没有起到时间限制的作用。
😕 上面问题的解决方案:执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍然能返回到它的调用者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class timeRun2 { private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1 ); public static void timedRun (final Runnable r, long timeout, TimeUnit unit) throws InterruptedException { class RethrowableTask implements Runnable { private volatile Throwable t; public void run () { try { r.run(); } catch (Throwable t) { this .t = t; } } void rethrow () { if (t != null ) throw LaunderThrowable.launderThrowable(t); } } RethrowableTask task = new RethrowableTask(); final Thread taskThread = new Thread(task); taskThread.start(); cancelExec.schedule(new Runnable() { public void run () { taskThread.interrupt(); } }, timeout, unit); taskThread.join(unit.toMillis(timeout)); task.rethrow(); } }
这个程序的问题是,它依赖于一个限时的join join的不足:无法知道执行控制是因为线程正常退出还是因为join超时而返回。
通过Future来实现取消 先说一下Future带有的cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功。
mayInterruptIfRunning参数:true表示线程能被中断,false则表示不能。
什么时候可以把cancel的参数指定为true?如果是在标准的Executor中创建,并且你打算用Future来取消,则可以。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class TimedRun { private static final ExecutorService taskExec = Executors.newCachedThreadPool(); public static void timedRun (Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } finally { task.cancel(true ); } } }
处理不可中断的阻塞 不是所有Java中的阻塞机制都会直接响应InterruptException,如一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞。但他们也有类似InterruptException的机制:
Java.io包中的同步Socket I/O 在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法不会响应中断,但通过关闭底层的socket(套接字),可以使得由于执行read和write等方法而被阻塞的线程抛出一个SocketException。
Java.io包中的同步I/O 当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(channel)(这会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现(implements)了InterruptibleChannel。
Selector的异步I/O 如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close和wakeup方法会使线程抛出ClosedByInterruptException并提前返回。
获取某个锁 如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会得到锁,所有将不会理会中断请求。但是,在Lock类提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。
让一个Socket既能处理标准的中断,也能关闭底层的套接字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class ReaderThread extends Thread { private static final int BUFSZ = 512 ; private final Socket socket; private final InputStream in; public ReaderThread (Socket socket) throws IOException { this .socket=socket; this .in=socket.getInputStream(); } public void interrupt () { try { socket.close(); }catch (IOException ignored) { }finally { super .interrupt(); } } public void run () { try { byte [] buf=new byte [BUFSZ]; while (true ){ int count=in.read(buf); if (count<0 ) break ; else if (count>0 ) processBuffer(buf,count); } }catch (IOException e) { } } }
无论ReaderThread线程是在read方法中阻塞还是在某个可中断的阻塞方法中阻塞,都可以被中断并停止执行当前的工作。
采用newTaskFor来封装非标准的取消 newTaskFor:ThreadPoolExecutor中的一个工厂方法,它将创建Future来代表任务。newTaskFor还能返回一个RunnableFuture接口,该接口继承(扩展)了Future和Runnable(并由FutureTask实现)。
改写Future.cancel也可以实现和上面代码相同的功能
1 2 3 4 5 6 public interface CancellableTask <T > extends Callable <T > { void cancel () ; RunnableFuture<T> newTask () ; }
1 2 3 4 5 6 7 8 9 10 11 12 @ThreadSafe class CancellingExecutor extends ThreadPoolExecutor { ... protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { if (callable instanceof CancellableTask) return ((CancellableTask<T>) callable).newTask(); else return super .newTaskFor(callable); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public abstract class SocketUsingTask <T > implements CancellableTask <T > { private Socket socket; protected synchronized void setSocket (Socket s) { socket = s; } public synchronized void cancel () { try { if (socket != null ) socket.close(); } catch (IOException ignored) { } } public RunnableFuture<T> newTask () { return new FutureTask<T>(this ) { @SuppressWarnings ("finally" ) public boolean cancel (boolean mayInterruptIfRunning) { try { SocketUsingTask.this .cancel(); } finally { return super .cancel(mayInterruptIfRunning); } } }; } }
停止基于线程的服务 服务应该提供生命周期方法来关闭它自己以及它所拥有的线程,比如ExecutorServi提供了shutdown和shutdownNow等方法。
示例:日志服务 😕 日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接写入输出流,而是由LogWriter通过BlockingQueue将消息提交给日志线程,由日志线程写入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread logger; private static final int CAPACITY = 1000 ; public LogWriter (Writer writer) { this .queue=new LinkedBlockingQueue<>(CAPACITY); this .logger=new LoggerThread(writer); } public void start () { logger.start(); } public void log (String msg) throws InterruptedException { queue.put(msg); } private class LoggerThread extends Thread { private final PrintWriter writer; public LoggerThread (Writer writer) { this .writer = new PrintWriter(writer, true ); } public void run () { try { while (true ) writer.println(queue.take()); }catch (InterruptedException ignored) { }finally { writer.close(); } } } }
如果将日志线程修改为当捕获到InterruptedException时退出,那么只需中断日志线程就能停止服务。但是,1.会丢失那些正在等待被写入到日志的信息,2.其他线程将在调用log时被阻塞,因为日志消息队列是满的(日志线程停止了take)
😕 另一种关闭方法:设置某个“已请求关闭”标志(与前面的已请求取消标志类似),避免进一步提交日志信息。
1 2 3 4 5 6 7 8 public void log (String msg) throws InterruptedException { if (!shutdownRequested) queue.put(msg); else throw new IllegalStateException("logger is shut down" ); }
😊 通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提交信息的权利。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class LogService { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; private boolean isShutdown; private int reservations; public LogService (Writer writer) { this .queue = new LinkedBlockingQueue<String>(); this .loggerThread = new LoggerThread(); this .writer = new PrintWriter(writer); } public void start () { loggerThread.start(); } public void stop () { synchronized (this ) { isShutdown=true ; } loggerThread.interrupt(); } public void log (String msg) throws InterruptedException { synchronized (this ) { if (isShutdown) throw new IllegalStateException(); ++reservations; } queue.put(msg); } private class LoggerThread extends Thread { public void run () { try { while (true ){ try { synchronized (LogService.this ) { if (isShutdown&&reservations==0 ) break ; } String msg=queue.take(); synchronized (LogService.this ) { --reservations; } writer.println(msg); }catch (InterruptedException e) { } } }finally { writer.close(); } } } }
关闭ExecutorService ExecutorService提供了两种关闭方法:1.shutdown正常关闭。2.shutdownNow强行关闭,首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。
使用ExecutorService的日志服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class LogService { private final ExecutorService exec=newSingleThreadExecutor(); private final PrintWriter writer; public void start () { } public void stop () throws InterruptedException { try { exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT); }finally { writer.close(); } } public void log (String msg) { try { exec.execute(new WriteTask(msg)); }catch (RejectedExecutionException ignored) { } } }
“毒丸”对象 “毒丸”:“当得到这个对象时,立即停止”。 在FIFO队列中使用
一个生产者和一个消费者的桌面搜索示例,通过“毒丸”对象来关闭服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class IndexingService { private static final int CAPACITY = 1000 ; private static final File POSION=new File("" ); private final CrawlerThread producer=new CrawlerThread(); private final IndexerThread consumer=new IndexerThread(); private final BlockingQueue<File> queue; private final FileFilter fileFilter; private final File root; public IndexingService (File root, final FileFilter fileFilter) { this .root = root; this .queue = new LinkedBlockingQueue<File>(CAPACITY); this .fileFilter = new FileFilter() { public boolean accept (File f) { return f.isDirectory() || fileFilter.accept(f); } }; } private boolean alreadyIndexed (File f) { return false ; } public void start () { producer.start(); consumer.start(); } public void stop () { producer.interrupt(); } public void awaitTermination () throws InterruptedException { consumer.join(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CrawlerThread extends Thread { public void run () { try { crawl(root); }catch (InterruptedException e) { }finally { while (true ){ try { queue.put(POSION); break ; }catch (InterruptedException e) { } } } } private void crawl (File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null ) { for (File entry : entries) { if (entry.isDirectory()) crawl(entry); else if (!alreadyIndexed(entry)) queue.put(entry); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class IndexerThread extends Thread { public void run () { try { while (true ){ File file=queue.take(); if (file==POSION) break ; else indexFile(file); } }catch (InterruptedException comsumed) { } } public void indexFile (File file) { }; }
示例:只执行一次的服务 checkMail方法能在多台主机上并行地检查新邮件。它创建一个私有的Executor,并向每台主机提交一个任务。然后,当所有邮件检查任务都执行完毕后,关闭Executor并等待结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class CheckForMail { public boolean checkMail (Set<String> hosts,long timeout,TimeUnit unit) throws InterruptedException { ExecutorService exec=Executors.newCachedThreadPool(); final AtomicBoolean hasNewMail=new AtomicBoolean(false ); try { for (final String host:hosts) exec.execute(new Runnable(){ public void run () { if (checkMail(host)) hasNewMail.set(true ); } }); }finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasNewMail.get(); } private boolean checkMail (String host) { return false ; } }
shutdownNow的局限性 使用shutdownNow时,虽然他会返回已提交但尚未开始的任务,但我们无法在关闭中知道正在执行的任务的状态
记录哪些任务是在关闭后取消的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown= Collections.synchronizedSet(new HashSet<Runnable>()); public List<Runnable> getCancelledTasks () { if (!exec.isShutdown()) throw new IllegalStateException(); return new ArrayList<Runnable>(tasksCancelledAtShutdown); } public void execute (final Runnable runnable) { exec.execute(new Runnable(){ public void run () { try { runnable.run(); }finally { if (isShutdown()&&Thread.currentThread().isInterrupted()) tasksCancelledAtShutdown.add(runnable); } } }); } }
TrackingExecutor的客户端程序。
当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public abstract class WebCrawler { private volatile TrackingExecutor exec; @GuardedBy ("this" ) private final Set<URL> urlsToCrawl = new HashSet<URL>(); private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>(); private static final long TIMEOUT = 500 ; private static final TimeUnit UNIT = MILLISECONDS; public WebCrawler (URL startUrl) { urlsToCrawl.add(startUrl); } public synchronized void start () { exec = new TrackingExecutor(Executors.newCachedThreadPool()); for (URL url : urlsToCrawl) submitCrawlTask(url); urlsToCrawl.clear(); } public synchronized void stop () throws InterruptedException { try { saveUncrawled(exec.shutdownNow()); if (exec.awaitTermination(TIMEOUT, UNIT)) saveUncrawled(exec.getCancelledTasks()); } finally { exec = null ; } } protected abstract List<URL> processPage (URL url) ; private void saveUncrawled (List<Runnable> uncrawled) { for (Runnable task : uncrawled) urlsToCrawl.add(((CrawlTask) task).getPage()); } private void submitCrawlTask (URL u) { exec.execute(new CrawlTask(u)); } private class CrawlTask implements Runnable { private final URL url; CrawlTask(URL url) { this .url = url; } private int count = 1 ; boolean alreadyCrawled () { return seen.putIfAbsent(url, true ) != null ; } void markUncrawled () { seen.remove(url); System.out.printf("marking %s uncrawled%n" , url); } public void run () { for (URL link : processPage(url)) { if (Thread.currentThread().isInterrupted()) return ; submitCrawlTask(link); } } public URL getPage () { return url; } } }
处理非正常的线程终止 最主要原因就是RuntimeException。
下面代码中,如果任务抛出了一个运行时异常,那么它将使线程终结。ThreadPoolExecutor也使用了这种技术。
1 2 3 4 5 6 7 8 9 10 11 12 public void run () { Throwable thrown = null ; try { while (!isInterrupted()) runTask(getTaskFromWorkQueue()); } catch (Throwable e) { thrown = e; } finally { threadExited(this , thrown); } }
未捕获异常的处理 除了上一种种主动方法来解决运行时异常,在Thread API 中同样提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。这两种方法是互补的,通过将两者结合在一起,就能有效地防止线程泄漏 问题。
当线程由于未捕获异常而退出时,JVM会吧这个事件报告给应用程序提的UncaughtExceptionHandler异常处理器。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err。
或者像下面程序这样,写入日志。
1 2 3 4 5 6 7 public class UEHLogger implements Thread .UncaughtExceptionHandler { public void uncaughtException (Thread t, Throwable e) { Logger logger = Logger.getAnonymousLogger(); logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e); } }
Reference Java并发编程实战
源代码
Java并发编程实战(学习笔记六 第七章 取消与关闭 上)
Java并发编程实战(学习笔记六 第七章 取消与关闭 下)