任务执行
Table of Contents
线程中执行任务
- 串行执行:
- 吞吐量低
- 响应慢 :IO操作中浪费cpu
- 每个请求单独线程:
- 线程的 创建和销毁 会 占用一定的资源 :如果请求频繁而对请求的处理是轻量级的(大多的web请求符合该情形),创建一个线程处理请求后将其销毁的方式是不划算的
- 过多的线程导致 线程切换频繁 :用于 处理请求的CPU时间 反而会 减少 . 如果当前的线程数已经让CPU处于忙碌状态, 那么增加更多的线程不会改善应用的性能
- 过多的线程会导致 系统稳定性下降
Executor框架
将 创建好的线程 组织成 线程池 :
- 当 请求来临 时从池中 取出线程 处理请求
- 处理完毕 后将线程 归还 给 线程池 , 而不是销毁
通过限制线程池中的线程数, 以克服 线程过多时“性能和稳定性”下降的缺陷
Executor 框架包含多个线程池的实现, 所有线程池都派生自 Executor 接口,这个接口只定义了一个方法: execute(Runnable task)
public interface Executor { void execute(Runnable command); }
Executor接口解耦了 任务提交 和 任务执行 , 提交任务的线程为 生产者 , 执行任务的线程为 消费者
class TaskExecutionWebServer { private static final int NTHREADS = 100; // 创建线程池 private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(connection); } }; // 将任务提交给线程池执行 exec.execute(task); } } }
使用Executor实现 每个请求 一个线程
public class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); }; }
线程执行策略
- 任务在 哪个线程 中执行
- 任务以 何种顺序 ( FIFO , LIFO , 优先级 )执行
- 同时有多少 线程同步执行任务
- 多少任务可以 等待 执行
- 当 负荷过载 时候,那些任务可以 被牺牲 ,如何 通知 应用这些任务
- 任务 执行前 和 执行后 需要哪些操作
线程池
ExecutorService 接口继承自 Executor :
- newFixedThreadPool : 最大线程数固定 的线程池
- newCachedThreadPool : 可伸缩式线程池
- 当线程池中 线程的数量 超过 程序所需 时, 会 自动销毁 多余的线程
- 当线程池中的线程 少于需要 时再 创建 新的线程执行提交的任务
- 该线程池 没有最大线程数的限定
- newSingleThreadExecutor : 仅包含 一个线程的线程池 , 提交给该线程池执行的任务, 都将在这一单个线程中完成处理
- newScheduledThreadPool : 最大线程数固定 且 支持 延迟和周期性重复执行 任务的线程池
而预定义的线程池类大多实现了ExecutorService接口
Executor生命周期
运行 , 关闭中 , 终止
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // ... additional convenience methods for task submission }
- shutdown : 在 关闭前 允许 执行已经提交 的任务, 包括那些 已提交但尚未开始执行 的任务
- shutdownNow : 阻止 尚未开始执行 的任务启动并 试图停止 当前正在执行 的任务, 返回 从未开始执行的任务的列表
- isShutdown :判断 线程池是否已关闭
- 线程池关闭后将 拒绝接受新任务 , 抛出 RejectedExecutionException
- awaitTermination : 将使得 主线程阻塞 , 直到 线程池 转变为 终止状态 ,
- 通常在调用 shutdown 方法后 紧接着调用 awaitTermination 方法
- isTerminated : 检测 线程池是否处于终止状态
- 当线程池 已关闭 , 并且 所有提交给线程池的任务 都已完成 时, 线程池转变为 终止状态
class LifecycleWebServer { private final ExecutorService exec = ...; public void start() throws IOException { ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) log("task submission rejected", e); } } } public void stop() { exec.shutdown(); } void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); else dispatchRequest(req); } }
ScheduledThreadPool
Timer和ScheduledThreadPool两者都可以用于延时或周期性重复执行某个任务 但是Timer存在一些缺陷: 1. Timer基于绝对时间来安排任务的调度, 因此系统时钟的改变会对其产生影响 ScheduledThreadPoolExecutor基于相对时间进行任务的调度 2. Timer创建单一的线程执行定时任务 假如Timer对象以10ms的间隔重复执行某个任务, 但是其中的一次执行花去了40ms, 这就意味着少执行了至少4次重复任务 ScheduledThreadPoolExecutor可以使用多个线程执行定时任务 3. 如果在执行任务的过程中抛出运行时异常, Timer的线程会被终止且没有恢复机制
几乎没有理由继续使用Timer调度定时任务了
返回结果的任务
Executor的使用 Runnable 作为基本的任务表达形式 但是Runnable无法返回结果或抛出异常
Callable 作为Runnable的扩展,可以 返回结果 或 抛出异常
public interface Callable<V> { V call() throws Exception; }
任务生命周期
Executor 提交的任务 有4个生命周期: 创建 , 提交 , 开始 , 完成
- 已经提交 但 尚未开始 的任务 可以取消
- 已经开始执行 但 尚未完成 的任务只有它们 能响应中断 才能取消
- 取消 一个已经完成 的任务 没有任何影响
Future对象
Future 表示一个 任务的生命周期 ,并提供相应的方法
- 判断任务 是否已经 开始 , 完成 , 取消
- 获取任务结果
取消 任务等
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException, CancellationException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException; }
获得任务结果
get 方法:行为取决与 任务的状态
- 已经完成 : 立即返回结果 或者 抛出异常ExecutionException , 可以通过 getClause 获得被封装的初始异常
- 没有完成 : 阻塞主线程 直到任务结束
- 取消 :将抛出 CancellationException
创建 Future
- ExecutorService 中所有 submit 方法都会 返回一个Future
- 将 Runnable 或 Callable 提交 给 Executor 得到Future
- 显式 地为 Runnable 或者 Callable 创建 一个 FutureTask 提交 给 Executor
ExecutorService 也可以使用 newTaskFor 方法为 Callable 获得Future
protected <T> RunnableFuture<T> newTaskFor(Callable<T> task) { return new FutureTask<T>(task); }
实例
使用Future渲染图片
public class FutureRenderer implements Render { private static final int NTHREADS = 100; private final ExecutorService executor = Executors.newFixedThreadPool(NTHREADS); @Override public void renderPage(CharSequence source) { final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task; task = () -> { List<ImageData> result = new ArrayList<>(); imageInfos.forEach((imageInfo) -> { result.add(imageInfo.downloadImage()); }); return result; }; Future<List<ImageData>> future = executor.submit(task); // 渲染文本 renderText(source); try { // get方法将阻塞, 直到task完成下载 List<ImageData> imageData = future.get(); imageData.forEach((data) -> { // 渲染图片 renderImage(data); }); } catch (InterruptedException e) { // Re-assert the thread’s interrupted status Thread.currentThread().interrupt(); // We don’t need the result, so cancel the task too future.cancel(true); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }
下载图片任务 比 渲染文本任务慢的多,可以把下载图片拆分成多个任务
CompletionService
CompletionService : Executor 和 BlockingQueue 融合在一起
- 将 Callable 任务 提交 给 CompletionService 执行
- 使用类似 队列 操作的 take 和 poll 方法来获得已知的结果
- 这些结果会在 全部结束 时候 封装 为 Future
ExecutorCompletionService实现了CompletionService ExecutorCompletionService的构造函数中创建一个 BlockingQueue 来保存计算完成的结果 当计算完成时候,调用FutureTask的done方法 当提交某个任务的时候,该任务将包装成为QueueingFuture, 这是FutureTask的一个子类 然后改写子类的done方法,将结果放入BlockingQueue中 take和poll方法委托给BlockingQueue, 这些方法在得出结果前会“阻塞”
public class ExecutorCompletionService<V> implements CompletionService<V> { ... private final BlockingQueue<Future<V>> completionQueue; public ExecutorCompletionService(Executor executor) { ... this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); // 将任务包装成 QueueingFuture 对象后委托给executor执行 executor.execute(new QueueingFuture(f)); return f; } private class QueueingFuture<V> extends FutureTask<V> { QueueingFuture(Callable<V> c) { super(c); } QueueingFuture(Runnable t, V r) { super(t, r); } protected void done() { completionQueue.add(this); } } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } ... }
使用 CompletionService 渲染
public class CompletionRenderer implements Render { private final ExecutorService executor = Executors.newCachedThreadPool(); public void renderPage(CharSequence source) { List<ImageInfo> info = scanForImageInfo(source); // 将图片下载拆分为多个任务 CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor); info.forEach((imageInfo) -> { completionService.submit(() -> imageInfo.downloadImage()); }); renderText(source); try { for (int t = 0, n = info.size(); t < n; t++) { // take方法可能阻塞: 当已完成队列中为空时 Future<ImageData> f = completionService.take(); // get方法不会阻塞, 因为从take方法返回的Future对象肯定是已完成的 ImageData imageData = f.get(); renderImage(imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }