Spring 之 @Async 与线程池的使用

前言

本文将介绍 @Async 注解和线程池的使用、注意事项和实现原理。

版本说明

组件版本
Spring Boot3.0.5

@Async 的概述

  • @Async 注解通常标注在方法上,用于实现方法的异步执行,即方法调用者调用方法后立即返回,待调用的方法会提交给 Spring 的线程池去异步执行。
  • @Async 也可以标注在类上,等价于在类中的所有方法上添加该注解。特别注意,@Async 注解只对 Spring IOC 容器管理的对象生效。

@Async 的使用

简单使用案例

添加配置参数类,用于定义线程池的运行参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Data
@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class ThreadPoolProperties {

/**
* 核心线程数
*/
private int corePoolSize;

/**
* 最大线程数
*/
private int maxPoolSize;

/**
* 工作队列的最大长度
*/
private int queueCapacity;

/**
* 空闲线程的存活时间
*/
private int keepAliveSeconds;

}

创建 application.yml 配置文件,添加以下配置内容来指定线程池的运行参数。

1
2
3
4
5
6
thread:
pool:
corePoolSize: 10
maxPoolSize: 20
queueCapacity: 50
keepAliveSeconds: 60

添加配置类,并且实现 AsyncConfigurer 接口,同时使用 @EnableAsync 注解开启异步功能。特别注意,Spring 默认使用的异步任务执行器是 SimpleAsyncTaskExecutor,这个执行器没有线程池的概念,每处理一个任务都会单独创建一个线程来异步执行,因此不会重复使用线程。为了实现线程的复用,需要重写 AsyncConfigurer 接口的 getAsyncExecutor() 方法,然后手动创建一个 ThreadPoolTaskExecutor 实例来作为默认的异步任务执行器。这里使用的线程池异步任务执行器是 Spring 提供的 ThreadPoolTaskExecutor,而不是 JUC 里面的 ThreadPoolExecutor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

@Autowired
private ThreadPoolProperties poolProperties;

/**
* 配置默认的线程池
*
* <p> Spring 默认使用的异步任务执行器是 SimpleAsyncTaskExecutor,它每处理一个任务都会单独创建一个线程来异步执行
* <p> 特别注意,SimpleAsyncTaskExecutor 没有线程池的概念,不会重复使用线程
*/
@Override
public Executor getAsyncExecutor() {
// 这里使用的是 Spring 提供的 ThreadPoolTaskExecutor,而不是 JUC 的 ThreadPoolExecutor
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(poolProperties.getCorePoolSize());
taskExecutor.setMaxPoolSize(poolProperties.getMaxPoolSize());
taskExecutor.setQueueCapacity(poolProperties.getQueueCapacity());
taskExecutor.setKeepAliveSeconds(poolProperties.getKeepAliveSeconds());
// 线程池对拒绝任务(无线程可用)的处理策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 任务都完成再关闭线程池(平滑关闭)
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setThreadNamePrefix("默认线程池 - Thread ");
// 必须初始化
taskExecutor.initialize();
return taskExecutor;
}

}

在需要异步执行的方法上添加 @Async 注解,即可实现方法的异步执行。值得一提的是,@Async 注解只对 Spring IOC 容器管理的对象生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
@Slf4j
public class UserService {

@Async
public void common() {
// 模拟业务耗时
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
log.info("===> 异步处理普通任务");
}

}

指定专用线程池案例

在一般情况下,一个微服务应用只有一个线程池,千万不要在每个方法内都创建一个线程池。但是,每个高并发的接口都可以有一个专用线程池来保证性能,即高并发接口可以使用单独的线程池来进行隔离处理,比如积分模块可以使用单独的线程池来专门处理请求。在使用 @Async 注解时,可以通过其 value 属性指定线程池的 Bean 名称,还可以使用 SpEL 表达式,以此指定专用的线程池。@Async 注解的底层源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Reflective
public @interface Async {

/**
* A qualifier value for the specified asynchronous operation(s).
* <p>May be used to determine the target executor to be used when executing
* the asynchronous operation(s), matching the qualifier value (or the bean
* name) of a specific {@link java.util.concurrent.Executor Executor} or
* {@link org.springframework.core.task.TaskExecutor TaskExecutor}
* bean definition.
* <p>When specified in a class-level {@code @Async} annotation, indicates that the
* given executor should be used for all methods within the class. Method-level use
* of {@code Async#value} always overrides any qualifier value configured at
* the class level.
* <p>The qualifier value will be resolved dynamically if supplied as a SpEL
* expression (for example, {@code "#{environment['myExecutor']}"}) or a
* property placeholder (for example, {@code "${my.app.myExecutor}"}).
* @since 3.1.2
*/
String value() default "";

}

