Java 之线程池使用详解

大纲

线程池的介绍

线程池的作用

  • 限定线程的个数,避免由于线程过多导致系统运行缓慢或崩溃。
  • 线程池不需要每次都去创建或销毁线程,节约了资源。
  • 线程池不需要每次都去创建线程,响应时间更快。

线程池的组成

线程池是一种线程管理的机制,用于提高多线程任务处理的效率和性能。它由线程池管理器、工作队列和一组工作线程组成。

  • 线程池管理器:负责创建、管理和调度线程池中的线程。它根据需要动态地创建或销毁线程,并分配任务给空闲的线程。
  • 工作队列:用于存储待执行的任务。线程池中的线程从工作队列中取出任务进行处理。当工作队列已满时,新提交的任务可能会被拒绝或者等待一段时间。
  • 工作线程:线程池中的实际执行单元。它们循环地从工作队列中取出任务执行,并在任务执行完毕后返回线程池等待下一次任务。

线程池的优缺点

线程池的优点

  • 降低资源消耗:减少了线程的创建和销毁频率,降低了系统开销。
  • 提高响应速度:线程池中的线程通常是预先创建好的,可以立即处理任务,避免了线程创建的延迟。
  • 提高系统稳定性:通过控制线程的数量,避免了系统资源被耗尽的风险,防止系统因过度并发而崩溃。
  • 提高可管理性:通过线程池管理器,可以方便地监控、调整线程池的大小和任务执行情况。

线程池广泛应用于各种多线程任务处理的场景,如网络服务器、数据库连接池、图像处理等。

线程池的缺点

  • 资源占用:线程池在运行过程中会占用一定的系统资源,包括内存和 CPU 资源。如果线程池的大小设置过大,可能会消耗过多的系统资源,影响其他程序的正常运行。
  • 调优难度:确定线程池的大小和配置参数需要一定的经验和调试,不同的应用场景可能需要不同的配置,而这种调优过程可能比较繁琐。
  • 任务拥堵:如果任务提交速度过快,超过了线程池的处理能力,会导致任务在工作队列中排队等待执行,可能造成任务响应时间延长或者任务被拒绝执行。
  • 任务依赖性:线程池中的线程都是独立的执行单元,无法直接控制线程间的依赖关系。如果有一些任务之间存在依赖关系,可能需要额外的同步机制来处理。
  • 线程泄漏:如果线程池中的线程没有适时地释放,可能会导致线程资源的泄漏,进而导致系统资源的浪费或者系统稳定性的下降。

综上所述,虽然线程池可以提高多线程任务处理的效率和性能,但在使用时仍需注意合理配置线程池大小和参数,并且需要注意任务提交的速度,以免出现资源浪费或性能下降的情况。

线程池的核心组件

线程池主要由以下几个核心组件组成:

  • 线程池管理器(ThreadPoolExecutor):用于创建和管理线程池,包括创建线程、销毁线程和添加新任务。
  • 线程工厂(ThreadFactory):用于创建新线程,用户可以通过自定义 ThreadFactory 来定义线程的属性(如是否是守护线程、线程的优先级等)。
  • 任务队列(BlockingQueue):用于存放等待执行的任务,可以是有界或者无界的队列。
  • 任务拒绝策略(RejectedExecutionHandler):当任务太多而无法处理时,线程池采取的处理策略。

线程池的核心参数

线程池一共有 7 个核心参数,分别是:

  • corePoolSize:核心线程数,线程池中的常驻核心线程数
    • 在线程池创建后,池中的线程数通常等于 corePoolSize,当有请求任务过来,就会安排线程池中的线程去执行任务。
    • 当线程池中的活动线程数量达到 corePoolSize 后,就会把到达的请求任务放到工作队列中等待。
    • 在没有任务执行时,核心线程会一直存活在线程池中,即使是处于空闲状态,核心线程也不会被销毁。如果使用了无界队列,即使没有任务执行,核心线程也不会超时退出。
  • maximumPoolSize:最大线程数,线程池允许能够同时执行的最大线程数
    • 相当于扩容后的线程数,即这个线程池能容纳的最大线程数,此值必须大于等于 1。
    • 当线程池中的活动线程数量达到 corePoolSize,且工作队列已满时,会为新任务创建新的线程,直到达到最大线程数,超过最大线程数的任务将会根据预先设定的拒绝策略来处理。
  • keepAliveTime:空闲线程的存活时间
    • 当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程在被回收前等待新任务到来的最长时间。
    • 当空闲时间超过 keepAliveTime 时,多余的空闲线程会被销毁,直到只剩下 corePoolSize 个线程为止。
    • 在默认情况下,只有当线程池中的线程数量大于 corePoolSize 时,keepAliveTime 才会起作用。
  • unit:线程存活时间的单位
    • 指定 keepAliveTime 参数的时间单位,可以是秒、毫秒、分钟等。
  • workQueue:工作队列,存放被提交但未被执行的任务(待执行任务)
    • 线程池通过该队列来管理待执行的任务。
    • 常见的实现类包括 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue 等。
  • threadFactory:用于创建新线程的工厂
    • 生成线程池中工作线程的线程工厂,一般用默认的即可。
    • 可以通过该参数自定义线程的创建方式,如设置线程的名称、优先级等。
  • handler:拒绝策略
    • 当任务无法被线程池执行,通常是由于线程池已经关闭,或者超过了最大线程数(maximumPoolSize),并且队列已满时,如何来拒绝执行任务的策略。

线程池的拒绝策略

