前言 本文将介绍 @Async
注解和线程池的使用、注意事项和实现原理。
版本说明 @Async 的概述 @Async
注解通常标注在方法上,用于实现方法的异步执行,即方法调用者调用方法后立即返回,待调用的方法会提交给 Spring 的线程池去异步执行。@Async
也可以标注在类上,等价于在类中的所有方法上添加该注解。特别注意,@Async
注解只对 Spring IOC 容器管理的对象生效。@Async 的使用 简单使用案例 添加配置参数类,用于定义线程池的运行参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Data @Configuration @ConfigurationProperties(prefix = "thread.pool") public class ThreadPoolProperties { private int corePoolSize; private int maxPoolSize; private int queueCapacity; private int keepAliveSeconds; }
创建 application.yml
配置文件,添加以下配置内容来指定线程池的运行参数。
1 2 3 4 5 6 thread: pool: corePoolSize: 10 maxPoolSize: 20 queueCapacity: 50 keepAliveSeconds: 60
添加配置类,并且实现 AsyncConfigurer
接口,同时使用 @EnableAsync
注解开启异步功能。特别注意,Spring 默认使用的异步任务执行器是 SimpleAsyncTaskExecutor,这个执行器没有线程池的概念,每处理一个任务都会单独创建一个线程来异步执行,因此不会重复使用线程。为了实现线程的复用,需要重写 AsyncConfigurer
接口的 getAsyncExecutor()
方法,然后手动创建一个 ThreadPoolTaskExecutor 实例来作为默认的异步任务执行器。这里使用的线程池异步任务执行器是 Spring 提供的 ThreadPoolTaskExecutor,而不是 JUC 里面的 ThreadPoolExecutor。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Configuration @EnableAsync public class AsyncConfiguration implements AsyncConfigurer { @Autowired private ThreadPoolProperties poolProperties; @Override public Executor getAsyncExecutor () { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(poolProperties.getCorePoolSize()); taskExecutor.setMaxPoolSize(poolProperties.getMaxPoolSize()); taskExecutor.setQueueCapacity(poolProperties.getQueueCapacity()); taskExecutor.setKeepAliveSeconds(poolProperties.getKeepAliveSeconds()); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setWaitForTasksToCompleteOnShutdown(true ); taskExecutor.setThreadNamePrefix("默认线程池 - Thread " ); taskExecutor.initialize(); return taskExecutor; } }
在需要异步执行的方法上添加 @Async
注解,即可实现方法的异步执行。值得一提的是,@Async
注解只对 Spring IOC 容器管理的对象生效。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Service @Slf4j public class UserService { @Async public void common () { try { TimeUnit.SECONDS.sleep(5 ); } catch (Exception e) { e.printStackTrace(); } log.info("===> 异步处理普通任务" ); } }
指定专用线程池案例 在一般情况下,一个微服务应用只有一个线程池,千万不要在每个方法内都创建一个线程池。但是,每个高并发的接口都可以有一个专用线程池来保证性能,即高并发接口可以使用单独的线程池来进行隔离处理,比如积分模块可以使用单独的线程池来专门处理请求。在使用 @Async
注解时,可以通过其 value
属性指定线程池的 Bean 名称,还可以使用 SpEL 表达式,以此指定专用的线程池。@Async
注解的底层源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Reflective public @interface Async { String value () default "" ; }
首先手动定义一个专用的线程池实例,可以在上面的 AsyncConfiguration
配置类中定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Configuration @EnableAsync public class AsyncConfiguration implements AsyncConfigurer { @Autowired private ThreadPoolProperties poolProperties; @Override public Executor getAsyncExecutor () { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(poolProperties.getCorePoolSize()); taskExecutor.setMaxPoolSize(poolProperties.getMaxPoolSize()); taskExecutor.setQueueCapacity(poolProperties.getQueueCapacity()); taskExecutor.setKeepAliveSeconds(poolProperties.getKeepAliveSeconds()); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setWaitForTasksToCompleteOnShutdown(true ); taskExecutor.setThreadNamePrefix("默认线程池 - Thread " ); taskExecutor.initialize(); return taskExecutor; } @Bean("scoreThreadPollTaskExecutor") public Executor getScoreThreadPollTaskExecutor () { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(15 ); taskExecutor.setMaxPoolSize(30 ); taskExecutor.setQueueCapacity(100 ); taskExecutor.setKeepAliveSeconds(60 ); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setWaitForTasksToCompleteOnShutdown(true ); taskExecutor.setThreadNamePrefix("积分专用线程池 - Thread " ); taskExecutor.initialize(); return taskExecutor; } }
然后在使用 @Async
注解时,通过其 value
属性指定线程池的 Bean 名称。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Service @Slf4j public class UserService { @Async public void common () { try { TimeUnit.SECONDS.sleep(5 ); } catch (Exception e) { e.printStackTrace(); } log.info("===> 异步处理普通请求" ); } @Async("scoreThreadPollTaskExecutor") public void score () { try { TimeUnit.SECONDS.sleep(5 ); } catch (Exception e) { e.printStackTrace(); } log.info("===> 异步处理积分请求" ); } }
此时,当调用 score()
方法时,Spring 会将该方法交给 scoreThreadPollTaskExecutor
线程池对象去执行。
获取方法返回值案例 如果异步方法有返回值,可以使用 Future
或 CompletableFuture
包装返回值,以便在异步操作完成后获取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Service @Slf4j public class UserService { @Async public Future<Integer> future () { log.info("===> 返回异步执行的返回值" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (Exception e) { e.printStackTrace(); } int result = 100 ; return CompletableFuture.completedFuture(result); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j @RestController @RequestMapping("/user") public class UserController { @GetMapping("/future") public Integer future () { Integer result = null ; Future<Integer> future = this .userService.future(); try { result = future.get(); } catch (Exception e) { log.error("Future error : " , e); } return result; } }
处理异步任务中未捕获的异常 首先通过实现 Spring 的 AsyncUncaughtExceptionHandler 接口来自定义异常处理器,该接口可用于处理异步任务中未捕获的异常。
1 2 3 4 5 6 7 8 9 10 @Slf4j public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException (Throwable ex, Method method, Object... params) { log.error("Async exception : " , ex); } }
添加配置类,实现 AsyncConfigurer 接口,并返回自定义的 AsyncUncaughtExceptionHandler 实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration @EnableAsync public class AsyncConfiguration implements AsyncConfigurer { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler () { return new MyAsyncUncaughtExceptionHandler(); } }
@Async 的默认线程池说明 (1) 注意,在早期版本的 Spring Boot 环境中,如果用户没有自定义配置异步执行器(Async Executor),并且没有实现 AsyncConfigurer 接口来提供一个自定义的执行器,那么 Spring Boot 会使用一个默认的异步执行器,而在某些早期版本或特定配置下,这个默认执行器可能是 SimpleAsyncTaskExecutor,这是一个不重用线程、无界并发的执行器,每个提交的任务都会创建一个新的线程来执行。这意味着每次调用都会创建新的线程,而不是从固定大小的线程池中获取可用线程。 (2) 在后期版本的 Spring Boot 中,如果没有 Executor 的实例,Spring Boot 将会使用其默认配置的线程池(Bean 名称为 taskExecutor
)来执行被 @Async
注解修饰的异步方法。 (3) 在 Spring Boot 中,如果不存在 Excutor Bean,则会通过 TaskExecutionAutoConfiguration 自动配置一个基于 ThreadPoolTaskExecutor 的默认线程池,取名为 applicationTaskExecutor
和 taskExecutor
进行自动配置。如果已经自定义了 Executor Bean,那么 applicationTaskExecutor
将不会自动配置。 (4) 这个默认线程池的相关配置通常基于 Spring Boot 的默认属性,这些属性可以根据应用的具体需求,在 application.properties
或 application.yml
文件中进行调整。例如:spring.task.execution.pool.core-size
:核心线程数,默认值可能依赖于具体版本,一般较小。spring.task.execution.pool.max-size
:最大线程数,默认值也可能因版本不同而变化。spring.task.execution.pool.queue-capacity
:线程池的工作队列容量。spring.task.execution.pool.keep-alive
:空闲线程的存活时间。 @Async 的失效情况分析 没有启用异步支持
在 Spring 中要开启 @Async
注解的异步功能,需要在项目的启动类或者配置类上,使用 @EnableAsync
注解。 内部方法调用
Spring 通过 @Async
注解实现异步的功能,底层其实是通过 Spring AOP 机制实现的,也就是说它需要通过 JDK 动态代理(基于接口的代理)或者 CGLIB 代理(基于类的代理)生成代理对象。 @Async
方法如果在同一个类的其他方法中调用(内部方法调用),即使其他方法本身标记为 @Async
,Spring 的 AOP 机制也不会生效,因为内部方法的调用不会经过代理对象,所以不会异步执行。方法不是 public
@Async
注解的方法必须是 public
的,因为 Spring 使用 AOP 机制来实现异步调用,而代理机制无法拦截对 protected
或者 private
方法的调用。方法用 static 修饰
使用 @Async
注解的方法,必须是能够被重写的。很显然,static
修饰的方法是类的静态方法,是不允许被重写的。 方法用 final 修饰
使用 final
修饰的类,是无法被继承的。使用 final
修饰的方法,是无法被子类重写的。 @Async
注解是基于 Spring AOP 实现的,当 Spring AOP 使用的是 CGLIB 代理,由于它会利用字节码操作技术通过继承生成目标类的子类,如果目标类的方法使用 final
修饰的,会导致目标方法无法被代理,最终导致 @Async
注解的异步功能会失效。方法返回值错误
如果想要使用 @Async
注解的异步功能,相关方法的返回值必须是 void
、Future
或者 CompletableFuture
,否则 @Async
注解的异步功能会失效。 未正确配置 Executor
如果没有正确配置异步任务的 Executor,或者使用的 Executor 已关闭或不可用,@Async
注解可能会失效。 目标类没有被 Spring 管理
只有被 Spring 管理的 Bean(即通过 Spring 容器创建和管理的类)才能使用 @Async
注解。如果目标类没有被 Spring 管理,@Async
注解将不会生效。 @Async 的底层原理浅析 @Async 原理 本质上 @Async
注解是依赖于 Spring 的 AOP 机制实现的,代理过程中将任务提交给线程池。具体处理流程如下:
Spring 的 AOP 机制支持 JDK 动态代理(基于接口的代理)和 CGLIB 代理(基于类的代理)。首先 Spring 会为使用了 @Async
注解的 Bean 对象生成一个动态代理对象,当使用了 @Async
注解的方法被调用时,会进入代理流程,获取线程池、封装调用逻辑并提交给线程池执行,最后返回执行结果。由于异步执行的本质是基于代理实现,所以同一个类中的方法互相调用会导致被调用方的异步作用失效,该场景与 Spring 的 @Transactional
事务注解失效原因相同。
特别注意
未被 @Async
注解标注的方法不会执行上述流程,被 static
关键字修饰的方法同样也不会。
@EnableSync 原理 @EnableAsync
注解的核心作用是向 Spring 容器中注册 AsyncAnnotationBeanPostProcessor 对象,底层源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation> annotation() default Annotation.class; boolean proxyTargetClass () default false ; AdviceMode mode () default AdviceMode.PROXY ; int order () default Ordered.LOWEST_PRECEDENCE ; }
属性说明
mode
:
默认值为 AdviceMode.PROXY
。 指定如何应用异步处理建议,目前支持两种模式:PROXY
:使用代理模式来支持异步方法调用。ASPECTJ
:通过织入的方式来支持异步方法调用。使用这个模式需要配置额外的依赖和设置。 order
:
默认值为 Ordered.LOWEST_PRECEDENCE
。 指定执行异步方法拦截器的顺序。值越低,优先级越高。这个属性主要在应用程序中存在多个切面或拦截器时使用,以控制它们的执行顺序。 annotation
:
默认值为 Async.class
。 指定用来标注异步方法的注解类型。默认情况下,@Async
注解用于标识异步方法,可以自定义注解来替代默认的 @Async
注解。 proxyTargetClass
:
默认值为 false
。 确定是否使用 CGLIB 代理(基于类的代理)而不是 JDK 动态代理(基于接口的代理)。如果值为 true
,Spring 将使用 CGLIB 创建代理类;如果值为 false
,Spring 将优先选择 JDK 动态代理(如果目标类实现了接口)。这个属性的选择可能会影响性能和使用的代理技术。 注意事项
在使用 @EnableAsync
时,需要确保 Spring 上下文中存在合适的 Executor
Bean,尤其是在高并发环境下,建议使用配置好的线程池来避免资源耗尽。比如使用 ThreadPoolTaskExecutor,而不是默认的 SimpleAsyncTaskExecutor。 使用 CGLIB 代理时,需要注意目标类及其方法是否是 final
,因为 CGLIB 不能代理 final
的类或方法。 选择 mode
为 ASPECTJ
需要编译时和运行时的支持,通常涉及 AspectJ 的相关配置和依赖。 CompletableFuture 的使用 由于上面获取异步方法的返回值时使用到了 CompletableFuture,因此这里简单介绍一下 CompletableFuture 的使用。CompletableFuture 是 Java 8 引入的一个类,用于表示异步计算的结果。它提供了一种非阻塞的方式来处理异步任务的执行结果,可以组合多个异步操作,并提供了一些便捷的方法来处理结果和异常。
简单使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class CompletableFutureTest { public static void main (String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "任务完成" ; }); CompletableFuture<String> result = future.thenApply(s -> "异步任务执行的结果 : " + s); try { String value = result.get(); System.out.println(value); } catch (Exception e) { e.printStackTrace(); } } }
程序的运行结果:
异常处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CompletableFutureTest { public static void main (String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true ) { throw new RuntimeException("异常发生!" ); } return "任务完成" ; }); CompletableFuture<String> exceptionHandled = future.exceptionally(ex -> "异常结果" ); try { System.out.println(exceptionHandled.get()); } catch (Exception e) { e.printStackTrace(); } } }
程序的运行结果:
合并多个执行结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class CompletableFutureTest { public static void main (String[] args) { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10 ); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20 ); CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (x, y) -> x + y); try { System.out.println("合并后的结果: " + combinedFuture.get()); } catch (Exception e) { e.printStackTrace(); } } }
程序的运行结果:
运行多个任务后执行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class CompletableFutureTest { public static void main (String[] args) { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000 ); System.out.println("任务1完成" ); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { Thread.sleep(2000 ); System.out.println("任务2完成" ); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> { try { Thread.sleep(3000 ); System.out.println("任务3完成" ); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3); try { allOf.get(); System.out.println("所有已任务完成" ); } catch (Exception e) { e.printStackTrace(); } } }
程序的运行结果:
1 2 3 4 任务1完成 任务2完成 任务3完成 所有已任务完成
参考资料