Redisson 分布式锁使用教程

前言

Redisson 简介

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分地利用了 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中的常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。值得一提的是,Redisson 底层采用的是 Netty 框架。支持 Redis 2.8 以上版本,支持 Java 1.6+ 以上版本。

Redisson 对象

Redis 命令和 Redisson 对象匹配列表请阅读 这里

Redisson 基础使用

Spring 整合 Redisson

  • 引入 Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.0</version>
</dependency>
  • 配置 Redisson 客户端的连接信息,包括 Reids 服务器的地址、密码等内容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class RedisssonConfig {

@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() throws IOException {
Config config = new Config();
config.useSingleServer()
// 地址
.setAddress("redis://127.0.0.1:6379")
// 密码
.setPassword("123456");
return Redisson.create(config);
}

}

提示

  • 上述的配置方式同样适用于 SpringBoot 项目。
  • 配置完 Redisson 客户端后,在 Java 业务代码里就可以直接注入 RedissonClient 实例对象来使用 Redisson 提供的各种分布式锁了。

可重入锁 (Reentrant Lock)

基于 Redis 的 Redisson 分布式可重入锁 RLock 实现了 java.util.concurrent.locks.Lock 接口,同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。众所周知,如果负责储存这个分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。另外 Redisson 还为加锁的方法提供了 leaseTime 参数来指定加锁的时间,超过这个时间后锁便会自动解开。

  • 直接获取锁,阻塞等待直至获取到锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

@Test
public void rLock() throws InterruptedException {
// 获取可重入锁
RLock lock = redissonClient.getLock("rLock");

// 阻塞等待,直至获取到锁
lock.lock();
try {
System.out.println("==> success to get locker");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}
}

}

提示

  • RLock.lock() 方法加锁后,默认加的锁的有效期是 30 秒。
  • RLock.lock() 方法加锁后,如果业务耗时超长,Redisson 在业务执行期间会周期性地自动给锁续上新的 30 秒有效期(看门狗机制),不用担心业务执行时间过长,锁自动过期被删掉的问题。
  • RLock.lock() 方法加锁后,只要加锁的业务运行完成,Redisson 就不会再给当前锁续期,即使不手动解锁,锁默认会在 30 秒内自动删除。
  • 直接获取锁,阻塞等待直至获取到锁,且上锁以后 10 秒自动解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

@Test
public void rLock() throws InterruptedException {
// 获取可重入锁
RLock lock = redissonClient.getLock("rLock");

// 阻塞等待,直至获取到锁,且上锁以后10秒自动解锁
lock.lock(10, TimeUnit.SECONDS);
try {
System.out.println("==> success to get locker");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}
}

}

特别注意

  • 调用 RLock.lock(10, TimeUnit.SECONDS) 方法加锁时,设置自动解锁的时间必须大于业务的执行时间。
  • 调用 RLock.lock(10, TimeUnit.SECONDS) 方法加锁时,在锁时间到了以后,即使业务未执行完成,Redisson 也不会给锁续期,也就是看门狗机制此时不会生效。
  • 尝试获取锁,阻塞等待,但不能超过指定的最大等待时间,且上锁以后 10 秒自动解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

@Test
public void rLock() throws InterruptedException {
// 获取可重入锁
RLock lock = redissonClient.getLock("rLock");

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
System.out.println("==> success to get locker");
Thread.sleep(5000);
} finally {
// 解锁
lock.unlock();
}
}
}

}

特别注意

  • 调用 RLock.tryLock(100, 10, TimeUnit.SECONDS) 方法加锁时,设置自动解锁的时间必须大于业务的执行时间。
  • 调用 RLock.tryLock(100, 10, TimeUnit.SECONDS) 方法加锁时,在锁时间到了以后,即使业务未执行完成,Redisson 也不会给锁续期,也就是看门狗机制此时不会生效。

读写锁 (ReadWriteLock)

基于 Redis 的 Redisson 分布式可重入读写锁 RReadWriteLock 实现了 java.util.concurrent.locks.ReadWriteLock 接口,其中读锁和写锁都继承了 RLock 接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

