Resilience4j 入门教程 - 基础篇之二

大纲

前言

技术资源

版本说明

在本文的所有案例中,各个组件统一使用以下版本:

组件版本说明
Spring Boot3.2.0
Spring Cloud2023.0.0
Consul1.15.4 作为注册中心

隔离的使用

隔离的介绍

Resilience4j Bulkhead(舱壁,也叫隔离)可以用于依赖隔离和负载保护,也就是用来限制针对于下游服务的最大并发访问数量(即限制并发)。Resilience4j 提供了以下两种隔离的实现方式,可以限制并发执行的数量,更多介绍请参考 官方文档

  • SemaphoreBulkhead:使用了信号量
  • FixedThreadPoolBulkhead:使用了有界队列和固定大小线程池

SemaphoreBulkhead 可以在各种线程和 I/O 模型上正常工作,但与 Hystrix 不同,它不提供基于 shadow 的线程池配置选项,由客户端来确保正确的线程池大小与隔离配置的一致性。

什么是舱壁

Bulkhead 翻译过来是指船的舱壁,或者是指飞机的隔板。术语舱壁来自造船行业,船仓内部一般会分成很多个小隔舱(如下图所示),一旦一个隔舱漏水,因为舱壁(隔板)的存在,而不至于影响到其它隔舱,从而保证整艘船的安全。

信号量隔离的使用

信号量隔离的原理

Resilience4j SemaphoreBulkhead(信号量隔离)基本上采用了与 JUC 包中 Semaphore 信号量类同样的设计思想,其底层源码如下:

Resilience4j SemaphoreBulkhead(信号量隔离)的底层原理如下:

  • 当信号量有空闲时,进入系统的请求会直接获取信号量,并执行相应的业务处理
  • 当信号量全部被占用时,接下来的请求将会进入阻塞状态,SemaphoreBulkhead 提供了一个阻塞计时器
    • 如果处于阻塞状态的请求在阻塞计时内无法获取到信号量,则系统会拒绝处理这些请求
    • 如果处于阻塞状态的请求在阻塞计时内获取到了信号量,那将直接获取信号量,并执行相应的业务处理

信号量隔离的配置

Resilience4j SemaphoreBulkhead(信号量隔离)的详细配置参数可以查阅 官方文档,其中的核心配置项如下:

YML 配置示例如下,定义了一段简单的 SemaphoreBulkhead 配置,指定默认配置为 maxConcurrentCalls:5maxWaitDuration:20ms,并且指定了两个实例,其中 backendA 使用了默认配置,而 backendB 使用了自定义配置。

信号量隔离的使用案例

本节将演示如何使用 OpenFeign + Resilience4j SemaphoreBulkhead 实现基于信号量的隔离,也就是限制针对于下游服务的最大并发访问数量。

创建父级 Pom 工程

在父工程里面配置好工程需要的父级依赖,目的是为了更方便管理与简化配置,具体 Maven 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hutool.version>5.8.22</hutool.version>
<lombok.version>1.18.26</lombok.version>
<spring.boot.version>3.2.0</spring.boot.version>
<spring.boot.test.version>3.1.5</spring.boot.test.version>
<spring.cloud.version>2023.0.0</spring.cloud.version>
</properties>

<dependencyManagement>
<dependencies>
<!--SpringBoot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SpringCloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--HuTool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--SpringBoot Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.boot.test.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
创建 Provider 工程

为了测试 Resilience4j SemaphoreBulkhead(信号量隔离)的使用效果,先创建一个服务提供者。这里创建 Provider 的 Maven 工程,由于需要将服务注册到 Consul,工程下的 pom.xml 文件需要引入 spring-cloud-starter-consul-discovery

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

创建 Provider 的启动主类,添加注解 @EnableDiscoveryClient,将服务注册到 Consul

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableDiscoveryClient
public class ProviderApplication {

public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}

}

