java8线程池源码解析(vip解析源码)

FutureTask

Runnable 的 void run() 方法是没有返回值的,所以如果我们需要返回值的话,会在 submit 中指定第二个参数作为返回值:

<T> Future<T> submit(Runnable task, T result);

其实这两个参数会被包装成 Callable。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常call() 方法会抛出异常。

public interface Callable<V> { V call() throws Exception; }

FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口,所以每个 Runnable 通常都先包装成 FutureTask,然后调用 executor.execute(Runnable command) 将其提交给线程池 。引入此类是为了将提交的每个任务包装为Runable,ThreadPoolExecutor下的内部类Worker才是线程。

Executor

/** * 只有提交任务的方法 */ void execute(Runnable command);

ExecutorService

Executor的基础上提供了一系列的方法,关闭线程池,提交有返回值的任务,批量执行任务等

public interface ExecutorService extends Executor { // 关闭线程池,已提交的任务继续执行,不接受继续提交新任务 void shutdown(); // 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务 List<Runnable> shutdownNow(); // 线程池是否已关闭 boolean isShutdown(); // 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true boolean isTerminated(); // 等待所有任务完成,并设置超时时间 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交一个 Callable 任务 <T> Future<T> submit(Callable<T> task); // 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值, // 因为 Runnable 的 run 方法本身并不返回任何东西 <T> Future<T> submit(Runnable task, T result); // 提交一个 Runnable 任务 Future<?> submit(Runnable task); // 执行所有任务,返回 Future 类型的一个 list <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 也是执行所有任务,但是这里设置了超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 同上一个方法,只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果, // 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService { // RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask // 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // 提交任务 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 1. 将任务包装成 FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); // 2. 交给执行器执行,execute 方法由具体的子类来实现 execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } // 此方法目的:将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了 // 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数, // 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); // 任务数 int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { // 用于保存异常信息,此方法如果没有得到任何有效的结果,那么我们可以抛出最后得到的一个异常 ExecutionException ee = null; long lastTime = timed ? System.nanoTime() : 0; Iterator<? extends Callable<T>> it = tasks.iterator(); // 首先先提交一个任务,后面的任务到下面的 for 循环一个个提交 futures.add(ecs.submit(it.next())); // 提交了一个任务,所以任务数量减 1 --ntasks; // 正在执行的任务数(提交的时候 +1,任务结束的时候 -1) int active = 1; for (;;) { // BlockingQueue 的 poll 方法不阻塞,返回 null 代表队列为空 Future<T> f = ecs.poll(); // 为 null,说明刚刚提交的第一个线程还没有执行完成 if (f == null) { if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // 任务全部执行失败 else if (active == 0) break; //没有任务了,但是设置了超时时间,这里检测是否超时 else if (timed) { // 带等待的 poll 方法 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); // 如果已经超时,抛出 TimeoutException 异常,这整个方法就结束了 if (f == null) throw new TimeoutException(); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } // 这里说明没有任务需要提交,但是池中的任务没有完成,还没有超时(如果设置了超时) // take() 方法会阻塞,直到有元素返回,说明有任务结束了 else f = ecs.take(); } // 有任务结束了 if (f != null) { --active; try { // 返回执行结果,如果有异常,都包装成 ExecutionException return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 方法退出之前,取消其他的任务 for (Future<T> f : futures) f.cancel(true); } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } // 执行所有的任务,返回任务结果。 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { // 包装成 FutureTask RunnableFuture<T> f = newTaskFor(t); futures.add(f); // 提交任务 execute(f); } for (Future<T> f : futures) { if (!f.isDone()) { try { // 这是一个阻塞方法,直到获取到值,或抛出了异常 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { // 处理异常情况 if (!done) for (Future<T> f : futures) f.cancel(true); } } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); long lastTime = System.nanoTime(); Iterator<Future<T>> it = futures.iterator(); // 每提交一个任务,检测一次是否超时 while (it.hasNext()) { execute((Runnable)(it.next())); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; // 超时 if (nanos <= 0) return futures; } for (Future<T> f : futures) { if (!f.isDone()) { if (nanos <= 0) return futures; try { // 调用带超时的 get 方法,这里的参数 nanos 是剩余的时间, f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } } done = true; return futures; } finally { if (!done) for (Future<T> f : futures) f.cancel(true); } } } 

ThreadPoolExecutor

构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } //corePoolSize:核心线程数 //maximumPoolSize:最大线程数 //keepAliveTime:空闲时间 //unit:时间单位 //workQueue:任务队列 //threadFactory:任务工厂 //handler:拒绝策略 //如果线程数达到corePoolSize,任务会提交到等待队列中,等待线程池中的线程来取任务并执行。 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 