读写锁的特性

  • 读 + 读:相当于无锁,支持并发读
  • 写 + 读:读操作需要等待写操作完成
  • 读 + 写:写操作需要等待读操作完成
  • 写 + 写:互斥,需要等待对方的锁释放
  • 简而言之,只要有写锁存在,则其他操作都必须阻塞等待
  • 单元测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

/**
* 缓存(非线程安全)
*/
public static final Map<String, String> CACHES = new HashMap<>();

/**
* 写入数据
*/
private String writeValue() {
// 获取写锁
RReadWriteLock rwLock = redissonClient.getReadWriteLock("rw-lock");
RLock writeLock = rwLock.writeLock();

String uuid = UUID.randomUUID().toString();
try {
// 加写锁
writeLock.lock();
Thread.sleep(8000);
CACHES.put("uuid", uuid);
System.out.println("==> write uuid : " + uuid);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解写锁
writeLock.unlock();
}
return uuid;
}

/**
* 读取数据
*/
private String readValue() {
// 获取读锁
RReadWriteLock rwLock = redissonClient.getReadWriteLock("rw-lock");
RLock readLock = rwLock.readLock();

String uuid = null;
try {
// 加读锁
readLock.lock();
uuid = CACHES.get("uuid");
System.out.println("==> read uuid : " + uuid);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解读锁
readLock.unlock();
}
return uuid;
}

@Test
public void readWriteLock() throws Exception {
// 写操作
new Thread(this::writeValue).start();

Thread.sleep(500);

// 读操作(会阻塞等待写操作完成才执行)
new Thread(this::readValue).start();

System.in.read();
}

}
  • 单元测试结果
1
2
==> write uuid : 7d611f3a-2437-413d-b1aa-4041decc344e
==> read uuid : 7d611f3a-2437-413d-b1aa-4041decc344e

提示

  • 读锁是一个共享锁,支持并发地执行读操作。
  • 写锁是一个排他锁(互斥锁 / 独占锁),可防止并发地执行写操作。
  • 使用读写锁,可以保证读到的数据永远是最新的;只要写锁没有释放掉,那么拥有读锁的操作就会一直阻塞等待,直至写锁被释放。

闭锁 (CountDownLatch)

基于 Redis 的 Redisson 分布式闭锁 RCountDownLatch 采用了与 java.util.concurrent.CountDownLatch 相似的接口和用法。闭锁适用于等待一个多线程的操作,也就是等待 N 个线程把所有业务执行完毕后,再处理一个业务。关于闭锁的使用场景,可以想象一下公司的门卫如何等所有员工下班后再关门。公司一共有五名员工,门卫需要等这五名员工下班后,才能关闭大门。

闭锁的使用场景

  • 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:
  • (a) 确保某个计算在其需要的所有资源都被初始化之后才继续执行
  • (b) 确保某个服务在其他依赖的所有其他服务都已经启动之后才启动
  • (c) 等待直到某个操作所有参与者都准备就绪再继续执行
  • 单元测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

/**
* 门卫关门
*/
public void lockDoor() {
RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");
// 设置总共有5个员工
countDownLatch.trySetCount(5);

try {
// 门卫等待所有员工下班
countDownLatch.await();
System.out.println("==> 门卫关门成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 员工下班
*/
public void offWork(long num) {
RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");
// 未下班的员工计数减一
countDownLatch.countDown();
System.out.println("==> " + num + " 号员工下班");
}

@Test
public void countDownLatch() throws Exception {
// 模拟门卫关门
new Thread(this::lockDoor).start();

Thread.sleep(1000);

// 模拟5个员工下班
for (int i = 0; i < 5; i++) {
new Thread(() -> {
offWork(Thread.currentThread().getId());
}).start();
}

System.in.read();
}

}
  • 单元测试结果
1
2
3
4
5
6
==> 113 号员工下班
==> 112 号员工下班
==> 114 号员工下班
==> 115 号员工下班
==> 116 号员工下班
==> 门卫关门成功

信号量 (Semaphore)

基于 Redis 的 Redisson 的分布式信号量 RSemaphore 采用了与 java.util.concurrent.Semaphore 相似的接口和用法,同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。关于信号量的使用场景,可以想象一下平时停车场如何停车。一共有十辆车准备停车,停车位有五个,当五个停车位满了后,其他车只能等有车位空出来才能停车。可以把停车位比作信号,现在有五个信号,停一次车,用掉一个信号,车离开就是释放一个信号。值得一提的是,RSemaphore 可用于实现分布式限流。RSemaphore 的原理图如下。

  • 单元测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

@Test
public void semaphore() throws IOException {
// 获取信号量
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");

// 设置许可数量,模拟五个停车位
semaphore.trySetPermits(5);

// 创建10个线程,模拟10辆车过来停车
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
// 占用信号(停车位)
semaphore.acquire();
Thread.sleep(1000);
System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 进入停车场");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放信号(停车位)
semaphore.release();
System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 离开停车场");
}
});
}
System.in.read();
}

}
  • 单元测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
