令牌桶和漏桶限流算法的 Java 实现

漏桶限流算法

漏桶限流算法的介绍

  • 漏桶限流算法
    • 原理:漏桶算法维护了一个固定容量的漏桶,请求以固定的速率流入漏桶。当请求到达时,如果漏桶未满,则允许请求通过,如果漏桶已满,则拒绝请求。漏桶以恒定的速率漏水,即使系统没有请求,漏桶也会持续漏水。
    • 使用场景:适用于需要固定的请求达到速率的场景,比如对网络流量进行限制,确保不会出现突发流量导致系统瘫痪。
    • 实现方式:在代码中维护一个固定容量的漏桶,请求以固定的速率流入漏桶,同时定期漏水,并在每次请求到达时检查漏桶的剩余容量是否足够。

漏桶限流算法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketDemo {

private final long capacity; // 漏桶的容量
private final long rate; // 漏水的速率,单位:请求数/秒
private final AtomicLong waterLevel; // 漏桶的当前水位
private final ScheduledExecutorService scheduler; // 任务调度器

public LeakyBucketDemo(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.waterLevel = new AtomicLong(0);
this.scheduler = Executors.newScheduledThreadPool(1);
initScheduler();
}

/**
* 初始化任务调度器
*/
private void initScheduler() {
scheduler.scheduleAtFixedRate(() -> {
long currWaterLevel = waterLevel.get();
// 控制漏桶以恒定的速率漏水,漏水的量被限制在当前水位和漏水速率之间的较小值
long leakAmount = Math.min(currWaterLevel, rate);
waterLevel.set(currWaterLevel - leakAmount);
}, 0, 1, TimeUnit.SECONDS); // 每秒执行一次
}

/**
* 消费请求
*/
public boolean tryConsume() {
long currWaterLevel = waterLevel.get();
// 判断漏桶是否已经满了
if (currWaterLevel < capacity) {
// 将请求流入漏桶
waterLevel.incrementAndGet();
return true;
} else {
return false;
}
}

public static void main(String[] args) throws InterruptedException {
// 初始化漏桶,容量为10,漏水速率为每秒处理5个请求
LeakyBucketDemo leakyBucket = new LeakyBucketDemo(10, 5);

// 模拟多个并发请求
for (int i = 0; i < 20; i++) {
new Thread(() -> {
// 消费请求
if (leakyBucket.tryConsume()) {
System.out.println(Thread.currentThread().getName() + " 请求被接受处理");
} else {
System.out.println(Thread.currentThread().getName() + " 请求被拒绝处理");
}
}, "T-" + i).start();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
T-15 请求被拒绝处理
T-7 请求被接受处理
T-2 请求被接受处理
T-9 请求被接受处理
T-1 请求被接受处理
T-8 请求被接受处理
T-11 请求被拒绝处理
T-12 请求被拒绝处理
T-0 请求被接受处理
T-6 请求被接受处理
T-13 请求被拒绝处理
T-17 请求被拒绝处理
T-5 请求被接受处理
T-3 请求被接受处理
T-18 请求被拒绝处理
T-14 请求被拒绝处理
T-10 请求被拒绝处理
T-4 请求被接受处理
T-19 请求被拒绝处理
T-16 请求被拒绝处理

令牌桶限流算法

令牌桶限流算法的介绍

  • 令牌桶限流算法
    • 原理:令牌桶算法维护一个固定容量的桶,其中以固定的速率产生令牌,每个令牌代表一个允许通过的请求。当请求到来时,需要从桶中获取一个令牌,如果桶中没有足够的令牌,则拒绝请求。
    • 使用场景:适用于需要平滑限流的场景,比如对服务的请求进行限制,但允许短时间内突发请求。
    • 实现方式:在代码中维护一个令牌桶,定期向其中添加令牌,并在每次请求到达时检查是否有足够的令牌可用。

令牌桶限流算法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TokenBucketDemo {

private final long capacity; // 令牌桶的容量
private final long rate; // 令牌的生成速率,单位:令牌数/秒
private final AtomicLong tokens; // 当前的令牌数量
private final ScheduledExecutorService scheduler; // 任务调度器

public TokenBucketDemo(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = new AtomicLong(0);
this.scheduler = Executors.newScheduledThreadPool(1);
initScheduler();
}

/**
* 初始化任务调度器
*/
private void initScheduler() {
scheduler.scheduleAtFixedRate(() -> {
long currTokens = tokens.get();
// 维护一个固定容量的桶,并以固定的速率产生令牌
long newTokens = Math.min(currTokens + rate, capacity);
tokens.set(newTokens);
}, 0, 1, TimeUnit.SECONDS); // 每秒执行一次
}

/**
* 消费令牌
*/
public boolean tryConsume() {
long currTokens = tokens.get();
if (currTokens > 0) {
// CAS 操作
return tokens.compareAndSet(currTokens, currTokens - 1);
} else {
return false;
}
}

public static void main(String[] args) throws InterruptedException {
// 初始化令牌桶,固定容量为10,生成速率为每秒生成5个令牌
TokenBucketDemo tokenBucket = new TokenBucketDemo(10, 5);

// 模拟多个并发请求
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 获取令牌
if (tokenBucket.tryConsume()) {
System.out.println(Thread.currentThread().getName() + " 请求被接受处理");
} else {
System.out.println(Thread.currentThread().getName() + " 请求被拒绝处理");
}
}, "T-" + i).start();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
T-7 请求被拒绝处理
T-1 请求被接受处理
T-4 请求被接受处理
T-9 请求被拒绝处理
T-8 请求被拒绝处理
T-2 请求被接受处理
T-5 请求被拒绝处理
T-6 请求被拒绝处理
T-3 请求被接受处理
T-0 请求被接受处理

其他常用的限流算法

滚动时间窗口算法

滚动时间窗口算法(Tumbling Time Window,又叫固定时间窗口算法)的核心思想:允许固定数量的请求进入,超过数量就拒绝或者排队,等下一个时间段再进入。由于是在一个时间间隔内进行限制,如果用户在上个时间间隔结束前请求(但没有超过限制),同时在当前时间间隔刚开始请求(同样没超过限制),在各自的时间间隔内,这些请求都是正常的。但是,间隔临界的一段时间内的请求就会超过系统限制,可能会导致系统被压垮。

由于滚动时间窗口算法存在时间临界点缺陷,因此在时间临界点左右的极短时间段内系统容易遭到攻击。比如设定 1 分钟最多可以请求 100 次某个接口,如 12:00:00-12:00:59 时间段内没有数据请求,但在 12:00:59-12:01:00 时间段内突然并发 100 次请求,紧接着瞬间跨入下一个计数周期,并且计数器清零;在 12:01:00-12:01:01 内又有 100 次请求。那么也就是说在时间临界点左右可能同时有 2 倍的峰值进行请求,从而造成后台处理请求加倍过载的问题,甚至导致系统崩溃。

滚动时间窗的缺点

  • 限流不够平滑。例如:限流是每秒 3 个,在第一毫秒发送了 3 个请求,触发限流,窗口剩余时间内的请求都将会被拒绝,体验不够好。
  • 无法处理时间窗口边界问题。因为是在某个时间窗口内进行流量控制,所以可能会出现窗口边界效应,即在时间窗口的边界处可能会有大量的请求被允许通过,从而导致突发流量。

滑动时间窗口算法

滑动时间窗口算法(Sliding Time Window)的核心思想:将固定时间进行划分,并且随着时间移动,移动方式为开始时间点变为时间列表中的第二时间点,结束时间点增加一个时间点,不断重复该过程,通过这种方式可以巧妙的避开滚动时间窗口算法的临界点问题。滑动时间窗口算法仍然存在时间片段的概念,同时滑动窗口算法计数的运算相对于滚动时间窗口算法来说比较耗时。

限流算法的总结

上面介绍了四种常用的限流算法:漏桶算法、令牌桶算法、滚动时间窗口算法、滑动时间窗口算法,每种算法都有其特点和适用场景。

  • 令牌桶算法:既能平滑流量,又能处理突发流量,适用于需要处理突发流量的场景。
  • 漏桶算法: 优点是流量处理更平滑,缺点是无法应对突发流量,适用于需要平滑流量的场景。
  • 滚动时间窗口算法:实现简单,但是限流不够平滑,存在时间窗口边界问题,适用于需要简单实现限流的场景。
  • 滑动时间窗口算法:解决了时间窗口边界问题,但是还是存在限流不够平滑的问题,适用于需要控制平均请求速率的场景。

提示

令牌桶算法和漏桶算法也属于基于时间窗口的限流算法。

第三方 Java 限流库

在企业的项目开发中,考虑到代码的稳定性、健壮性、可用性,一般推荐直接使用第三方限流库来实现限流策略。

Bucket4J

  • Bucket4J 是一款基于令牌桶算法的 Java 限流库。

Resilience4j

  • Resilience4j 是一款轻量级容错框架,设计灵感来源于 Netflix 的 Hystrix 框架,专为函数式编程所设计。

参考资料