初探 RabbitMQ 使用

学习资源

RabbitMQ 的基础概念

AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布 / 订阅)、可靠性、安全。RabbitMQ 是一个开源的 AMQP 标准实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、C#、Java、PHP、GO、JavaScript 等。RabbitMQ 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等方面表现不俗。

RabbitMQ 服务管理

RabbitMQ 常用管理命令

1
2
3
4
5
6
7
8
9
10
11
# 列出所有队列
# rabbitmqctl list_queues

# 列出指定队列的信息
# rabbitmqctl list_queues queue_name messages_ready messages_unacknowledged

# 列出所有交换机
# rabbitmqctl list_exchanges

# 列出所有绑定
# rabbitmqctl list_bindings

RabbitMQ 的用户角色分类

  • 超级管理员 (administrator),可登陆管理控制台,可查看所有的信息,并且可以对用户,策略 (policy) 进行操作
  • 监控者 (monitoring),可登陆管理控制台,同时可以查看 rabbitmq 节点的相关信息 (进程数,内存使用情况,磁盘使用情况等)
  • 策略制定者 (policymaker),可登陆管理控制台,同时可以对 policy 进行管理。但无法查看节点的相关信息,与 administrator 的对比,administrator 能看到节点信息
  • 普通管理者 (management),仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理
  • 其他,无法登陆管理控制台,通常就是普通的生产者和消费者

RabbitMQ 用户与角色管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建超级管理员用户
# rabbitmqctl add_user user_admin your_password

# 赋予administrator角色给超级管理员用户
# rabbitmqctl set_user_tags user_admin administrator

# 创建监控用户
# rabbitmqctl add_user user_monitoring your_password

# 赋予monitoring角色给监控用户
# rabbitmqctl set_user_tags user_monitoring monitoring

# 创建某个项目的专用用户,限制只能访问自己项目的virtual hosts
# rabbitmqctl add_user user_proj your_password

# 赋予management给某个项目的专用用户
# rabbitmqctl set_user_tags user_proj management

# 查看用户与角色列表
# rabbitmqctl list_users

# 删除用户
# rabbitmqctl delete_user user_admin

# 修改用户密码
# rabbitmqctl change_password user_admin your_password

RabbitMQ 虚拟主机管理

对 RabbitMQ 的用户角色权限进行管理时,可以将 RabbitMQ 理解为普通的数据库,其中 VHostPath 可以类比为数据库名,用户角色权限则是对指定数据库的限制访问。

1
2
3
4
5
6
7
8
# 创建虚拟主机
# rabbitmqctl add_vhost vhostpath

# 删除虚拟主机
# rabbitmqctl delete_vhost vhostpath

# 列出所有虚拟主机
# rabbitmqctl list_vhosts

RabbitMQ 用户权限管理

用户权限指的是用户对其所能访问的 VHostPath 的 exchange,queue 的操作权限,包括配置权限,读写权限。配置权限会影响到 exchange,queue 的声明和删除;读写权限影响到从 queue 里取消息,向 exchange 发送消息以及 queue 和 exchange 的绑定 (bind) 操作。例如: 将 queue 绑定到某 exchange 上,需要具有 queue 的可写权限,以及 exchange 的可读权限;向 exchange 发送消息需要具有 exchange 的可写权限;从 queue 里取数据需要具有 queue 的可读权限。详细请参考官方文档中 “How permissions work” 部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 赋予用户权限
# rabbitmqctl set_permissions -p VHostPath user_admin ConfP WriteP ReadP

# 赋予用户所有权限
# rabbitmqctl set_permissions -p VHostPath user_admin '.*' '.*' '.*'

# 查看VHostPath下所有用户的权限
# rabbitmqctl list_permissions -p VHostPath

# 查看指定用户的权限
# rabbitmqctl list_user_permissions user_admin

# 清除指定用户在指定VHostPath下的权限
# rabbitmqctl clear_permissions -p VHostPath user_admin