application.yml 文件中指定服务名称(provider-service)、注册中心地址与端口号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 8001

spring:
application:
name: provider-service
cloud:
# Consul
consul:
host: 127.0.0.1
port: 8500
# 注册中心
discovery:
service-name: ${spring.application.name}
heartbeat:
enabled: true

创建用于测试的 Controller 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RestController
public class ProviderController {

/**
* 该接口用于测试 Resilience4j 的隔离(舱壁)
*/
@GetMapping(value = "/provider/bulkhead/{id}")
public String bulkhead(@PathVariable("id") Integer id) {
// 模拟业务处理出错
if (id == -4) {
throw new RuntimeException("Bulkhead id 不能为负数");
}

// 模拟业务长时间处理
if (id == 9999) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "Hello, bulkhead! inputId : " + id + " \t " + UUID.fastUUID();
}

}
创建 Consumer 工程

若要使用 Resilience4j SemaphoreBulkhead(信号量隔离),则需要在 pom.xml 文件中引入依赖 resilience4j-bulkhead,由于基于注解的方式配置隔离是依赖 AOP 实现的,所以必须引入 spring-boot-starter-aop。另外,由于需要从 Consul 获取服务列表,即作为 Consul 的客户端,还需要引入 spring-cloud-starter-consul-discovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>

<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

创建服务接口类,通过 OpenFeign 调用 Provider 提供的服务

1
2
3
4
5
6
7
8
9
10
@FeignClient("provider-service")
public interface ProviderFeignApi {

/**
* 该接口用于测试服务调用方(消费者)的隔离(舱壁)
*/
@GetMapping(value = "/provider/bulkhead/{id}")
String bulkhead(@PathVariable("id") Integer id);

}

创建启动主类,添加 @EnableDiscoveryClient@EnableFeignClients 注解

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