Java 线程池提供了 4 大拒绝策略,用于处理无法接受新任务的情况,四大拒绝策略都实现了 RejectedExecutionHandler 接口。

  • AbortPolicy
    • 这是默认拒绝策略,直接抛出 RejectedExcutionException 异常,以此通知调用者线程池无法接受新任务。
  • DiscardPolicy
    • 直接丢弃任务,不予任何处理也不抛出异常,如果允许任务丢失,这是一种较好的方案。
  • DiscardOldestPolicy
    • 丢弃工作队列中等待时间最长的任务(即最旧的任务),然后尝试再次提交这个新任务,将其加入到工作队列中。
  • CallerRunsPolicy
    • 既不会丢弃任务,也不会抛出异常,而是调用任务提交者的线程来执行这个新任务(即谁提交由谁来执行)。这样一来,提交任务的线程就会尝试去执行该任务,从而避免任务的丢失,并降低新任务的流量。

CallerRunsPolicy 策略详解

  • 上述提到的 CallerRunsPolicy 拒绝策略,如果线程池中的线程数达到最大线程数,工作队列也已经满了,并且任务提交者的线程也很忙,没时间去执行被拒绝的任务,那么线程池会怎么处理呢?
  • 如果采用 CallerRunsPolicy 拒绝策略,此时线程池中的线程数达到最大线程数,且工作队列也已经满了,这意味着没有空闲的线程来执行任务,并且工作队列也无法继续接收新的任务。在这种情况下,线程池会尝试调用任务提交者的线程来执行这个被拒绝的任务。然而,如果提交该任务的线程也忙于执行其他任务,没有空闲的时间去执行被拒绝的任务,那么线程池就会继续阻塞任务提交者,直到工作队列有空间或者有空闲线程可用来执行任务为止。

线程池的使用

线程池的核心类

JUC 包中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,ExecutorService、ThreadPoolExecutor、Executors(工具类)这几个核心类。

线程池的创建方式

可以通过 Executors 提供的 5 个静态方法来创建不同的线程池,如下:

  • newFixedThreadPool:创建一个拥有固定线程数的线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newCachedThreadPool:创建一个可扩容的线程池,如果线程池大小超过处理需要,可灵活回收空闲线程,若线程无可回收,则创建线程。
  • newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行。
  • newScheduledThreadPool:创建一个拥有固定线程数的线程池,支持定时及周期性任务的执行。
  • newWorkStealingPool:创建持有足够的线程的线程池来支持给定的并行级别,是 Java 8 新增的 API。

重点面试题

线程池常用的创建方式有三种:固定数量的、单一线程的、可扩容的,那么在实际开发中,应该使用哪种方式?

在生产环境中,三种方式都不使用,而是使用自己自定义的。那为什么不用 JDK 中 Executors 提供的呢?这里总结了一些不推荐使用默认 Executors 创建线程池的原因:

  • 无界队列:默认情况下,Executors 工厂方法创建的线程池使用的是无界队列。这意味着如果任务提交的速度超过了线程池处理任务的速度,那么队列会不断增长,最终可能导致内存耗尽或者 OutOfMemoryError。因此,在某些情况下,使用有界队列更安全,当任务队列已满时,可以根据需要采取适当的拒绝策略。
  • 线程生命周期管理:使用默认的 Executors 创建的线程池,线程的生命周期(如线程的创建、销毁等)可能由线程池自动管理,而这种自动管理可能不适合所有的应用场景。例如,在某些情况下,可能需要对线程的创建和销毁进行更精细的控制,以避免资源泄露或者其他问题。
  • 线程池的大小:默认的 Executors 创建的线程池大小可能不符合实际需求。例如,如果任务量非常大,但是线程池的大小较小,那么可能会导致任务排队等待执行,从而影响系统的性能。因此,在实际应用中,通常需要根据实际情况来调整线程池的大小。

线程池的使用案例

