源码解读---线程池源码探秘

惟其如此,失望和孤单的时候,我才可以不掉眼泪,不起波动,微笑告诉自己,不是你对我不好,而是爱情本来就是虚妄的,它曾经有多热烈,也就有多寂寞

Posted by yishuifengxiao on 2021-02-06

一 基本概念

线程池解决两个不同的问题:由于每个任务的调用开销减少,它们通常在执行大量异步任务时提供改进的性能,并且它们提供了一种限制和管理资源(包括执行一个任务。 每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。

为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 然而,程序员被敦促使用更方便的Executors工厂方法Executors.newCachedThreadPool() (无限线程池,具有自动线程回收), Executors.newFixedThreadPool(int) (固定大小的线程池)和Executors.newSingleThreadExecutor() (单个后台线程),可以预先配置最常用的使用场景设置。 否则,手动配置和调优此类时,请使用以下指南:

1.1 Core and maximum pool sizes

ThreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。 当在方法execute(Runnable)中提交新任务且运行的线程数少于corePoolSize线程时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。 如果运行的线程数大于corePoolSize但小于maximumPoolSize,则仅在队列已满时才创建新线程。 通过将corePoolSize和maximumPoolSize设置为相同,可以创建固定大小的线程池。 通过将maximumPoolSize设置为一个本质上不受限制的值(例如Integer.MAX_VALUE),可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构造时设置,但也可以使用setCorePoolSize和setMaximumPoolSize动态更改。

corePoolSize : 除非设置allowCoreThreadTimeOut,否则核心池大小是保持活动状态(且不允许超时等)的最小工作线程数,在这种情况下,最小值为零。
maximumPoolSize : 最大池大小。 请注意,实际最大值在内部受“容量”限制。

1.2 On-demand construction(按需施工)

默认情况下,甚至只有在有新任务到达时才开始甚至启动核心线程,但是可以使用prestartCoreThread或prestartAllCoreThreads方法动态地覆盖它。 如果使用非空队列构造池,则可能要预启动线程

1.3 Creating new threads (创建新线程)

使用ThreadFactory创建新线程。 如果没有另外指定,则使用Executors.defaultThreadFactory,该线程创建的线程全部位于同一ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护程序状态。 通过提供不同的ThreadFactory,可以更改线程的名称,线程组,优先级,守护程序状态等。如果ThreadFactory在通过从newThread返回null返回要求时未能创建线程,执行器将继续执行,但可能无法执行 执行任何任务。 线程应具有“ modifyThread” RuntimePermission。 如果使用该池的工作线程或其他线程不具有此许可权,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能会处于可能终止但未完成的状态

1.4 Keep-alive times

如果当前池中的线程数超过corePoolSize,则多余的线程将在空闲时间超过keepAliveTime时终止(请参阅getKeepAliveTime(TimeUnit))。 当不积极使用池时,这提供了减少资源消耗的方法。 如果池稍后变得更加活跃,则将构建新线程。 也可以使用setKeepAliveTime(long,TimeUnit)方法动态更改此参数。 使用Long.MAX_VALUE TimeUnit.NANOSECONDS值可以有效地使空闲线程永远不会在关闭之前终止。 默认情况下,仅当存在多个corePoolSize线程时,保持活动策略才适用。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)还可用于将此超时策略应用于核心线程。

1.5 Queuing (排队)

任何BlockingQueue均可用于传输和保留提交的任务。 此队列的使用与池大小交互:

  • 如果运行的线程数少于corePoolSize,则执行程序总是喜欢添加新线程,而不是排队。
  • 如果正在运行corePoolSize或更多线程,则执行程序总是更喜欢对请求进行排队,而不是添加新线程。
  • 如果无法将请求放入队列中,则将创建一个新线程,除非该线程超过了maximumPoolSize,在这种情况下,该任务将被拒绝。

有三种一般的排队策略:

  • 直接交接。工作队列的一个很好的默认选择是SynchronousQueue,它可以将任务移交给线程,而不必另外保留它们。在这里,如果没有立即可用的线程来运行任务,则尝试将任务排队将失败,因此将构造一个新线程。在处理可能具有内部依赖项的请求集时,此策略避免了锁定。直接切换通常需要无限制的maximumPoolSizes以避免拒绝新提交的任务。反过来,当平均而言,命令继续以比其处理速度更快的速度到达时,就可以实现无限线程增长的可能性。
  • 无限队列。使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将在所有corePoolSize线程繁忙时使新任务在队列中等待。因此,将仅创建corePoolSize线程。 (因此,maximumPoolSize的值没有任何作用。)当每个任务完全独立于其他任务时,这可能是适当的,因此任务不会影响彼此的执行。例如,在网页服务器中。尽管这种排队方式对于消除短暂的请求突发很有用,但它承认当命令平均继续以比处理速度更快的速度到达时,工作队列会无限增长。
  • 有界队列。与有限的maximumPoolSizes一起使用时,有界队列(例如ArrayBlockingQueue)有助于防止资源耗尽,但调优和控制起来会更加困难。队列大小和最大池大小可能会相互折衷:使用大队列和小池可以最大程度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为地降低吞吐量。如果任务频繁阻塞(例如,如果它们受I / O约束),则系统可能可以为您安排的线程调度的时间超出您的允许范围。使用小队列通常需要更大的池大小,这会使CPU繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。

1.6 Rejected tasks

当执行器关闭时,并且在执行器对最大线程和工作队列容量使用有限范围时,执行器将关闭在方法execute(Runnable)中提交的新任务。 无论哪种情况,execute方法都会调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(Runnable,ThreadPoolExecutor)方法。 提供了四个预定义的处理程序策略:

  • 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时会抛出运行时RejectedExecutionException。
  • 在ThreadPoolExecutor.CallerRunsPolicy中,调用执行自己的线程运行任务。 这提供了一种简单的反馈控制机制,它将降低新任务的提交速度。
  • 在ThreadPoolExecutor.DiscardPolicy中,简单地删除了无法执行的任务。
  • 在ThreadPoolExecutor.DiscardOldestPolicy中,如果未关闭执行程序,则将丢弃工作队列开头的任- 务,然后重试执行(该操作可能再次失败,导致重复执行此操作)。

可以定义和使用其他种类的RejectedExecutionHandler类。 这样做需要格外小心,尤其是在设计策略仅在特定容量或排队策略下工作时。

1.7 Hook methods

此类提供受保护的可重写的beforeExecute(Thread,Runnable)和afterExecute(Runnable,Throwable)方法,这些方法在执行每个任务之前和之后调用。 这些可以用来操纵执行环境。 例如,重新初始化ThreadLocals,收集统计信息或添加日志条目。 此外,一旦执行程序完全终止,可以终止方法终止以执行需要执行的任何特殊处理。
如果钩子或回调方法引发异常,内部工作线程可能进而失败并突然终止。

1.8 Queue maintenance

方法getQueue()允许访问工作队列,以进行监视和调试。 强烈建议不要将此方法用于任何其他目的。 当取消大量排队的任务时,可以使用提供的两种方法remove(Runnable)和purge来帮助回收存储。

1.9 Finalization

程序中不再引用且没有剩余线程的池将自动关闭。 如果即使在用户忘记调用shutdown的情况下也要确保收回未引用的池,则必须使用零核心线程的下限和/或设置allowCoreThreadTimeOut来设置适当的保活时间,以安排未使用的线程最终死掉 (布尔值)。

ThreadPoolExecutor

二 源码相关

2.1 核心属性

主池控制状态ctl是一个打包两个概念字段workerCount的原子整数,指示线程的有效数量runState,指示是否运行,关闭等。为了将它们打包为一个int,我们将workerCount限制为(2 ^ 29 )-1(约5亿)个线程,而不是(2 ^ 31)-1(20亿)可以表示的线程。如果将来有问题,可以将该变量更改为AtomicLong,并在下面调整shift / mask常数。但是在需要之前,使用int可以使此代码更快,更简单。 workerCount是已被允许启动但不允许停止的工人数。该值可能与活动线程的实际数量暂时不同,例如,当ThreadFactory在被询问时未能创建线程,并且退出线程仍在终止之前执行簿记操作时,该值会有所不同。用户可见的池大小报告为工作集的当前大小。 runState提供主要的生命周期控制,并具有以下值:RUNNING:接受新任务并处理排队的任务SHUTDOWN:不接受新任务,但处理排队的任务STOP:不接受新任务,不处理排队的任务,并中断进行中的任务TIDYING:所有任务都已终止,workerCount为零,线程转换到TIDYING状态将运行Terminated()挂钩方法TERMINATED:terminald()已完成这些值之间的数值顺序很重要,可以进行有序比较。 runState随时间单调增加,但不必达到每个状态。转换为:RUNNING-> SHUTDOWN在调用shutdown()时,可能隐式在finalize()中(RUNNING或SHUTDOWN)-> STOP在调用shutdownNow()时SHUTDOWN-> TIDYING当队列和池都为空时STOP-> TIDYING当池为空时TIDYING-> TERMINATED当Terminated()挂钩方法完成时,状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。检测从SHUTDOWN到TIDYING的转换并不像您想要的那样简单,因为在SHUTDOWN状态期间,队列在非空之后可能会变空,反之亦然,但是只有在看到它为空之后,我们看到workerCount为0(有时需要重新检查-参见下文)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private final ReentrantLock mainLock = new ReentrantLock();

