线程池
Table of Contents
任务和执行策略的隐形耦合
某些 任务的特质 会 要求或者排除 某些特定的 执行策略
- 有 相互依赖 的任务在 同一个线程池 会要求 线程池无限大
- 依赖于 线程封闭 的任务 只能用 单线程执行 的线程池
- 快速响应 的任务 不适合 单线程执行 的线程池
- 使用 ThreadLocal 的任务 不适合 线程池
线程死锁
相互依赖 的任务在 一个线程池 中执行,就可能出现 饥饿死锁
比如在一个单线程执行的线程池中 线程B需要等待线程A执行完毕才能进入线程池执行 而线程A又要等待线程B执行完毕才能完毕。这样就造成了死锁 当线程池容量更大一点的情况下,依然可能所有存在于线程池中的线程都在等待“未能进入线程池”的线程执行完毕
public class ThreadDeadlock { ExecutorService exec = Executors.newSingleThreadExecutor(); private class RenderPageTask implements Callable<String> { public String call() throws Exception { Future<String> header, footer; header = exec.submit(new LoadFileTask("header.html")); footer = exec.submit(new LoadFileTask("footer.html")); String page = renderBody(); // Will deadlock -- task waiting for result of subtask return header.get() + page + footer.get(); } } }
长时间运行任务
线程池包含 许多运行长时间 的任务会导致 所有任务 哪怕本来响应很快 的响应变慢
使用timeout机制,标记那些超时的任务为失败,在未来串行执行
线程池大小
- 对于 计算密集型 task, 合适的size大约为 CPU数量 + 1
- 对于 I/O占较大比例 的task, 合适的size大约为: size = CPU数量 * CPU利用率 * (1 + I/O时间比例)
实际size还受到内存, 文件句柄, socket, 数据库连接数 等稀缺资源的约束 将总的稀缺资源除以每一个task使用的资源数, 能得到线程数的上限
线程池配置
线程的创建和销毁
ThreadPoolExecutor 构造函数中与线程的 创建 和 销毁 有关的参数:
- corePoolSize :线程池中持有的 核心线程数
- 除非task队列已满 , ThreadPoolExecutor 不会创建 超过核心线程数 的线程
- corePoolSize为0 :一种特殊情况, 此时就算task队列没有饱和, 向线程池第一次提交task时仍然会创建新的线程
- 核心线程 一旦 创建 就 不会销毁 ,除非:
- 设置了 allowCoreThreadTimeOut(true)
- 关闭 线程池
- 除非task队列已满 , ThreadPoolExecutor 不会创建 超过核心线程数 的线程
- maximumPoolSize :线程池中持有的 最大线程数
keepAliveTime :对于 超过核心线程数 的线程, 如果在 指定的超时时间内 没有使用 到, 就会被 销毁
public static ExecutorService newCachedThreadPool() { // 核心线程数为0, 最大线程数为Integer.MAX_VALUE, 超时时间为60s return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { // 核心线程数和最大线程数都为调用方指定的值nThreads, 超时时间为0 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { // 核心线程数由调用方指定, 最大线程数为Integer.MAX_VALUE, 超时时间为0 return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
任务队列
线程池内部持有一个 任务队列 , 当 任务的提交速度 超过 任务的执行速度 时, 任务将被 缓存 在 任务队列 中等待有线程可用时再执行。ThreadPoolExecutor在创建时可以为其指定任务队列。一般有三种选择:
- 有界 队列
- 无界 队列:newFixedThreadPool, newScheduledThreadPool
- 同步 队列:newCashedThreadPool
线程数不多 的线程池:
- 指定一个 容量大的队列 (或者无界队列):
- 有助于 减少线程间切换 , CPU等方面的消耗
- 可能会造成吞吐量下降
- 如果使用的是 有界队列 , 队列 可能会被填满 :将根据 指定的饱和策略 进行处理
同步队列其实不能算是一种队列,因为同步队列没有缓存的作用 使用同步队列时, task被提交时, 直接由线程池中的线程接手 如果此时线程池中没有可用的线程, 线程池将创建新的线程接手 如果线程池无法创建新的线程(比如线程数已到达maximumPoolSize), 则根据指定的饱和策略进行处理
线程数很大 的线程池, 可以使用 同步队列
饱和策略
- 有界队列 : 队列满后继续提交 task时, 饱和策略会被触发.
- 同步队列 : 线程池 无法创建新的线程 接手task时, 饱和策略会被触发.
- 线程池被关闭 后,:向其提交task时, 饱和策略也会被触发.
ThreadPoolExecutor.setRejectedExecutionHandler 方法用于 设定饱和策略
预定义 RejectedExecutionHandler 的实现类
AbortPolicy : 默认的 饱和策略, 抛出 RejectedExecutionException 异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); }
CallerRunsPolicy : 在 提交task的线程中执行 task, 而不是由线程池中的线程执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
DiscardPolicy : 将最新提交的任务 丢弃
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 丢弃, 不做任何处理 }
DiscardOldestPolicy : 将 队列头部的任务丢弃 ,然后尝试重新提交新的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
CallerRunsPolicy
class LifecycleWebServer { // MAX_THREAD_COUNT和MAX_QUEUE_COUNT的值根据系统的实际情况确定 private static final int MAX_THREAD_COUNT = 100; private static final int MAX_QUEUE_COUNT = 1000; // 使用有界队列作为task队列, 当有界队列满时, 将触发饱和策略 private final ThreadPoolExecutor exec = new ThreadPoolExecutor(0, MAX_THREAD_COUNT, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(MAX_QUEUE_COUNT)); public void start() throws IOException { // 设置饱和策略为CallerRunsPolicy exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) log("task submission rejected", e); } } } public void stop() { exec.shutdown(); } void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); else dispatchRequest(req); } public static void main(String[] args) { LifecycleWebServer server = new LifecycleWebServer(); try { // 在main线程中启动server server.start(); } catch (IOException e) { e.printStackTrace(); } } }
- 如果线程池饱和时主线程仍然向线程池提交任务, 那么任务将在主线程中执行
- 主线程执行任务是需要一定时间的,这样就给了线程池喘息的机会
- 主线程在执行任务的时间内 无法接受socket连接 , 因此socket连接请求将 缓存在tcp层
- 如果 server过载持续的时间较长 , 使得 tcp层的缓存不够 , 那么 tcp缓存 将根据其策略 丢弃部分请求
整个系统的过载压力逐步向外扩散: 线程池 - 线程池中的队列 - main线程 - tcp层 - client 不会因为过多的请求而导致系统资源耗尽, 也不会一发生过载时就拒绝服务, 只有发生长时间系统过载时才会出现客户端无法连接的情况!!!
BoundExecutor
没有预定的饱和策略 来限定线程池中执行任务线程的数量,可以结合 Semaphore 来实现
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; // 设定信号量permit的上限 this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException { // 提交task前先申请permit, 如果无法申请到permit, 调用submitTask的线程将被阻塞, 直到有permit可用 semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { // 提交成功了, 运行task后释放permit semaphore.release(); } } }); } catch (RejectedExecutionException e) { // 如果没有提交成功, 也需要释放permit semaphore.release(); } } }
ThreadFactory
创建ThreadPoolExecutor时还可以为其 指定 ThreadFactory :当线程池需要 创建新的线程 时会调用 ThreadFactory 的 newThread 方法
- 默认的 ThreadFactory创建的线程是 nonDaemon :
- 线程优先级: 普通 的线程
- 指定了 可识别的线程名称
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
客户化的MyAppThread
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); private static volatile boolean debugLifecycle = false; public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { // 为自定义的Thread类指定线程名称 super(runnable, name + "-" + created.incrementAndGet()); // 设置UncaughtExceptionHandler. UncaughtExceptionHandler的uncaughtException方法将在线程运行中抛出未捕获异常时由系统调用 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } @Override public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } }
客户化ThreadFactory返回MyAppThread
public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName); } }
扩展线程池
ThreadPoolExecutor 类提供了多个 钩子 方法,以 供其子类实现
- beforeExecute : 任务执行前
- afterExecute : 任务执行后
- terminated : 线程池被关闭后 (释放线程池申请的资源)
private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } }
扩展线程池,在日志中 记录每个任务执行时间
public class TimingThreadPool extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final Logger log = Logger.getLogger("TimingThreadPool"); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t, r)); startTime.set(System.nanoTime()); } protected void afterExecute(Runnable r, Throwable t) { try { long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); } finally { super.afterExecute(r, t); } } protected void terminated() { try { log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get())); } finally { super.terminated(); } } }
并行执行递归算法
如果 循环体所进行的操作 是 相互独立 的, 这样的循环可以并发的运行
void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } // 将相互独立的循环操作转变为并发操作 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); }
有循环的递归操作 也可以并发进行计算
public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } public<T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { //并发进行计算 exec.execute(new Runnable() { public void run() { results.add(n.compute()); } }); //遍历依旧是递归 parallelRecursive(exec, n.getChildren(), results); } } //获取最终结果 public<T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
Java7 之后可以使用 ForkJoinPool Java 8 之后可以使用 parallel stream