application.yml 文件中配置端口号、注册中心地址、隔离(舱壁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
server:
port: 8003

spring:
application:
name: consumer-service
cloud:
# Consul
consul:
host: 127.0.0.1
port: 8500
# 注册中心
discovery:
service-name: ${spring.application.name}
heartbeat:
enabled: true
# OpenFeign
openfeign:
# 开启断路器和分组激活
circuitbreaker:
enabled: true
group:
enabled: true # 没有开启分组则永远不用分组的配置。精确优先、分组次之(开了分组)、默认最后

# Resilience4j
resilience4j:
# 超时处理
timelimiter:
configs:
default:
timeout-duration: 20s # 神坑的位置,timelimiter 默认限制请求处理耗时为 1s,超过 1s 就会认为请求超时,如果配置了降级,就会走降级逻辑
# 隔离(舱壁)
bulkhead:
configs:
# 隔离(舱壁)的默认配置
default:
maxConcurrentCalls: 2 # 隔离允许并发执行的最大线程数
maxWaitDuration: 1s # 当达到最大并发执行数量时,新的线程执行时将会被阻塞,这个属性表示线程最长的阻塞等待时间;如果等待超时,直接走 Fallback 兜底处理
instances:
# 指定特定的服务实例或者方法使用哪个隔离(舱壁)配置,还可以在每个实例下进行自定义配置
provider-service:
baseConfig: default

特别注意

  • 第一点:这里必须配置 OpenFeign 开启断路器和分组激活,否则 Resilience4j 的隔离功能不会生效。
  • 第二点:这里必须配置 Resilience4j TimeLimiter 的 timeout-duration 的参数,因为 TimeLimiter 默认限制请求处理耗时为 1 秒,超过 1 秒 就会认为请求超时(在默认情况下不会打印超时的异常信息,而是直接走 Fallback 处理逻辑,导致很难发现超时问题)。为了避免影响后面的代码测试,需要将 timeout-duration 的参数值设置大一点。

创建用于测试的 Controller 类,并使用 @Bulkhead 注解来实现隔离(舱壁)的功能,同时通过 type 属性来指定使用基于信号量的隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RestController
public class ConsumerController {

@Autowired
private ProviderFeignApi providerFeignApi;

/**
* 该接口用于测试 Resilience4j 的隔离(舱壁)
* <p> @Bulkhead 注解写在需要限制并发访问的一侧
*/
@GetMapping(value = "/consumer/bulkhead/{id}")
@Bulkhead(name = "provider-service", fallbackMethod = "bulkheadFallback", type = Bulkhead.Type.SEMAPHORE)
public String bulkhead(@PathVariable("id") Integer id) {
return providerFeignApi.bulkhead(id);
}

/**
* 服务降级后的兜底处理方法
*/
public String bulkheadFallback(Integer id, Throwable t) {
// 这里是容错处理逻辑,返回备用结果
return "BulkheadFallback,系统繁忙,请稍后再试 /(ㄒoㄒ)/~~";
}

}

特别注意

  • 这里 @Bulkhead 注解的 name 属性必须与 application.yml 配置文件中的 instances 匹配对应。
  • 换言之,name 指定的名称必须和 instances 中的某个键匹配,这样才能让指定的服务实例或者方法使用相应的 Bulkhead 配置。
测试案例代码
  • (1) 浏览器打开 2 个窗口,分别访问 http://localhost:8003/consumer/bulkhead/9999
  • (2) 由于每个请求调用需要耗时 5 秒,因此 2 个请求就瞬间达到配置过的最大并发访问数量 2
  • (3) 浏览器打开第 3 窗口,访问 http://localhost:8003/consumer/bulkhead/22,可以发现请求直接被舱壁限制隔离了,阻塞等待 1 秒后,最终得到 Fallback 处理结果(降级)
  • (4) 等上面的第 1 个和第 2 个窗口请求完成后,再去访问 http://localhost:8003/consumer/bulkhead/22,因为并发访问数量小于 2,所以可以正常调用接口
下载案例代码
  • 完整的案例代码可以从 这里 下载得到。

固定线程池隔离的使用

固定线程池隔离的原理

Resilience4j FixedThreadPoolBulkhead(固定线程池隔离)基本上采用了与 JUC 包中 ThreadPoolExecutor 线程池类同样的设计思想,其底层源码如下:

Resilience4j FixedThreadPoolBulkhead(固定线程池隔离)的底层原理如下:

  • FixedThreadPoolBulkhead 使用一个固定线程池和一个有界等待队列来实现隔离。
  • 当线程池中存在空闲时,则进入系统的请求将直接进入线程池开启新线程,或者使用空闲线程来处理请求。
  • 当线程池中无空闲时,后面进来的请求将被放入等待队列中
    • 如果等待队列仍然无剩余空间时,后面进来的请求将直接被拒绝。
    • 在队列中的请求会等待线程池出现空闲,然后进入线程池执行业务处理。
  • 与 SemaphoreBulkhead(信号量隔离)一样,也是用于限制并发执行的数量,但是二者的实现原理存在差别,而且表现效果也存在细微的差别。

特别注意

FixedThreadPoolBulkhead 只对 CompletableFuture 方法有效,所以开发者必须创建返回值为 CompletableFuture 类型的方法。FixedThreadPoolBulkhead 的底层源码请看 这里

固定线程池隔离的配置

Resilience4j FixedThreadPoolBulkhead(固定线程池隔离)的详细配置参数可以查阅 官方文档,其中的核心配置项如下:

YML 配置示例如下,定义了一段简单的 FixedThreadPoolBulkhead 配置,指定默认配置为 maxThreadPoo1Size:4coreThreadPoolSize:2queueCapacity:2,并且指定了两个实例,其中 backendA 使用了默认配置,而 backendB 使用了自定义配置。

固定线程池隔离的使用案例

本节将演示如何使用 OpenFeign + Resilience4j FixedThreadPoolBulkhead 实现基于线程池的隔离,也就是限制针对于下游服务的最大并发访问数量。

创建父级 Pom 工程

在父工程里面配置好工程需要的父级依赖,目的是为了更方便管理与简化配置,具体 Maven 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hutool.version>5.8.22</hutool.version>
<lombok.version>1.18.26</lombok.version>
<spring.boot.version>3.2.0</spring.boot.version>
<spring.boot.test.version>3.1.5</spring.boot.test.version>
<spring.cloud.version>2023.0.0</spring.cloud.version>
</properties>

<dependencyManagement>
<dependencies>
<!--SpringBoot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SpringCloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--HuTool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--SpringBoot Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.boot.test.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
创建 Provider 工程

为了测试 Resilience4j FixedThreadPoolBulkhead(固定线程池隔离)的使用效果,先创建一个服务提供者。这里创建 Provider 的 Maven 工程,由于需要将服务注册到 Consul,工程下的 pom.xml 文件需要引入 spring-cloud-starter-consul-discovery

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

创建 Provider 的启动主类,添加注解 @EnableDiscoveryClient,将服务注册到 Consul

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableDiscoveryClient
public class ProviderApplication {

public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}

}