/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();

/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize;

/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;

/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/

/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;

/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;

/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;

/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;

/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;

/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;

2.2 构造函数

image-20210205112340109

默认情况下,被拒绝的任务的处理程序为 AbortPolicy ,即 被拒绝的任务的处理程序,抛出一个 RejectedExecutionException

在使用构造函数时,可能会在以下情况时产生运行时异常

IllegalArgumentException

corePoolSize < 0 keepAliveTime < 0 maximumPoolSize <= 0 maximumPoolSize < corePoolSize

NullPointerException

workQueue is null

在构造函数中使用到的队列的情况如下

image-20210205160609234

队列还支持以下操作:在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用。
BlockingQueue方法有四种形式,它们以不同的方式处理操作,这些方法无法立即满足,但将来可能会满足:一种抛出异常,第二种返回特殊值(null或false,具体取决于 操作),第三个块将无限期地阻塞当前线程,直到操作成功为止;第四个块仅在给定的最大时间限制内放弃。 下表总结了这些方法:

image-20210205160810150

BlockingQueue不接受空元素。在尝试添加,放置或提供null时,实现会引发NullPointerException。空值用作标记值,以指示轮询操作失败。
BlockingQueue可能受容量限制。在任何给定时间,它可能具有剩余容量,超过该容量就不能放置其他元素而不会阻塞。没有任何内部容量约束的BlockingQueue始终报告Integer.MAX_VALUE的剩余容量。
BlockingQueue实现被设计为主要用于生产者-消费者队列,但另外还支持Collection接口。因此,例如,可以使用remove(x)从队列中删除任意元素。但是,这样的操作通常不能非常有效地执行,并且仅用于偶尔的使用,例如在取消排队的消息时。
BlockingQueue实现是线程安全的。所有排队方法都是使用内部锁或其他形式的并发控制来原子地实现其效果的。但是,除非在实现中另行指定,否则批量Collection操作addAll,containsAll,retainAll和removeAll不一定是原子执行的。因此,例如,仅在c中添加一些元素之后,addAll(c)可能会失败(引发异常)。
BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流尾对象或有毒对象,当消费者采取这种方法时会对其进行相应的解释。
使用示例,基于典型的生产者-消费者方案。请注意,BlockingQueue可以安全地与多个生产者和多个消费者一起使用。

2.3 核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

由此可见最终执行的都是 execute 方法,该方法的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

添加一个任务主要分为以下三步

  • 1.如果正在运行的线程少于corePoolSize线程,请尝试执行以下操作:使用给定的命令作为第一个启动新线程任务。 对addWorker的调用自动检查runState和workerCount,这样可以防止假警报增加通过返回false返回不应该执行的线程。
  • 2.如果任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加线程(因为现有的自上次检查后死亡)或自进入此方法以来,该池已关闭。 所以我们重新检查状态,并在必要时回退排队停止,如果没有,则启动一个新线程。
  • 3.如果我们无法将任务排队,那么我们尝试添加一个新的线。 如果失败,我们知道我们已经关闭或饱和,因此拒绝任务。

在将来的某个时间执行给定的任务。 该任务可以在新线程或现有池线程中执行。 如果由于该执行器已关闭或已达到其能力而无法提交执行任务,则该任务由当前的RejectedExecutionHandler处理。

实际上添加任务的方法的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
*
* @param firstTask 新线程应首先运行的任务(如果没有,则为null)。
* 当线程数少于corePoolSize线程(在这种情况下,我们始终启动一个线程)
* 或队列已满(在这种情况下,我们必须绕过队列)时,
* 使用初始的第一个任务(在execute()方法中)创建工作程序以绕过队列。
* 最初空闲线程通常是通过prestartCoreThread创建的,或者用来替代其他垂死的工作线程。
*
* @param core 如果为true,请使用corePoolSize作为绑定,
* 否则使用maximumPoolSize。
* (此处使用布尔值指示符而不是值,以确保在检查其他池状态后读取新值)。
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);

//状态判断
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;

