UP | HOME

任务执行

Table of Contents

线程中执行任务

  • 串行执行:
    • 吞吐量低
    • 响应慢 :IO操作中浪费cpu
  • 每个请求单独线程:
    • 线程的 创建和销毁占用一定的资源 :如果请求频繁而对请求的处理是轻量级的(大多的web请求符合该情形),创建一个线程处理请求后将其销毁的方式是不划算的
    • 过多的线程导致 线程切换频繁 :用于 处理请求的CPU时间 反而会 减少 . 如果当前的线程数已经让CPU处于忙碌状态, 那么增加更多的线程不会改善应用的性能
    • 过多的线程会导致 系统稳定性下降

Executor框架

创建好的线程 组织成 线程池

  • 请求来临 时从池中 取出线程 处理请求
  • 处理完毕 后将线程 归还线程池 , 而不是销毁
    通过限制线程池中的线程数, 以克服 线程过多时“性能和稳定性”下降的缺陷 

Executor 框架包含多个线程池的实现, 所有线程池都派生自 Executor 接口,这个接口只定义了一个方法: execute(Runnable task)

public interface Executor {
    void execute(Runnable command);
}

Executor接口解耦了 任务提交任务执行 , 提交任务的线程为 生产者 , 执行任务的线程为 消费者

class TaskExecutionWebServer {   
    private static final int NTHREADS = 100;   
    // 创建线程池  
    private static final Executor exec   
    = Executors.newFixedThreadPool(NTHREADS);   

    public static void main(String[] args) throws IOException {   
        ServerSocket socket = new ServerSocket(80);   
        while (true) {   
            final Socket connection = socket.accept();   
            Runnable task = new Runnable() {   
                    public void run() {   
                        handleRequest(connection);   
                    }   
                };   
            // 将任务提交给线程池执行  
            exec.execute(task);   
        }   
    }   
}  

使用Executor实现 每个请求 一个线程

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    };
}

线程执行策略

  • 任务在 哪个线程 中执行
  • 任务以 何种顺序FIFO , LIFO , 优先级 )执行
  • 同时有多少 线程同步执行任务
  • 多少任务可以 等待 执行
  • 负荷过载 时候,那些任务可以 被牺牲 ,如何 通知 应用这些任务
  • 任务 执行前执行后 需要哪些操作

线程池

ExecutorService 接口继承自 Executor

  • newFixedThreadPool : 最大线程数固定 的线程池
  • newCachedThreadPool : 可伸缩式线程池
    • 当线程池中 线程的数量 超过 程序所需 时, 会 自动销毁 多余的线程
    • 当线程池中的线程 少于需要 时再 创建 新的线程执行提交的任务
    • 该线程池 没有最大线程数的限定
  • newSingleThreadExecutor : 仅包含 一个线程的线程池 , 提交给该线程池执行的任务, 都将在这一单个线程中完成处理
  • newScheduledThreadPool最大线程数固定 且 支持 延迟和周期性重复执行 任务的线程池
     而预定义的线程池类大多实现了ExecutorService接口 

Executor生命周期

运行关闭中终止

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
// ... additional convenience methods for task submission
}
  • shutdown : 在 关闭前 允许 执行已经提交 的任务, 包括那些 已提交但尚未开始执行 的任务
  • shutdownNow : 阻止 尚未开始执行 的任务启动并 试图停止 当前正在执行 的任务, 返回 从未开始执行的任务的列表
  • isShutdown :判断 线程池是否已关闭
    • 线程池关闭后将 拒绝接受新任务 , 抛出 RejectedExecutionException
  • awaitTermination : 将使得 主线程阻塞 , 直到 线程池 转变为 终止状态 ,
    • 通常在调用 shutdown 方法后 紧接着调用 awaitTermination 方法
  • isTerminated : 检测 线程池是否处于终止状态
    • 当线程池 已关闭 , 并且 所有提交给线程池的任务 都已完成 时, 线程池转变为 终止状态
class LifecycleWebServer {
    private final ExecutorService exec = ...;
    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket conn = socket.accept();
                exec.execute(new Runnable() {
                        public void run() { handleRequest(conn); }
                    });
            } catch (RejectedExecutionException e) {
                if (!exec.isShutdown())
                    log("task submission rejected", e);
            }
        }
    }
    public void stop() { exec.shutdown(); }
    void handleRequest(Socket connection) {
        Request req = readRequest(connection);
        if (isShutdownRequest(req))
            stop();
        else
            dispatchRequest(req);
    }
}

ScheduledThreadPool

Timer和ScheduledThreadPool两者都可以用于延时或周期性重复执行某个任务

但是Timer存在一些缺陷:
1. Timer基于绝对时间来安排任务的调度, 因此系统时钟的改变会对其产生影响
   ScheduledThreadPoolExecutor基于相对时间进行任务的调度

2. Timer创建单一的线程执行定时任务
   假如Timer对象以10ms的间隔重复执行某个任务, 但是其中的一次执行花去了40ms, 这就意味着少执行了至少4次重复任务
   ScheduledThreadPoolExecutor可以使用多个线程执行定时任务

3. 如果在执行任务的过程中抛出运行时异常,  Timer的线程会被终止且没有恢复机制