application.yml 文件中指定服务名称(provider-service)、注册中心地址与端口号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 8001

spring:
application:
name: provider-service
cloud:
# Consul
consul:
host: 127.0.0.1
port: 8500
# 注册中心
discovery:
service-name: ${spring.application.name}
heartbeat:
enabled: true

创建用于测试的 Controller 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
public class ProviderController {

/**
* 该接口用于测试 Resilience4j 的隔离(舱壁)
*/
@GetMapping(value = "/provider/bulkhead/{id}")
public String bulkhead(@PathVariable("id") Integer id) {
// 模拟业务处理出错
if (id == -4) {
throw new RuntimeException("Bulkhead id 不能为负数");
}
// 模拟业务长时间处理
if (id == 9999) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "Hello, bulkhead! inputId : " + id + " \t " + UUID.fastUUID();
}

}
创建 Consumer 工程

若要使用 Resilience4j FixedThreadPoolBulkhead(固定线程池隔离),则需要在 pom.xml 文件中引入依赖 resilience4j-bulkhead,由于基于注解的方式配置隔离是依赖 AOP 实现的,所以必须引入 spring-boot-starter-aop。另外,由于需要从 Consul 获取服务列表,即作为 Consul 的客户端,还需要引入 spring-cloud-starter-consul-discovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>

<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

创建服务接口类,通过 OpenFeign 调用 Provider 提供的服务

1
2
3
4
5
6
7
8
9
10
@FeignClient("provider-service")
public interface ProviderFeignApi {

/**
* 该接口用于测试服务调用方(消费者)的隔离(舱壁)
*/
@GetMapping(value = "/provider/bulkhead/{id}")
String bulkhead(@PathVariable("id") Integer id);

}

创建启动主类,添加 @EnableDiscoveryClient@EnableFeignClients 注解

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

