前言
Redisson 简介
Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分地利用了 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中的常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。值得一提的是,Redisson 底层采用的是 Netty 框架。支持 Redis 2.8 以上版本,支持 Java 1.6+ 以上版本。
Redisson 对象
Redis 命令和 Redisson 对象匹配列表请阅读 这里。
Redisson 基础使用
Spring 整合 Redisson
1 2 3 4 5
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.19.0</version> </dependency>
|
- 配置 Redisson 客户端的连接信息,包括 Reids 服务器的地址、密码等内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration public class RedisssonConfig { @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() throws IOException { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("123456"); return Redisson.create(config); } }
|
提示
- 上述的配置方式同样适用于 SpringBoot 项目。
- 配置完 Redisson 客户端后,在 Java 业务代码里就可以直接注入
RedissonClient
实例对象来使用 Redisson 提供的各种分布式锁了。
可重入锁 (Reentrant Lock)
基于 Redis 的 Redisson 分布式可重入锁 RLock
实现了 java.util.concurrent.locks.Lock
接口,同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。众所周知,如果负责储存这个分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是 30
秒钟,也可以通过修改 Config.lockWatchdogTimeout
来另行指定。另外 Redisson 还为加锁的方法提供了 leaseTime
参数来指定加锁的时间,超过这个时间后锁便会自动解开。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient;
@Test public void rLock() throws InterruptedException { RLock lock = redissonClient.getLock("rLock"); lock.lock(); try { System.out.println("==> success to get locker"); Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }
}
|
提示
RLock.lock()
方法加锁后,默认加的锁的有效期是 30 秒。RLock.lock()
方法加锁后,如果业务耗时超长,Redisson 在业务执行期间会周期性地自动给锁续上新的 30 秒有效期(看门狗机制),不用担心业务执行时间过长,锁自动过期被删掉的问题。RLock.lock()
方法加锁后,只要加锁的业务运行完成,Redisson 就不会再给当前锁续期,即使不手动解锁,锁默认会在 30 秒内自动删除。
- 直接获取锁,阻塞等待直至获取到锁,且上锁以后 10 秒自动解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient; @Test public void rLock() throws InterruptedException { RLock lock = redissonClient.getLock("rLock"); lock.lock(10, TimeUnit.SECONDS); try { System.out.println("==> success to get locker"); Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }
}
|
特别注意
- 调用
RLock.lock(10, TimeUnit.SECONDS)
方法加锁时,设置自动解锁的时间必须大于业务的执行时间。 - 调用
RLock.lock(10, TimeUnit.SECONDS)
方法加锁时,在锁时间到了以后,即使业务未执行完成,Redisson 也不会给锁续期,也就是看门狗机制此时不会生效。
- 尝试获取锁,阻塞等待,但不能超过指定的最大等待时间,且上锁以后 10 秒自动解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient; @Test public void rLock() throws InterruptedException { RLock lock = redissonClient.getLock("rLock");
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { System.out.println("==> success to get locker"); Thread.sleep(5000); } finally { lock.unlock(); } } }
}
|
特别注意
- 调用
RLock.tryLock(100, 10, TimeUnit.SECONDS)
方法加锁时,设置自动解锁的时间必须大于业务的执行时间。 - 调用
RLock.tryLock(100, 10, TimeUnit.SECONDS)
方法加锁时,在锁时间到了以后,即使业务未执行完成,Redisson 也不会给锁续期,也就是看门狗机制此时不会生效。
读写锁 (ReadWriteLock)
基于 Redis 的 Redisson 分布式可重入读写锁 RReadWriteLock
实现了 java.util.concurrent.locks.ReadWriteLock
接口,其中读锁和写锁都继承了 RLock
接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
读写锁的特性
读 + 读
:相当于无锁,支持并发读写 + 读
:读操作需要等待写操作完成读 + 写
:写操作需要等待读操作完成写 + 写
:互斥,需要等待对方的锁释放- 简而言之,只要有写锁存在,则其他操作都必须阻塞等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient;
public static final Map<String, String> CACHES = new HashMap<>();
private String writeValue() { RReadWriteLock rwLock = redissonClient.getReadWriteLock("rw-lock"); RLock writeLock = rwLock.writeLock(); String uuid = UUID.randomUUID().toString(); try { writeLock.lock(); Thread.sleep(8000); CACHES.put("uuid", uuid); System.out.println("==> write uuid : " + uuid); } catch (Exception e) { e.printStackTrace(); } finally { writeLock.unlock(); } return uuid; }
private String readValue() { RReadWriteLock rwLock = redissonClient.getReadWriteLock("rw-lock"); RLock readLock = rwLock.readLock(); String uuid = null; try { readLock.lock(); uuid = CACHES.get("uuid"); System.out.println("==> read uuid : " + uuid); } catch (Exception e) { e.printStackTrace(); } finally { readLock.unlock(); } return uuid; } @Test public void readWriteLock() throws Exception { new Thread(this::writeValue).start(); Thread.sleep(500); new Thread(this::readValue).start(); System.in.read(); } }
|
1 2
| ==> write uuid : 7d611f3a-2437-413d-b1aa-4041decc344e ==> read uuid : 7d611f3a-2437-413d-b1aa-4041decc344e
|
提示
- 读锁是一个共享锁,支持并发地执行读操作。
- 写锁是一个排他锁(互斥锁 / 独占锁),可防止并发地执行写操作。
- 使用读写锁,可以保证读到的数据永远是最新的;只要写锁没有释放掉,那么拥有读锁的操作就会一直阻塞等待,直至写锁被释放。
闭锁 (CountDownLatch)
基于 Redis 的 Redisson 分布式闭锁 RCountDownLatch
采用了与 java.util.concurrent.CountDownLatch
相似的接口和用法。闭锁适用于等待一个多线程的操作,也就是等待 N 个线程把所有业务执行完毕后,再处理一个业务。关于闭锁的使用场景,可以想象一下公司的门卫如何等所有员工下班后再关门。公司一共有五名员工,门卫需要等这五名员工下班后,才能关闭大门。
闭锁的使用场景
- 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:
- (a) 确保某个计算在其需要的所有资源都被初始化之后才继续执行
- (b) 确保某个服务在其他依赖的所有其他服务都已经启动之后才启动
- (c) 等待直到某个操作所有参与者都准备就绪再继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient;
public void lockDoor() { RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch"); countDownLatch.trySetCount(5); try { countDownLatch.await(); System.out.println("==> 门卫关门成功"); } catch (InterruptedException e) { e.printStackTrace(); } }
public void offWork(long num) { RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch"); countDownLatch.countDown(); System.out.println("==> " + num + " 号员工下班"); } @Test public void countDownLatch() throws Exception { new Thread(this::lockDoor).start(); Thread.sleep(1000); for (int i = 0; i < 5; i++) { new Thread(() -> { offWork(Thread.currentThread().getId()); }).start(); } System.in.read(); } }
|
1 2 3 4 5 6
| ==> 113 号员工下班 ==> 112 号员工下班 ==> 114 号员工下班 ==> 115 号员工下班 ==> 116 号员工下班 ==> 门卫关门成功
|
信号量 (Semaphore)
基于 Redis 的 Redisson 的分布式信号量 RSemaphore
采用了与 java.util.concurrent.Semaphore
相似的接口和用法,同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。关于信号量的使用场景,可以想象一下平时停车场如何停车。一共有十辆车准备停车,停车位有五个,当五个停车位满了后,其他车只能等有车位空出来才能停车。可以把停车位比作信号,现在有五个信号,停一次车,用掉一个信号,车离开就是释放一个信号。值得一提的是,RSemaphore 可用于实现分布式限流。RSemaphore 的原理图如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient; @Test public void semaphore() throws IOException { RSemaphore semaphore = redissonClient.getSemaphore("semaphore"); semaphore.trySetPermits(5); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.submit(() -> { try { semaphore.acquire(); Thread.sleep(1000); System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 进入停车场"); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 离开停车场"); } }); } System.in.read(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ==> 车辆 113 进入停车场 ==> 车辆 110 进入停车场 ==> 车辆 109 进入停车场 ==> 车辆 111 进入停车场 ==> 车辆 108 进入停车场 ==> 车辆 108 离开停车场 ==> 车辆 109 离开停车场 ==> 车辆 110 离开停车场 ==> 车辆 111 离开停车场 ==> 车辆 113 离开停车场 ==> 车辆 112 进入停车场 ==> 车辆 117 进入停车场 ==> 车辆 114 进入停车场 ==> 车辆 116 进入停车场 ==> 车辆 116 离开停车场 ==> 车辆 114 离开停车场 ==> 车辆 112 离开停车场 ==> 车辆 117 离开停车场 ==> 车辆 115 进入停车场 ==> 车辆 115 离开停车场
|
可过期性信号量 (PermitExpirableSemaphore)
基于 Redis 的 Redisson 可过期性信号量 RPermitExpirableSemaphore
是在 RSemaphore
对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的 ID 来辨识,释放时只能通过提交这个 ID 才能释放。它提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient; @Test public void expirableSemaphore() throws IOException { RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore("expirable-semaphore"); semaphore.trySetPermits(5); for (int i = 0; i < 10; i++) { new Thread(() -> { String permitId = null; try { permitId = semaphore.acquire(5, TimeUnit.SECONDS); Thread.sleep(1000); System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 进入停车场"); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(permitId); System.out.println("==> 车辆 " + Thread.currentThread().getId() + " 离开停车场"); } }).start(); } System.in.read(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ==> 车辆 115 进入停车场 ==> 车辆 109 进入停车场 ==> 车辆 112 进入停车场 ==> 车辆 111 进入停车场 ==> 车辆 113 进入停车场 ==> 车辆 113 离开停车场 ==> 车辆 115 离开停车场 ==> 车辆 111 离开停车场 ==> 车辆 109 离开停车场 ==> 车辆 112 离开停车场 ==> 车辆 110 进入停车场 ==> 车辆 114 进入停车场 ==> 车辆 108 进入停车场 ==> 车辆 106 进入停车场 ==> 车辆 114 离开停车场 ==> 车辆 108 离开停车场 ==> 车辆 106 离开停车场 ==> 车辆 110 离开停车场 ==> 车辆 107 进入停车场 ==> 车辆 107 离开停车场
|
SpringBoot 整合 Redisson
引入 Maven 依赖
1 2 3 4 5
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.19.0</version> </dependency>
|
添加 YML 配置信息
- 配置 Redis 的连接信息,包括主机地址、端口、密码等信息。
1 2 3 4 5 6 7
| spring: redis: host: 127.0.0.1 port: 6379 password: 123456 database: 0 timeout: 5000
|
创建 Redission 配置类
- 创建 Redission 配置类,用于定义 Redission 的客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RedisssonConfig { @Autowired private RedisProperties redisProperties; @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { String password = redisProperties.getPassword(); String url = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + ""); Config config = new Config(); config.useSingleServer().setAddress(url).setPassword(password); return Redisson.create(config); } }
|
单元测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @SpringBootTest public class RedissonTest { @Autowired private RedissonClient redissonClient;
@Test public void rLock() throws InterruptedException { RLock lock = redissonClient.getLock("rLock"); lock.lock(); try { System.out.println("==> success to get locker"); Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }
}
|
参考博客