RabbitMQ 入门教程之四

大纲

前言

学习资源

版本说明

本文所有案例代码使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11

RabbitMQ 发布确认

发布确认的概念

当生产者将信道设置为 Confirm 模式后,在该信道上发布的每条消息都会被分配一个从 1 开始递增的唯一 ID。一旦消息成功投递到所有匹配的队列,Broker 便会发布确认消息给生产者,其中包含该消息的唯一 ID,从而让生产者确认消息已正确到达目标队列。如果消息和队列是持久化的,Broker 会在将消息写入磁盘后再执行发布确认(如下图所示)。确认消息的 delivery-tag 字段表示被确认的消息序列号。此外,Broker 还可以在 basic.ack() 中设置 multiple 标志,表示该序列号及其之前的所有消息均已确认(即批量确认)。Confirm 模式的最大优势在于其异步特性。生产者发布消息后,无需等待确认即可继续发送下一条消息,而信道会在消息最终被确认时,通过回调方法通知生产者应用进行处理。如果 RabbitMQ 由于内部错误导致消息丢失,它会发送一条 nack(否定确认)消息,生产者应用同样可以在回调方法中处理该 nack,以便采取相应措施(比如重新发布消息)。

为什么需要发布确认机制

在使用 RabbitMQ 的时候,即使同时将队列和消息都标记为持久化,仍然无法完全保证消息不丢失。虽然 RabbitMQ 会将持久化消息写入磁盘,但在消息正式落盘之前,仍有丢失的可能。例如,当消息刚进入缓存,尚未完全写入磁盘时,如果 RabbitMQ 服务器突然宕机,该消息可能无法恢复导致丢失。因为 RabbitMQ 的持久化机制并不能完全保证发送的消息不丢失,所以才需要有发布确认机制。

发布确认的开启

RabbitMQ 的发布确认机制默认是没有开启的,如果要开启需要调用方法 channel.confirmSelect(),每当要想使用发布确认机制,都需要在 Channel 上调用该方法。

1
2
3
4
5
// 创建信道
Channel channel = connection.createChannel()

// 启用发布确认
channel.confirmSelect();

在消费者发送消息后,可以使用以下 API 让消费者等待 Broker 的发布确认响应:

方法返回值超时时间失败时行为适用场景
waitForConfirms()boolean有未确认消息时返回 false,不会关闭 Channel 需要手动处理失败的情况
waitForConfirms(long timeout)boolean超时或有未确认消息时返回 false,不会关闭 Channel 限制等待时间,避免长时间阻塞
waitForConfirmsOrDie()void有未确认消息时抛出 IOException 并关闭 Channel 适用于希望在失败时终止 Channel 的情况
waitForConfirmsOrDie(long timeout)void超时或有未确认消息时抛出 IOException 并关闭 Channel 需要严格保证消息成功且限制等待时间

发布确认的策略

RabbitMQ 提供了三种消息发布确认策略,如下:

  • 单个确认发布(同步单个确认)

    • 原理:发送一条消息后,阻塞等待服务器返回确认消息。
    • 优点:
      • 实现简单。
      • 消息可靠性高。
    • 缺点:
      • 吞吐量低,每次发送消息都需要等待确认。
      • 延迟较高,不适用于高并发场景。
  • 批量确认发布(同步批量确认)

    • 原理:一次性发送多条消息,然后等待服务器返回批量确认。
    • 优点:
      • 比单个确认发布性能更高,减少了等待时间,提高了吞吐量。
    • 缺点:
      • 本质上还是同步确认操作,不适用于高并发场景。
      • 如果出现问题,无法确定具体是哪条消息发布失败,可能导致部分消息丢失或重复发送。
  • 异步确认发布(异步回调确认)

    • 原理:消息发送后不阻塞,RabbitMQ 通过回调机制(ConfirmCallback)异步通知哪些消息被确认或丢失。
    • 优点:
      • 异常处理,拥有最高的性能,支持高并发和高吞吐量。
      • 可以在回调中维护消息状态,准确跟踪失败消息。
      • 资源占用更低。
    • 缺点:
      • 实现较复杂,需要自己管理未确认的消息队列(如使用 ConcurrentSkipListMap)。
      • 代码逻辑相对较难调试。

单个确认发布

这种发布方式是一种同步确认机制,即每发布一条消息,必须等待其确认后才能继续发布下一条消息。方法 waitForConfirmsOrDie(long) 仅在消息被确认时才会返回,如果在指定时间内未收到确认,则会抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢。由于未确认的消息会阻塞后续消息的发布,其吞吐量通常仅能达到每秒数百条消息。尽管如此,对于某些应用场景而言,这样的性能可能已经足够。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

public class MQProducer1 {

/**
* 单个确认发布
*/
public static void publishMessageIndividually() throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.127");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

try (
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()
) {
String queueName = UUID.randomUUID().toString();

// 声明队列(支持持久化)
channel.queueDeclare(queueName, true, false, false, null);

// 开启发布确认
channel.confirmSelect();

int total = 1000;
long begin = System.currentTimeMillis();
for (int i = 0; i < total; i++) {
// 发布消息(支持持久化)
String message = "消息" + i;
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

// 单个确认发布,当服务端返回 false 或超时时间内未返回,生产者可以重发消息
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}

long end = System.currentTimeMillis();
System.out.println("发布" + total + "个单独确认消息,耗时" + (end - begin) + "ms");
}
}

}

批量确认发布

相较于逐条发送并确认消息的机制,批量发布消息后统一确认的方式能够显著提升系统吞吐量。然而这种批量处理策略存在两点主要限制:首先在消息发布异常的场景中,由于采用批量确认机制,系统无法快速定位具体故障消息,需要将整个消息批次持久化存储,以便后续故障诊断和批量重发;其次,虽然采用了批量处理技术,但消息发布过程本质上仍属于同步操作模式,发布线程在等待确认期间仍会处于阻塞状态。这种设计在提升吞吐量的同时,需要权衡额外的内存资源消耗和异常处理复杂度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

public class MQProducer2 {

/**
* 批量确认发布
*/
public static void publishMessageBatch() throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.127");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

try (
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()
) {
String queueName = UUID.randomUUID().toString();

// 声明队列(支持持久化)
channel.queueDeclare(queueName, true, false, false, null);

// 开启发布确认
channel.confirmSelect();

// 批量确认消息的数量
int batchSize = 100;

// 未确认消息的数量
int outstandingMessageCount = 0;

int total = 1000;
long begin = System.currentTimeMillis();
for (int i = 0; i < total; i++) {
// 发布消息(支持持久化)
String message = "消息" + i;
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

// 批量确认发布(比如每发送 100 条消息就批量确认一次)
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}

// 为了确保没有剩余的未确认消息,再次确认发布
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}

long end = System.currentTimeMillis();
System.out.println("发布" + total + "个批量确认消息,耗时" + (end - begin) + "ms");
}
}

}

异步确认发布

异步确认虽然在编程逻辑上比前两种方式更复杂,但在可靠性和效率方面表现最佳,性价比最高。它通过回调函数实现消息的可靠传递,而消息中间件也利用回调机制来确认消息是否成功投递。异步确认发布的工作流程如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class MQProducer3 {

/**
* 异步确认发布
*/
public static void publishMessageAsync() throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.127");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

try (
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()
) {
String queueName = UUID.randomUUID().toString();

// 声明队列(支持持久化)
channel.queueDeclare(queueName, true, false, false, null);

// 开启发布确认
channel.confirmSelect();

/**
* 线程安全的一个有序哈希表,适用于高并发场景
* 1. 轻松地将消息序列号与消息进行关联
* 2. 轻松地批量删除条目,只要传入消息序列号
* 3. 支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

/**
* 收到确认消息的回调
* 1. deliveryTag 表示消息序列号
* 2. multiple = true,表示可以确认小于等于当前消息序列号的消息
* 3. multiple = false,表示仅确认当前消息序列号的消息
*/
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 返回的是小于等于当前消息序列号的未确认消息,是一个 Map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
// 清除该部分未确认的消息
confirmed.clear();
} else {
// 只清除当前消息序列号的消息
outstandingConfirms.remove(deliveryTag);
}
};

/**
* 未收到确认消息的回调
* 1. deliveryTag 表示消息序列号
* 2. multiple = true,表示可以确认小于等于当前消息序列号的消息
* 3. multiple = false,表示仅确认当前消息序列号的消息
*/
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
// 异步处理未确认的消息,比如重新发送消息
String message = outstandingConfirms.get(deliveryTag);
System.out.println("发布的消息" + message + "未被确认,消息序列号" + deliveryTag);
};

/**
* 添加一个异步确认的监听器
* 1. 监听哪些消息发送成功
* 2. 监听哪些消息发送失败
*/
channel.addConfirmListener(ackCallback, nackCallback);

int total = 1000;
long begin = System.currentTimeMillis();
for (int i = 0; i < total; i++) {
// 发布消息(支持持久化)
String message = "消息" + i;
// channel.getNextPublishSeqNo() 获取下一个消息的消息序列号
// 使用 Map 存储全部未确认的消息体,通过消息序列号与消息体进行关联
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}

long end = System.currentTimeMillis();
System.out.println("发布" + total + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}

}

如何处理异步未确认的消息

最好的解决方案就是把未确认的消息存储到一个基于内存的能被消息发布线程访问的队列,比如使用 ConcurrentLinkedQueue 队列,这个队列在 ConfirmCallback(负责专门处理 NACK 的情况)与消息发布线程之间进行消息的传递,以此实现未确认消息的重新发送。值得一提的是,重新发送消息后,为了避免消费者在消费消息时,可能出现重复消费的问题,消费者需要实现幂等消费。

性能压测结果

RabbitMQ 发布确认策略的压测结果如下表所示:

发布确认策略消息发送数量消息发送耗时
单个确认发布 100050278 ms
批量确认发布 1000635 ms
异步确认发布 100092 ms