application.yml 文件中配置端口号、注册中心地址、隔离(舱壁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
server:
port: 8003

spring:
application:
name: consumer-service
cloud:
# Consul
consul:
host: 127.0.0.1
port: 8500
# 注册中心
discovery:
service-name: ${spring.application.name}
heartbeat:
enabled: true
# OpenFeign
openfeign:
# 开启断路器和分组激活
circuitbreaker:
enabled: true
group:
enabled: false # 设置为 false,这样新启动的线程和原来的主线程才是脱离的

# Resilience4j
resilience4j:
# 超时处理
timelimiter:
configs:
default:
timeout-duration: 20s # 神坑的位置,timelimiter 默认限制请求处理耗时为 1s,超过 1s 就会认为请求超时,如果配置了降级,就会走降级逻辑
# 固定线程池隔离
thread-pool-bulkhead:
configs:
# 隔离(舱壁)的默认配置
default:
core-thread-pool-size: 1
max-thread-pool-size: 1
queue-capacity: 1
instances:
# 指定特定的服务实例或者方法使用哪个隔离(舱壁)配置,还可以在每个实例下进行自定义配置
provider-service:
baseConfig: default

特别注意

  • 第一点:这里必须配置 OpenFeign 开启断路器,否则 Resilience4j 的隔离功能不会生效,且不要激活分组(这样新启动的线程和原来的主线程才是脱离的)。
  • 第二点:当线程池参数设置为 coreThreadPoolSize=1maxThreadPoolSize=1queueCapacity=1 的时候,线程池最多能处理 2 个任务,也就是 maxThreadPoolSize + queueCapacity,当有第 3 个任务提交就会遭到线程池拒绝。
  • 第三点:这里必须配置 Resilience4j TimeLimiter 的 timeout-duration 的参数,因为 TimeLimiter 默认限制请求处理耗时为 1 秒,超过 1 秒 就会认为请求超时(在默认情况下不会打印超时的异常信息,而是直接走 Fallback 处理逻辑,导致很难发现超时问题)。为了避免影响后面的代码测试,需要将 timeout-duration 的参数值设置大一点。

创建用于测试的 Controller 类,并使用 @Bulkhead 注解来实现隔离(舱壁)的功能,同时通过 type 属性来指定使用基于信号量的隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RestController
public class ConsumerController {

@Autowired
private ProviderFeignApi providerFeignApi;

/**
* 该接口用于测试 Resilience4j 的隔离(舱壁)
* <p> @Bulkhead 注解写在需要限制并发访问的一侧
*/
@GetMapping(value = "/consumer/bulkhead/{id}")
@Bulkhead(name = "provider-service", fallbackMethod = "bulkheadFallback", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture<String> bulkhead(@PathVariable("id") Integer id) {
return CompletableFuture.supplyAsync(() -> providerFeignApi.bulkhead(id));
}

/**
* 服务降级后的兜底处理方法
*/
public CompletableFuture<String> bulkheadFallback(Integer id, Throwable t) {
// 这里是容错处理逻辑,返回备用结果
return CompletableFuture.supplyAsync(() -> "ThreadPoolBulkhead Fallback,系统繁忙,请稍后再试 /(ㄒoㄒ)/~~");
}

}

特别注意

  • 这里 @Bulkhead 注解的 name 属性必须与 application.yml 配置文件中的 instances 匹配对应。
  • 换言之,name 指定的名称必须和 instances 中的某个键匹配,这样才能让指定的服务实例或者方法使用相应的 Bulkhead 配置。
  • FixedThreadPoolBulkhead 只对 CompletableFuture 方法有效,所以开发者必须创建返回值为 CompletableFuture 类型的方法。
测试案例代码
  • (1) 浏览器打开 2 个窗口,分别访问 http://localhost:8003/consumer/bulkhead/9999
  • (2) 由于每个请求调用需要耗时 5 秒,因此 2 个请求就瞬间达到配置过的最大并发访问数量 2
  • (3) 浏览器打开第 3 窗口,访问 http://localhost:8003/consumer/bulkhead/22,可以发现请求直接被舱壁限制隔离了,得到的是 Fallback 处理结果(降级)
  • (4) 等上面的第 1 个和第 2 个窗口请求完成后,再去访问 http://localhost:8003/consumer/bulkhead/22,因为并发访问数量小于 2,所以可以正常调用接口
下载案例代码
  • 完整的案例代码可以从 这里 下载得到。

限流使用

限流的介绍

所谓限流(RateLimiter),就是通过对并发访问 / 请求进行限速,或者对一个时间窗口内的请求进行限速,以保护应用系统,一旦达到限制速率则可以拒绝服务、排队等待、降级处理等。比如电商秒杀业务,瞬时大量请求涌入,服务器忙不过来就只好排队限流了,这和去景点排队买票和去医院办理业务排队等号的道理一样。更多介绍可参考 官方文档

限流的算法

业内常用的限流算法有以下几种:

  • 漏桶算法(Leaky Bucket)
  • 令牌桶算法(Token Bucket)
  • 滑动时间窗口算法(Sliding Time Window)
  • 滚动时间窗口算法(Tumbling Time Window),又叫固定时间窗口算法

提示

SpringCloud 默认使用的限流算法是令牌桶算法。值得一提的是,漏桶算法和令牌桶算法的 Java 实现可参考 这里

漏桶算法

漏桶算法(Leaky Bucket)的核心思想:一个固定容量的漏桶,请求以固定的速率流入漏桶。当请求到达时,如果漏桶未满,则允许请求通过,如果漏桶已满,则拒绝请求。漏桶以恒定的速率漏水,类似医院打吊针,不管源头流量多大,漏桶都按设定匀速流出。值得一提的是,即使系统没有请求,漏桶也会持续漏水。

这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(Burst),另一个是水桶漏洞的大小(Rate)。因为漏桶的漏出速率是固定的参数,所以即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能加快突发流量(Burst)到达出口的速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。

漏桶算法的总结

  • 致命缺点:不能应付突发的流量。
  • 使用场景:适用于需要固定的请求达到速率的场景,比如对网络流量进行限制,确保不会出现突发流量导致系统瘫痪。
  • 实现方式:在代码中维护一个固定容量的漏桶,请求以固定的速率流入漏桶,同时定期漏水,并在每次请求到达时检查漏桶的剩余容量是否足够。

令牌桶算法

令牌桶算法(Token Bucket)的核心思想:一个固定容量的桶,其中以固定的速率产生令牌,每个令牌代表一个允许通过的请求。当请求到来时,需要从桶中获取一个令牌,如果桶中没有足够的令牌,则拒绝请求。

令牌算法的总结

  • 使用场景:适用于需要平滑限流的场景,比如对服务的请求进行限制,但允许短时间内突发请求。
  • 实现方式:在代码中维护一个令牌桶,定期向其中添加令牌,并在每次请求到达时检查是否有足够的令牌可用。

滚动时间窗口算法

滚动时间窗口算法(Tumbling Time Window,又叫固定时间窗口算法)的核心思想:允许固定数量的请求进入,超过数量就拒绝或者排队,等下一个时间段再进入。由于是在一个时间间隔内进行限制,如果用户在上个时间间隔结束前请求(但没有超过限制),同时在当前时间间隔刚开始请求(同样没超过限制),在各自的时间间隔内,这些请求都是正常的。但是,间隔临界的一段时间内的请求就会超过系统限制,可能会导致系统被压垮。

由于滚动时间窗口算法存在时间临界点缺陷,因此在时间临界点左右的极短时间段内系统容易遭到攻击。比如设定 1 分钟最多可以请求 100 次某个接口,如 12:00:00-12:00:59 时间段内没有数据请求,但在 12:00:59-12:01:00 时间段内突然并发 100 次请求,紧接着瞬间跨入下一个计数周期,并且计数器清零;在 12:01:00-12:01:01 内又有 100 次请求。那么也就是说在时间临界点左右可能同时有 2 倍的峰值进行请求,从而造成后台处理请求加倍过载的问题,甚至导致系统崩溃。

滚动时间窗的缺点

  • 限流不够平滑。例如:限流是每秒 3 个,在第一毫秒发送了 3 个请求,触发限流,窗口剩余时间内的请求都将会被拒绝,体验不够好。
  • 无法处理时间窗口边界问题。因为是在某个时间窗口内进行流量控制,所以可能会出现窗口边界效应,即在时间窗口的边界处可能会有大量的请求被允许通过,从而导致突发流量。

滑动时间窗口算法

滑动时间窗口算法(Sliding Time Window)的核心思想:将固定时间进行划分,并且随着时间移动,移动方式为开始时间点变为时间列表中的第二时间点,结束时间点增加一个时间点,不断重复该过程,通过这种方式可以巧妙的避开滚动时间窗口算法的临界点问题。滑动时间窗口算法仍然存在时间片段的概念,同时滑动窗口算法计数的运算相对于滚动时间窗口算法来说比较耗时。

限流算法总结

上面介绍了四种常用的限流算法:漏桶算法、令牌桶算法、滚动时间窗口算法、滑动时间窗口算法,每种算法都有其特点和适用场景。

  • 令牌桶算法:既能平滑流量,又能处理突发流量,适用于需要处理突发流量的场景。
  • 漏桶算法: 优点是流量处理更平滑,缺点是无法应对突发流量,适用于需要平滑流量的场景。
  • 滚动时间窗口算法:实现简单,但是限流不够平滑,存在时间窗口边界问题,适用于需要简单实现限流的场景。
  • 滑动时间窗口算法:解决了时间窗口边界问题,但是还是存在限流不够平滑的问题,适用于需要控制平均请求速率的场景。

限流的原理

Resilience4j 提供了一个限流器,它将从 epoch 开始的所有纳秒划分为多个周期,每个周期的持续时间为 RateLimiterConfig.limitRefreshPeriod。在每个周期开始时,限流器会将活动权限数设置为 RateLimiterConfig.limitForPeriod。期间,对于限流器的调用者来说,它看起来确实是这样的,但是对于 AtomicRateLimiter 的底层实现来说,如果 RateLimiter 未被经常使用,则会在后台进行一些优化,这些优化将跳过此刷新操作。


Resilience4j 限流器的默认实现是 AtomicRateLimiter,它通过原子引用来管理其状态的。这个 AtomicRateLimiter 状态完全不可变,并且具有以下属性:

  • activeCycle:上次调用的周期号。
  • activePermissions:在上次调用结束后,可用的活跃权限数。如果保留了某些权限,则可以为负。
  • nanosToWait:最后一次调用要等待的纳秒数。

另外,还有一个使用信号量的 SemaphoreBasedRateLimiter 限流器和一个调度程序,它将在每个 RateLimiterConfig.limitRefreshPeriod 之后刷新活动权限数。

限流的配置

Resilience4j RateLimiter(限流)的详细配置参数可以查阅 官方文档,其中的核心配置项如下:

限流的使用案例

本节将演示如何使用 OpenFeign + Resilience4j RateLimiter 实现限流,也就是控制一段时间内可以处理请求的最大数量。

创建父级 Pom 工程

在父工程里面配置好工程需要的父级依赖,目的是为了更方便管理与简化配置,具体 Maven 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hutool.version>5.8.22</hutool.version>
<lombok.version>1.18.26</lombok.version>
<spring.boot.version>3.2.0</spring.boot.version>
<spring.boot.test.version>3.1.5</spring.boot.test.version>
<spring.cloud.version>2023.0.0</spring.cloud.version>
</properties>

<dependencyManagement>
<dependencies>
<!--SpringBoot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SpringCloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--HuTool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--SpringBoot Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.boot.test.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
创建 Provider 工程

为了测试 Resilience4j RateLimiter(限流)的使用效果,先创建一个服务提供者。这里创建 Provider 的 Maven 工程,由于需要将服务注册到 Consul,工程下的 pom.xml 文件需要引入 spring-cloud-starter-consul-discovery

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

创建 Provider 的启动主类,添加注解 @EnableDiscoveryClient,将服务注册到 Consul

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableDiscoveryClient
public class ProviderApplication {

public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}

}

application.yml 文件中指定服务名称(provider-service)、注册中心地址与端口号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 8001

spring:
application:
name: provider-service
cloud:
# Consul
consul:
host: 127.0.0.1
port: 8500
# 注册中心
discovery:
service-name: ${spring.application.name}
heartbeat:
enabled: true

创建用于测试的 Controller 类

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class ProviderController {

/**
* 该接口用于测试 Resilience4j 的限流
*/
@GetMapping(value = "/provider/ratelimit/{id}")
public String rateLimit(@PathVariable("id") Integer id) {
return "Hello, ratelimit! inputId : " + id + " \t " + UUID.fastUUID();
}

}
创建 Consumer 工程

若要使用 Resilience4j RateLimiter(限流),则需要在 pom.xml 文件中引入依赖 resilience4j-ratelimiter,由于基于注解的方式配置限流是依赖 AOP 实现的,所以必须引入 spring-boot-starter-aop。另外,由于需要从 Consul 获取服务列表,即作为 Consul 的客户端,还需要引入 spring-cloud-starter-consul-discovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>

<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

创建服务接口类,通过 OpenFeign 调用 Provider 提供的服务

1
2
3
4
5
6
7
8
9
10
@FeignClient("provider-service")
public interface ProviderFeignApi {

/**
* 该接口用于测试 Resilience4j 的限流
*/
@GetMapping(value = "/provider/ratelimit/{id}")
String rateLimit(@PathVariable("id") Integer id);

}

创建启动主类,添加 @EnableDiscoveryClient@EnableFeignClients 注解

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

application.yml 文件中配置端口号、注册中心地址、限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
server:
port: 8003

spring:
application:
name: consumer-service
cloud:
# Consul
consul:
host: 127.0.0.1
port: 8500
# 注册中心
discovery:
service-name: ${spring.application.name}
heartbeat:
enabled: true

# Resilience4j
resilience4j:
## 限流
ratelimiter:
configs:
# 限流的默认配置
default:
limitForPeriod: 2 # 在一次刷新周期内,允许执行的最大请求数
limitRefreshPeriod: 1s # 限流器每隔 limitRefreshPeriod 刷新一次,并将允许处理的最大请求数量重置为 limitForPeriod
timeout-duration: 1 # 线程等待权限的默认等待时间
instances:
# 指定特定的服务实例或者方法使用哪个限流配置,还可以在每个实例下进行自定义配置
provider-service:
baseConfig: default

提示

上面的限流配置,其效果是每 1 秒只允许最多处理 2 个请求,数量超过就后会走 Fallback 处理逻辑(降级)。

创建用于测试的 Controller 类,并使用 @RateLimiter 注解来实现限流的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RestController
public class ConsumerController {

@Autowired
private ProviderFeignApi providerFeignApi;

/**
* 该接口用于测试 Resilience4j 的限流
* <p> @RateLimiter 注解写在需要限流的一侧
*/
@GetMapping(value = "/consumer/ratelimit/{id}")
@RateLimiter(name = "provider-service", fallbackMethod = "rateLimitFallback")
public String rateLimit(@PathVariable("id") Integer id) {
return providerFeignApi.rateLimit(id);
}

/**
* 服务降级后的兜底处理方法
*/
public String rateLimitFallback(Integer id, Throwable t) {
// 这里是容错处理逻辑,返回备用结果
return "RateLimiterFallback,系统繁忙,请稍后再试 /(ㄒoㄒ)/~~";
}

}

特别注意

  • 这里 @Bulkhead 注解的 name 属性必须与 application.yml 配置文件中的 instances 匹配对应。
  • 换言之,name 指定的名称必须和 instances 中的某个键匹配,这样才能让指定的服务实例或者方法使用相应的 RateLimiter 配置。
测试案例代码
  • (1) 浏览器频繁调用 http://localhost:8003/consumer/ratelimit/33 接口
  • (2) 当每秒的请求数超过 2 个后,服务端会返回 RateLimiterFallback,系统繁忙,请稍后再试 /(ㄒoㄒ)/~~ 服务降级提示。
下载案例代码
  • 完整的案例代码可以从 这里 下载得到。