大纲 前言 学习资源 版本说明 本文所有案例代码使用的各软件版本如下表所示:
组件 版本 说明 RabbitMQ Server 3.8.26
RabbitMQ Client 5.10.0
Erlang 24.2
Java 11
RabbitMQ 消息持久化 概念介绍 RabbitMQ 的手动确认机制可以保证消费者在处理消息时不会出现丢失消息的情况,但是如何保障当 RabbitMQ 服务器宕机,之前消息生产者发送过来的消息不丢失呢? 默认情况下 RabbitMQ 退出或由于某种原因宕机时,它会忽视队列和消息,除非告知它不要这样做。 RabbitMQ 确保消息不会丢失需要做两件事,分别是需要将队列和消息都标记为持久化,后者只需要由生产者来完成即可。 特别注意:即使同时将队列和消息都标记为持久化,RabbitMQ 仍然无法完全保证消息不丢失,需要配合其他策略才能实现。 队列实现持久化 当创建的队列是非持久化的,如果 RabbitMQ 服务器重启,该队列就会被删除掉。如果要队列实现持久化,需要在消息生产者 / 消费者声明队列的时候把 durable
参数设置为 true
。
1 2 3 4 5 6 7 8 9 10 11 boolean durable = true ;channel.queueDeclare(QUEUE_NAME, durable, false , false , null );
特别注意
如果之前声明的队列不是持久化的,则必须将原来的队列先删除,或者创建一个新的持久化队列,不然 RabbitMQ 的客户端会出现错误,如图所示 。
下图是在 RabbitMQ 控制台中非持久化队列与持久化队列的 UI 显示区别,持久化队列的 Feature
属性为 D
,这个时候即使重启 RabbitMQ 服务器,队列也依然存在。
消息实现持久化 若希望让 RabbitMQ 消息实现持久化,除了需要声明持久化队列之外,还需要将消息标记为持久化,也就是需要在消息生产者发送消息时,添加 MessageProperties.PERSISTENT_TEXT_PLAIN
这个属性。
1 2 3 4 5 6 7 8 9 10 AMQP.BasicProperties msgProperties = MessageProperties.PERSISTENT_TEXT_PLAIN; channel.basicPublish("" , QUEUE_NAME, msgProperties, "hello" .getBytes(StandardCharsets.UTF_8));
特别注意
在使用 RabbitMQ 的时候,即使同时将队列和消息都标记为持久化,仍然无法完全保证消息不丢失。虽然 RabbitMQ 会将持久化消息写入磁盘,但在消息正式落盘之前,仍有丢失的可能。例如,当消息刚进入缓存,尚未完全写入磁盘时,如果 RabbitMQ 服务器突然宕机,该消息可能无法恢复导致丢失。因此,RabbitMQ 的持久化机制并不能提供绝对可靠的保障,但对于一些简单的任务队列应用而言,已足够满足需求。如果需要更高等级的持久化方案,可以参考后续的文章内容。
案例代码 本节将演示如何将队列和消息标记为持久化,以此避免 RabbitMQ 服务器重启后导致消息丢失的问题。
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-06
。
1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.10.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;public class MQProducer { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { boolean durable = true ; channel.queueDeclare(QUEUE_NAME, durable, false , false , null ); AMQP.BasicProperties msgProperties = MessageProperties.PERSISTENT_TEXT_PLAIN; channel.basicPublish("" , QUEUE_NAME, msgProperties, "hello" .getBytes(StandardCharsets.UTF_8)); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true ; channel.queueDeclare(QUEUE_NAME, durable, false , false , null ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(60000 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); connection.close(); } }
代码测试
(3) 重启 RabbitMQ 服务器,等重启完成后,如果在 RabbitMQ 的控制台,仍然可以看到有未被消费(Ready)或者正在消费(Unacked)的消息,则说明队列和消息的持久化都生效了,如下图所示:
(4) 最终消息会被消费者消费(注意:这里可能会出现重复消费的现象),消费者的控制台输出结果如下: 1 2 按回车键退出程序: Successed to consume message : hello
RabbitMQ 不公平分发 概念介绍 RabbitMQ 默认采用轮询机制(Round-Robin)进行消息分发 —— 轮询分发,即在多个消费者之间按固定顺序逐一分配消息。这种机制在某些业务场景中可能产生效率问题,例如:当存在两个处理能力差异显著的消费者时(假设消费者 A 的处理效率是消费者 B 的三倍),轮询分发会导致系统资源利用率严重失衡。具体表现为:消费者 A 每完成三次任务仅需消耗消费者 B 处理单个任务的时间,但受限于均等分配规则,消费者 A 在完成既定任务后被迫进入等待状态,而消费者 B 却持续积压未处理消息。这种 “能者等待,弱者过载” 的分配模式本质上源于 RabbitMQ 的设计机制 —— 消息代理系统无法主动感知消费者节点的处理能力差异,仅会机械执行预设的公平分发策略。
1 2 3 4 5 6 int prefetchCount = 1 ;channel.basicQos(prefetchCount);
启动消费者应用后,在 RabbitMQ 控制台中可以看到以下内容:
此时,多个消费者工作的流程图如下所示。大概的意思就是:如果消费者一还没有处理完消息,或者消费者一还没有确认消息,那么 RabbitMQ 先别将新消息分配给消费者一;消费者一目前只能处理一个消息,然后 RabbitMQ 就会将新消息分配给没有那么忙的其他空闲消费者。当然,如果所有的消费者都还没有处理完手上的消息,且队列还在不停地添加新消息,那么队列就有可能会出现被撑满的情况,这个时候就只能添加新的 Worker(消费者)或者改变消息的存储策略。
案例代码 本节将演示如何使用手动确认机制(ACK)+ QoS 预取计数值来实现 RabbitMQ 消息的不公平分发,并且使用的是工作队列模式。值得一提的是,RabbitMQ 的不公平分发通常是配合手动确认机制(ACK)一起使用。
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-04
。
1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.10.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils { public static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.127" ); connectionFactory.setPort(5672 ); connectionFactory.setUsername("admin" ); connectionFactory.setPassword("admin" ); } public static Channel createChannel () throws Exception { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQProducer { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.nextLine(); System.out.println("发送消息:" + message); channel.basicPublish("" , QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer01 { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(3000 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者一等待接收消息,处理消息较快..." ); int prefetchCount = 1 ; channel.basicQos(prefetchCount); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer02 { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(8000 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者二等待接收消息,处理消息较慢..." ); int prefetchCount = 1 ; channel.basicQos(prefetchCount); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
代码测试 (1) 分别启动生产者和两个消费者应用
(2) 在生产者的控制台中,依次输入以下内容:
1 2 3 4 5 6 7 8 9 10 11 12 AA 发送消息:AA BB 发送消息:BB CC 发送消息:CC DD 发送消息:DD EE 发送消息:EE FF 发送消息:FF
(3) 由于消费者一的消息处理速度较快,在消费者一的控制台中,会输出以下内容 1 2 3 4 5 6 消费者一等待接收消息,处理消息较快... 按回车键退出程序: Successed to consume message : BB Successed to consume message : CC Successed to consume message : DD Successed to consume message : FF
(4) 由于消费者二的消息处理速度较慢,在消费者二的控制台中,会输出以下内容 1 2 3 4 消费者二等待接收消息,处理消息较慢... 按回车键退出程序: Successed to consume message : AA Successed to consume message : EE
(5) 观察消费者一和消费者二的控制台输出结果,可以发现符合 “能者多劳” 的特点,也就是在此案例中 RabbitMQ 使用了不公平分发。 RabbitMQ 预取计数值 概念介绍 预取计数值的概念
在上面介绍的 RabbitMQ 不公平分发中,主要是通过 basicQos(int prefetchCount)
方法来实现,而 prefetchCount
就是 “预取计数值”,也被称为 “预取值”。
当 prefetchCount = 0
,表示 RabbitMQ 服务器将不会限制往消费者投递的最大消息数量(未确认消息数量)。 当 prefetchCount = 1
,表示 RabbitMQ 服务器将会限制往消费者投递的最大消息数量(未确认消息数量)为 1,其最终效果就是不公平分发(能者多劳)。 当 prefetchCount = x
,表示 RabbitMQ 服务器将会限制往消费者投递的最大消息数量(未确认消息数量)为 x
,其中 x
的值大于 1,即消费者在任意时刻最多可以同时获取到 x
条消息,这有区别于不公平分发实现的效果(能者多劳)。
预取计数值的作用
消息的发送本质上是异步的,因此在任何时刻,Channel 上可能存在多个未处理的消息。同时,消费者的手动确认机制(ACK)也是异步进行的,这就导致存在一个未确认消息的缓冲区。为了避免该缓冲区无限增长,开发人员应当限制其大小 ,否则会导致资源占用过多,影响系统稳定性。可以通过 basicQos()
方法设置 “预取计数值” 来控制缓冲区的大小,该参数定义了 Channel 上允许的最大未确认消息数量。 当未确认的消息数量达到该上限时,RabbitMQ 将暂停向该 Channel 继续投递新消息,直到至少有一条未确认的消息被确认(ACK)。例如,假设 Channel 上当前有 5、6、7、8 四条未确认消息,而预取计数值设置为 4,那么 RabbitMQ 不会再发送更多消息,除非其中至少有一条消息被确认(ACK)。若 tag = 6
的消息被确认,RabbitMQ 便会感知到并立即投递一条新消息到 Channel 上。
消费确认机制与 QoS 预取计数值对吞吐量的影响
通常,适当增加预取计数值可以提高 RabbitMQ 向消费者投递消息的速度。虽然自动确认机制能够实现最快的消息处理速度,但此时 RabbitMQ 可能会向消费者无限制地发送消息,在这种情况下已投递但尚未处理的消息会不断积压,从而导致消费者 RAM(物理内存)占用激增。因此,在使用自动确认机制或手动确认机制时,应避免设置无限预取,以免出现内存耗尽的风险。 当消费者需要处理大量消息时,若未及时确认(ACK),会导致消费者连接节点的内存消耗变大。因此,选择合适的预取计数值是一个需要反复试验的过程,不同的业务负载对应不同的最佳取值。一般而言,预取计数值设在 100 ~ 300 之间可以在吞吐量和资源占用之间取得较好的平衡,不会给消费者带来太大的风险 。若预取计数值为 1,虽然是最安全的,但吞吐量会显著降低,尤其是在消费者连接存在较大延迟的情况下。对于大多数应用而言,稍微提高预取计数值可以获得更高的性能。
案例代码 本节将演示如何使用手动确认机制(ACK)+ QoS 预取计数值,并且使用的是工作队列模式。
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-05
。
1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.10.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils { public static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.127" ); connectionFactory.setPort(5672 ); connectionFactory.setUsername("admin" ); connectionFactory.setPassword("admin" ); } public static Channel createChannel () throws Exception { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQProducer { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.nextLine(); System.out.println("发送消息:" + message); channel.basicPublish("" , QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer01 { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(5000 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者一等待接收消息,处理消息较快..." ); int prefetchCount = 2 ; channel.basicQos(prefetchCount); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer02 { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(10000 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者二等待接收消息,处理消息较慢..." ); int prefetchCount = 4 ; channel.basicQos(prefetchCount); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
代码测试 (1) 分别启动生产者和两个消费者应用
(2) 在生产者的控制台中,依次输入以下内容:
1 2 3 4 5 6 7 8 9 10 11 12 AA 发送消息:AA BB 发送消息:BB CC 发送消息:CC DD 发送消息:DD EE 发送消息:EE FF 发送消息:FF
(3) 由于消费者一的消息处理速度较快,并且预取计数值设置为 2
,在消费者一的控制台中,会输出以下内容 1 2 3 4 消费者一等待接收消息,处理消息较快... 按回车键退出程序: Successed to consume message : AA Successed to consume message : CC
(4) 由于消费者二的消息处理速度较慢,并且预取计数值设置为 4
,在消费者二的控制台中,会输出以下内容 1 2 3 4 5 6 消费者二等待接收消息,处理消息较慢... 按回车键退出程序: Successed to consume message : BB Successed to consume message : DD Successed to consume message : EE Successed to consume message : FF
(5) 观察消费者一和消费者二的控制台输出结果,可以发现每个消费者都会获取到与预取计数值对应的消息数量,也就是不符合 “能者多劳” 的特点,这有区别于不公平分发实现的效果。
预览: