大纲 前言 学习资源 版本说明 本文所有案例代码使用的各软件版本如下表所示:
组件 版本 说明 RabbitMQ Server 3.8.26
RabbitMQ Client 5.10.0
Erlang 24.2
Java 11
RabbitMQ 发布确认 发布确认的概念 当生产者将信道设置为 Confirm 模式后,在该信道上发布的每条消息都会被分配一个从 1 开始递增的唯一 ID。 一旦消息成功投递到所有匹配的队列,Broker 便会发布确认消息给生产者,其中包含该消息的唯一 ID,从而让生产者确认消息已正确到达目标队列。如果消息和队列是持久化的,Broker 会在将消息写入磁盘后再执行发布确认(如下图所示)。 确认消息的 delivery-tag
字段表示被确认的消息序列号。此外,Broker 还可以在 basic.ack()
中设置 multiple
标志,表示该序列号及其之前的所有消息均已确认(即批量确认)。Confirm 模式的最大优势在于其异步特性。生产者发布消息后,无需等待确认即可继续发送下一条消息,而信道会在消息最终被确认时,通过回调方法通知生产者应用进行处理。如果 RabbitMQ 由于内部错误导致消息丢失,它会发送一条 nack
(否定确认)消息,生产者应用同样可以在回调方法中处理该 nack
,以便采取相应措施(比如重新发布消息)。
为什么需要发布确认机制
在使用 RabbitMQ 的时候,即使同时将队列和消息都标记为持久化,仍然无法完全保证消息不丢失。虽然 RabbitMQ 会将持久化消息写入磁盘,但在消息正式落盘之前,仍有丢失的可能。例如,当消息刚进入缓存,尚未完全写入磁盘时,如果 RabbitMQ 服务器突然宕机,该消息可能无法恢复导致丢失。因为 RabbitMQ 的持久化机制并不能完全保证发送的消息不丢失,所以才需要有发布确认机制。
发布确认的开启 RabbitMQ 的发布确认机制默认是没有开启的,如果要开启需要调用方法 channel.confirmSelect()
,每当要想使用发布确认机制,都需要在 Channel 上调用该方法。
1 2 3 4 5 Channel channel = connection.createChannel() channel.confirmSelect();
在消费者发送消息后,可以使用以下 API 让消费者等待 Broker 的发布确认响应:
方法 返回值 超时时间 失败时行为 适用场景 waitForConfirms()
boolean
无 有未确认消息时返回 false
,不会关闭 Channel 需要手动处理失败的情况 waitForConfirms(long timeout)
boolean
有 超时或有未确认消息时返回 false
,不会关闭 Channel 限制等待时间,避免长时间阻塞 waitForConfirmsOrDie()
void
无 有未确认消息时抛出 IOException
并关闭 Channel 适用于希望在失败时终止 Channel 的情况 waitForConfirmsOrDie(long timeout)
void
有 超时或有未确认消息时抛出 IOException
并关闭 Channel 需要严格保证消息成功且限制等待时间
发布确认的策略 RabbitMQ 提供了三种消息发布确认策略,如下:
单个确认发布(同步单个确认)
原理:发送一条消息后,阻塞等待服务器返回确认消息。 优点: 缺点:吞吐量低,每次发送消息都需要等待确认。 延迟较高,不适用于高并发场景。 批量确认发布(同步批量确认)
原理:一次性发送多条消息,然后等待服务器返回批量确认。 优点:比单个确认发布性能更高,减少了等待时间,提高了吞吐量。 缺点:本质上还是同步确认操作,不适用于高并发场景。 如果出现问题,无法确定具体是哪条消息发布失败,可能导致部分消息丢失或重复发送。 异步确认发布(异步回调确认)
原理:消息发送后不阻塞,RabbitMQ 通过回调机制(ConfirmCallback
)异步通知哪些消息被确认或丢失。 优点:异常处理,拥有最高的性能,支持高并发和高吞吐量。 可以在回调中维护消息状态,准确跟踪失败消息。 资源占用更低。 缺点:实现较复杂,需要自己管理未确认的消息队列(如使用 ConcurrentSkipListMap
)。 代码逻辑相对较难调试。 单个确认发布 这种发布方式是一种同步确认机制,即每发布一条消息,必须等待其确认后才能继续发布下一条消息。 方法 waitForConfirmsOrDie(long)
仅在消息被确认时才会返回,如果在指定时间内未收到确认,则会抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢。由于未确认的消息会阻塞后续消息的发布,其吞吐量通常仅能达到每秒数百条消息。尽管如此,对于某些应用场景而言,这样的性能可能已经足够。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.UUID;public class MQProducer1 { public static void publishMessageIndividually () throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); int total = 1000 ; long begin = System.currentTimeMillis(); for (int i = 0 ; i < total; i++) { String message = "消息" + i; channel.basicPublish("" , queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功" ); } else { System.out.println("消息发送失败" ); } } long end = System.currentTimeMillis(); System.out.println("发布" + total + "个单独确认消息,耗时" + (end - begin) + "ms" ); } } }
批量确认发布 相较于逐条发送并确认消息的机制,批量发布消息后统一确认的方式能够显著提升系统吞吐量。 然而这种批量处理策略存在两点主要限制:首先在消息发布异常的场景中,由于采用批量确认机制,系统无法快速定位具体故障消息,需要将整个消息批次持久化存储,以便后续故障诊断和批量重发;其次,虽然采用了批量处理技术,但消息发布过程本质上仍属于同步操作模式,发布线程在等待确认期间仍会处于阻塞状态。 这种设计在提升吞吐量的同时,需要权衡额外的内存资源消耗和异常处理复杂度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.UUID;public class MQProducer2 { public static void publishMessageBatch () throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); int batchSize = 100 ; int outstandingMessageCount = 0 ; int total = 1000 ; long begin = System.currentTimeMillis(); for (int i = 0 ; i < total; i++) { String message = "消息" + i; channel.basicPublish("" , queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { channel.waitForConfirms(); outstandingMessageCount = 0 ; } } if (outstandingMessageCount > 0 ) { channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布" + total + "个批量确认消息,耗时" + (end - begin) + "ms" ); } } }
异步确认发布 异步确认虽然在编程逻辑上比前两种方式更复杂,但在可靠性和效率方面表现最佳,性价比最高。它通过回调函数实现消息的可靠传递,而消息中间件也利用回调机制来确认消息是否成功投递。异步确认发布的工作流程如下图所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmCallback;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.UUID;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;public class MQProducer3 { public static void publishMessageAsync () throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true ); confirmed.clear(); } else { outstandingConfirms.remove(deliveryTag); } }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("发布的消息" + message + "未被确认,消息序列号" + deliveryTag); }; channel.addConfirmListener(ackCallback, nackCallback); int total = 1000 ; long begin = System.currentTimeMillis(); for (int i = 0 ; i < total; i++) { String message = "消息" + i; outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("" , queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } long end = System.currentTimeMillis(); System.out.println("发布" + total + "个异步确认消息,耗时" + (end - begin) + "ms" ); } } }
如何处理异步未确认的消息
最好的解决方案就是把未确认的消息存储到一个基于内存的能被消息发布线程访问的队列,比如使用 ConcurrentLinkedQueue
队列,这个队列在 ConfirmCallback(负责专门处理 NACK 的情况)与消息发布线程之间进行消息的传递,以此实现未确认消息的重新发送。值得一提的是,重新发送消息后,为了避免消费者在消费消息时,可能出现重复消费的问题,消费者需要实现幂等消费。
性能压测结果 RabbitMQ 发布确认策略的压测结果如下表所示:
发布确认策略 消息发送数量 消息发送耗时 单个确认发布 1000 50278 ms 批量确认发布 1000 635 ms 异步确认发布 1000 92 ms
预览: