首页 独家 > 正文

【世界新要闻】ThreadPoolExecutor源码学习

2023-04-11 18:30:43 来源:博客园


(资料图片仅供参考)

线程池ThreadPoolExecutor

ThreadPoolExecutor 继承结构

继承结构如图所示:ThreadPoolExecutor <- AbstractExecutorService <- ExecutorService <- Executor

public class ThreadPoolExecutor extends AbstractExecutorService {    //...}/** * 实现了部分 ExecutorService 方法 * 1. submit 方法 * 2. invokeAny 方法 * 3. invokeAll 方法 */public abstract class AbstractExecutorService implements ExecutorService {    /**     * Callable -> FutureTask     * FutureTask implements RunnableFuture     * RunnableFuture extends Future, Runnable     *     * FutureTask Status:     *      NEW(0): 初始状态, 任务刚被创建或者正在计算中     *      COMPLETING(1): 中间状态, 任务计算完成正在对结果进行赋值,或者正在处理异常     *      NORMAL(2): 终止状态, 任务计算完成, 结果已经完成赋值     *      EXCEPTIONAL(3): 终止状态, 任务计算过程发生异常无法处理,线程中断     *      CANCELLED(4): 终止状态, 任务计算过程被取消     *      INTERRUPTING(5): 中间状态, 任务计算过程已开始并被中断,正在修改状态     *      INTERRUPTED(6): 终止状态,任务计算过程已开始并被中断,且已经完全停止     */    protected  RunnableFuture newTaskFor(Callable callable) {        return new FutureTask(callable);    }    protected  RunnableFuture newTaskFor(Runnable runnable, T value) {        return new FutureTask(runnable, value);    }        // 提交 callable 任务    public  Future submit(Callable task) {        if (task == null) throw new NullPointerException();        RunnableFuture ftask = newTaskFor(task);        execute(ftask);        return ftask;    }        // 提交 runnable 任务,返回 null    public Future submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }        // 提交 runnable 任务,返回 result    public  Future submit(Runnable task, T result) {        if (task == null) throw new NullPointerException();        RunnableFuture ftask = newTaskFor(task, result);        execute(ftask);        return ftask;    }            // invokeAll    // 为每一个任务创建对应的FutureTask, 并调用 execute 方法执行    // execute() 方法在 ThreadPoolExecutor 被实现    public  List> invokeAll(Collection> tasks)        throws InterruptedException {        if (tasks == null)            throw new NullPointerException();        ArrayList> futures = new ArrayList>(tasks.size());        boolean done = false;        try {            for (Callable t : tasks) {                RunnableFuture f = newTaskFor(t);                futures.add(f);                execute(f);            }            for (int i = 0, size = futures.size(); i < size; i++) {                Future f = futures.get(i);                                // 如何任务此时还未执行完成,则阻塞获取对应的值                if (!f.isDone()) {                    try {                        f.get();                    } catch (CancellationException ignore) {                    } catch (ExecutionException ignore) {                    }                }            }            done = true;            return futures;        } finally {            // 执行过程抛出无法处理的异常            if (!done)                for (int i = 0, size = futures.size(); i < size; i++)                    // 取消任务的执行,如果任务已经执行完成,则不受影响                    futures.get(i).cancel(true);        }    }        // InvokeAny 方法逻辑待后续更新}/** * 在 Executor 的基础上定义了一系列任务执行和线程池管理方法 *  * 1. submit: 提供方法执行带有返回值的任务 * 2. invokeAll: 提供方法执行指定的任务集合中的所有任务, 返回 List> * 3. invokeAny: 提供方法执行指定的任务集合中的所有任务, 将第一个执行完成的任务的结果作为返回值, 并终止其他线程的执行 * 4. isShutDown/isTerminated: 判断线程池状态方法 * 5. shutdown: 不再接受新的任务, 待所有任务执行完毕后关闭线程池  * 6. shutdownNow: 不再接受新的任务,直接关闭线程池 */public interface ExecutorService extends Executor {    // ...}/** * 只定义了一个 execute 方法, 执行 Runnable 任务 */public interface Executor {    void execute(Runnable command);}

ThreadPoolExecutor 关键参数及核心方法

关键参数

线程池状态参数

public class ThreadPoolExecutor extends AbstractExecutorService {    // 线程池状态,由两部分构造 runState | workerCount    // runState: 占2bit(29~30位)    // workerCount: 占29bit(0~28位)    // 符号位: 占1bit(最高位)    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));         // workerCount 最大容量: 2^29 - 1    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;        /**     * 线程池状态     *     RUNNING: 运行状态,接受新任务,处理阻塞队列中的任务     *     SHUTDOWN: 关闭状态,拒绝新任务,处理阻塞队列中的任务     *     STOP: 停止状态,拒绝新任务,并中断当前正在执行的任务,不处理阻塞队列中的任务直接关闭     *     TIDYING: 过度状态,当前线程池中的活动线程数降为0时的状态     *     TERMINATED: 销毁状态,线程池彻底终止     */    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;}

线程池状态转移图如下所示

