UP | HOME

线程池

Table of Contents

任务和执行策略的隐形耦合

某些 任务的特质要求或者排除 某些特定的 执行策略

  • 相互依赖 的任务在 同一个线程池 会要求 线程池无限大
  • 依赖于 线程封闭 的任务 只能用 单线程执行 的线程池
  • 快速响应 的任务 不适合 单线程执行 的线程池
  • 使用 ThreadLocal 的任务 不适合 线程池

线程死锁

相互依赖 的任务在 一个线程池 中执行,就可能出现 饥饿死锁

     比如在一个单线程执行的线程池中
     线程B需要等待线程A执行完毕才能进入线程池执行
     而线程A又要等待线程B执行完毕才能完毕。这样就造成了死锁

     当线程池容量更大一点的情况下,依然可能所有存在于线程池中的线程都在等待“未能进入线程池”的线程执行完毕
public class ThreadDeadlock {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    private class RenderPageTask implements Callable<String> {
        public String call() throws Exception {
            Future<String> header, footer;
            header = exec.submit(new LoadFileTask("header.html")); 
            footer = exec.submit(new LoadFileTask("footer.html"));
            String page = renderBody();
            // Will deadlock -- task waiting for result of subtask
            return header.get() + page + footer.get();
        }
    }
}

长时间运行任务

线程池包含 许多运行长时间 的任务会导致 所有任务 哪怕本来响应很快 的响应变慢

     使用timeout机制,标记那些超时的任务为失败,在未来串行执行

线程池大小

  • 对于 计算密集型 task, 合适的size大约为 CPU数量 + 1
  • 对于 I/O占较大比例 的task, 合适的size大约为: size = CPU数量 * CPU利用率 * (1 + I/O时间比例)
    实际size还受到内存,  文件句柄,  socket, 数据库连接数 等稀缺资源的约束

    将总的稀缺资源除以每一个task使用的资源数,  能得到线程数的上限

线程池配置

线程的创建和销毁

ThreadPoolExecutor 构造函数中与线程的 创建销毁 有关的参数:

  • corePoolSize :线程池中持有的 核心线程数
    • 除非task队列已满 , ThreadPoolExecutor 不会创建 超过核心线程数 的线程
      • corePoolSize为0 :一种特殊情况, 此时就算task队列没有饱和, 向线程池第一次提交task时仍然会创建新的线程
    • 核心线程 一旦 创建不会销毁 ,除非:
      • 设置了 allowCoreThreadTimeOut(true)
      • 关闭 线程池
  • maximumPoolSize :线程池中持有的 最大线程数
  • keepAliveTime :对于 超过核心线程数 的线程, 如果在 指定的超时时间内 没有使用 到, 就会被 销毁

    public static ExecutorService newCachedThreadPool() {  
        // 核心线程数为0, 最大线程数为Integer.MAX_VALUE, 超时时间为60s  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());  
    }  
    
    public static ExecutorService newFixedThreadPool(int nThreads) {  
        // 核心线程数和最大线程数都为调用方指定的值nThreads, 超时时间为0  
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,  
                          new LinkedBlockingQueue<Runnable>());  
    }  
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
        // 核心线程数由调用方指定, 最大线程数为Integer.MAX_VALUE, 超时时间为0  
        return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue());  
    }   
    

任务队列

线程池内部持有一个 任务队列 , 当 任务的提交速度 超过 任务的执行速度 时, 任务将被 缓存任务队列 中等待有线程可用时再执行。ThreadPoolExecutor在创建时可以为其指定任务队列。一般有三种选择:

  1. 有界 队列
  2. 无界 队列:newFixedThreadPool, newScheduledThreadPool
  3. 同步 队列:newCashedThreadPool

线程数不多 的线程池:

  • 指定一个 容量大的队列 (或者无界队列):
    • 有助于 减少线程间切换 , CPU等方面的消耗
    • 可能会造成吞吐量下降
  • 如果使用的是 有界队列 , 队列 可能会被填满 :将根据 指定的饱和策略 进行处理
     同步队列其实不能算是一种队列,因为同步队列没有缓存的作用

     使用同步队列时, task被提交时, 直接由线程池中的线程接手
     如果此时线程池中没有可用的线程, 线程池将创建新的线程接手
     如果线程池无法创建新的线程(比如线程数已到达maximumPoolSize), 则根据指定的饱和策略进行处理 