==> 车辆 113 进入停车场
==> 车辆 110 进入停车场
==> 车辆 109 进入停车场
==> 车辆 111 进入停车场
==> 车辆 108 进入停车场
==> 车辆 108 离开停车场
==> 车辆 109 离开停车场
==> 车辆 110 离开停车场
==> 车辆 111 离开停车场
==> 车辆 113 离开停车场
==> 车辆 112 进入停车场
==> 车辆 117 进入停车场
==> 车辆 114 进入停车场
==> 车辆 116 进入停车场
==> 车辆 116 离开停车场
==> 车辆 114 离开停车场
==> 车辆 112 离开停车场
==> 车辆 117 离开停车场
==> 车辆 115 进入停车场
==> 车辆 115 离开停车场

可过期性信号量 (PermitExpirableSemaphore)

基于 Redis 的 Redisson 可过期性信号量 RPermitExpirableSemaphore 是在 RSemaphore 对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的 ID 来辨识,释放时只能通过提交这个 ID 才能释放。它提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。

  • 单元测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

@Test
public void expirableSemaphore() throws IOException {
// 获取可过期性信号量
RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore("expirable-semaphore");

// 设置许可数量,模拟五个停车位
semaphore.trySetPermits(5);

// 创建10个线程,模拟10辆车过来停车
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 信号的 ID 标识
String permitId = null;
try {
// 占用信号量(停车位),有效期只有5秒
permitId = semaphore.acquire(5, TimeUnit.SECONDS);
Thread.sleep(1000);
System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 进入停车场");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放信号量(停车位)
semaphore.release(permitId);
System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 离开停车场");
}
}).start();
}
System.in.read();
}

}
  • 单元测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
==> 车辆 115 进入停车场
==> 车辆 109 进入停车场
==> 车辆 112 进入停车场
==> 车辆 111 进入停车场
==> 车辆 113 进入停车场
==> 车辆 113 离开停车场
==> 车辆 115 离开停车场
==> 车辆 111 离开停车场
==> 车辆 109 离开停车场
==> 车辆 112 离开停车场
==> 车辆 110 进入停车场
==> 车辆 114 进入停车场
==> 车辆 108 进入停车场
==> 车辆 106 进入停车场
==> 车辆 114 离开停车场
==> 车辆 108 离开停车场
==> 车辆 106 离开停车场
==> 车辆 110 离开停车场
==> 车辆 107 进入停车场
==> 车辆 107 离开停车场

SpringBoot 整合 Redisson

引入 Maven 依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.19.0</version>
</dependency>

添加 YML 配置信息

  • 配置 Redis 的连接信息,包括主机地址、端口、密码等信息。
1
2
3
4
5
6
7
spring:
redis:
host: 127.0.0.1
port: 6379
password: 123456
database: 0
timeout: 5000

创建 Redission 配置类

  • 创建 Redission 配置类,用于定义 Redission 的客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedisssonConfig {

@Autowired
private RedisProperties redisProperties;

@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
String password = redisProperties.getPassword();
String url = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");

Config config = new Config();
config.useSingleServer().setAddress(url).setPassword(password);
return Redisson.create(config);
}

}

单元测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@SpringBootTest
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

@Test
public void rLock() throws InterruptedException {
// 获取可重入锁
RLock lock = redissonClient.getLock("rLock");

// 阻塞等待,直至获取到锁
lock.lock();
try {
System.out.println("==> success to get locker");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}
}

}

参考博客