取消和关闭
Table of Contents
任务取消
启动任务之后, 大多数时候会等待运行完成,但是有时希望可以提前终止任务的运行: 1. 用户申请取消时,比如用户点击了取消按钮. 2. 时间限制的任务,有些任务具有时间限制, 如果在一定的时间内仍然没有得到想要的结果, 可能希望终止该任务的运行. 3. 发生特定的事件时,比如多个任务同时在不同的位置搜索某一文件, 当其中一个搜索到了想要的文件, 应该终止其他仍在运行的任务 4. 发生错误时,比如发生了磁盘已满的错误, 需要向磁盘写入数据的任务应该提前终止 5. 应用或者服务被关闭时
设置自定义flag结束线程
public class PrimeGenerator implements Runnable { @GuardedBy("this") private final List<BigInteger> primes = new ArrayList<BigInteger>(); //自定义的flag, 为保证线程可见性, 将其声明为volatile private volatile boolean cancelled; @Override public void run() { BigInteger p = BigInteger.ONE; // 每次循环之前检查cancelled标记的值, 如果cancelled为true, 循环终止, 线程也就运行结束了 while (!cancelled) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public void cancel() { cancelled = true; } public synchronized List<BigInteger> get() { return new ArrayList<>(primes); } }
测试代码
public static void main(String[] args) { PrimeGenerator generator = new PrimeGenerator(); Thread t = new Thread(generator); t.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { //除非明确知道主线程应该终止,不然通常情况下应该重新抛出InterruptedException或者恢复被中断的线程 } // 通过调用cancel方法, 将自定义的cancelled标记设置为true, 从而使得线程t运行终止 generator.cancel(); System.out.println(generator.get().size()); }
假如循环中执行了阻塞操作, 那么即使cancelled标记被设置为true, run方法却没有机会去检查cancelled标记的值, 线程将迟迟无法结束!
class BrokenPrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; private volatile boolean cancelled = false; BrokenPrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; while (!cancelled) // 当队列已满时, put方法将会阻塞. 一旦put方法阻塞 //且没有其他线程从队列中取数据时, 阻塞将一直持续下去 queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { } } public void cancel() { cancelled = true; } }
测试代码
public static void main(String[] args) { // 设置队列的最大容量为10 BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<BigInteger>(10); BrokenPrimeProducer producer = new BrokenPrimeProducer(primes); producer.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { } producer.cancel(); }
中断
java没有直接规定如何安全的提前终止线程的运行, 而是提供了不具约束力的协商式机制: 线程A可以请求线程B中断, 但是是否响应, 何时响应, 如何响应中断请求, 由线程B自己决定
每个 线程 对象都有一个 boolean 型的 中断标记 :
- 其他线程 请求 目标线程 中断 时, 会将 目标线程 的 中断标记 设置为true
- 由 目标线程 自己决定 如何处理
所以中断线程时, 需要明确知道目标线程的中断机制 如果不知道目标线程会怎样处理中断请求, 不要贸然请求其中断
Thread类中与中断标记相关的方法有:
public class Thread { // 请求线程中断, 该方法会将线程的中断标记设置为true. 如何处理中断由目标线程决定 public void interrupt() { ... } // 返回中断标记的值 public boolean isInterrupted() { ... } // 静态方法用于重置当前进程的中断标记(将其设置为false), 并返回重置之前的值 public static boolean interrupted() { ... } ... }
可中断的阻塞方法
java API中的 大多数 阻塞 方法 都是 可中断的 :如 Thread.sleep , Object.wait , BlockingQueue.put 等
- 可中断的阻塞方法有一个共同的特点: 声明 抛出 InterruptedException 异常
- 在 阻塞期间 会 周期性 检查当前线程的 中断标记 , 如果发现当前线程的 中断标记 为 true :
- 重置中断标记
- 提前 从阻塞状态返回
- 抛出 InterruptedException 异常
使用中断终止PrimeProducer
class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; PrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; // 每次循环前检查当前线程的中断标记, 如果中断标记为设定为true, 则循环结束 // 就算当前线程阻塞在put方法上, 在阻塞期间也会周期性检查中断标记, //一旦发现中断标记为true, 就会从阻塞状态中返回, 并抛出InterruptedException异常 while (!Thread.currentThread().isInterrupted()) { queue.put(p = p.nextProbablePrime()); } } catch (InterruptedException consumed) { System.out.println("InterruptedException happened"); } } public void cancel() { // interrupt方法会将当前线程的中断标记设置为true interrupt(); } }
测试代码
public static void main(String[] args) { // 设置队列的最大容量为10 BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<>(10); PrimeProducer producer = new PrimeProducer(primes); producer.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { } // producer.cancel(); }
不可中断的阻塞方法
有些方法阻塞的时候不会检查中断标记
- 同步socket I/O : 基于 InputStream 的读写方法不会响应中断, 关闭socket 可以使阻塞方法退出
- 同步nio : 关闭 InterruptibleChannel 可以扔出 AsynchronousCloseException
- 异步Selector :*关闭* selector
- 内置锁 : 等待获得内置锁 不会响应中断,使用新的 Lock 对象替代内置锁
public class ReaderThread extends Thread { private final Socket socket; private final InputStream in; private final static int BUFSZ = 8192; public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); } @Override public void interrupt() { try { // 如果发生中断时, 线程阻塞在read方法上, socket的关闭会导致read方法抛出SocketException,然后run方法运行完毕 socket.close(); } catch (IOException ignored) { } finally { //如果阻塞在一个可响应阻塞的方法,继续传递一个中断 super.interrupt(); } } @Override public void run() { try { byte[] buf = new byte[BUFSZ]; while (true) { int count = in.read(buf); if (count < 0) { break; } else if (count > 0) { processBuffer(buf, count); } } } catch (IOException e) { /* Allow thread to exit */ } } private void processBuffer(byte[] buf, int count) { // do something ... } }
覆写 Thread的 interrupt 方法:
- 如果阻塞在不可响应中断的read方法,则 关闭socket 退出read方法
- 反之则 继续传递一个中断
处理InterruptedException
直接向上层抛出
也可以 做一些清理工作 之后 重抛该异常
// 直接向上层抛出InterruptedException, dosomething方法也是一个可中断的阻塞方法 private void dosomething() throws InterruptedException { Thread.sleep(1000); }
这样的处理使得你的方法也成为一个可中断的阻塞方法
设置当前线程的中断标记为true
表明 当前线程发生了中断 , 以便 调用栈上层 进行处理
一般用于当前进程无法直接向上抛出InterruptedException异常
- 主线程 启动 InterruptedExceptionHandler线程 1s后, 设置 InterruptedExceptionHandler线程 的 中断标记 为 true
- InterruptedExceptionHandler线程 应该 阻塞 在 wait 方法上
- 由于wait方法是 可中断的阻塞方法 , 所以其检查到 中断标记 为 true 时
- 重置 当前线程的 中断标记 后
- 抛出InterruptedException
- dosomething 方法catch住 InterruptedException 异常后, 再次 将 当前线程的中断标记 设置为 true
run 方法检查到 中断标记 为 true , 循环不再继续
public class InterruptedExceptionHandler implements Runnable { private Object lock = new Object(); @Override public void run() { while (!Thread.currentThread().isInterrupted()) { dosomething(); } } private void dosomething() { try { // Object.wait是一个可中断的阻塞方法 // 如果在其阻塞期间检查到当前线程的中断标记为true, 会重置中断标记后从阻塞状态返回, 并抛出InterruptedException异常 synchronized (lock) { lock.wait(); // 会重置中断标记后从阻塞状态返回, 并抛出InterruptedException异常 } } catch (InterruptedException e) { System.out.println("InterruptedException happened"); // catch住InterruptedException后设置当前线程的中断标记为true, 以供调用栈上层进行相应的处理 // 在此例中, dosomething方法的调用栈上层是run方法. Thread.currentThread().interrupt(); } } public static void main(String[] args) throws InterruptedException { Thread t = new Thread(new InterruptedExceptionHandler()); t.start(); Thread.sleep(1000); // 启动线程1s后设置其中断标记为true t.interrupt(); } }
假如dosomething方法catch住InterruptedException异常后没有设置中断标记 其调用栈上层的run方法就无法得知线程曾经发生过中断, 循环也就无法终止
仍然继续循环执行某阻塞方法
将 中断状态 保存 下来, 当 循环完成 后再根据 保存下来的中断状态 执行相应的操作
- 等待doSomething的 循环执行完毕
- 恢复 中断状态 为 true
run 中 处理中断
public class InterruptedExceptionContinueHandler implements Runnable { private BlockingQueue<Integer> queue; public InterruptedExceptionContinueHandler(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (!Thread.currentThread().isInterrupted()) { dosomething(); } System.out.println(queue.size()); } private void dosomething() { // cancelled变量用于表明线程是否发生过中断 boolean cancelled = false; for (int i = 0; i < 10000; i++) { try { queue.put(i); } catch (InterruptedException e) { // 就算发生了InterruptedException, 循环也希望继续运行下去, 此时将cancelled设置为true, 以表明遍历过程中发生了中断 System.out.println("InterruptedException happened when i = " + i); cancelled = true; } } if (cancelled) { // 如果当前线程曾经发生过中断, 就将其中断标记设置为true, 以通知dosomething方法的上层调用栈 Thread.currentThread().interrupt(); } } public static void main(String[] args) throws InterruptedException { Thread t = new Thread(new InterruptedExceptionContinueHandler(new LinkedBlockingQueue<Integer>())); t.start(); // 启动线程2ms后设置其中断标记为true Thread.sleep(2); t.interrupt(); } }
“直接设置当前进程的中断标志为true“还是”保存中断状态“直到循环结束? 取决于业务场景是否需要”立刻结束循环“,还是”循环结束“再处理!
忽略
同时满足下面两个条件:
- 被 捕获 在 调用栈的最上层 : run 方法或者 main 方法中
后续代码 不检查 中断状态
其他任何情况下都应该对InterruptedException作处理
限时运行
private static final ScheduledExecutorService cancelExec = ...; public static void timedRun(Runnable r, long timeout, TimeUnit unit) { final Thread taskThread = Thread.currentThread(); cancelExec.schedule(new Runnable() { public void run() { taskThread.interrupt(); } }, timeout, unit); r.run(); }
timeRun方法可以在任何一个线程中调用,然而 timeRun 方法无法知道运行线程处理中断的策略,不应该贸然向对应的线程发出中断请求 1. 在 r.run 运行完成后,调用线程捕获中断异常是危险的 2. 如果调用线程忽略了中断异常,那只有r.run运行完毕timeRun才能结束,这会超出所要求的运行时间
private static final ScheduledExecutorService cancelExec = Executors.newSingleThreadScheduledExecutor(); public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException { class ReThrowableTask implements Runnable { //在当前线程和taskThread线程共享异常 private volatile Throwable t; public void run() { try { r.run(); } catch (Throwable t) { this.t = t; } } void rethrow() { if (t != null) throw launderThrowable(t); } } ReThrowableTask task = new ReThrowableTask(); final Thread taskThread = new Thread(task); taskThread.start(); cancelExec.schedule(new Runnable() { public void run() { taskThread.interrupt(); } }, timeout, unit); // 停止当前进程,让taskThread运行限时时间 // 如果超过限时,则让cancelExec线程池的线程对taskThread发起中断请求 taskThread.join(unit.toMillis(timeout)); //如果taskThread线程内捕获异常,重新抛出 task.rethrow(); }
- 创建 taskThread 线程,把 运行任务 包装到 taskThread
- 通过 join 方法让 taskThread 跑 限时 时间
- 超过限时时间,向 taskThread 发送中断 请求
- 在 taskThread中发现异常 ,则 重新抛出 供主线程处理
取消Future
将task提交给线程池运行, 由于不知道task会由线程池中的哪一个线程运行, 也不知道线程池中的线程会怎样处理中断, 所以无法直接调用 Thread 对象的 interrupt 方法提前终止线程的运行
ExecutorService 类的 submit 等方法会返回表示 task未决结果 的 Future 对象, 调用Future对象的 cancel 方法, 可以 取消task的运行
取消Future的方法
/** * 尝试取消task的执行 * 如果task已经完成, 或已取消, 或由于某些原因无法取消, 则尝试失败, 返回false * 如果task尚未启动, 则成功调用其Future对象的cancel方法将导致其永不启动 * mayInterruptIfRunning如果为true, 且此时task正在某个线程中运行, 那么该线程的中断标记将被设置为true * 当mayInterruptIfRunning为false时, 如果task没有启动则不再启动, 如果task已经启动, 则尝试失败 * 如果task没有处理中断, mayInterruptIfRunning应该为false * cancel方法返回后, isDone方法将始终返回true, 如果cancel返回true, 对isCancelled方法的后续调用将始终返回true */ boolean cancel(boolean mayInterruptIfRunning) /** * 如果task正常完成前被取消, 该方法返回true. */ boolean isCancelled(); /** * 如果task已经完成, 该方法返回true. 完成的情况包括正常完成, task被取消, 异常终止等 */ boolean isDone();
如果不知道线程会怎样处理中断, 就不应该调用该线程的interrupt方法, 那么调用Future的cancel方法, 并将mayInterruptIfRunning参数设置为true是否合适?
线程池中用于 执行task的线程 会将 中断的处理 委托给task , 所以这样做是合适的
当然还有个前提是task中正确处理了中断
通过取消Future实现timeRun
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { final ExecutorService taskExec = Executors.newCachedThreadPool(); Future<?> task = taskExec.submit(r); try { //如果线程池中的线程执行任务过程中该线程发生了中断, 那么调用task的get方法将会抛出InterruptedException异 // 对于InterruptedException, 按照之前总结的方法处理即可. 此例将其抛给上层 task.get(timeout, unit); } catch (TimeoutException e) { // 如果发生TimeoutException异常, 表明执行时间超时, 此时取消该任务即可 } catch (ExecutionException e) { // 发生其他异常时, 不仅要取消任务的执行, 也应该重抛该异常 throw launderThrowable(e.getCause()); } finally { // Harmless if task already completed task.cancel(true); // interrupt if running } }
客户化取消Future
有时候需要覆写Future.cancel方法,比如: 对于不可响应中断的阻塞方法需要在其中加入诸如关闭socket的操作等
定义 CacellableTask 接口
public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); }
实现 SocketUsingTask<T> 对象 :覆盖其中的 cancel 方法来关闭 socket 连接
public class SocketUsingTask<T> implements CancellableTask<T> { @GuardedBy("this") private Socket socket; protected synchronized void setSocket(Socket s) { socket = s; } @Override public synchronized void cancel() { try { if (socket != null) { socket.close(); } } catch (IOException ignored) { } } @Override public RunnableFuture<T> newTask() { return new FutureTask<T>(this) { @Override // 定义FutureTask的匿名内部类, 并覆盖cancel方法, 向其中加入关闭socket的操作 public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } @Override public T call() throws Exception { ... } }
继承 ThreadPoolExecutor 类并覆盖 newTaskFor 方法, 返回 自定义的CancelTask对象
@ThreadSafe public class CancellingExecutor extends ThreadPoolExecutor { public CancellingExecutor() { super(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask) { return ((CancellableTask<T>) callable).newTask(); } else { return super.newTaskFor(callable); } } }
测试代码
public class CancellingExecutorTest { public static void main(String[] args) throws IOException { CancellingExecutor executor = new CancellingExecutor(); SocketUsingTask<String> task = new SocketUsingTask<>(); task.setSocket(new Socket("www.baidu.com", 80)); Future<String> future = executor.submit(task); try { future.get(1000L, TimeUnit.MILLISECONDS); } catch (TimeoutException | InterruptedException ex) { } catch (ExecutionException ex) { throw launderThrowable(ex.getCause()); } finally { future.cancel(true); // interrupt if running executor.shutdown(); } } }
取消线程
如果一个线程在 创建结束 之后 依然运行 ,那就必须为这个线程提供诸如 取消 , 关闭 等生命周期方法
比如: ExecutorService提供了shutDown和shutDownNow方法关闭Service 而Service负责停止其拥有的线程
基于生产消费者模型的LoggerWriter
public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread logger; public LogWriter(Writer writer) { this.queue = new LinkedBlockingQueue<String>(CAPACITY); this.logger = new LoggerThread(writer); } public void start() { logger.start(); } /** * 需要打印数据的线程调用该方法, 将待打印数据加入阻塞队列 */ public void log(String msg) throws InterruptedException { queue.put(msg); } /** * 负责从阻塞队列中取出数据输出的线程 */ private class LoggerThread extends Thread { private final PrintWriter writer; // ... public void run() { try { while (true) writer.println(queue.take()); } catch (InterruptedException ignored) { } finally { writer.close(); } } } }
LogWriter 内部封装有 LoggerThread 线程, 所以是一个基于线程构建的Service 需要在LogWriter中提供停止LoggerThread线程的方法
在LogWriter中添加shutDown方法
/** * 该方法用于停止LoggerThread线程 */ public void shutDown() { logger.interrupt(); }
- 当 LogWriter.shutDown 方法被调用时
- LoggerThread线程的中断标记 被设置为 true
- LoggerThread线程 执行 queue.take() 方法时会抛出 InterruptedException 异常
- 使得 LoggerThread线程 结束
中断线程的问题
- 丢弃 了队列中 尚未来得及输出 的数据
- 假如 线程A 对 LogWriter.log方法的调用 因为 队列已满 而 阻塞 , 此时 停止LoggerThread线程 将导致 线程A永远阻塞在queue.put方法上
当停止LogService以后,设置状态不在接受新的任务,并处理完所有已经存在的数据
这种处理方式会导致 竞争条件 ,所以必须 手动同步
public class LogService { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; /** * 表示是否关闭Service */ private boolean isShutdown; /** * 队列中待处理数据的数量 */ private int reservations; public LogService(PrintWriter writer) { this.queue = new LinkedBlockingQueue<>(100); this.loggerThread = new LoggerThread(writer); } public void start() { loggerThread.start(); } public void shutDown() { synchronized (this) { isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException { synchronized (this) { // service已关闭后调用log方法直接抛出异常 if (isShutdown) { throw new IllegalStateException("Service has been shut down"); } ++reservations; } // BlockingQueue本身就是线程安全的, put方法的调用不在同步代码块中 // 我们只需要保证isShutdown和reservations是线程安全的即可 queue.put(msg); } private class LoggerThread extends Thread { private final PrintWriter writer; private LoggerThread(PrintWriter writer) { this.writer = writer; } public void run() { try { while (true) { try { synchronized (this) { // 当service已关闭且处理完队列中的所有数据时才跳出while循环 if (isShutdown && reservations == 0) { break; } } String msg = queue.take(); synchronized (this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { // 发生InterruptedException异常时不应该立刻跳出while循环 // 而应该继续输出log, 直到处理完队列中的所有数据 } } } finally { writer.close(); } } } }
可以 使用 ExecutorService 简化 :
- 关闭ExecutorService后再调用其 awaitTermination 将导致 当前线程阻塞 , 直到 所有已提交的任务执行完毕 , 或者发生 超时
线程池关闭 后再 调用其execute方法 将抛出 RejectedExecutionException 异常
public class LogService { private final ExecutorService exec = Executors.newSingleThreadExecutor(); private final PrintWriter writer; public LogService(PrintWriter writer) { this.writer = writer; } public void shutdown() throws InterruptedException { final long TIMEOUT = 10L; try { // 关闭ExecutorService后再调用其awaitTermination将导致当前线程阻塞, 直到所有已提交的任务执行完毕, 或者发生超时 exec.shutdown(); exec.awaitTermination(TIMEOUT, TimeUnit.SECONDS); } finally { writer.close(); } } public void log(String msg) { try { // 线程池关闭后再调用其execute方法将抛出RejectedExecutionException异常 exec.execute(new WriteTask(msg)); } catch (RejectedExecutionException ignored) { } } private final class WriteTask implements Runnable { private final String msg; public WriteTask(String msg) { this.msg = msg; } @Override public void run() { writer.println(msg); } } }
shutdownNow的局限性
无法知道那些正在运行任务的最终结束状态
必须 手动记录 那些 被中断的任务
public abstract class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec = Executors.newCachedThreadPool(); //已经启动还没有结束被cancel的线程存放在这里 private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<>()); public List<Runnable> getCancelledTasks() { if (!exec.isTerminated()) { throw new IllegalStateException("illegal task status"); } return new ArrayList<>(tasksCancelledAtShutdown); } @Override public void execute(final Runnable runnable) { exec.execute(() -> { try { runnable.run(); } finally { if (isShutdown() //执行的任务必须保证正确地把中断状态传递回来 && Thread.currentThread().isInterrupted()) { tasksCancelledAtShutdown.add(runnable); } } }); } }
极小的概率可能发生在关闭线程池的那一刻某个任务实际上已经执行完最后一条指令,但还没来得及记录任务运行结束 这种情况下就会造成将来重复执行任务
异常退出
线程异常退出 的主要原因是 RunTimeException
这种情况下往往不会通知主线程
在某些情况下需要 捕获 这些RunTimeException
public void run() { Throwable thrown = null; try { while (!isInterrupted()) runTask(getTaskFromWorkQueue()); } catch (Throwable e) { //保存所有异常 thrown = e; } finally { //通知主线程保存的异常 threadExited(this, thrown); } }
未捕获异常处理器
长时间运行的应用,对于未捕获的异常至少要在 日志记录 下来
public class UEHLogger implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { Logger logger = Logger.getAnonymousLogger(); logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e); } }
通过线程池构造器的ThreadFactory参数可以使用自定义UncaughtExceptionHandler
关闭JVM
正常关闭
- 运行完 主线程最后一条命令
- 调用 System.exit
- 发送 SIGINT 信号(CTRL+C)
jvm不会主动关闭或者中断任何其他线程
正常关闭的时候:
- 开始运行 注册 在 Runtime.addShutdownHook 的 钩子线程
- 所有的钩子线程运行完毕,如果 runFinalizersOnExit 被设置 的话,jvm开始运行 finializer
- 停止jvm
关闭钩子
- 钩子线程必须是 同步
- 钩子线程 不能死锁 ,不然jvm无法关闭
- 所有的钩子线程会 同时运行
public void start() { Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { LogService.this.stop(); } catch (InterruptedException ignored) {} } }); }
尽量使用一个钩子关闭所有的服务!
守护线程
守护线程 在被关闭的时候不会运行 finializer , 回收函数栈
当jvm停止的那一刻,守护线程只是被抛弃 慎用守护线程!
强行关闭
- 调用 RunTime.halt
发送 SIGKILL 信号(kill -9)