首先手动定义一个专用的线程池实例,可以在上面的 AsyncConfiguration 配置类中定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

@Autowired
private ThreadPoolProperties poolProperties;

/**
* 配置默认的线程池
*
* <p> Spring 默认使用的异步任务执行器是 SimpleAsyncTaskExecutor,它每处理一个任务都会单独创建一个线程来异步执行
* <p> 特别注意,SimpleAsyncTaskExecutor 没有线程池的概念,不会重复使用线程
*/
@Override
public Executor getAsyncExecutor() {
// 这里使用的是 Spring 提供的 ThreadPoolTaskExecutor,而不是 JUC 的 ThreadPoolExecutor
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(poolProperties.getCorePoolSize());
taskExecutor.setMaxPoolSize(poolProperties.getMaxPoolSize());
taskExecutor.setQueueCapacity(poolProperties.getQueueCapacity());
taskExecutor.setKeepAliveSeconds(poolProperties.getKeepAliveSeconds());
// 线程池对拒绝任务(无线程可用)的处理策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 任务都完成再关闭线程池(平滑关闭)
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setThreadNamePrefix("默认线程池 - Thread ");
// 必须初始化
taskExecutor.initialize();
return taskExecutor;
}

/**
* 配置专用的线程池
*
* <p> 每个高并发的接口都可以有一个专用线程池落地保证,高并发接口使用单独线程池进行隔离处理
* <p> 比如:积分模块可以使用单独的线程池来专门处理请求
*/
@Bean("scoreThreadPollTaskExecutor")
public Executor getScoreThreadPollTaskExecutor() {
// 这里使用的是 Spring 提供的 ThreadPoolTaskExecutor,而不是 JUC 的 ThreadPoolExecutor
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(15);
taskExecutor.setMaxPoolSize(30);
taskExecutor.setQueueCapacity(100);
taskExecutor.setKeepAliveSeconds(60);
// 线程池对拒绝任务(无线程可用)的处理策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 任务都完成再关闭线程池(平滑关闭)
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setThreadNamePrefix("积分专用线程池 - Thread ");
// 必须初始化
taskExecutor.initialize();
return taskExecutor;
}

}

然后在使用 @Async 注解时,通过其 value 属性指定线程池的 Bean 名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Service
@Slf4j
public class UserService {

/**
* 使用默认的线程池来处理请求
*/
@Async
public void common() {
// 模拟业务耗时
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
log.info("===> 异步处理普通请求");
}

/**
* 指定专用的线程池来处理请求
*/
@Async("scoreThreadPollTaskExecutor")
public void score() {
// 模拟业务耗时
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
log.info("===> 异步处理积分请求");
}

}

此时,当调用 score() 方法时,Spring 会将该方法交给 scoreThreadPollTaskExecutor 线程池对象去执行。

获取方法返回值案例

如果异步方法有返回值,可以使用 FutureCompletableFuture 包装返回值,以便在异步操作完成后获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
@Slf4j
public class UserService {

@Async
public Future<Integer> future() {
log.info("===> 返回异步执行的返回值");
// 模拟业务耗时
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
int result = 100;
return CompletableFuture.completedFuture(result);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {

@GetMapping("/future")
public Integer future() {
Integer result = null;
Future<Integer> future = this.userService.future();
try {
// 阻塞等待异步方法返回的结果
result = future.get();
} catch (Exception e) {
log.error("Future error : ", e);
}
return result;
}

}

处理异步任务中未捕获的异常

首先通过实现 Spring 的 AsyncUncaughtExceptionHandler 接口来自定义异常处理器,该接口可用于处理异步任务中未捕获的异常。

1
2
3
4
5
6
7
8
9
10
@Slf4j
public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
// 处理异常的逻辑
log.error("Async exception : ", ex);
}

}

添加配置类,实现 AsyncConfigurer 接口,并返回自定义的 AsyncUncaughtExceptionHandler 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

