java线程池原理之ThreadPoolExecutor源码解析

起因

一直听说java的线程池能减少线程创建的开销,是多线程编程中的一把利器。但是,具体线程池是个啥东东,为啥这么牛逼,又是怎么做到的减少开销,一直都是处于知其然不知其所以然的状态。最近正好面试复习这些知识,干脆打算彻底搞懂它。

总结

哈哈,上来先总结一句,装装逼。ThreadPoolExecutor是java线程池的核心类,对其进行分析还是挺长的,先总结一句,能有个整体的把握。一句话来概述,线程池就是用一堆包装住Thread的Wroker类的集合,在里面有条件的进行着死循环,从而可以不断接受任务来进行。

问题

先抛出几个问题,带着问题进行分析会更有目的性。

  • 问题一:线程池的池子是啥,里边放了什么
  • 问题二:线程池能减少开销,它是怎么做到的
  • 问题三:线程池会自动往池子里创建线程,它会自动把线程释放掉吗?如果能,是怎么做到的;如果不能,它不怕溢出吗
  • 问题四:线程池怎么使用,java给除了怎样的工具集。

解析

线程

首先,java为多线程提供了Thread和Runnable(还有个能有返回值的Callable),Runnable和Callable的真正执行还是Thread。Thread不外就是两个方法start和run。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Causes this thread to begin execution; the Java Virtual Machine
* calls the <code>run</code> method of this thread.
* <p>
* The result is that two threads are running concurrently: the
* current thread (which returns from the call to the
* <code>start</code> method) and the other thread (which executes its
* <code>run</code> method).
* <p>
* It is never legal to start a thread more than once.
* In particular, a thread may not be restarted once it has completed
* execution.
*
* @exception IllegalThreadStateException if the thread was already
* started.
* @see #run()
* @see #stop()
*/
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented.
*/
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();

从这个方法解释上看,start()这个方法,最终会交给VM 去执行run()方法,所以一般情况下,我们在随便一个线程上执行start(),里面的run()操作都会交给VM 去执行。

第一部分:ThreadPoolExecutor的继承结构

根据源码可以知道,ThreadPoolExecutor是继承的AbstractExecutorService(抽象类)。再来看一下AbstractExecutorService的结构可以发现,AbstractExecutorService实现了ExecutorService,并且ExecutorService继承Executor接口。

如下是Executor和ExecutorService接口中一些方法:

1
2
3
public interface Executor {
void execute(Runnable command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

可以简单总结一下:Executor这个接口只有一个方法execute(Runnable command)(以Command Pattern(命令模式)的设计模式实现);在ExecutorService接口中一部分是和执行器生命周期相关的方法,而另一部分则是以各种方式提交要执行的任务的方法。像submit()就是提交任务的一个方法,在实现中做了适配的工作,无论参数是Runnable还是Callable,执行器都会正确执行。ExecutorService中,和生命周期相关的,声明了5个方法:

  • awaitTermination() 阻塞等待shutdown请求后所有线程终止,会有时间参数,超时和中断也会令方法调用结束
  • isShutdown() 通过ctl属性判断当前的状态是否不是RUNNING状态
  • isTerminated() 通过ctl属性判断当前的状态是否为TERMINATED状态
  • shutdown() 关闭Executor,不再接受提交任务
  • shutdownNow() 关闭Executor,不再接受提交任务,并且不再执行入队列中的任务

那么再来看一下AbstractExecutorService,这个类是ExecutorService的一个抽象实现。其中,提交任务的各类方法已经给出了十分完整的实现。之所以抽象,是因为和执行器本身生命周期相关的方法在此类中并未给出任何实现,需要子类扩展完善(模板方法设计模式)拿一个submit方法出来分析一下:

1
2
3
4
5
6
7
8
9
10
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

从代码可以看出实际上用到的是RunnableFuture的实现类FutureTask。但最终还是调用了execute()方法,在子类中实现。
在正式进入ThreadPoolExecutor源码分析之前还需要补充一点的是:Executors(工厂方法设计模式)
java.util.concurrent.Executors是个工具类,提供了很多静态的工具方法。其中很多对于执行器来说就是初始化构建用的工厂方法。

  • 重载实现的newFixedThreadPool()
  • 重载实现的newSingleThreadExecutor()
  • 重载实现的newCachedThreadPool()
  • 重载实现的newSingleThreadScheduledExecutor()
  • 重载实现的newScheduledThreadPool()

这些方法返回的ExecutorService对象最终都是由ThreadPoolExecutor实现的,根据不同的需求以不同的参数配置,或经过其它类包装。其中,Executors中的一些内部类就是用来做包装用的。Executors类中还有静态的defaultThreadFactory()方法,当然也可以自己实现自定义的ThreadFactory。

第二部分:ThreadPoolExecutor源码分析

下面正式进入ThreadPoolExecutor:(按照程序运行顺序分析)

1、ThreadPoolExecutor的全参数构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

根据注释:

  • corePoolSize 是线程池的核心线程数,通常线程池会维持这个线程数
  • maximumPoolSize 是线程池所能维持的最大线程数
  • keepAliveTime 和 unit 则分别是超额(空闲)线程的空闲存活时间数和时间单位
  • workQueue 是提交任务到线程池的入队列
  • threadFactory 是线程池创建新线程的线程构造器
  • handler 是当线程池不能接受提交任务的时候的处理策略

2、execute方法提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

通过注释:提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。同时判断目前线程的状态是不是RUNNING其他线程有可能调用了shutdown()或shutdownNow()方法,关闭线程池,导致目前线程的状态不是RUNNING。在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。

3、addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

第一部分:第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。第二部分:从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。在看runWorker()之前先看一下Worker对象。

4、Worker对象

Worker是真正的任务,是由任务执行线程完成,它是ThreadPoolExecutor的核心。每个线程池中,有为数不等的Worker对象,每个Worker对象中,包含一个需要立即执行的新任务和已经执行完成的任务数量,Worker本身,是一个Runnable对象,不是Thread对象它内部封装一个Thread对象,用此对象执行本身的run方法,而这个Thread对象则由ThreadPoolExecutor提供的ThreadFactory对象创建新的线程。(将Worker和Thread分离的好处是,如果我们的业务代码,需要对于线程池中的线程,赋予优先级、线程名称、线程执行策略等其他控制时,可以实现自己的ThreadFactory进行扩展,无需继承或改写ThreadPoolExecutor。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

5、runWorker()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

根据代码顺序看下来,其实很简单。

  • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
  • 在任务执行前后,执行beforeExecute()和afterExecute()方法
  • 记录任务执行中的异常后,继续抛出
  • 每个任务完成后,会记录当前线程完成的任务数
  • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
  • 线程退出,执行processWorkerExit(w, completedAbruptly)处理

接下来看一下getTask()是怎样实现空闲线程复用的

6、getTask()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。

其中:

1
2
3
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

即判断当前线程池的线程数是否超出corePoolSize,如果超出这个值并且空闲时间多于keepAlive则当前线程退出。另外一种情况就是allowCoreThreadTimeOut为true,就是允许核心在空闲超时的情况下停掉。最后再来看一下线程池线程数的维护和线程的退出处理。

7、processWorkerExit()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

这个方法最主要就是从workers的Set中remove掉一个多余的线程。这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行。当前如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1 ;如果是正常退出则加锁统计完成的任务数,并从workers这个集合中移除当前worker。执行tryTerminate(),这个方法后面会介绍,主要就是尝试将线程池推向TERMINATED状态。最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。

8、tryTerminate()方法

tryTerminate()的意义就在于尝试进入终止状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

当ctl中worker数字为0时执行terminated()方法,否则等锁中断一个空闲的Worker,其中interruptIdleWorkers()就是来中断线程的。空闲的worker主要是通过worker的tryLock()来确认的,因为执行任务的worker互斥地锁定对象。中断worker导致线程退出,最终还会循环尝试终止其它的空闲线程,直到整个ThreadPoolExecutor最后终结。到此为止,从线程池的新建,提交任务,到结束,基本结束。

第三部分:ThreadPoolExecutor生命周期

下面来补充一下ThreadPoolExecutor生命周期中的一些重要方法的介绍:

1、ShutDown()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

尝试将状态切换到SHUTDOWN,这样就不会再接收新的任务提交。对空闲线程进行中断调用。最后检查线程池线程是否为0,并尝试切换到TERMINATED状态。

2、ShutDownNow()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

主要所做的事情就是切换ThreadPoolExecutor到STOP状态,中断所有worker,并将任务队列中的任务取出来,不再执行。最后尝试修改状态到TERMINATED。

shutdown()和shutdownNow()的区别:
shutdown()新的任务不会再被提交到线程池,但之前的都会依旧执行,通过中断方式停止空闲的(根据没有获取锁来确定)线程。
shutdownNow()则向所有正在执行的线程发出中断信号以尝试终止线程,并将工作队列中的任务以列表方式的结果返回。

  • 一个要将线程池推到SHUTDOWN状态,一个将推到STOP状态
  • 并且对运行的线程处理方式不同,shutdown()只中断空闲线程,而shutdownNow()会尝试中断所有活动线程
  • 还有就是对队列中的任务处理,shutdown()队列中已有任务会继续执行,而shutdownNow()会直接取出不被执行
  • 相同的是都在最后尝试将线程池推到TERMINATED状态。

3、awaitTermination()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}

阻塞等待shutdown请求后所有线程终止,会有时间参数,超时和中断也会令方法调用结束。实际所做的就是Condition的定时await调用。用于状态依赖的线程阻塞。

4、ThreadPoolExecutor生命周期的扩展点

在生命周期上,ThreadPoolExecutor为扩展的类提供了一些扩展点,这是很好的设计,对扩展开放。其中声明了如下protected的方法:

  • beforeExecute() 在每个任务执行前做的处理
  • afterExecute() 在每个任务执行后做的处理
  • terminated() 在ThreadPoolExecutor到达TERMINATED状态前所做的处理
  • finalize() 有默认实现,直接调用shutdown(),以保证线程池对象回收
  • onShutdown() 在shutdown()方法执行到最后时调用,在ScheduledThreadPoolExecutor类实现中用到了这个
  • 扩展点,做一些任务队列的清理操作。

第四部分:ThreadPoolExecutor的丢弃–RejectedExecutionHandler

当ThreadPoolExecutor执行任务的时候,如果线程池的线程已经饱和,并且任务队列也已满。那么就会做丢弃处理,这也是execute()方法实现中的操作,源码如下:

1
2
3
4
5
6
7
8
9
10
11
else if (!addWorker(command, false))
reject(command);
//方法调用
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
//RejectedExecutionHandler接口
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

RejectedExecutionHandler 其中只有rejectedExecution()一个方法。返回为void,而参数一个是具体的Runnable任务,另一个则是被提交任务的ThreadPoolExecutor。ThreadPoolExecutor给出了4种基本策略的实现。分别是:

  • CallerRunsPolicy
  • AbortPolicy
  • DiscardPolicy
  • DiscardOldestPolicy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

根据源码可知:

  1. 调用者执行策略(CallerRunsPolicy)
    在这个策略实现中,任务还是会被执行,但线程池中不会开辟新线程,而是提交任务的线程来负责维护任务。会先判断ThreadPoolExecutor对象的状态,之后执行任务。这样处理的一个好处,是让caller线程运行任务,以推迟该线程进一步提交新任务有效的缓解了线程池对象饱和的情况。

  2. 废弃终止(AbortPolicy)
    不处理,而是抛出java.util.concurrent.RejectedExecutionException异常。
    注意,处理这个异常的线程是执行execute()的调用者线程。

  3. 直接丢弃(DiscardPolicy)
    这个也是实现最简单的类,其中的rejectedExecution()方法是空实现,即什么也不做,那么提交的任务将会被丢弃,而不做任何处理

  4. 丢弃最老(DiscardPolicy)
    会丢弃掉一个任务,但是是队列中最早的。注意,会先判断ThreadPoolExecutor对象是否已经进入SHUTDOWN以后的状态。之后取出队列头的任务并不做任何处理,即丢弃,再重新调用execute()方法提交新任务。

回答问题

此时,我们就可以回答之前提出的问题了。

  • 问题一:线程池的池子是啥,里边放了什么
    池子其实就是HashSet,每个Worker包装了一个Thread和一个Runnable任务,由thread来执行runnable任务。
  • 问题二:线程池能减少开销,它是怎么做到的
    线程池减少开销的原理其实就是减少线程创建和销毁的开销。在线程池中有常驻线程,这些线程会去阻塞队列中不断请求新任务。当我们创建一个新任务的时候,可以直接丢到阻塞队列中,等线程有时间的时候就会来执行这个任务,而不用每次都创建新的线程。所以线程池其实比较适合需要经常执行不同临时任务的场景,比如网络请求,用户不是每时每刻都在与服务器交互,但是用户量非常大,请求的特点就是短暂而大量。
  • 问题三:线程池会自动往池子里创建线程,它会自动把线程释放掉吗?如果能,是怎么做到的;如果不能,它不怕溢出吗
    这个看线程池的配置,如果是配置了超时时间,那么一个线程在阻塞队列等待超过一定时间之后就会自己退出,这部分逻辑在runWorker()的getTask()部分。如果线程是计时的,在队列中取任务是用的poll(),这个会在超时之后返回null,然后把timedout设置为true,这样就会在返回到runWorker()方法的时候退出循环,执行processWorkerExit方法来结束当前线程。
    另外,线程池会配置一个corePoolSize,代表线程池中的核心线程个数。一般来说,核心线程一般不是可以计时的,可以常驻。核心线程在阻塞队列中等待任务的时候使用的方法是take()。
  • 问题四:线程池怎么使用,java给除了怎样的工具集。
    给一个demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public class Test {
    public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(5));
    for(int i=0;i<15;i++){
    MyTask myTask = new MyTask(i);
    executor.execute(myTask);
    System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
    executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
    }
    executor.shutdown();
    }
    }
    class MyTask implements Runnable {
    private int taskNum;
    public MyTask(int num) {
    this.taskNum = num;
    }
    @Override
    public void run() {
    System.out.println("正在执行task "+taskNum);
    try {
    Thread.currentThread().sleep(4000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("task "+taskNum+"执行完毕");
    }
    }

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