线程数很大 的线程池, 可以使用 同步队列

饱和策略

  • 有界队列队列满后继续提交 task时, 饱和策略会被触发.
  • 同步队列 : 线程池 无法创建新的线程 接手task时, 饱和策略会被触发.
  • 线程池被关闭 后,:向其提交task时, 饱和策略也会被触发.

ThreadPoolExecutor.setRejectedExecutionHandler 方法用于 设定饱和策略

预定义 RejectedExecutionHandler 的实现类

  1. AbortPolicy : 默认的 饱和策略, 抛出 RejectedExecutionException 异常

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        throw new RejectedExecutionException();  
    }   
    
  2. CallerRunsPolicy : 在 提交task的线程中执行 task, 而不是由线程池中的线程执行

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        if (!e.isShutdown()) {  
            r.run();  
        }  
    }  
    
  3. DiscardPolicy : 将最新提交的任务 丢弃

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        // 丢弃, 不做任何处理  
    }   
    
  4. DiscardOldestPolicy : 将 队列头部的任务丢弃 ,然后尝试重新提交新的任务

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        if (!e.isShutdown()) {  
            e.getQueue().poll();  
            e.execute(r);  
        }  
    }   
    

CallerRunsPolicy

class LifecycleWebServer {  
    // MAX_THREAD_COUNT和MAX_QUEUE_COUNT的值根据系统的实际情况确定  
    private static final int MAX_THREAD_COUNT = 100;  
    private static final int MAX_QUEUE_COUNT = 1000;  

    // 使用有界队列作为task队列, 当有界队列满时, 将触发饱和策略  
    private final ThreadPoolExecutor exec = new ThreadPoolExecutor(0, MAX_THREAD_COUNT, 60L, TimeUnit.SECONDS,  
                                       new ArrayBlockingQueue<Runnable>(MAX_QUEUE_COUNT));  

    public void start() throws IOException {  
        // 设置饱和策略为CallerRunsPolicy  
        exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        ServerSocket socket = new ServerSocket(80);  
        while (!exec.isShutdown()) {  
            try {  
                final Socket conn = socket.accept();  
                exec.execute(new Runnable() {  
                        public void run() {  
                            handleRequest(conn);  
                        }  
                    });  
            } catch (RejectedExecutionException e) {  
                if (!exec.isShutdown())  
                    log("task submission rejected", e);  
            }  
        }  
    }  

    public void stop() {  
        exec.shutdown();  
    }  

    void handleRequest(Socket connection) {  
        Request req = readRequest(connection);  
        if (isShutdownRequest(req))  
            stop();  
        else  
            dispatchRequest(req);  
    }  

    public static void main(String[] args) {  
        LifecycleWebServer server = new LifecycleWebServer();  
        try {  
            // 在main线程中启动server  
            server.start();  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}   
  • 如果线程池饱和时主线程仍然向线程池提交任务, 那么任务将在主线程中执行
  • 主线程执行任务是需要一定时间的,这样就给了线程池喘息的机会
    • 主线程在执行任务的时间内 无法接受socket连接 , 因此socket连接请求将 缓存在tcp层
    • 如果 server过载持续的时间较长 , 使得 tcp层的缓存不够 , 那么 tcp缓存 将根据其策略 丢弃部分请求
      整个系统的过载压力逐步向外扩散: 线程池 - 线程池中的队列 - main线程 - tcp层 - client

      不会因为过多的请求而导致系统资源耗尽, 也不会一发生过载时就拒绝服务, 只有发生长时间系统过载时才会出现客户端无法连接的情况!!!

BoundExecutor

没有预定的饱和策略 来限定线程池中执行任务线程的数量,可以结合 Semaphore 来实现

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        // 设定信号量permit的上限
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException {
        // 提交task前先申请permit, 如果无法申请到permit, 调用submitTask的线程将被阻塞, 直到有permit可用
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                    public void run() {
                        try {
                            command.run();
                        } finally {
                            // 提交成功了, 运行task后释放permit
                            semaphore.release();
                        }
                    }
                });
        } catch (RejectedExecutionException e) {
            // 如果没有提交成功, 也需要释放permit
            semaphore.release();
        }
    }
}

ThreadFactory

创建ThreadPoolExecutor时还可以为其 指定 ThreadFactory :当线程池需要 创建新的线程 时会调用 ThreadFactorynewThread 方法

  • 默认的 ThreadFactory创建的线程是 nonDaemon
    • 线程优先级: 普通 的线程
    • 指定了 可识别的线程名称
public Thread newThread(Runnable r) {  
    Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);  
    if (t.isDaemon())  
        t.setDaemon(false);  
    if (t.getPriority() != Thread.NORM_PRIORITY)  
        t.setPriority(Thread.NORM_PRIORITY);  
    return t;  
}  

客户化的MyAppThread

public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();
    private static volatile boolean debugLifecycle = false;

    public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
    }

    public MyAppThread(Runnable runnable, String name) {
        // 为自定义的Thread类指定线程名称
        super(runnable, name + "-" + created.incrementAndGet());
        // 设置UncaughtExceptionHandler. UncaughtExceptionHandler的uncaughtException方法将在线程运行中抛出未捕获异常时由系统调用
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                public void uncaughtException(Thread t, Throwable e) {
                    log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
                }
            });
    }

    public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }

    @Override
    public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug)
            log.log(Level.FINE, "Created " + getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug)
                log.log(Level.FINE, "Exiting " + getName());
        }
    }
}

客户化ThreadFactory返回MyAppThread

public class MyThreadFactory implements ThreadFactory {  
    private final String poolName;  

    public MyThreadFactory(String poolName) {  
        this.poolName = poolName;  
    }  

    public Thread newThread(Runnable runnable) {  
        return new MyAppThread(runnable, poolName);  
    }  
}  

扩展线程池

ThreadPoolExecutor 类提供了多个 钩子 方法,以 供其子类实现

  1. beforeExecute : 任务执行前
  2. afterExecute : 任务执行后
  3. terminated : 线程池被关闭后 (释放线程池申请的资源)
private void runTask(Runnable task) {  
    final ReentrantLock runLock = this.runLock;  
    runLock.lock();  
    try {  
        if (runState < STOP && Thread.interrupted() && runState >= STOP)  
            thread.interrupt();  
        boolean ran = false;  
        beforeExecute(thread, task);  
        try {  
            task.run();  
            ran = true;  
            afterExecute(task, null);  
            ++completedTasks;  
        } catch (RuntimeException ex) {  
            if (!ran)  
                afterExecute(task, ex);  
            throw ex;  
        }  
    } finally {  
        runLock.unlock();  
    }  
}   

扩展线程池,在日志中 记录每个任务执行时间

public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }

    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s: end %s, time=%dns",
                           t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }

    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time=%dns",
                           totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

并行执行递归算法

如果 循环体所进行的操作相互独立 的, 这样的循环可以并发的运行

void processSequentially(List<Element> elements) {
    for (Element e : elements)
        process(e);
}

// 将相互独立的循环操作转变为并发操作
void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
        exec.execute(new Runnable() {
                public void run() { process(e); }
            });
    exec.shutdown();   
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 
}

有循环的递归操作 也可以并发进行计算

public<T> void sequentialRecursive(List<Node<T>> nodes,
                   Collection<T> results) {
    for (Node<T> n : nodes) {
        results.add(n.compute());
        sequentialRecursive(n.getChildren(), results);
    }
}

public<T> void parallelRecursive(final Executor exec,
                 List<Node<T>> nodes,
                 final Collection<T> results) {
    for (final Node<T> n : nodes) {
        //并发进行计算
        exec.execute(new Runnable() {
                public void run() {
                    results.add(n.compute());
                }
            });
        //遍历依旧是递归
        parallelRecursive(exec, n.getChildren(), results);
    }
}

//获取最终结果
public<T> Collection<T> getParallelResults(List<Node<T>> nodes)
    throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
}
Java7 之后可以使用 ForkJoinPool

Java 8 之后可以使用 parallel stream 

Next:显式锁

Previous:任务取消

Up:目录