# 清除指定用户的所有权限
# rabbitmqctl clear_permissions user_admin

RabbitMQ 的工作模式

RabbitMQ Simple Queue 模式(简单队列)

简单队列模式下,每条消息只会被一个消费者所接收,不存在多个消费者接收到同一条消息的情况,而且不管有多少个消费者,默认情况下服务端都会以轮询分发(round-robin)的方式确保每个消费者接收到的消息数量是一样的。

rabbitmq-sample

RabbitMQ Work Queue 模式(工作队列)

工作队列模式下,每条消息只会被一个消费者所接收,不存在多个消费者接收到同一条消息的情况;同时工作队列模式可以使用公平分发(fair dispatch)的方式来发送消息,特点是处理能力强的消费者可以接收到更多的消息(能者多劳),这也是与简单队列模式相比较不同的地方。当使用公平分发时,消费者可调用 basicQos()方法,同时需要手动确认消息(ACK 机制)。

rabbitmq-work-queue

RabbitMQ Fanout 模式(发布 / 订阅)

单个生产者可以对应多个消费者,每个消费者都有自己的队列,同一条消息可以被多个消费者接收。所有发送到 Fanout Exchange 的消息都会被转发到与该 Exchange 绑定 (Binding) 的所有 Queue 上。Fanout Exchange 不需要额外处理 RouteKey,只需要简单地将队列绑定到 Exchange 上,这样发送到 Exchange 的消息都会被转发到与该交换机绑定的所有队列上,作用类似子网广播,每台子网内的主机都获得了一份复制的消息,因此 Fanout Exchange 转发消息是最快的。

rabbitmq-fanout-exchang-1
rabbitmq-fanout-exchang-2

RabbitMQ Direct 模式(路由)

单个生产者可以对应多个消费者,每个消费者都有自己的队列,同一条消息可以被多个消费者接收。所有发送到 Direct Exchange 的消息会被转发到 RouteKey 中指定的 Queue 上。消息传递时,RouteKey 必须完全匹配,才会被队列接收,否则该消息会被抛弃。

rabbitmq-direct-exchang-1
rabbitmq-direct-exchange-2

RabbitMQ Topic 模式(通配符)

单个生产者可以对应多个消费者,每个消费者都有自己的队列,同一条消息可以被多个消费者接收。所有发送到 Topic Exchange 的消息会被转发到指定 Topic 的 Queue 上,Exchange 会将 RouteKey 和某个 Topic 进行模糊匹配,此时队列需要绑定一个 Topic。RouteKey 可以使用通配符进行模糊匹配,符号 # 表示匹配一个或多个词,符号 * 表示匹配不多不少一个词。因此 log.# 能够匹配到 log.info.oa,但是 log.* 只会匹配到 log.error,所以 Topic Exchange 的使用非常灵活。

rabbitmq-topic-exchange-1
rabbitmq-topic-exchange-2

RabbitMQ 的底层原理

RabbitMQ 的非阻塞 I/O

NIO 通常也称非阻塞 I/O,包含三大核心部分:Selector(选择器)、Channel(信道)和 Buffer(缓冲区)。NIO 是基于 Channel 和 Buffer 进行操作的,数据总是从信道读取数据到缓冲区中,或者从缓冲区写入到信道中,而 Selector 则用于监听多个信道的事件(比如连接打开,数据到达等)。因此,单线程可以监听多个数据的信道。由于 RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

RabbitMQ 的 ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel 都是 RabbitMQ 对外提供的 API 中最基本的对象。Connection 是 RabbitMQ 的 Socket 连接,它封装了 Socket 协议相关部分逻辑。ConnectionFactory 是客户端与 Broker 的 TCP 连接工厂,负责根据 URI 创建 Connection。Channel 是与 RabbitMQ 打交道的最重要的一个接口,大部分的业务操作是在 Channel 这个接口中完成的,包括定义 Queue、定义 Exchange、绑定 Queue、绑定 Exchange、发布消息等。如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP Method 包含了 Channel ID 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销。