几乎没有理由继续使用Timer调度定时任务了

返回结果的任务 

Executor的使用 Runnable 作为基本的任务表达形式

但是Runnable无法返回结果或抛出异常

Callable 作为Runnable的扩展,可以 返回结果抛出异常

public interface Callable<V> {
    V call() throws Exception;
}

任务生命周期

Executor 提交的任务 有4个生命周期: 创建提交开始完成

  • 已经提交尚未开始 的任务 可以取消
  • 已经开始执行尚未完成 的任务只有它们 能响应中断 才能取消
  • 取消 一个已经完成 的任务 没有任何影响

Future对象

Future 表示一个 任务的生命周期 ,并提供相应的方法

  • 判断任务 是否已经 开始完成取消
  • 获取任务结果
  • 取消 任务等

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException,
            CancellationException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException,
            CancellationException, TimeoutException;
    }
    
获得任务结果

get 方法:行为取决与 任务的状态

  • 已经完成立即返回结果 或者 抛出异常ExecutionException , 可以通过 getClause 获得被封装的初始异常
  • 没有完成阻塞主线程 直到任务结束
  • 取消 :将抛出 CancellationException
创建 Future
  1. ExecutorService 中所有 submit 方法都会 返回一个Future
  2. RunnableCallable 提交Executor 得到Future
  3. 显式 地为 Runnable 或者 Callable 创建 一个 FutureTask 提交Executor
  4. ExecutorService 也可以使用 newTaskFor 方法为 Callable 获得Future

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> task) {
        return new FutureTask<T>(task);
    }
    

实例

使用Future渲染图片

public class FutureRenderer implements Render {
    private static final int NTHREADS = 100;
    private final ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);

    @Override
    public void renderPage(CharSequence source) {
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        Callable<List<ImageData>> task;
        task = () -> {
            List<ImageData> result
            = new ArrayList<>();
            imageInfos.forEach((imageInfo) -> {
                    result.add(imageInfo.downloadImage());
                });
            return result;
        };
        Future<List<ImageData>> future = executor.submit(task);
        // 渲染文本  
        renderText(source);
        try {
            // get方法将阻塞, 直到task完成下载  
            List<ImageData> imageData = future.get();
            imageData.forEach((data) -> {
                    // 渲染图片  
                    renderImage(data);
                });
        } catch (InterruptedException e) {
            // Re-assert the thread’s interrupted status
            Thread.currentThread().interrupt();
            // We don’t need the result, so cancel the task too
            future.cancel(true);
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}
     下载图片任务 比 渲染文本任务慢的多,可以把下载图片拆分成多个任务

CompletionService

CompletionServiceExecutorBlockingQueue 融合在一起

  • Callable 任务 提交CompletionService 执行
  • 使用类似 队列 操作的 takepoll 方法来获得已知的结果
  • 这些结果会在 全部结束 时候 封装Future
ExecutorCompletionService实现了CompletionService

ExecutorCompletionService的构造函数中创建一个 BlockingQueue 来保存计算完成的结果

当计算完成时候,调用FutureTask的done方法

当提交某个任务的时候,该任务将包装成为QueueingFuture, 这是FutureTask的一个子类
然后改写子类的done方法,将结果放入BlockingQueue中

take和poll方法委托给BlockingQueue, 这些方法在得出结果前会“阻塞”
  public class ExecutorCompletionService<V> implements CompletionService<V> {
...
      private final BlockingQueue<Future<V>> completionQueue;

      public ExecutorCompletionService(Executor executor) {
...
          this.completionQueue = new LinkedBlockingQueue<Future<V>>();
      }

      public Future<V> submit(Callable<V> task) {  
          if (task == null) throw new NullPointerException();  
          RunnableFuture<V> f = newTaskFor(task);  
          // 将任务包装成 QueueingFuture 对象后委托给executor执行  
          executor.execute(new QueueingFuture(f));  
          return f;  
      }

      private class QueueingFuture<V> extends FutureTask<V> {
          QueueingFuture(Callable<V> c) { super(c); }
          QueueingFuture(Runnable t, V r) { super(t, r); }
          protected void done() {
              completionQueue.add(this);
          }
      }

      public Future<V> take() throws InterruptedException {  
          return completionQueue.take();  
      }  

      public Future<V> poll() {  
          return completionQueue.poll();  
      }
      ...
  }

使用 CompletionService 渲染

public class CompletionRenderer implements Render {
    private final ExecutorService executor = Executors.newCachedThreadPool();

    public void renderPage(CharSequence source) {
        List<ImageInfo> info = scanForImageInfo(source);
        // 将图片下载拆分为多个任务  
        CompletionService<ImageData> completionService
            = new ExecutorCompletionService<>(executor);
        info.forEach((imageInfo) -> {
                completionService.submit(() -> imageInfo.downloadImage());
            });
        renderText(source);
        try {
            for (int t = 0, n = info.size(); t < n; t++) {
                // take方法可能阻塞: 当已完成队列中为空时  
                Future<ImageData> f = completionService.take();
                // get方法不会阻塞, 因为从take方法返回的Future对象肯定是已完成的  
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

Next:任务取消

Previous:基础模块

Up:目录