  • RUNNING: 线程池创建后进入的状态
  • SHUTDOWN: 调用 shutdown方法进入该状态,该方法主要包含如下操作
    • 更新线程池状态为 SHUTDOWN
    • 中断空闲线程 interruptIdleWorkers()
    • 所以已经存在任务队列中的任务还是能被正常执行完成
    • 执行完所有任务后,先清除所有的worker,然后调用 tryTerminate(),进入 TIDYING状态
  • STOP: 调用 shutdownNow()方法进入该状态,该方法主要包含如下操作
    • 更新线程池状态为 STOP
    • 中断所有线程 interruptWorkers()
    • 清空任务队列 drainQueue()
    • 立即调用 tryTerminate()进入 TIDYING状态
  • TIDYING: 调用 terminated()方法
  • TERMINATED: 执行完 terminated()方法进入该状态
    • ctl.set(ctlOf(TERMINATED, 0))

线程池管理参数

public class ThreadPoolExecutor extends AbstractExecutorService {    // 任务队列    private final BlockingQueue workQueue;        // 工作线程集合    private final HashSet workers = new HashSet();        // 线程池到达过的最大线程数量    private int largestPoolSize;        // 已完成任务数    private long completedTaskCount;        // 线程工厂,用于创建线程    private volatile ThreadFactory threadFactory;        // 拒绝策略处理类    private volatile RejectedExecutionHandler handler;        // 线程池中线程数量 > corePoolSize 情况下,空闲线程的最大存活时间    private volatile long keepAliveTime;        // true: 线程数量 <= corePoolSize 情况下,空闲线程的最大存活时间也设置为 keepAliveTime    // false(default): 线程数量 <= corePoolSize 情况下,空闲线程可以一直存活    private volatile boolean allowCoreThreadTimeOut;        // 设置线程池 —— 核心线程数    private volatile int corePoolSize;        // 设置线程池 —— 最大线程数    private volatile int maximumPoolSize;        // 默认任务拒绝策略: 抛出异常    private static final RejectedExecutionHandler defaultHandler =        new AbortPolicy();}

核心方法

构造函数

// corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue 必须手动设置// threadFactory, handler 可以使用默认设置public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.acc = System.getSecurityManager() == null ?            null :            AccessController.getContext();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}

execute() 方法

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();        int c = ctl.get();        // workerCount < corePoolSize,则直接添加一个 worker 执行该任务    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }        // workerCount >= corePoolSize, 则先尝试将任务添加到 workQueue    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();                // 任务添加到 workQueue 后,执行recheck        // 如果线程池未处于 Running 状态,则将刚刚添加的任务从阻塞队列中删除        if (!isRunning(recheck) && remove(command))            reject(command);        // 如果线程池处于 Running 状态,则判断是否需要添加一个新的 worker        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }        // workerCount >= corePoolSize, 并且任务队列已满,添加失败    // 则尝试增加一个新的 worker 执行该任务    // 如果添加失败,则调用拒绝策略处理类    else if (!addWorker(command, false))        reject(command);}

execute提交新任务的处理策略总结如下:

  1. workerCount < corePoolSize: 直接添加一个新的 worker 执行任务
  2. workerCount >= corePoolSize: 尝试添加到任务队列
    • 添加成功则执行recheck
    • 添加失败则尝试创建一个新的 worker 来执行该任务,创建worker失败则调用拒绝策略处理

addWorker() 方法

该方法用于添加一个新的 Worker 到线程池中,包括两个参数:

  • firstTask(Runnable): 创建完成后第一个执行的任务
  • core(boolean):
    • true: 使用 corePoolSize 为最大线程数量
    • false: 使用 maxPoolSize 为最大线程数量
private boolean addWorker(Runnable firstTask, boolean core) {    // 循环标签,方便跳出    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        /**         * 判断线程池状态:以下状态才能添加 worker         *    1. 线程池处于 RUNNING 状态         *    2. 线程池处于 SHUTDOWN 状态 且 firstTask 为 null 且 workQueue 不为空         */        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            return false;        for (;;) {            int wc = workerCountOf(c);                        // 判断当前 worker 数量是否还能继续添加            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;                        // CAS 更新 workerCount            if (compareAndIncrementWorkerCount(c))                break retry;                        // CAS 更新失败则自旋重试            c = ctl.get();            if (runStateOf(c) != rs)                continue retry;        }    }    // worker 启动标识    boolean workerStarted = false;    // worker 加入 HashSet 集合标识    boolean workerAdded = false;        Worker w = null;    try {        // Worker构造方法调用 threadFactory 创建新的线程        w = new Worker(firstTask);                final Thread t = w.thread;        if (t != null) {            final ReentrantLock mainLock = this.mainLock;                        // 加锁,保证多个线程同时添加 worker 到集合中的安全性            mainLock.lock();            try {                int rs = runStateOf(ctl.get());                //                 if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    if (t.isAlive()) // 判断该线程是否已经启动                        throw new IllegalThreadStateException();                    workers.add(w);                    int s = workers.size();                    if (s > largestPoolSize)                        largestPoolSize = s;                    workerAdded = true;                }            } finally {                mainLock.unlock();            }            if (workerAdded) {                t.start();    // 启动线程                workerStarted = true;            }        }    } finally {        if (! workerStarted)            // worker 启动失败,则做一些回退处理            // 从 workers 集合中删除 worker            // workCount 减少1            addWorkerFailed(w);    }    return workerStarted;}

Worker

  • Worker类实现了Runnable接口,所以在创建线程中可以传入自己作为任务,然后线程启动时调用自己的run()方法

  • Worker类继承自AQS,所以其本身也是一把锁(不可重入锁),在执行任务时通过lock()锁住自己,保证worker正在执行时不会去获取其他任务来执行

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable {        Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;                // 传入自己作为 Runnable 实例        // 线程启动时执行 Worker.run() 方法        this.thread = getThreadFactory().newThread(this);    }        // run() 则调用外部 ThreadPoolExecutor 的 runWorker 方法    public void run() {        runWorker(this);    }}

runWorker() 方法

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();        // 初始任务    Runnable task = w.firstTask;    // firstTask 执行过一次后被置为 null    w.firstTask = null;        w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // 循环获取任务执行,复用已有线程        // getTask() 从任务队列获取task        while (task != null || (task = getTask()) != null) {            w.lock();                        // 若线程池处于 STOP 状态,但线程没有中断执行,则调用 interrupt() 方法完成中断            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                // 钩子方法,任务执行前逻辑                // 默认实现为空,可自定义线程池扩展该功能                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 执行任务                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    // 钩子方法,任务执行后逻辑                // 默认实现为空,可自定义线程池扩展该功能                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // 删除 worker,线程执行完毕        processWorkerExit(w, completedAbruptly);    }}

getTask() 方法

workQueue中获取任务,返回 Runnable 任务或者 null

  • return Runnable: worker正常执行
  • return null: 获取不到任务,进入 processWorkerExit 结束当前 worker
private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        /**         * 判断是否回收当前线程:          *    情况1. 线程池状态为 SHUTDOWN && workQueue 为空         *    情况2. 线程池状态为 STOP || TERMINATED         */        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            decrementWorkerCount();            return null;        }        int wc = workerCountOf(c);        // true: poll()获取任务,阻塞获取,设置超时时间        // false: take()获取任务,阻塞获取        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        /**         * 判断是否回收当前线程:          *    条件1. workerCount > maxPoolSize 或 当前线程获取任务超时         *    条件2. workerCount > 1 或 workQueue 为空          *         * 同时满足条件1和条件2,则CAS减少workerCount,并返回null         */        if ((wc > maximumPoolSize || (timed && timedOut))            && (wc > 1 || workQueue.isEmpty())) {            if (compareAndDecrementWorkerCount(c))                return null;            continue;        }        // 不满足回收当前线程的条件,则执行后续获取任务的逻辑        try {            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false;        }    }}

processWorkerExit() 方法

从 workers 工作线程集合中删除当前 worker,回收线程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {    // 如果是异常退出,则需要手动完成 workerCount 的更新    if (completedAbruptly)         decrementWorkerCount();    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        completedTaskCount += w.completedTasks;        workers.remove(w);    } finally {        mainLock.unlock();    }    // 尝试终止线程池    tryTerminate();    int c = ctl.get();    if (runStateLessThan(c, STOP)) {        if (!completedAbruptly) {            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;            if (min == 0 && ! workQueue.isEmpty())                min = 1;            if (workerCountOf(c) >= min)                return; // replacement not needed        }                // 1.如果是异常退出则直接添加一个新的 worker        // 2.如果 workerCount < 最小线程数要求,则添加一个新的 worker        addWorker(null, false);    }}

总结

创建线程池提交任务,整体执行流程如下图所示:

  • execute(): 提交 Runnable Task
  • submit(): 提交 Callable Task
  • wc: workerCount, 线程数量
  • rs: runState, 线程池运行状态
  • reject: 执行任务拒绝策略

关键词:

责任编辑:宋璟

返回首页
相关新闻
返回顶部