主要属性:

  • corePoolSize

核心线程数

  • maximumPoolSize

最大线程数,线程池允许创建的最大线程数

  • workQueue

任务队列,BlockingQueue 接口的某个实现

  • keepAliveTime

空闲线程的存活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也可以被回收。

  • threadFactory

用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 XXX-Thread-1, XXX-Thread-2 类似这样。

  • handler:

当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。

Doug Lea 采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 000 11111111111111111111111111111 // 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 运算结果为 111跟29个0 => 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 将整数 c 的低 29 位修改为 0,就得到了线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 将整数 c 的高 3 为修改为 0,就得到了线程池中的线程数 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; }

线程池中的各个状态和状态变化的转换过程:

  • RUNNING:接受新的任务,处理等待队列中的任务
  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
  • TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个

RUNNING 定义为 -1,SHUTDOWN 定义为 0,其他的都比 0 大,所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断。

各个状态的转换过程有以下几种:

  • RUNNING -> SHUTDOWN:当调用了 shutdown() 后,会发生这个状态转换,这也是最重要的
  • (RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 后,会发生这个状态转换
  • SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING
  • STOP -> TIDYING:当任务队列清空后,发生这个转换
  • TIDYING -> TERMINATED:这个前面说了,当 terminated() 方法结束后

上面的几个记住核心的就可以了,尤其第一个和第二个。

提交任务方法:

public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }

submit 方法中参数是有Runnable 类型,也有Callable 类型,这个参数不是用于 new Thread(runnable).start() 中的,这个参数不是用于启动线程的,这里指的是任务,任务要做的事情是 run() 方法里面定义的或 Callable 中的 call() 方法里面定义的。

execute 方法:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 表示线程池状态和线程数的整数 int c = ctl.get(); // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务, // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了 // 至于执行的结果,到时候会包装到 FutureTask 中。 // 返回 false 代表线程池不允许提交任务 if (addWorker(command, true)) return; c = ctl.get(); } // 上面返回false,说明要么线程池被关闭了,要么说明线程池中的线程数超过了核心线程数 // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 这里面说的是,如果任务进入了 workQueue,我们是否需要开启新的线程 * 因为线程数在 [0, corePoolSize) 是无条件开启新的线程 * 如果线程数已经大于等于 corePoolSize,那么将任务添加到队列中,然后进到这里 */ int recheck = ctl.get(); // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程 // 这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果 workQueue 队列满了,那么进入到这个分支 // 以 maximumPoolSize 为界创建新的 worker, // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略 else if (!addWorker(command, false)) reject(command); } 

分析 addWorker(Runnable firstTask, boolean core) 方法之前看下Worker主要的字段和方法。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 这个是真正的线程 final Thread thread; // 如果当前线程池中的线程数小于核心线程数则firstTask不为空,值为提交的任务,如果超过核心线程数则从队列中拉去任务 Runnable firstTask; // 用于存放此线程完成的任务数,volatile,保证可见性 volatile long completedTasks; // Worker 只有这一个构造方法,传入 firstTask,也可以传 null Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 这里调用了外部类的 runWorker 方法 public void run() { runWorker(this); } } 

addWorker(Runnable firstTask, boolean core) 方法:

// 第一个参数是准备提交给这个线程执行的任务,可以为 null // 第二个参数为 true 代表使用核心线程数 corePoolSize 作为创建线程的界限,也就说创建这个线程的时候, // 如果线程池中的线程总数已经达到 corePoolSize,那么不能响应这次创建线程的请求 // 如果是 false,代表使用最大线程数 maximumPoolSize 作为界限 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker: // 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED // 2. firstTask != null // 3. workQueue.isEmpty() // 也就是说当线程池处于 SHUTDOWN 的时候,不允许提交任务,但是已有的任务继续执行 // 当状态大于 SHUTDOWN 时,不允许提交任务,且中断正在执行的任务 // 如果线程池处于 SHUTDOWN,但是 firstTask 为 null,且 workQueue 非空,那么是允许创建 worker 的 // 这是因为 SHUTDOWN不允许提交新的任务,但是要把已经进入到 workQueue 的任务执行完,所以是允许创建新Worker的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了 // 这里失败的话,说明有其他线程也在尝试往线程池中创建线程 if (compareAndIncrementWorkerCount(c)) break retry; // 由于有并发,重新再读取一下 ctl c = ctl.get(); // 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了 // 可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池 // 那么需要回到外层的for循环 if (runStateOf(c) != rs) continue retry; } } /* * 到这里就可以开始创建线程来执行任务了 */ // worker 是否已经启动 boolean workerStarted = false; // 是否已将这个 worker 添加到 workers 这个 HashSet 中 boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 把 firstTask 传给 worker 的构造方法 w = new Worker(firstTask); // 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程 final Thread t = w.thread; if (t != null) { // 这个是整个线程池的全局锁,持有这个锁才能让进行下面的操作, // 因为关闭一个线程池也需要这个锁,所以在持有锁的期间,线程池不会被关闭 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 小于 SHUTTDOWN 那就是 RUNNING // 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker 里面的 thread 可不能是已经启动的 if (t.isAlive()) throw new IllegalThreadStateException(); // 加到 workers 这个 HashSet 中 workers.add(w); int s = workers.size(); // largestPoolSize 用于记录 workers 中的个数的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功的话,启动这个线程 if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉 if (! workerStarted) addWorkerFailed(w); } // 返回线程是否启动成功 return workerStarted; } 

addWorkFailed:

// workers 中删除掉相应的 worker // workCount 减 1 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); // rechecks for termination, in case the existence of this worker was holding up termination tryTerminate(); } finally { mainLock.unlock(); } }

worker 中的线程 start 后,其 run 方法会调用 runWorker 方法:

// Worker 类的 run() 方法 public void run() { runWorker(this); }

runWorker 方法:

// 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行 // 前面说了,worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取 final void runWorker(Worker w) { // Thread wt = Thread.currentThread(); // 该线程的第一个任务(如果有的话) Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 循环调用 getTask 获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // 如果线程池状态大于等于 STOP,那么意味着该线程也要中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,留给需要的子类实现 beforeExecute(wt, task); Throwable thrown = null; try { // 到这里终于可以执行任务了 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 不清楚为啥不允许跑出Throwable异常 thrown = x; throw new Error(x); } finally { // 钩子方法,将 task 和异常作为参数,留给需要的子类实现 afterExecute(task, thrown); } } finally { // 置空 task,准备 getTask 获取下一个任务 task = null; // 累加完成的任务数 w.completedTasks++; // 释放掉 worker 的独占锁 w.unlock(); } } completedAbruptly = false; } finally { // 如果到这里,需要执行线程关闭: // 1. 说明 getTask 返回 null,也就是说,队列中已经没有任务需要执行了,执行关闭 // 2. 任务执行过程中发生了异常 // 第一种情况,已经在代码处理了将 workCount 减 1,这个在 getTask 方法分析中会说 // 第二种情况,workCount 没有进行处理,所以需要在 processWorkerExit 中处理 processWorkerExit(w, completedAbruptly); } }

getTask()获取任务:

// 此方法有三种可能: // 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的, // 它们会一直等待任务 // 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭 // 3. 如果发生了以下条件,此方法必须返回 null: // - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置) // - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务 // - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 两种可能 // 1. rs == SHUTDOWN && workQueue.isEmpty() // 2. rs >= STOP if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // CAS 操作,减少工作线程数 decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里 break,是为了不往下执行后一个 if (compareAndDecrementWorkerCount(c)) // 两个 if 一起看:如果当前线程数 wc > maximumPoolSize,或者超时,都返回 null // 那这里的问题来了,wc > maximumPoolSize 的情况,为什么要返回 null? // setMaximumPoolSize() 将线程池的 maximumPoolSize 调小了,那么多余的 Worker 就需要被关闭 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } // wc <= maximumPoolSize 同时没有超时 try { // 到 workQueue 中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量, // 那么意味着超出的部分线程要被关闭。 timedOut = false; } } } 

processWorkerExit(Worker w, boolean completedAbruptly)方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } 

execute(Runnable command) 方法:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //表示线程池状态和线程数的整数 int c = ctl.get(); // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务, // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任务成功,那么就结束了。线程池已经接受了这个任务,这个方法也就可以返回了 // 至于执行的结果,到时候会包装到 FutureTask 中。 // 返回 false 代表线程池不允许提交任务 if (addWorker(command, true)) return; c = ctl.get(); } // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 这里面说的是,如果任务进入了 workQueue,我们是否需要开启新的线程 * 因为线程数在 [0, corePoolSize) 是无条件开启新的线程 * 如果线程数已经大于等于 corePoolSize,那么将任务添加到队列中,然后进到这里 */ int recheck = ctl.get(); // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程 // 这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果 workQueue 队列满了,那么进入到这个分支 // 以 maximumPoolSize 为界创建新的 worker, // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略 else if (!addWorker(command, false)) reject(command); }

reject(command) 拒绝策略:

final void reject(Runnable command) { // 执行拒绝策略 handler.rejectedExecution(command, this); }

此处的 handler 我们需要在构造线程池的时候就传入这个参数,它是 RejectedExecutionHandler 的实例。

// 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } // 不管怎样,直接抛出 RejectedExecutionException 异常 // 这个是默认的策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } // 不做任何处理,直接忽略掉这个任务 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } // 这个相对霸道一点,如果线程池没有被关闭的话, // 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }

shutdown:

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 校验线程池中的线程是否可访问 checkShutdownAccess(); // 将线程池状态改为SHUTDOWN advanceRunState(SHUTDOWN); // 中段线程池中的所有线程(因为线程执行任务的时候需要获取锁,所以对于正在跑的线程不会被中断掉) interruptIdleWorkers(); // hooks,需要子类手动实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }

shutdownNow:

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 校验线程池中的线程是否可访问 checkShutdownAccess(); // 将线程池状态改为STOP advanceRunState(STOP); // 中段线程池中的所有线程(与上面的区别是不需要获取锁,所以线程池中的所以线程都会被中断掉) interruptWorkers(); // 任务队列中的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

总结

线程池关键属性

corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler

corePoolSize 到 maximumPoolSize 之间的线程会被回收,当corePoolSize 的线程也可以通过设置而得到回收(allowCoreThreadTimeOut(true))。

workQueue 用于存放任务,添加任务的时候,如果当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务。

keepAliveTime 用于设置空闲时间,如果线程数超出了 corePoolSize,并且有些线程的空闲时间超过了这个值,会执行关闭这些线程的操作

rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况,默认有抛出 RejectedExecutionException 异常、忽略任务、使用提交任务的线程来执行此任务和将队列中等待最久的任务删除,然后提交此任务这四种策略,默认为抛出异常。

线程池中的线程创建时机

如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;

如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。

什么时候会执行拒绝策略

workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。

workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。

任务执行过程中发生异常

如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务,然后会启动一个新的线程来代替它

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注