RabbitMQ 开发随笔

消息处理

使用自定义消息转换器

业务之间大多数数据都是以 JSON 的数据格式进行传输的,即生产者服务将 JSON 类型的数据发送到对应的队列,而消费端从队列中接收到的数据类型也是 JSON 类型。为了方便将 POJO 对象转为 JSON 类型的数据来传输,可以使用 Spring 内置的 Jackson2JsonMessageConverter 消息转换器,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

/**
* RabbitMQ的管理对象
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}

/**
* RabbitMq的消息转换器
*
* @return
*/
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
return messageConverter;
}

/**
* RabbitMq的模版
*
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置发送消息时所用的消息转换器
template.setMessageConverter(messageConverter);
return template;
}

/**
* RabbitMq的监听容器工厂
*
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置线程数
factory.setConcurrentConsumers(3);
// 最大线程数
factory.setMaxConcurrentConsumers(10);
// 设置接收消息时所用的消息转换器
factory.setMessageConverter(messageConverter);
return factory;
}

}

消息队列任务的平滑关闭

客户端连接

客户端 SSL 连接

RabbitMQ 客户端使用 SSL 证书连接 RabbitMQ 服务器的 Java 示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeoutException;

public class SslConnectionDemo {

public static void main(String[] args) throws TimeoutException {
// 证书路径
String classpath = SslReceiver.class.getResource("/").getPath();
// 证书密码
char[] sslPwd1 = "password1".toCharArray();
char[] sslPwd2 = "password2".toCharArray();
// 读取client密钥和rabbitStore证书
try (InputStream sslCardStream = new FileInputStream(classpath + "ssl/keycert.p12");
InputStream rabbitStoreStream = new FileInputStream(classpath + "ssl/rabbitStore")) {

// 加载秘钥
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(sslCardStream, sslPwd1);
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
keyManagerFactory.init(ks, sslPwd1);

// 读取授权证书,只含有服务端的公钥
KeyStore jks = KeyStore.getInstance("JKS");
jks.load(rabbitStoreStream, sslPwd2);
TrustManagerFactory keyStoreManager = TrustManagerFactory.getInstance("SunX509");
keyStoreManager.init(jks);
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(keyManagerFactory.getKeyManagers(), keyStoreManager.getTrustManagers(), null);
ConnectionFactory factory = new ConnectionFactory();
// RabbitMQ的登录账号与密码
factory.setUsername("yourUserName");
factory.setPassword("yourPassword");
// RabbitMQ的主机地址
factory.setHost("127.0.0.1");
// RabbitMQ的SSL端口
factory.setPort(5671);
factory.setAutomaticRecoveryEnabled(true);

// 设置sslContext
factory.useSslProtocol(context);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明RabbitMQ的队列
channel.queueDeclare("rabbitmq-queue", false, true, true, null);
channel.basicPublish("", "rabbitmq-queue", null, "Test,Test".getBytes());
GetResponse chResponse = channel.basicGet("rabbitmq-queue", false);
if (chResponse == null) {
System.out.println("No message retrieved");
} else {
byte[] body = chResponse.getBody();
System.out.println("Recieved: " + new String(body));
}
channel.close();
connection.close();
} catch (KeyStoreException | UnrecoverableKeyException | KeyManagementException | CertificateException | NoSuchAlgorithmException | IOException e) {
e.printStackTrace();
}
}

}

生产落地实践

如何保证消息的顺序性

关键点

  • 不同 MQ 对消息顺序性的支持

    • 在 Kafka 和 RabbitMQ 中,无法保证消息的顺序性,需要开发者自己实现。
    • 在 RocketMQ 中,有完整的针对性设计(原生支持消息的顺序性),可以保证消息的顺序性。
  • 全局有序和局部有序

    • MQ 通常只需要保证消息在局部有序,而不需要保证消息在全局有序。
    • 比如,在局部层面上,某个订单的多个消息必须有序的,但在全局层面上,不同订单的消息是不需要保证有序的。这类似微信的聊天窗口,在单个聊天窗口内,多个消息必须是有序的,但在多个聊天窗口中消息不一定是要有序的。
  • 生产实践案例分析

    • 大数据团队需要开发一个 MySQL Binlog 同步系统,要求同步一个 MySQL 库的数据过来,对公司业务系统的数据做各种复杂的分析。那么在 MySQL 里增删改一条数据,对应出来了增删改 3 条 Binlog,接着这三条 Binlog 发送到 MQ 里面。当消费者将消息读取出来依次执行时,这就要保证消息是有序,不然本来依次是增加、修改、删除;搞错顺序后,给执行成删除、修改、增加,这就全乱套了。因为本来这条数据同步过来,最后这条数据应该被删除了;结果搞错了这个顺序,最后这条数据保留下来了,这就造成数据同步出错了。

解决方案

  • 要保证一个生产者只对应一个交换机(Exchange),一个交换机只对应一个队列,并且一个队列只对应一个消费者。

如何避免消息丢失的问题

这问题相当于 “如何保证消息的可靠性” 或者 “如何保证消息可靠传输”。消息丢失的问题,可能会出现在生产者、消息队列服务器、消费者中的任一环节。

生产者发送消息丢失

生产者发送消息后,由于网络故障或网络延迟,可能会导致消息在传输过程中丢失。

  • 解决方案

    • (1) 使用 RabbitMQ 提供的事务机制

      • channel.txSelect() 开启事务
      • channel.txCommit() 提交事务
      • channel.txRollback() 回滚事务
      • 生产者在发送数据之前开启事务,然后再发送消息;如果消息没有成功被 RabbitMQ 接收到,那么生产者会出现异常报错,此时就可以回滚事务,然后重试发送消息;如果消息成功被 RabbitMQ 接收到,那么就可以提交事务。
      • 特别注意,事务机制是同步的,生产者发送消息后会同步阻塞,也就是会一直等待直到 RabbitMQ 返回响应,这往往会导致 MQ 的吞吐量大大下降。
    • (2) 使用 RabbitMQ 提供的生产者确认机制(Confirm)

      • 在生产者那里设置开启 Confirm 机制之后,每次写消息的时候都会分配一个唯一的 ID,然后将消息写入了 RabbitMQ 后,RabbitMQ 会回调你提供的一个 ACK 回调通知函数,告诉你这个消息成功被接收到了。如果 RabbitMQ 没能处理这个消息,会回调你提供的一个 NACK 回调通知函数,告诉你这个消息接收失败,此时你可以重试发送消息。另外,你还可以结合这个确认机制(Confirm),自己在内存里维护每个消息 ID 的状态,如果超过一定时间还没接收到这个消息的回调通知,那么你就可以重新发送消息。
      • RabbitMQ 的事务机制和 Cnofirm 机制最大的不同在于,事务机制是同步的,生产者提交一个事务之后就会阻塞在那里;但是 Confirm 机制是异步的,生产者发送个消息之后就可以继续发送下一个消息,然后那个消息被 RabbitMQ 成功接收到之后,会异步回调你提供的一个回调通知函数来通知你这个消息被成功接收到了。所以,如果希望在生产者这边避免数据丢失的话,一般都是用 Confirm 机制,因为它是异步非阻塞的。

MQ 服务器消息同步丢失

在 MQ 集群中,多个节点之间同步消息时,可能会发生消息丢失。

  • 问题发生
    • 在普通集群中,消息是分散存储在不同的节点上,节点之间不会主动进行消息同步,当存储消息的节点宕机了,这可能会丢失消息
  • 解决方案
    • 使用镜像集群,因为镜像集群会在节点之间主动进行消息同步,这样消息的可靠性可以得到提高

MQ 服务器消息存盘丢失

MQ 服务器将内存中的数据持久化到硬盘时,可能会发生消息丢失(比如 MQ 在持久化之前意外宕机)。

  • 解决方案
    • (1) 使用持久化队列
      • 当使用 RabbitMQ 的持久化队列后,消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 宕机了,恢复之后会自动读取之前存储的数据,这样数据一般就不会丢失。除非极其罕见的是,RabbitMQ 还没来得及持久化,自己就宕机了,这可能导致少量数据会丢失,但是这个概率比较小。
      • 使用持久化队列有两个设置步骤,第一个是在创建队列时将其设置为持久化的,这样就可以保证 RabbitMQ 会持久化队列的元数据,但是这并不会持久化队列里的数据;第二个是发送消息的时候将消息的 deliveryMode 设置为 2,表示将消息设置为持久化,这样 RabbitMQ 就会将消息持久化到磁盘。特别注意,必须执行这两个持久化步骤才行,缺一不可,这样哪怕 RabbitMQ 宕机了,再次重启后,也可以从磁盘上恢复队列里的数据。
      • 持久化队列可以跟生产者那边的确认机制(Confirm)配合起来使用,只有消息被持久化到磁盘之后,RabbitMQ 才会给生产者回传 ACK 消息。所以,哪怕是在消息持久化到磁盘之前,RabbitMQ 宕机了,数据丢失了,生产者接收不到消息被成功接收的 ACK 消息,生产者还可以重新发送消息。
    • (2) 使用 Quorum 队列(仲裁队列)
      • Quorum 队列采用 Raft 一致性协议进行消息持久化,从 3.8.0 版本开始才支持。

消费者消费消息丢失

消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。

  • 问题发生

    • 消费者之所以会丢失消息,主要是开启了 RabbitMQ 的自动 ACK 功能。一旦使用了自动 ACK 功能,在消费者消费的时候,刚拉取消息,还没来得及处理完业务逻辑,消费者就自动发送 ACK 消息,此时应用进程突然宕机了(比如重启),那么 RabbitMQ 就会认为消息已经被消费,这样消息就丢失了。
  • 解决方案

    • 关闭自动 ACK,使用手动 ACK
      • 关闭 RabbitMQ 自动 ACK 功能,调用一个 API 就可以关闭;然后每次你确保执行完业务处理后,手动执行 ACK 操作。这样的话,如果应用还没来得及处理完业务就宕机了,自然就不会再发送 ACK 消息;那么 RabbitMQ 就会认为你还没处理完业务,这个时候 RabbitMQ 会把这个消息重新分配给别的消费者去处理,消息是不会丢的。

如何保证消息队列的高可用性

RabbitMQ 是基于主从实现高可用性的,它有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

  • 概述
    • 单机模式是 RabbitMQ 最简单的部署方式,即在单台机器上运行一个 RabbitMQ 实例。所有的队列和消息都存放在这台机器上,适合开发、测试等应用场景。单机模式没有集群的冗余和容错特性,因此在生产环境中不推荐使用。
  • 特点
    • 配置简单,部署快速。
    • 没有容错能力,服务器宕机会导致消息服务不可用。
  • 适用场景
    • 适合开发、测试环境,或对消息队列要求不高的小型项目。

普通集群模式

  • 概述
    • 普通集群模式是由多个 RabbitMQ 实例组成的一个集群。集群中的每个节点都会维护队列的元数据,但队列中的消息只存储在一个节点上,其他节点只存储该队列的元数据。这意味着,如果队列所在的节点故障,那么该队列的消息将会丢失,无法自动恢复。
    • 大概意思就是在多台机器上部署多个 RabbitMQ 实例,你创建的队列只会放在一个 RabbitMQ 实例上,不过每个实例都会同步队列的元数据(元数据可以认为是队列的一些配置信息,通过元数据可以找到队列所在的实例),但队列中消息本身不复制到其他节点。当客户端消费的时候,如果队列就在自己所连接的 RabbitMQ 实例中,那么就可以直接消费消息;如果队列不在自己所连接的 RabbitMQ 实例中,那么就要从另一个 RabbitMQ 实例的队列中拉取消息过来再消费。
    • 这种集群模式没做到真正的分布式,就是一个普通的集群。因为这导致消费者要么每次随机连接一个 RabbitMQ 实例,然后可能需要从另一个 RabbitMQ 实例拉取数据;要么固定连接那个队列所在的 RabbitMQ 实例,然后直接消费数据;前者有数据拉取的开销,后者有单实例性能瓶颈。
    • 另外,如果那个存放队列的 RabbitMQ 实例宕机了,会导致其他实例无法从那个宕机实例拉取数据。如果 RabbitMQ 开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,但是得等那个宕机实例恢复了,然后才可以继续从那个实例的队列中拉取数据。
    • 简而言之,RabbitMQ 普通集群模式没有什么所谓的高可用性可言,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个队列的读写操作。
  • 特点
    • 集群中的节点共享队列元数据,但消息本身不复制到其他节点。
    • 扩展性较好,能够增加集群节点来支持更大的负载。
    • 存在单点故障风险,当某个队列所在节点发生故障后,队列中的消息将不可用。
  • 适用场景
    • 适合需要一定扩展能力,可以接受一定数据丢失(不强求高可用)的场景。

镜像集群模式

  • 概述
    • 镜像集群模式是在普通集群模式的基础上增加了消息冗余的能力。通过设置队列为镜像队列(Mirrored Queue),可以将队列的数据同步到多个节点上。在 RabbitMQ 的镜像集群模式中,每个镜像队列都有一个主节点和多个备份节点,当主节点不可用时,其他备份节点可以自动接管服务,从而提高了高可用性。
    • 这种集群模式,才是所谓的 RabbitMQ 高可用模式,跟普通集群模式不一样的是,你创建的队列,无论是队列的元数据还是队列里的消息都会存在于多个 RabbitMQ 实例上。当你每次写消息到队列的时候,RabbitMQ 都会自动将消息同步到多个实例的队列里。
    • 这样的好处在于任何一个 RabbitMQ 实例宕机了,其他的实例都可以继续使用。坏处在于,第一,这会导致性能开销很大,消息需要同步到其他机器,导致网络带宽压力和消耗很重;第二,这样就没有扩展性可言了,如果某个队列负载很重,你想增加 RabbitMQ 实例,但新增的实例也包含了这个队列的所有数据,也就是没有办法线性扩展这个队列。思考一下,如果这个队列的数据量很大,大到在这台机器上无法容纳了,此时该怎么办呢?
    • 那么怎么开启镜像集群模式呢?其实很简单,RabbitMQ 有很好用的管理控制台,就是在控制台新增一个策略,这个策略是镜像集群模式的策略,新增的时候可以要求将消息同步到所有节点,也可以要求将消息同步到指定数量的节点。最后,在你再次创建的队列时候,应用这个策略就可以自动将队列里的消息同步到其他的节点上了。
  • 特点
    • 队列数据会复制到多个节点,具备高可用性,适合生产环境。
    • 节点故障时可以自动切换,确保消息的持久性和可用性。
    • 消耗更多的资源,增加了系统的 I/O 负担,因此性能会受到一定影响。
  • 适用场景
    • 适合对高可用性要求高、数据不允许丢失、需要容灾的场景。

如何解决消息大量积压的问题

消息积压的概述

消息积压指的是消息在消息队列中堆积而未能及时处理的情况。消息的积压主要来自于两方面:要么消息生产变快了,要么消息消费变慢了。

  • 监控发现,生产和消费消息的速度没什么变化,出现消息积压的情况,检查是有消费失败反复消费的情况。
  • 监控发现,消费消息的速度变慢,检查消费实例,日志中是否有大量消费错误、消费线程是否死锁、是否卡在某些资源上。
  • 单位时间内发送的消息增多,比如赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,但可以通过扩容消费端的实例数来提升总体的消费能力
  • 如果短时间内没有服务器资源扩容,可以将系统降级,通过关闭某些不重要的业务,减少消息发送的数据量,最低限度让系统还能正常运转,保证核心业务的可用性
  • 严重影响 MQ 甚至整个系统时,可以考虑临时启用多个消费者,并发接收消息,同时持久化消息(比如写入数据库),过段时间再将持久化的消息重新写回 MQ 中进行消费,或者极端情况下直接丢弃消息

消息积压的解决方案

可以使用扩容来解决消息积压的问题,比如利用临时消费者,消费原来积压在队列中的消息。该消费者不做任何耗时的操作,将消息均匀写入新创建的队列里,最后将更多 Consumer 部署到更多的机器上消费新创建队列上的消息。等待积压的消息被消费,恢复到正常状态后,撤掉扩容服务器。具体步骤和思路如下:

  • (1) 先修复 Consumer 的问题,确保其恢复正常的消费速度,然后将现有的 Consumer 都停掉
  • (2) 临时建立好原先 10 倍或者 20 倍的 Queue 数量
  • (3) 写一个临时的分发消息的 Consumer 程序,将这个程序部署上去消费积压的消息,消费之后不做任何耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue
  • (4) 接着临时征用 10 倍机器来部署 Consumer 实例,每一个(或者一批) Consumer 消费一个临时 Queue 的数据
  • (5) 等积压的数据都被消费完之后,恢复原先的部署架构,重新用原先的 Consumer 机器来消费消息

这种做法相当于临时将 Queue 资源和 Consumer 资源扩大了 10 倍,即以正常的 10 倍速度消费积压的消息,如下图所示:

如何解决消息队列的过期失效问题

假设用的是 RabbitMQ,由于 RabbitMQ 是可以设置过期时间的(TTL),如果消息在 Queue 中积压超过一定的时间,就会被 RabbitMQ 清理掉。这个时候就不会有消息被大量积压的问题,而是会有大量的消息丢失了。这种情况下,就不是说要增加 Consumer 消费积压的消息了,因为实际上消息是没有积压的,而是丢了大量的消息。

可以采取的一个解决方案就是 “批量重导”。当大量的消息积压的时候,由于设置了过期时间,RabbitMQ 会直接丢弃数据,然后等业务高峰期过了之后,例如在晚上 12 点以后,写个临时程序将丢失的那批数据查询出来,然后重新将消息写入 RabbitMQ 里,即把白天丢的消息全部补回来。假设 10000 个订单积压在 RabbitMQ 里面,没有来得及处理掉,其中 2000 个订单都丢了,那么只能手动写个临时程序把那 2000 个订单查询出来,然后手动发送消息到 RabbitMQ 中重新进行消费。

消息队列的磁盘满了应该怎么处理

消息积压在 MQ 里,那么如果很长时间都没有处理掉,此时导致 MQ 都快将磁盘写满了,那应该怎么办?这个时候可以写一个临时程序,启用多个消费者,并发消费消息,同时将消息持久化(比如写入数据库),即快速消费掉 MQ 中积压的消息。到凌晨的时候,将持久化的消息重新写回 MQ 中进行消费;如果希望加快已持久化消息的消费速度,可以引入上述的消息积压扩容解决方案