使用案例一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadPoolDemo1 {

public static void main(String[] args) {
// 创建一个拥有固定线程数的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);

try {
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " execute job " + index);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-1 execute job 1
pool-1-thread-2 execute job 2
pool-1-thread-3 execute job 3
pool-1-thread-4 execute job 4
pool-1-thread-5 execute job 5
pool-1-thread-4 execute job 9
pool-1-thread-3 execute job 8
pool-1-thread-2 execute job 7
pool-1-thread-1 execute job 6
pool-1-thread-5 execute job 10

使用案例二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadPoolDemo2 {

public static void main(String[] args) {
// 创建一个只有 1 个线程的单线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();

try {
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " execute job " + index);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-1 execute job 1
pool-1-thread-1 execute job 2
pool-1-thread-1 execute job 3
pool-1-thread-1 execute job 4
pool-1-thread-1 execute job 5
pool-1-thread-1 execute job 6
pool-1-thread-1 execute job 7
pool-1-thread-1 execute job 8
pool-1-thread-1 execute job 9
pool-1-thread-1 execute job 10

使用案例三

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadPoolDemo3 {

public static void main(String[] args) {
// 创建一个可扩容的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();

try {
Random random = new Random();
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.execute(() -> {
// 模拟业务延迟
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " execute job " + index);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-7 execute job 7
pool-1-thread-6 execute job 6
pool-1-thread-3 execute job 3
pool-1-thread-9 execute job 9
pool-1-thread-8 execute job 8
pool-1-thread-1 execute job 1
pool-1-thread-2 execute job 2
pool-1-thread-4 execute job 4
pool-1-thread-10 execute job 10
pool-1-thread-5 execute job 5

线程池如何正确关闭

线程池关闭的 API

线程池关闭有两个 API:

  • threadPool.shutdown()
  • threadPool.shutdownNow()

执行 shutdown() 或者 shutdownNow() 方法之后,都将会影响任务的执行状态,比如:

  • (1) 未提交的任务,此时任务可以被提交到线程池。
  • (2) 已提交未执行的任务,此时任务已在线程池的工作队列中,等待着执行。
  • (3) 执行中的任务,此时任务正在执行。
  • (4) 任务执行完毕
shutdown () 方法

在 JDK 中,线程池的 shutdown() 方法的源码注释如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
void shutdown();

大概的意思是,调用 shutdown() 方法后,会将线程池状态置为 SHUTDOWN,并且会启动一个有序的关闭过程。在这个关闭过程中,线程池里等候待执行的任务和正在执行的任务都会继续执行,但不会再接受新的任务。如果系统已经关闭了,那么再次调用该方法将不会产生额外的效果。结合现实例子,比如银行下午 5 点关门,那么在下午 5 点 之前进去的顾客(Task),已经受理的可以办完业务才离开;下午 5 点之后,不再接受新顾客进来,只能明日请早。

  • shutdown() 方法的特性总结
    • 在方法调用后,不会再接受新的任务。
    • 在方法调用后,等候待执行的任务和正在执行的任务都会继续执行。
    • 如果线程池已关闭,再次调用方法不会产生任何影响。

验证 shutdown() 方法的使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ThreadPoolDemo {

public static void main(String[] args) {
shutdownTest();
}

/**
* 第 6 个任务开始及之后的任务都被拒绝了,1 ~ 5 号任务正常执行。
* 所以 shutdown() 方法会将线程池状态设置为 SHUTDOWN,但是线程池并不会立即停止,要等正在执行和队列里等待的任务执行完才会停止。
*/
private static void shutdownTest() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 提交 10 个任务,在第 5 个任务提交完,准备提交第 6 个的时候执行 shutdown
for (int i = 1; i <= 10; i++) {
System.out.println("第 " + i + " 次提交");
threadPool.execute(new Task(i));
// i 等于 5 的时候 shutdown,意味着从第 6 次开始就不能提交新任务
if (i == 5) {
threadPool.shutdown();
}
}
}

private static class Task implements Runnable {

@Getter
String name = "";

public Task(int i) {
name = "task-" + i;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("sleep completed, " + getName());
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("interrupted, " + getName());
return;
}
System.out.println(getName() + " finished");
System.out.println();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
第 1 次提交
第 2 次提交
第 3 次提交
第 4 次提交
第 5 次提交
第 6 次提交

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.java.interview.pool.ThreadPoolDemo$Task@5e5792a0 rejected from java.util.concurrent.ThreadPoolExecutor@26653222[Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
at com.java.interview.pool.ThreadPoolDemo.shutdownTest(ThreadPoolDemo.java:23)
at com.java.interview.pool.ThreadPoolDemo.main(ThreadPoolDemo.java:15)

sleep completed, task-1
task-1 finished

sleep completed, task-2
task-2 finished

sleep completed, task-3
task-3 finished

sleep completed, task-4
task-4 finished

sleep completed, task-5
task-5 finished

从上面的输出结果可以看到,当调用 shutdown() 方法后,线程池里等候待执行的任务和正在执行的任务都会继续执行,但不会再接受新的任务。

shutdownNow () 方法

在 JDK 中,线程池的 shutdownNow() 方法的源码注释如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();

大概的意思是,调用 shutdownNow() 方法后,线程池会尝试停止所有正在执行的任务(仅仅是做尝试,成功与否取决于任务是否响应 InterruptedException 异常,以及对其做出的反应),同时会中止处理等待执行的任务,并返回等待执行的任务列表。shutdownNow() 方法会将线程池状态置为 STOP,并试图让线程池立刻关闭,但不一定能保证立刻关闭。因为线程池是通过 interrupt() 方法去尝试停止正在执行的任务,所以无法响应 Interrupt 中断的任务可能不会被停止,要等所有正在执行的任务(不能被 Interrupt 中断的任务)执行完才能关闭线程池,也就是说该方法是无法保证一定能够停止正在执行的任务。

  • shutdownNow() 方法的特性总结
    • 在方法调用后,不会再接受新的任务。
    • 在方法调用后,会尝试停止所有正在执行的任务。
    • 在方法调用后,等待执行的任务会被取消,并返回等待任务的列表。
    • 在方法返回时,等待执行的任务将从工作队列中移除。

验证 shutdownNow() 方法的使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ThreadPoolDemo {

public static void main(String[] args) {
shutdownNowTest();
}

/**
* 调用 shutdownNow() 方法后,在第一个任务正在睡眠的时候,触发了 Interrupt 中断。
* 之前等待执行的任务 2 ~ 5 将从队列中移除并返回,之后的任务 6 ~ 10 被拒绝执行。
* shutdounNow() 方法会将线程池状态置为 STOP,试图让线程池立刻关闭,但不一定能保证立即关闭,要等所有正在执行的任务(不能被 Interrupt 中断的任务)执行完才能关闭。
*/
private static void shutdownNowTest() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// 提交 10 个任务,在第 5 个任务提交完,准备提交第 6 个的时候执行 shutdown
for (int i = 1; i <= 10; i++) {
try {
System.out.println("第 " + i + " 次提交");
threadPool.execute(new Task(i));
} catch (Exception e) {
System.out.println("rejected, task = " + i);
}
// i 等于 5 的时候 shutdown,意味着从第 6 次开始就不能提交新任务
if (i == 5) {
List<Runnable> tasks = threadPool.shutdownNow();
for (Runnable task : tasks) {
if (task instanceof Task) {
System.out.println("waiting task: " + ((Task) task).getName());
}
}
}
}
}

private static class Task implements Runnable {

@Getter
String name = "";

public Task(int i) {
name = "task-" + i;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("sleep completed, " + getName());
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("interrupted, " + getName());
return;
}
System.out.println(getName() + " finished");
System.out.println();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
第 1 次提交
第 2 次提交
第 3 次提交
第 4 次提交
第 5 次提交

java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at java.base/java.lang.Thread.sleep(Thread.java:339)
at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
at com.java.interview.pool.ThreadPoolDemo$Task.run(ThreadPoolDemo.java:73)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

waiting task: task-2
interrupted, task-1
waiting task: task-3
waiting task: task-4
waiting task: task-5
第 6 次提交
rejected, task = 6
第 7 次提交
rejected, task = 7
第 8 次提交
rejected, task = 8
第 9 次提交
rejected, task = 9
第 10 次提交
rejected, task = 10

从上面的输出结果可以看到,当调用 shutdownNow() 方法后,线程池会尝试停止所有正在执行的任务,同时会中止处理等待执行的任务,并返回等待执行的任务列表。

awaitTermination () 方法

在 JDK 中,线程池的 awaitTermination() 方法的源码注释如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

大概的意思是,在发出线程池关闭请求之后,会阻塞当前线程直到所有任务都执行完、或者发生超时、或者当前线程被中断,并且以这三者中先发生的情况为准。在关闭线程池时,如果需要等待任务执行完,请使用 awaitTermination() 方法。因为该方法带有超时参数,如果等待超时后,任务仍然未执行完毕,那么线程池就不会再等待。毕竟应用总归要停机重启的,不可能无限期地等待下去,因此超时机制是提供给用户的最后一道底线。

  • awaitTermination() 的特性总结
    • 在调用方法后,会阻塞当前线程,直到等待执行和正在执行的任务都执行完了,才解除当前线程的阻塞。
    • 当等待超过设置的时间,会检查线程池是否已经关闭。
    • 如果所有任务都执行完了,方法会返回 true
    • 如果在所有任务执行完之前等待超时了,方法会返回 false,并解除当前线程的阻塞。

验证 shutdown () + awaitTermination () 方法的使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ThreadPoolDemo {

public static void main(String[] args) {
shutdownAndAwaitTermination();
}

private static void shutdownAndAwaitTermination() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// 提交 10 个任务,在第 5 个任务提交完,准备提交第 6 个的时候执行 shutdown
for (int i = 1; i <= 10; i++) {
try {
System.out.println("第 " + i + " 次提交");
threadPool.execute(new Task(i));
} catch (Exception e) {
System.out.println("rejected, task = " + i);
}
// i 等于 5 的时候 shutdown,意味着从第 6 次开始就不能提交新任务
if (i == 5) {
threadPool.shutdown();
System.out.println();
}
}

try {
/**
* 调用 awaitTermination() 方法后,当线程池里的任务没执行完成,且还没到设定的超时时间,则会阻塞当前线程,也就是不会执行最下面的两行打印代码。
* 现在把等待时间设置为 4 秒,当达到设定的超时时间后,就不会再阻塞当前线程,直接打印最新下面的两行代码,并且返回了 false,表示线程池没有关闭。
* 有时候需要主线程等所有子线程执行完成后再运行,在所有任务提交后,调用 shutdown() 方法触发 awaitTermination() 时,会阻塞主线程;当所有子线程执行完成后,才会解除阻塞。
*/
boolean isStop = threadPool.awaitTermination(4, TimeUnit.SECONDS);
System.out.println("is pool stoped: " + isStop);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-----------------");
System.out.println(Thread.currentThread().getName() + "\t" + "all tests finished");
System.out.println();
}

private static class Task implements Runnable {

@Getter
String name = "";

public Task(int i) {
name = "task-" + i;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("sleep completed, " + getName());
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("interrupted, " + getName());
return;
}
System.out.println(getName() + " finished");
System.out.println();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
第 1 次提交
第 2 次提交
第 3 次提交
第 4 次提交
第 5 次提交

第 6 次提交
rejected, task = 6
第 7 次提交
rejected, task = 7
第 8 次提交
rejected, task = 8
第 9 次提交
rejected, task = 9
第 10 次提交
rejected, task = 10
sleep completed, task-1
task-1 finished

is pool stoped: false
-----------------
main all tests finished

sleep completed, task-2
task-2 finished

sleep completed, task-3
task-3 finished

sleep completed, task-4
task-4 finished

sleep completed, task-5
task-5 finished

从上面的输出结果可以看到,当调用 shutdown() 方法后,线程池里等候待执行的任务和正在执行的任务都会继续执行,但不会再接受新的任务。当调用 awaitTermination() 方法后,由于在所有任务执行完之前等待超时了,所以方法会返回 false,并解除当前线程的阻塞。

验证 shutdownNow () + awaitTermination () 方法的使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ThreadPoolDemo {

public static void main(String[] args) {
shutdownNow_awaitTermination_Test();
}

private static void shutdownNow_awaitTermination_Test() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// 提交 10 个任务,在第 5 个任务提交完,准备提交第 6 个的时候执行 shutdown
for (int i = 1; i <= 10; i++) {
try {
threadPool.execute(new Task(i));
} catch (Exception e) {
System.out.println("rejected, task-" + i);
}

// i 等于 5 的时候 shutdown,意味着从第 6 次开始就不能提交新任务
if (i == 5) {
List<Runnable> tasks = threadPool.shutdownNow();
for (Runnable task : tasks) {
if (task instanceof Task) {
System.out.println("waiting task: " + ((Task) task).getName());
}
}
}
}

try {
boolean isStop = threadPool.awaitTermination(4, TimeUnit.SECONDS);
System.out.println("is pool stoped: " + isStop);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----------------");
System.out.println(Thread.currentThread().getName() + "\t" + "all tests finished");
}

private static class Task implements Runnable {

@Getter
String name = "";

public Task(int i) {
name = "task-" + i;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("sleep completed, " + getName());
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("interrupted, " + getName());
return;
}
System.out.println(getName() + " finished");
System.out.println();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
第 1 次提交
第 2 次提交
第 3 次提交
第 4 次提交
第 5 次提交
waiting task: task-2
interrupted, task-1
waiting task: task-3
waiting task: task-4
waiting task: task-5
第 6 次提交
rejected, task-6
第 7 次提交
rejected, task-7
第 8 次提交
rejected, task-8
第 9 次提交
rejected, task-9

java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at java.base/java.lang.Thread.sleep(Thread.java:339)
at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
at com.java.interview.pool.ThreadPoolDemo$Task.run(ThreadPoolDemo.java:146)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

第 10 次提交
rejected, task-10
is pool stoped: true
-----------------
main all tests finished

从上面的输出结果可以看到,当调用 shutdownNow() 方法后,线程池会尝试停止所有正在执行的任务,同时会中止处理等待执行的任务,并返回等待执行的任务列表。当调用 awaitTermination() 方法后,由于所有任务都执行完了(包括任务被中断而停止的情况),所以方法会返回 true,并解除当前线程的阻塞。

线程池关闭的最佳实践

JDK 官方文档

JDK 官方文档 中,给出了线程池正确关闭的示例代码(如下所示)。以下方法分两个阶段关闭线程池,首先通过调用 shutdown() 方法,让线程池拒绝接受新的任务,然后在必要时调用 shutdownNow() 方法,以此停止所有正在执行的任务。

最佳实践理论

线程池正确关闭的关键点在于使用 shutdown() + awaitTermination() 方法,或者使用 shutdownNow() + awaitTermination() 方法。

最佳实践代码

最终参考 JDK 官网的示例代码,正确(优雅)关闭线程池的写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void shutdownAndAwaitTermination(ExecutorService threadPool) {
if (threadPool != null && !threadPool.isShutdown()) {
// 平滑关闭线程池
threadPool.shutdown();
try {
// 阻塞当前线程 60 秒,等候待执行的任务和正在执行的任务执行完成
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
// 等待超时后,立刻关闭线程池
threadPool.shutdownNow();

// 再次阻塞当前线程 60 秒,然后检查线程池是否已经关闭
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
System.out.println("Thread pool did not terminate");
}
}
} catch (Exception e) {
// 捕获到异常后,立刻关闭线程池
threadPool.shutdownNow();
// 捕获到异常后,中断当前线程
Thread.currentThread().interrupt();
}
}
}

线程池如何处理异常

重点面试题

线程池中的线程抛出了异常,如何处理?

任务提交的三种方式

往线程池提交任务时,有三种提交方式(如下图所示),这也导致了对异常的处理方式不一样。

第一种任务提交方式

当调用 Executor 的 execute() 方法提交任务时,如果任务在执行期间出错,那么默认会抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ThreadPoolDemo {

public static void main(String[] args) {
execute();
}

/**
* 调用 execute() 方法提交任务,默认会抛出异常
*/
private static void execute() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
// 调用 execute() 方法提交任务,会抛出异常
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "进入 execute() 方法 ---start");
for (int i = 1; i <= 4; i++) {
if (i == 3) {
int age = 10 / 0;
}
System.out.println("come in execute: " + i);
}
System.out.println(Thread.currentThread().getName() + "\t" + "进入 execute() 方法 ---end");
});
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
pool-1-thread-1	进入 execute() 方法 ---start
come in execute: 1
come in execute: 2
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.java.interview.pool.ThreadPoolDemo.lambda$execute$0(ThreadPoolDemo.java:29)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
第二种任务提交方式

当调用 Executor 的 submit() 方法提交任务时,如果任务在执行期间出错,那么默认吞掉异常(即不会抛出异常)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ThreadPoolDemo {

public static void main(String[] args) {
submit();
}

/**
* 调用 submit() 方法提交任务,默认会吞掉异常
*/
private static void submit() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
// 调用 submit() 方法提交任务,默认会吞掉异常
threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "进入 submit() 方法 ---start");
for (int i = 1; i <= 4; i++) {
if (i == 3) {
int age = 10 / 0;
}
System.out.println("come in execute: " + i);
}
System.out.println(Thread.currentThread().getName() + "\t" + "进入 submit() 方法 ---end");
});
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序执行的输出结果:

1
2
3
pool-1-thread-1	进入 submit() 方法 ---start
come in execute: 1
come in execute: 2
第三种任务提交方式

当调用 Executor 的 submit() 方法提交任务后,如果任务在执行期间出错了,并且通过调用 Future.get() 方法来获取任务的执行结果,则会抛出异常。特别注意,如果不调用 Future.get() 方法,异常默认会被吞掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadPoolDemo {

public static void main(String[] args) {
submitAndGet();
}

/**
* 调用 submit() 方法提交任务后,如果通过调用 Future.get() 方法来获取任务的执行结果,那么会抛出异常;如果不调用 `Future.get()` 方法,异常默认会被吞掉
*/
private static void submitAndGet() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
// 执行 submit() 方法后,如果通过调用 Future.get() 方法来获取任务的执行结果,那么会抛出异常,否则异常会被吞掉
Future<?> result = threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "进入 submit() 方法 ---start");
int age = 20 / 0;
System.out.println(Thread.currentThread().getName() + "\t" + "进入 submit() 方法 ---end");
});
// 如果没有这一行代码,异常会被吞掉
result.get();
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
pool-1-thread-1	进入 submit() 方法 ---start
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.java.interview.pool.ThreadPoolDemo.submitAndGet(ThreadPoolDemo.java:55)
at com.java.interview.pool.ThreadPoolDemo.main(ThreadPoolDemo.java:14)
Caused by: java.lang.ArithmeticException: / by zero
at com.java.interview.pool.ThreadPoolDemo.lambda$submitAndGet$1(ThreadPoolDemo.java:51)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

异常处理的最佳实践

最佳实践理论

由于线程池提交任务有多种方式,这也导致了对异常的处理方式不一样。为了统一处理线程池的异常,不能使用 Excutors 类来创建线程池,而是使用 ThreadPoolExecutor 类手动创建线程池,同时需要重写 ThreadPoolExecutor 类的 afterExecute() 方法来统一处理异常。在 JDK 中,afterExecute() 方法的源码注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadPoolExecutor extends AbstractExecutorService {

/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught {@code RuntimeException}
* or {@code Error} that caused execution to terminate abruptly.
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.afterExecute} at the
* beginning of this method.
*
* <p><b>Note:</b> When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* {@code submit}, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method. If you would like to trap both kinds of
* failures in this method, you can further probe for such cases,
* as in this sample subclass that prints either the direct cause
* or the underlying exception if a task has been aborted:
*
* <pre> {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
* // ...
* protected void afterExecute(Runnable r, Throwable t) {
* super.afterExecute(r, t);
* if (t == null
* && r instanceof Future<?>
* && ((Future<?>)r).isDone()) {
* try {
* Object result = ((Future<?>) r).get();
* } catch (CancellationException ce) {
* t = ce;
* } catch (ExecutionException ee) {
* t = ee.getCause();
* } catch (InterruptedException ie) {
* // ignore/reset
* Thread.currentThread().interrupt();
* }
* }
* if (t != null)
* System.out.println(t);
* }
* }}</pre>
*
* @param r the runnable that has completed
* @param t the exception that caused termination, or null if
* execution completed normally
*/
protected void afterExecute(Runnable r, Throwable t) { }

......
}
最佳实践代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Slf4j
public class ThreadPoolExceptionDemo {

/**
* 手动创建线程池,并统一处理异常的写法
*/
public ExecutorService createThreadPool() {
// 手动创建线程池
ExecutorService threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
30L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)) {

/**
* 重写 afterExecute() 方法,实现统一异常处理
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
// 判断是否调用 execute() 方法提交任务
if (throwable != null) {
log.error(throwable.getMessage(), throwable);
}
// 判断是否调用 submit() 方法提交任务
if (throwable == null && runnable instanceof Future<?>) {
try {
Future<?> future = (Future<?>) runnable;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
throwable = ce;
log.error(ce.getMessage(), ce);
} catch (ExecutionException ee) {
throwable = ee.getCause();
log.error(ee.getMessage(), ee);
} catch (InterruptedException ie) {
log.error(ie.getMessage(), ie);
Thread.currentThread().interrupt();
}
}
}
};
return threadPool;
}

}

代码使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadPoolDemo {

public static void main(String[] args) {
// 手动创建线程池
ExecutorService threadPool = createThreadPool();
try {
// 调用 submit() 方法提交任务,默认会吞掉异常,改写后可以抛出异常了
threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "进入 submit() 方法 ---start");
for (int i = 1; i <= 4; i++) {
if (i == 3) {
int age = 10 / 0;
}
System.out.println("come in execute: " + i);
}
System.out.println(Thread.currentThread().getName() + "\t" + "进入 submit() 方法 ---end");
});
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

......

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
pool-1-thread-1	进入 submit() 方法 ---start
come in execute: 1
come in execute: 2
16:11:12.103 [pool-1-thread-1] ERROR com.java.interview.pool.ThreadPoolDemo - java.lang.ArithmeticException: / by zero
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.java.interview.pool.ThreadPoolDemo$1.afterExecute(ThreadPoolDemo.java:113)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1129)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArithmeticException: / by zero
at com.java.interview.pool.ThreadPoolDemo.lambda$test02$1(ThreadPoolDemo.java:55)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
... 2 common frames omitted

线程池如何合理配置参数

重点面试题

生产环境中如何配置线程池的 corePoolSize 和 maximumPoolSize 参数?

首先需要弄清楚一点,业务系统是属于 CPU 密集型还是 I/O 密集型的。因为线程池的配置,是需要根据具体不同的业务来配置。

  • CPU 密集型

    • CPU 密集的意思是,该任务需要大量的运算,而且没有阻塞,CPU 一直全速运行
    • CPU 密集任务只有在真正的多核 CPU 上才可能得到加速执行(使用多线程),而在单核 CPU 上,无论启动几个模拟的多线程,该任务都不可能得到加速执行
    • CPU 密集型的任务,应该尽可能少的配置线程数量
    • 一般配置公式:CPU 核心数 + 1 个线程数
  • I/O 密集型

    • IO 密集型的意思是,该任务需要大量的 I/O 操作(网络、磁盘等),大部分线程都会被阻塞
    • 由于 I/O 密集型的任务线程并不是一直在执行任务,因此需要尽可能多配置线程数,如 CPU 核心数 * 2
    • 在单线程上执行 I/O 密集型的任务,会浪费大量的 CPU 运算能力,即花费大量时间在阻塞等待上;所以在 I/O 密集型任务中,使用多线程可以大大的加速程序的运行,这种加速主要就是利用了被浪费掉的阻塞时间
    • 参考配置公式:CPU 核心数 / (1 - 阻塞系数),阻塞系数在 0.8 ~ 0.9 左右

如果希望更精准地估算 Java 线程池的大小与队列长度,可以参考 这里 的教程。

Java 获取 CPU 核心数

在 Java 中,可以通过调用 Runtime.getRuntime().availableProcessors() 方法来获取 CPU 的核心数。

线程池的原理

线程池的工作原理

线程池底层是通过阻塞队列 + 线程实现的,当往线程池添加新任务时:

  • (1) 如果当前线程数小于核心线程数,会创建新线程来处理被添加的任务,即使线程池中的线程都处于空闲状态。
  • (2) 如果当前线程数大于等于核心线程数,并且工作队列未满,那么新任务会被放入工作队列中。
  • (3) 如果当前线程数大于等于核心线程数,工作队列已满,并且当前线程数小于最大线程数,那么会创建新线程来处理被添加的任务。
  • (4) 如果当前线程数大于等于核心线程数,工作队列已满,并且当前线程数等于最大线程数,那么会通过设定的任务拒绝策略来处理被添加的任务。
  • (5) 当前线程数大于核心线程数时,如果某个线程的空闲时间超过设定的线程存活时间(keepAliveTime),那么该线程将会被终止并从线程池中移除。

如何处理刚提交的新任务

当提交新任务时,线程池会按照以下流程进行处理:

  1. 任务接收: 当有新的任务提交给线程池时,线程池会接收这个任务。
  2. 核心线程处理: 首先,线程池会检查当前活动线程数量是否已达到核心线程数。如果未达到核心线程数,线程池会创建新的核心线程来处理这个任务。
  3. 任务队列: 如果当前活动线程数量已经达到设定的核心线程数,但任务队列未满,线程池会将任务放入任务队列中,等待核心线程去处理。任务队列通常是一个阻塞队列,可以保证线程池安全地添加和获取任务。
  4. 最大线程数判断: 如果任务队列已满,并且线程池中的线程数量未达到最大线程数,则线程池会创建新的非核心线程来处理任务。
  5. 拒绝策略: 如果线程池中的任务队列已经满了,并且线程数量也已经达到最大线程数,线程池会根据预先设定的拒绝策略来处理新提交的任务。常见的拒绝策略包括抛出异常、丢弃任务或者调用任务提交者的线程执行任务。
  6. 执行任务: 选中的线程(核心线程或新创建的线程)会执行任务。执行完成后,线程会返回线程池,可以被其他任务复用。
  7. 线程池状态管理: 线程池会根据任务的执行情况来管理自身的状态,例如线程池的活动线程数量、已完成任务数量、空闲线程的销毁等。

线程池的企业实战

大数据批处理任务工具类设计

企业真实业务场景

在双十一购物节来临之前,基于线程池一次性下发 100 万优惠卷 / 短信 / 邮件给用户,并且需要兼顾线程池参数可配置。

  • 高并发多线程大数据批处理任务工具类的设计思路
    • (1) 多个派发任务,使用单线程还是多线程?
    • (2) 多线程企业级使用,100% 会引入线程池,那问题来了,线程池应该怎么使用和配置?
    • (3) 如何保证不丢数据?怎么保证全部优惠卷都下发完成了?如何实现下发统计或者重试下发?
    • (4) 这次是下发优惠卷,下次是短信 / 邮件 / 验证码等,如何做到通用?

代码说明

这里将给出基于线程池实现批量发送优惠券的 SpringBoot 核心代码,完整的案例代码请从 GitHub 获取。

  • 线程池的配置参数,写在 application.properties 配置文件中
1
2
3
4
thread.pool.corePoolSize=16
thread.pool.maxPoolSize=32
thread.pool.queueCapacity=50
thread.pool.keepAliveSeconds=2
  • 线程池的参数类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Data
@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class ThreadPoolProperties {

/**
* 核心线程数
*/
private int corePoolSize;

/**
* 最大可创建的线程数
*/
private int maxPoolSize;

/**
* 工作队列的最大长度
*/
private int queueCapacity;

/**
* 线程池维护线程所允许的空闲时间
*/
private int keepAliveSeconds;

}
  • 线程池的配置类,这里推荐使用 Spring 的 ThreadPoolTaskExecutor,而不是 JUC 的 ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class ThreadPoolConfiguration {

@Resource
private ThreadPoolProperties threadPoolProperties;

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
// 这里使用 Spring 的 ThreadPoolTaskExecutor,而不是 JUC 的 ThreadPoolExecutor
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();

// 核心线程数
threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
// 最大可创建的线程数
threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
// 工作队列的最大长度
threadPool.setQueueCapacity(threadPoolProperties.getQueueCapacity());
// 线程池维护线程所允许的空闲时间
threadPool.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
// 异步方法内部线程的名称前缀
threadPool.setThreadNamePrefix("Spring 自定义线程池 - ");
// 线程池对拒绝任务的处理策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 任务都执行完成再关闭线程池
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 任务初始化
threadPool.initialize();

return threadPool;
}

}
  • 业务接口类
1
2
3
4
5
public interface CouponServcie {

public void batchTaskAction();

}
  • 业务实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
@Service
public class CouponServiceImpl implements CouponServcie {

// 下发优惠卷数量
public static final Integer COUPON_NUMBER = 50;

@Resource
private ThreadPoolTaskExecutor threadPool;

@Override
public void batchTaskAction() {
// 模拟要下发的优惠卷
List<String> coupons = getCoupons();

long startTime = System.currentTimeMillis();

// 执行批量发送任务
try {
TaskBatchSendUtils.send(coupons, threadPool, TaskBatchSendUtils::couponTask);
} catch (InterruptedException e) {
log.error(e.getMessage());
}

long endTime = System.currentTimeMillis();
System.out.println("任务处理完毕,总耗时: " + (endTime - startTime) + " 毫秒");
}

/**
* 模拟要下发的优惠卷
*/
private List<String> getCoupons() {
List<String> coupons = new ArrayList<>(COUPON_NUMBER);
for (int i = 1; i <= COUPON_NUMBER; i++) {
coupons.add("优惠卷-" + i);
}
return coupons;
}

}
  • 批量处理任务的工具类,使用 Consumer 类来保证工具类的通用性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class TaskBatchSendUtils {

public static <T> void send(List<T> taskList, Executor threadPool, Consumer<? super T> consumer)
throws InterruptedException {
if (taskList == null || taskList.size() == 0) {
return;
}

if (Objects.isNull(consumer)) {
return;
}

CountDownLatch countDownLatch = new CountDownLatch(taskList.size());

for (T task : taskList) {
threadPool.execute(() ->
{
try {
consumer.accept(task);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
countDownLatch.countDown();
}
});
}

countDownLatch.await();
}

public static void emailTask(String task) {
System.out.println(String.format("【%s】下发邮件成功", task));
}

public static void textMessageTask(String task) {
System.out.println(String.format("【%s】下发短信成功", task));
}

public static void couponTask(String task) {
System.out.println(String.format("【%s】下发优惠卷成功", task));
}

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
@RunWith(SpringRunner.class)
public class BatchTaskTest {

@Resource
private CouponServcie couponServcie;

@Test
public void sendCoupons2() {
couponServcie.batchTaskAction();
}

}

程序运行输出的结果如下:

1
2
3
4
5
6
7
【优惠卷-1】下发优惠卷成功
【优惠卷-4】下发优惠卷成功
【优惠卷-2】下发优惠卷成功

......

任务处理完毕,总耗时: 10 毫秒

异步编排和并行优化的使用

企业真实业务场景

查询某个总接口,比如用户信息接口(ConsumerInfo),该接口又需要调用多个其他远程接口(比如关注、粉丝、订单信息、浏览记录),也就是说该接口聚合了多个其他远程接口的返回结果。如何才能让用户信息接口的请求响应时间最短呢?

  • 异步编排和并行优化的设计思路
    • (1) 在用户信息接口内,串行调用多个其他远程接口,显然这种串行调用接口的方式性能很低,调用用户信息接口的总耗时为所有其他远程接口调用耗时之和。
    • (2) 在用户信息接口内,使用 CompletableFuture + 线程池来并发调用多个其他远程接口,这样调用用户信息接口的总耗时为耗时最长的那个远程接口调用。
      • 除了可以使用 CompletableFuture + 线程池来实现并发调用多个其他远程接口,还可以使用 Callable + FurtureTask + 线程池来实现,具体可以参考 这里 的案例代码。

代码说明

这里将给出基于 CompletableFuture + 线程池实现并发调用多个远程接口的 SpringBoot 核心代码,完整的案例代码请从 GitHub 获取。

  • 线程池的配置参数,写在 application.properties 配置文件中
1
2
3
4
thread.pool.corePoolSize=16
thread.pool.maxPoolSize=32
thread.pool.queueCapacity=50
thread.pool.keepAliveSeconds=2
  • 线程池的参数类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Data
@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class ThreadPoolProperties {

/**
* 核心线程数
*/
private int corePoolSize;

/**
* 最大可创建的线程数
*/
private int maxPoolSize;

/**
* 工作队列的最大长度
*/
private int queueCapacity;

/**
* 线程池维护线程所允许的空闲时间
*/
private int keepAliveSeconds;

}
  • 线程池的配置类,这里推荐使用 Spring 的 ThreadPoolTaskExecutor,而不是 JUC 的 ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class ThreadPoolConfiguration {

@Resource
private ThreadPoolProperties threadPoolProperties;

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
// 这里使用 Spring 的 ThreadPoolTaskExecutor,而不是 JUC 的 ThreadPoolExecutor
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();

// 核心线程数
threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
// 最大可创建的线程数
threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
// 工作队列的最大长度
threadPool.setQueueCapacity(threadPoolProperties.getQueueCapacity());
// 线程池维护线程所允许的空闲时间
threadPool.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
// 异步方法内部线程的名称前缀
threadPool.setThreadNamePrefix("Spring 自定义线程池 - ");
// 线程池对拒绝任务的处理策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 任务都执行完成再关闭线程池
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 任务初始化
threadPool.initialize();

return threadPool;
}

}
  • DTO 类
1
2
3
4
5
6
7
8
9
@Data
public class CustomerMixInfo {

private Long id;
private String name;
private Long score;
private String orderInfo;

}
  • 业务接口类
1
2
3
4
5
public interface CustomerService {

CustomerMixInfo findCustomer();

}
  • 业务实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Slf4j
@Service
public class CustomerServiceImpl implements CustomerService {

@Resource
private ThreadPoolTaskExecutor threadPool;

@Override
public CustomerMixInfo findCustomer() {
CustomerMixInfo customerMixInfo = new CustomerMixInfo();
customerMixInfo.setId(1L);

long startTime = System.currentTimeMillis();

// 异步调用用户远程接口
CompletableFuture<Void> customerNameFuture = CompletableFuture.runAsync(() -> {
customerMixInfo.setName(this.getCustomerName());
}, threadPool);

// 异步调用积分远程接口
CompletableFuture<Void> scoreFuture = CompletableFuture.runAsync(() -> {
customerMixInfo.setScore(this.getScore());
}, threadPool);

// 异步调用订单远程接口
CompletableFuture<Void> orderInfoFuture = CompletableFuture.runAsync(() -> {
customerMixInfo.setOrderInfo(this.getOrderInfo());
}, threadPool);

// 阻塞等待所有任务完成,allOf() 方法的应用之一是在继续执行程序之前等待完成一组独立的 CompletableFuture
CompletableFuture.allOf(customerNameFuture, scoreFuture, orderInfoFuture).join();

long endTime = System.currentTimeMillis();
System.out.println("总耗时:" + (endTime - startTime) + " 毫秒");

return customerMixInfo;
}

/**
* 模拟调用用户远程接口
*/
public String getCustomerName() {
try {
log.info("==> 调用用户远程接口");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "张三";
}

/**
* 模拟调用积分远程接口
*/
public Long getScore() {
try {
log.info("==> 调用积分远程接口");
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100L;
}

/**
* 模拟调用订单远程接口
*/
public String getOrderInfo() {
try {
log.info("==> 调用订单远程接口");
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "001 - 华为P70手机";
}

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class ThreadPoolTest {

@Resource
private CustomerService customerService;

@Test
public void invoke() {
CustomerMixInfo customerMixInfo = customerService.findCustomer();
log.info(JSON.toJSONString(customerMixInfo));
}

}

程序运行输出的结果如下:

1
2
3
4
5
[Spring 自定义线程池 - 1] INFO  c.j.j.s.impl.CustomerServiceImpl - ==> 调用用户远程接口
[Spring 自定义线程池 - 3] INFO c.j.j.s.impl.CustomerServiceImpl - ==> 调用订单远程接口
[Spring 自定义线程池 - 2] INFO c.j.j.s.impl.CustomerServiceImpl - ==> 调用积分远程接口
总耗时:310 毫秒
[main] INFO com.java.juc.ThreadPoolTest - {"id":1,"name":"张三","orderInfo":"001 - 华为P70手机","score":100}

@Async 与线程池的使用