/**
* 配置异常处理器
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncUncaughtExceptionHandler();
}

}

@Async 的默认线程池说明

  • (1) 注意,在早期版本的 Spring Boot 环境中,如果用户没有自定义配置异步执行器(Async Executor),并且没有实现 AsyncConfigurer 接口来提供一个自定义的执行器,那么 Spring Boot 会使用一个默认的异步执行器,而在某些早期版本或特定配置下,这个默认执行器可能是 SimpleAsyncTaskExecutor,这是一个不重用线程、无界并发的执行器,每个提交的任务都会创建一个新的线程来执行。这意味着每次调用都会创建新的线程,而不是从固定大小的线程池中获取可用线程。
  • (2) 在后期版本的 Spring Boot 中,如果没有 Executor 的实例,Spring Boot 将会使用其默认配置的线程池(Bean 名称为 taskExecutor)来执行被 @Async 注解修饰的异步方法。
  • (3) 在 Spring Boot 中,如果不存在 Excutor Bean,则会通过 TaskExecutionAutoConfiguration 自动配置一个基于 ThreadPoolTaskExecutor 的默认线程池,取名为 applicationTaskExecutortaskExecutor 进行自动配置。如果已经自定义了 Executor Bean,那么 applicationTaskExecutor 将不会自动配置。
  • (4) 这个默认线程池的相关配置通常基于 Spring Boot 的默认属性,这些属性可以根据应用的具体需求,在 application.propertiesapplication.yml 文件中进行调整。例如:
    • spring.task.execution.pool.core-size:核心线程数,默认值可能依赖于具体版本,一般较小。
    • spring.task.execution.pool.max-size:最大线程数,默认值也可能因版本不同而变化。
    • spring.task.execution.pool.queue-capacity:线程池的工作队列容量。
    • spring.task.execution.pool.keep-alive:空闲线程的存活时间。

@Async 的失效情况分析

  • 没有启用异步支持

    • 在 Spring 中要开启 @Async 注解的异步功能,需要在项目的启动类或者配置类上,使用 @EnableAsync 注解。
  • 内部方法调用

    • Spring 通过 @Async 注解实现异步的功能,底层其实是通过 Spring AOP 机制实现的,也就是说它需要通过 JDK 动态代理(基于接口的代理)或者 CGLIB 代理(基于类的代理)生成代理对象。
    • @Async 方法如果在同一个类的其他方法中调用(内部方法调用),即使其他方法本身标记为 @Async,Spring 的 AOP 机制也不会生效,因为内部方法的调用不会经过代理对象,所以不会异步执行。
  • 方法不是 public

    • @Async 注解的方法必须是 public 的,因为 Spring 使用 AOP 机制来实现异步调用,而代理机制无法拦截对 protected 或者 private 方法的调用。
  • 方法用 static 修饰

    • 使用 @Async 注解的方法,必须是能够被重写的。很显然,static 修饰的方法是类的静态方法,是不允许被重写的。
  • 方法用 final 修饰

    • 使用 final 修饰的类,是无法被继承的。使用 final 修饰的方法,是无法被子类重写的。
    • @Async 注解是基于 Spring AOP 实现的,当 Spring AOP 使用的是 CGLIB 代理,由于它会利用字节码操作技术通过继承生成目标类的子类,如果目标类的方法使用 final 修饰的,会导致目标方法无法被代理,最终导致 @Async 注解的异步功能会失效。
  • 方法返回值错误

    • 如果想要使用 @Async 注解的异步功能,相关方法的返回值必须是 voidFuture 或者 CompletableFuture,否则 @Async 注解的异步功能会失效。
  • 未正确配置 Executor

    • 如果没有正确配置异步任务的 Executor,或者使用的 Executor 已关闭或不可用,@Async 注解可能会失效。
  • 目标类没有被 Spring 管理

    • 只有被 Spring 管理的 Bean(即通过 Spring 容器创建和管理的类)才能使用 @Async 注解。如果目标类没有被 Spring 管理,@Async 注解将不会生效。

@Async 的底层原理浅析

@Async 原理

本质上 @Async 注解是依赖于 Spring 的 AOP 机制实现的,代理过程中将任务提交给线程池。具体处理流程如下:

Spring 的 AOP 机制支持 JDK 动态代理(基于接口的代理)和 CGLIB 代理(基于类的代理)。首先 Spring 会为使用了 @Async 注解的 Bean 对象生成一个动态代理对象,当使用了 @Async 注解的方法被调用时,会进入代理流程,获取线程池、封装调用逻辑并提交给线程池执行,最后返回执行结果。由于异步执行的本质是基于代理实现,所以同一个类中的方法互相调用会导致被调用方的异步作用失效,该场景与 Spring 的 @Transactional 事务注解失效原因相同。

特别注意

未被 @Async 注解标注的方法不会执行上述流程,被 static 关键字修饰的方法同样也不会。

@EnableSync 原理

@EnableAsync 注解的核心作用是向 Spring 容器中注册 AsyncAnnotationBeanPostProcessor 对象,底层源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

Class<? extends Annotation> annotation() default Annotation.class;

boolean proxyTargetClass() default false;

AdviceMode mode() default AdviceMode.PROXY;

int order() default Ordered.LOWEST_PRECEDENCE;

}

属性说明

  • mode

    • 默认值为 AdviceMode.PROXY
    • 指定如何应用异步处理建议,目前支持两种模式:
      • PROXY:使用代理模式来支持异步方法调用。
      • ASPECTJ:通过织入的方式来支持异步方法调用。使用这个模式需要配置额外的依赖和设置。
  • order

    • 默认值为 Ordered.LOWEST_PRECEDENCE
    • 指定执行异步方法拦截器的顺序。值越低,优先级越高。这个属性主要在应用程序中存在多个切面或拦截器时使用,以控制它们的执行顺序。
  • annotation:

    • 默认值为 Async.class
    • 指定用来标注异步方法的注解类型。默认情况下,@Async 注解用于标识异步方法,可以自定义注解来替代默认的 @Async 注解。
  • proxyTargetClass

    • 默认值为 false
    • 确定是否使用 CGLIB 代理(基于类的代理)而不是 JDK 动态代理(基于接口的代理)。如果值为 true,Spring 将使用 CGLIB 创建代理类;如果值为 false,Spring 将优先选择 JDK 动态代理(如果目标类实现了接口)。这个属性的选择可能会影响性能和使用的代理技术。

注意事项

  • 在使用 @EnableAsync 时,需要确保 Spring 上下文中存在合适的 Executor Bean,尤其是在高并发环境下,建议使用配置好的线程池来避免资源耗尽。比如使用 ThreadPoolTaskExecutor,而不是默认的 SimpleAsyncTaskExecutor。
  • 使用 CGLIB 代理时,需要注意目标类及其方法是否是 final,因为 CGLIB 不能代理 final 的类或方法。
  • 选择 modeASPECTJ 需要编译时和运行时的支持,通常涉及 AspectJ 的相关配置和依赖。

CompletableFuture 的使用

由于上面获取异步方法的返回值时使用到了 CompletableFuture,因此这里简单介绍一下 CompletableFuture 的使用。CompletableFuture 是 Java 8 引入的一个类,用于表示异步计算的结果。它提供了一种非阻塞的方式来处理异步任务的执行结果,可以组合多个异步操作,并提供了一些便捷的方法来处理结果和异常。

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CompletableFutureTest {

public static void main(String[] args) {
// 使用 supplyAsync 方法创建一个异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务完成";
});

// 使用 thenApply 方法处理执行结果
CompletableFuture<String> result = future.thenApply(s -> "异步任务执行的结果 : " + s);

try {
// 阻塞等待获取执行结果
String value = result.get();
System.out.println(value);
} catch (Exception e) {
e.printStackTrace();
}
}

}

程序的运行结果:

1
异步任务执行的结果 : 任务完成

异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CompletableFutureTest {

public static void main(String[] args) {
// 使用 supplyAsync 方法创建一个异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("异常发生!");
}
return "任务完成";
});

// 处理异常
CompletableFuture<String> exceptionHandled = future.exceptionally(ex -> "异常结果");

try {
// 阻塞等待获取结果
System.out.println(exceptionHandled.get());
} catch (Exception e) {
e.printStackTrace();
}
}

}

程序的运行结果:

1
异常结果

合并多个执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CompletableFutureTest {

public static void main(String[] args) {
// 使用 supplyAsync 方法创建两个异步任务
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);

// 合并两个异步任务的结果
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (x, y) -> x + y);

try {
System.out.println("合并后的结果: " + combinedFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}

}

程序的运行结果:

1
合并后的结果: 30

运行多个任务后执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class CompletableFutureTest {

public static void main(String[] args) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("任务1完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("任务2完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("任务3完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);

try {
// 阻塞等待所有任务完成
allOf.get();
System.out.println("所有已任务完成");
} catch (Exception e) {
e.printStackTrace();
}
}

}

程序的运行结果:

1
2
3
4
任务1完成
任务2完成
任务3完成
所有已任务完成

参考资料