for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null;
try {


w = new ThreadPoolExecutor.Worker(firstTask);


final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

该部分代码主要目的是检查是否可以针对当前池状态和给定的边界(核心或最大值)添加新的工作程序。 如果是这样,则会相应地调整工作程序计数,并且如果可能,会创建并启动一个新的工作程序,并运行firstTask作为其第一个任务。 如果池已停止或有资格关闭,则此方法返回false。 如果在询问时线程工厂无法创建线程,则它还会返回false。 如果线程创建失败(由于线程工厂返回null或由于异常(通常是Thread.start()中的OutOfMemoryError)),我们将进行干净的回滚。

在添加失败时会触发拒绝策略,改部分代码如下

1
2
3
4
5
6
7
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

三 实践

3.1 任务时间小于Keep-alive times

3.1.1 超过最大线程数

本例子中,核心线程数为1,最大线程是2,队列的最大容量为10,尝试添加5个线程

1
2
3
4
5
6
7
8
public class ThreadDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100, TimeUnit.HOURS, new ArrayBlockingQueue(10));
for (int i = 0; i < 5; i++) {
executor.submit(() -> System.out.println(System.currentTimeMillis()));
}
}
}

运行上述程序,程序正常,得到以下的结果

1
2
3
4
5
6
7
8
9
10
1612506856883
1612506856883
1612506856883
1612506856883
1612506856883
1612506856883
1612506856883
1612506856883
1612506856883
1612506856883

【注意】代码运行完成之后,此进程未关闭

3.1.2 超过队列容量

本例子中,核心线程数为1,最大线程是2,队列的最大容量为5,尝试添加10个线程

1
2
3
4
5
6
7
8
public class ThreadDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100, TimeUnit.HOURS, new ArrayBlockingQueue(5));
for (int i = 0; i < 10; i++) {
executor.submit(() -> System.out.println(System.currentTimeMillis()));
}
}
}

运行程序,得到以下结果

1
2
3
4
5
6
7
8
9
10
11
12
13
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@335eadca rejected from java.util.concurrent.ThreadPoolExecutor@6ddf90b0[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
1612507255424
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
1612507255424
1612507255424 at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.yishuifengxiao.common.logcenter.ThreadDemo.main(ThreadDemo.java:16)

1612507255424
1612507255424
1612507255424
1612507255424

再次运行程序

1
2
3
4
5
6
7
8
9
10
11
12
13
Exception in thread "main" 1612507356405
1612507356405
1612507356405
1612507356405
1612507356405
1612507356405
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@335eadca rejected from java.util.concurrent.ThreadPoolExecutor@6ddf90b0[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.yishuifengxiao.common.logcenter.ThreadDemo.main(ThreadDemo.java:16)
1612507356406

可以看得出,两次运行的结果报错的位置不一样,但是两次运行均失败,在运行过程中发生了异常情况

3.1.3 结论

通过上述实例可知,在任务时间小于Keep-alive时,或者说Keep-alive时间为一个相对较大的值的时候,任务队列决定着最大可接收的任务的数量

3. 2任务时间大于Keep-alive times

3.2.1 超过最大线程数

本例子中,核心线程数为1,最大线程是2,队列的最大容量为10,尝试添加5个线程

其中每个线程里的任务完成时间需要4秒钟,Keep-alive times为3秒钟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue(10));
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
System.out.println(System.currentTimeMillis());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}

多次运行上述程序,程序运行正常

3.2.2 超过队列容量

本例子中,核心线程数为1,最大线程是2,队列的最大容量为5,尝试添加10个线程

其中每个线程里的任务完成时间需要4秒钟,Keep-alive times为3秒钟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ThreadDemo {

private static AtomicInteger num = new AtomicInteger(0);

public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue(5));
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println(System.currentTimeMillis() + " = " + num.incrementAndGet());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}

多次运行程序,其中数次的运行结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@335eadca rejected from java.util.concurrent.ThreadPoolExecutor@6ddf90b0[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
1612509295081 = 1
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.yishuifengxiao.common.logcenter.ThreadDemo.main(ThreadDemo.java:20)
1612509295081 = 2
1612509299081 = 3
1612509299082 = 4
1612509303082 = 5
1612509303083 = 6
1612509307082 = 7

另一次为

1
2
3
4
5
6
7
8
9
10
11
12
13
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@335eadca rejected from java.util.concurrent.ThreadPoolExecutor@6ddf90b0[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.yishuifengxiao.common.logcenter.ThreadDemo.main(ThreadDemo.java:20)
1612509412270 = 1
1612509412270 = 2
1612509416271 = 3
1612509416272 = 4
1612509420272 = 5
1612509420273 = 6
1612509424272 = 7

可以看到,两次执行时均有异常发生,并且任务未全部执行完成。