Kafka 开发随笔

Kafka 生产者调优

提示

本节将介绍 Kafka 生产者调优的最佳实践,若需要具体的配置和代码,请参考 这里

生产者核心参数

生产者的生产流程

生产者的核心参数

生产者提高吞吐量

为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:

  • (1) 在生产者端设置 buffer.memory:RecordAccumulator 缓冲区的总大小,默认是 32m,修改为 64m
  • (2) 在生产者端设置 batch.size:批次大小,默认 16k
  • (3) 在生产者端设置 linger.ms:等待时间,默认 0ms,修改为 5 ~ 100ms
  • (4) 在生产者端设置 compression.type:压缩方式,默认是 none,修改为 snappy
参数名称参数描述
buffer.memoryRecordAccumulator 缓冲区的总大小,默认值为 32m
batch.size缓冲区中一批数据的最大大小,默认值为 16k。适当增加该值,可以提高吞吐量;但是,如果该值设置得太大,会导致数据传输延迟增加。
linger.ms如果数据量迟迟未达到 batch.size,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
compression.type生产者发送的所有数据的压缩方式。默认值为 none,也就是不压缩。支持压缩类型:nonegzipsnappylz4zstd

特别注意

  • (1) 上述的四个参数值并不是设置得越大就越好,设置得过大会导致 Kafka 中的消息被延迟消费。
  • (2) 当 Topic 的分区数量比较多的时候,可以适当增加 RecordAccumulator(缓冲区) 的大小。

生产者避免消息丢失

生产者发送消息后,由于网络故障或网络延迟,可能会导致消息在传输过程中丢失。Kafka 只要至少配置以下 4 个参数,就可以保证生产者发送的消息不会丢失:

  • (1) 给 Topic 设置 replication.factor 参数:这个值必须大于等于 2,即要求每个 Partition 必须至少有 2 个副本
  • (2) 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于等于 2,即要求一个 Leader 至少要有 1 个 Follower 还跟自己保持着同步,这样才能确保 Leader 宕机了,还有一个 Follower 可以使用
  • (3) 在生产者端设置 acks=all:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
  • (4) 在生产者端设置 retries=MAX(可以是一个很大很大的值,表示无限次重试的意思):这个是要求一旦消息发送失败,就无限重试

生产者避免单分区乱序

Kafka 生产者解决单分区内的乱序问题,主要是依赖以下两个参数:

参数名称参数描述
enable.idempotence是否开启幂等性,默认值为 true, 表示默认开启幂等性。
max.in.flight.requests.per.connection允许最多没有返回 ACK 应答的次数,默认为 5,开启幂等性后必须保证该参数值在 1 ~ 5 范围内。

注意事项

  • 只开启 enable.idempotence 可以防止失败重试时的出现消息重复,但可能不能完全解决单分区内的乱序问题(这取决于 max.in.flight.requests.per.connection 参数的设置)。
  • 要彻底解决 Kafka 单分区内的乱序问题,enable.idempotencemax.in.flight.requests.per.connection 这两个参数需要配合使用,这样才可以确保消息的顺序性和幂等性,但是会牺牲一定的生产吞吐量。
  • 特别注意,这两个配置参数只能保证 Kafka 单分区内的数据顺序,多分区之间的数据顺序 Kafka 无法保证。

参数说明

  • max.in.flight.requests.per.connection 是 Kafka Producer 的一个配置参数,用于控制在同一个 TCP 连接上未被 Broker 确认(ACK)的请求的最大数量。默认值是 5,意味着在同一个连接上,最多可以有 5 个未确认的请求。
  • 如果启用了消息重试(retries > 0,默认启用重试机制),并且设置的 max.in.flight.requests.per.connection > 1,在某些情况下可能会导致单分区内的消息乱序。例如:一个较早的请求失败并被重试时,后续的请求可能已经被成功发送并确认。
  • enable.idempotence=true(启用幂等性),max.in.flight.requests.per.connection 的值不能超过 5,否则 Kafka 会拒绝启动 Producer。在启用幂等性后默认配置 max.in.flight.requests.per.connection=5,这是经过优化的平衡设置。

版本区别

  • Kafka 在 1.x 版本之前可以保证数据单分区的有序性,条件如下:

    • max.in.flight.requests.per.connection = 1,不需要考虑是否开启幂等性。
  • Kafka 在 1.x 及以后版本可以保证数据单分区的有序性,条件如下:

    • 未开启幂等性
      • max.in.flight.requests.per.connection 需要必须设置为 1。
    • 开启幂等性
      • max.in.flight.requests.per.connection 的值必须设置在 1 ~ 5 之间。
      • 原因说明:因为在 Kafka 1.x 版本以后,启用幂等性后,Kafka 服务端会缓存 Producer 发来的最近 5 个 Request 的元数据,因此无论如何,都可以保证最近 5 个 Request 的数据都是有序的(如下图所示)。

情景分析

  • max.in.flight.requests.per.connection 的值为 1(最严格的保证)

    • 每次只有一个请求正在处理,前一个请求完成后,才会发送下一个请求。
    • 在这种配置下,绝对可以保证单分区的有序性,但会牺牲生产者的吞吐量。
  • max.in.flight.requests.per.connection 的值在 1 ~ 5 之间

    • 配置的前提条件:必须设置 enable.idempotence=trueacks=all
    • 当满足上述前提条件时,Kafka 的生产者重试机制会确保消息的顺序,即使发生失败重试,也能按顺序处理请求。
    • Kafka 1.1 及以后版本,启用幂等性后,生产者可以保证单分区的顺序性,只要 max.in.flight.requests.per.connection 的值在 1 ~ 5 之间即可。
    • 配置风险:
      • 如果 enable.idempotence=false,同时 max.in.flight.requests.per.connection > 1,则单分区内可能会出现消息乱序。这是因为并行发送的请求,在发送失败后会重新发送,后发送的请求可能先完成,从而打破顺序。
      • 如果 enable.idempotence=false,为了保证单分区的顺序性,max.in.flight.requests.per.connection 的值必须设置为 1。

总结

  • enable.idempotence=trueacks=all 时,max.in.flight.requests.per.connection 的值在 1 ~ 5 之间,Kafka 仍然可以保证单分区的有序性。
  • 设置为 max.in.flight.requests.per.connection=1 时,这是最简单、最安全的配置,但会影响生产者的吞吐量。
  • 设置为 max.in.flight.requests.per.connection > 1 时,能保证单分区的有序性,同时可以提高吞吐量,但需要依赖 enable.idempotence=true(启用幂等性)。
  • 如果 enable.idempotence=false,即没有启用幂等性,那么 max.in.flight.requests.per.connection 的值必须设置为 1,这样才能保证单分区的有序性。

生产者避免消息重复发送

生产者避免消息重复发送有两种实现方案,包括开启幂等性和使用事务机制。

生产者开启幂等性

什么是 Kafka 的幂等性

  • Kafka 的幂等性是指 Producer (生产者) 无论向 Broker 发送多少条重复消息,Broker 端都只会持久化一条消息,保证了消息不重复。
  • 精确一次 (Exactly Once) = 开启幂等性 + 至少一次 (acks = -1 + 分区副本数 >= 2 + ISR 里应答的最小副本数量 >= 2)。
  • Kafka 官方文档中的幂等性详细介绍可以看 这里
参数名称参数描述
enable.idempotence是否开启幂等性,默认值为 true, 表示默认开启幂等性。
max.in.flight.requests.per.connection允许最多没有返回 ACK 应答的次数,默认为 5,开启幂等性要保证该参数值在 1 ~ 5 范围内。
  • 在 Kafka 中开启幂等性的配置参数是 enable.idempotence,默认值为 true,设置 false 会关闭幂等性。如果没有设置冲突的配置,默认情况下会启用幂等性。如果设置了冲突的配置,并且未显式启用幂等性,则会禁用幂等性。如果显式启用了幂等性,并且设置了冲突的配置,则会抛出 ConfigException 异常。

  • 当在生产者端设置 enable.idempotencetrue 时,生产者将确保数据流中只写入每条消息的一个副本,即可以确保生产者不会写入重复消息。如果设置为 false,由于代理失败等原因,生产者重试发送消息,可能会在数据流中写入重试消息的副本,即生产者可能会写入重复消息。

Kafka 启用幂等性要至少满足以下 4 个条件

  • (1) 在生产者端设置 enable.idempotence 参数:这个值必须为 true,即开启幂等性
  • (2) 在生产者端设置 max.in.flight.requests.per.connection 参数:这个值必须小于或等于 5
  • (3) 在生产者端设置 acks = -1:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
  • (4) 在生产者端设置 retries:这个值必须大于 0,即必须要有重试机制

生产者使用事务机制

Kafa 除了可以使用幂等性来解决生产者重复发送消息之外,还可以使用事务机制来解决,关于 Kafka 事务机制的详细介绍请看 这里。Kafka 的事务一共有以下 5 个 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1.初始化事务
void initTransactions();

// 2.开启事务
void beginTransaction() throws ProducerFencedException;

// 3.在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4.提交事务
void commitTransaction() throws ProducerFencedException;

// 5.放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

Kafka 消费者调优

消费者的核心参数

消费者组的消费流程

消费者的核心参数

消费者提高吞吐量

Kafka 如何提高消费者的消费速度呢?

  • (1) 如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数量,并且同时增加消费组的消费者数量,这两个条件缺一不可,即 消费组的消费者数量 = Topic 的分区数量

  • (2) 如果是下游的数据处理不及时(有较大延迟),则可以提高消费者每批次拉取消息的数量(默认每批次拉取 500 条消息)。每批次拉取数据过少(拉取的数据量 / 数据处理时间 < 生产速度),会使消费者处理数据的速度小于生产者生产数据的速度,从而可能导致消息积压。

  • (3) 消费者提高吞吐量的相关配置参数
参数名称参数描述
fetch.max.bytes消费者获取服务器端一批消息的最大字节数,默认值为 50M。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes(Broker 配置)或者 max.message.bytes(Topic 配置)影响。
max.poll.records消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500

消费者分区再平衡的优化

Kafka 消费者的分区再平衡(Partition Rebalancing)是指当消费者组中的成员发生变更时(如消费者加入、退出或故障),Kafka 自动调整分区与消费者之间的映射关系,以确保所有分区都会被消费者组中的成员消费。

消费者组的初始化流程

分区再平衡相关的配置参数

分区再平衡的优缺点

  • 优点:
    • 动态调整分区分配,支持弹性扩展。
    • 确保分区始终被有效消费。
  • 缺点:
    • 短暂中断:再平衡期间会暂停消费,导致短时间内消息未被处理。
    • 分区切换开销:消费者需要重新建立分区的连接和状态。

分区再平衡的优化措施

  • 使用静态成员 ID(group.instance.id),减少分区再平衡的频率。
  • 控制心跳和会话超时(heartbeat.interval.mssession.timeout.ms),避免误触发分区再平衡。
  • 通过配置参数 partition.assignment.strategy 来调整分区分配策略,以适应具体场景需求。

提示

  • 更多关于 Kafka 消费者的分区再平衡介绍请看 这里

Kafka Broker 调优

提示

  • 更多关于 Kafka Broker 调优的教程请看 这里

Broker 的核心参数

Broker 的工作流程

Broker 的核心参数

Broker 增加分区数量

  • 通过 kafka-topics.sh 脚本增加 Topic 的分区数量
1
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3

特别注意

Kafka 的分区数量只能增加,不能减少。值得一提的是,Kafka 同样不支持减少已创建的主题的副本数量,但是可以通过 kafka-reassign-partitions.sh 脚本重新分配副本的方式来实现。

Broker 自动创建主题

  • 如果将 Broker 端的配置参数 auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,Broker 会自动创建一个分区数为 num.partitions(默认值为 1)、副本因子为 default.replication.factor(默认值为 1)的主题。

  • 除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应的主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。在生产环境下,强烈建议将 auto.create.topics.enable 设置为 false

Kafka 整体性能调优

提高整体吞吐量

为了提高 Kafka 整体的吞吐量,可以从以下四个角度进行优化。

(1) 增加 Topic 的分区数量

  • 通过 kafka-topics.sh 脚本增加 Topic 的分区数量(注意:分区数量只能增加,不能减少)
1
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3

(2) 提高生产吞吐量

参数描述
buffer.memory发送消息的缓冲区大小,默认值是 32m,生产环境可以增加到 64m
batch.size缓冲区中一批数据的最大大小,默认值是 16k。如果 Batch 设置太小,会导致频繁发送网络请求,吞吐量下降;如果 Batch 设置太大,会导致一条消息需要等待很久才能够被发送出去,增加网络延迟。
linger.ms如果数据迟迟未达到 batch.size,那么 Sender 等待 linger.ms 之后就会发送消息。默认值是 0,表示立刻发送消息。生产环境建议设置 5 ~ 100 毫秒之间。如果 linger.ms 设置太小,会导致频繁发送网络请求,吞吐量下降;如果 linger.ms 设置太大,会导致一条消息需要等待很久才能被发送出去,增加网络延迟。
compression.type默认值是 none,表示不压缩直接发送消息。支持的压缩类型:nonegzipsnappylz4zstd。生产环境也可以使用 gzip 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 Producer 端的 CPU 开销。

(3) 提高消费吞吐量

参数名称参数描述
fetch.max.bytes消费者获取服务器端一批消息的最大字节数,默认值为 50m。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes(Broker 配置)或者 max.message.bytes(Topic 配置)影响。
max.poll.records消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500

(4) 增加消费者的数量

当增加了 Topic 的分区数量后,还需要同时增加消费组的消费者数量,这两个条件缺一不可,即 消费组的消费者数量 = Topic 的分区数量,这样才能提高消费吞吐量。

数据精确一次

在 Kafka 中,为了保证消息被精确一次发送和消费,必须满足以下全部条件,缺一不可。

(1) 生产者角度

  • acks 参数设置为 -1
  • 开启幂等性 enable.idempotence = true,默认开启。
  • 在生产端使用事务机制发送消息。

(2) Broker 角度

  • 分区的副本数量大于等于 2(--replication-factor 2)。
  • ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)。

(3) 消费者角度

  • 事务机制 + 手动提交 Offset(enable.auto.commit = false)。
  • 消费者输出的目标存储系统必须支持事务(比如 MySQL、Kafka)。

合理设置分区数量

如何为某个 Topic 合理设置分区数量呢?以下是估算分区数量的方法:

  • (1) 创建一个只有 1 个分区的 Topic。
  • (2) 测试这个 Topic 的 Producer 吞吐量和 Consumer 吞吐量。
  • (3) 假设它们吞吐量的值分别是 Tp 和 Tc,单位是 MB/s。
  • (4) 然后,假设总的目标吞吐量是 Tt,那么 分区数 = Tt /min (Tp, Tc)
  • (5) 比如:Producer 吞吐量是 20m/s,Consumer 吞吐量是 50m/s,期望吞吐量 100m/s,那么 分区数 = 100 / min (20, 50) = 5 个分区

提示

  • Kafka 的分区数一般设置为:3 ~ 10 个。
  • Kafka 的分区数并不是越多越好,也不是越少越好,需要搭建完 Kafka 集群,然后对集群进行压测,再根据压测结果调整分区的数量。

单条消息大于 1M

当 Kafka 单条消息的大小大于 1M,那么可以通过以下参数进行优化:

Broker 节点宕机

在 Kafka 生产环境(集群部署)中,如果某个 Broker 节点挂掉,正常的处理方法如下:

  • (1) 先尝试重新启动 Broker 节点,如果能够正常启动,那问题直接解决。
  • (2) 如果无法正常重启,考虑增加内存、增加 CPU、增加网络带宽。
  • (3) 如果是将整个 Broker 节点误删除掉
    • 如果分区的副本数大于等于 2,那么可以按照 Kafka 服役新节点的方式重新服役一个新节点,并执行负载均衡,将部分数据迁移到新节点。
    • 如果分区的副本只有一个,那么只能 “批量重导”,也就是写个临时程序将丢失的那批数据查询出来,然后重新将消息写入 Kafka 里面。

生产环境最佳实践

如何保证消息的顺序性

关键点

  • 不同 MQ 对消息顺序性的支持

    • 在 Kafka 和 RabbitMQ 中,无法保证消息的顺序性,需要开发者自己实现。
    • 在 RocketMQ 中,有完整的针对性设计(原生支持消息的顺序性),可以保证消息的顺序性。
  • 全局有序和局部有序

    • MQ 通常只需要保证消息在局部有序,而不需要保证消息在全局有序。
    • 比如,在局部层面上,某个订单的多个消息必须有序的,但在全局层面上,不同订单的消息是不需要保证有序的。这类似微信的聊天窗口,在单个聊天窗口内,多个消息必须是有序的,但在多个聊天窗口中消息不一定是要有序的。
  • 生产实践案例分析

    • 大数据团队需要开发一个 MySQL Binlog 同步系统,要求同步一个 MySQL 库的数据过来,对公司业务系统的数据做各种复杂的分析。那么在 MySQL 里增删改一条数据,对应出来了增删改 3 条 Binlog,接着这三条 Binlog 发送到 MQ 里面。当消费者将消息读取出来依次执行时,这就要保证消息是有序,不然本来依次是增加、修改、删除;搞错顺序后,给执行成删除、修改、增加,这就全乱套了。因为本来这条数据同步过来,最后这条数据应该被删除了;结果搞错了这个顺序,最后这条数据保留下来了,这就造成数据同步出错了。

解决方案

  • 在 Kafka 单分区内,数据是有序(需要满足一定的条件)。
    • 在单个分区内,Kafka 保证消息是按生产者写入的顺序进行存储的,并且消费者在读取时也是按这个顺序读取的。因此,只要满足一定的条件,单分区内的数据是有序的。
    • 为了保证单分区的有序性,需要满足以下任意一个条件:
      • 第一种情况: max.in.flight.requests.per.connection=1,不需要开启幂等性,但会影响生产者的吞吐量。
      • 第二种情况:开启幂等性 enable.idempotence=true,并且设置 max.in.flight.requests.per.connection 的值在 1 ~ 5 之间。
  • 在 Kafka 多分区内,分区与分区之间的数据是无序。
    • 在多分区的情况下,Kafka 会将消息分布到不同的分区中。由于分区之间是并行处理的,Kafka 不会保证分区之间的消息顺序。因此,从全局视角来看,多分区的数据是无序的。
  • 在 MQ(比如 Kafka)中,保证消息的顺序性,最关键的是严格保证生产者、MQ 队列、消费者这三者是一对一的关系。
    • 要保证一个生产者只对应一个 Topic,一个 Topic 只对应一个 Partition,并且一个 Partition 只对应一个消费者。
    • 比如,生产者在发送消息的时候,可以指定一个 Key,比如指定某个订单 的 ID 作为 Key,那么这个订单相关的所有消息,一定都会被分发到同一个 Partition 中去,而且这个 Partition 中的数据一定是有顺序的。当消费者从 Partition 中读取消息的时候,也一定是有顺序的。另外,如果消费者是单线程进行消费处理,而处理比较耗时的话,假设处理一条消息耗时几十毫秒,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。建议在消费者内部使用单线程进行消费时,将消息写入 N 个内存队列,并且通过哈希算法将拥有相同 Key 的消息都写入到同一个内存队列里面;最后启动 N 个线程,每个线程分别消费一个内存队列即可,这就能保证消息的顺序性,也能大大提高消费消息的吞吐量。整个处理流程如下图所示:

如何避免消息丢失的问题

这问题相当于 “如何保证消息的可靠性” 或者 “如何保证消息可靠传输”。消息丢失的问题,可能会出现在生产者、消息队列服务器、消费者中的任一环节。

生产者发送消息丢失

生产者发送消息后,由于网络故障或网络延迟,可能会导致消息在传输过程中丢失。

  • 解决方案
    • Kafka 只要至少配置以下 4 个参数,就可以保证生产者发送的消息不会丢失
      • (1) 给 Topic 设置 replication.factor 参数:这个值必须大于等于 2,即要求每个 Partition 必须至少有 2 个副本
      • (2) 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于等于 2,即要求一个 Leader 至少要有 1 个 Follower 还跟自己保持着同步,这样才能确保 Leader 宕机了,还有一个 Follower 可以使用
      • (3) 在生产者端设置 acks=all:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
      • (4) 在生产者端设置 retries=MAX(可以是一个很大很大的值,表示无限次重试的意思):这个是要求一旦消息发送失败,就无限重试

MQ 服务器消息同步丢失

在 MQ 集群中,多个节点之间同步消息时,可能会发生消息丢失。

  • 问题发生
    • Leader 与 Follower 之间同步消息时,可能会发生消息丢失。比如,Broker 上面有某个 Replica 的 Leader,刚好其他的 Follower 还有一些数据没有同步,此时 Leader 突然宕机了,然后 Kafa 选举某个 Follower 成为新的 Leader 之后,这就会丢失一些消息
  • 解决方案
    • 要求至少配置以下 4 个参数,这样至少可以保证 Leader 所在 Broker 发生故障,进行 Leader 切换时,Kafka 不会丢失数据
      • (1) 给 Topic 设置 replication.factor 参数:这个值必须大于 1,即要求每个 Partition 必须至少有 2 个副本
      • (2) 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 0,即要求一个 Leader 至少要有 1 个 Follower 还跟自己保持着同步,这样才能确保 Leader 宕机了,还有一个 Follower 可以使用
      • (3) 在生产者端设置 acks 参数为 all:这个是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
      • (4) 在生产者端设置 retries=MAX(可以是一个很大很大的值,表示无限次重试的意思):这个是要求一旦消息发送失败,就无限重试

MQ 服务器消息存盘丢失

MQ 服务器将内存中的数据持久化到硬盘时,可能会发生消息丢失(比如 MQ 在持久化之前意外宕机)。

  • 解决方案
    • 适当配置日志保留时间(retention.ms)和日志段大小(segment.bytes),这样可以避免日志被过早删除,特别是在高负载系统中,这可以保证数据在宕机恢复后仍然可用
    • Kafka 默认使用内存缓冲来写入磁盘数据,可以通过调整日志刷盘策略(以下两个参数)来控制数据的刷盘频率,避免数据在缓冲区中丢失
      • log.flush.interval.messages:达到一定数量的消息后强制刷盘
      • log.flush.interval.ms:在规定的时间内强制刷盘

消费者消费消息丢失

消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。

  • 解决方案
    • 关闭 Offset 自动提交,使用 Offset 手动提交

如何保证消息队列的高可用性

  • Kafka 的概述

    • Kafka 一个最基本的架构认识:由多个 Broker 组成,每个 Broker 就是一个节点;当你创建一个 Topic 时,这个 Topic 可以划分为多个 Partition(分区),每个 Partition 可以存在于不同的 Broker 上,每个 Partition 只存放这个 Topic 的一部分数据(类似数据分片)。
    • Kafka 是天生的分布式消息队列,就是说一个 Topic 的数据,是分散存放在多台机器上的,每台机器只存放一部分数据。实际上像 RabbitMQ 之类的消息队列,并不是分布式消息队列,它们本质上还是传统的消息队列,只不过提供了一些集群、HA 的机制而已。因为无论怎么使用,RabbitMQ 每一个队列的数据都是存放在一个实例(节点)里的,即使在镜像集群下,也只是多个实例(节点)都存放某个队列的完整数据。
  • Kafka 0.8 版本以前

    • 在 Kafka 的 0.8 版本以前,是没有 HA 机制的,也就是任何一个 Broker 宕机了,那个 Broker 上的 Partition 就废掉了,没法写也没法读,没有什么高可用性可言。
    • 比如说,假设创建了一个 Topic,指定其 Partition 数量是 3 个,分布在三台机器上。但是,如果第二台机器宕机了,会导致这个 Topic 的 1/3 的数据丢失,因此这个是做不到高可用的。

  • Kafka 0.8 版本以后

    • 在 Kafka 的 0.8 版本以后,提供了 HA 机制,也就是 Replica(副本)机制。每个 Partition(分区)的数据都会同步到其他机器上,形成自己的多个 Replica 副本。然后,所有 Replica 会选举一个 Leader 出来,其他 Replica 就是 Follower。
    • 生产者和消费者都只跟 Leader 打交道。在写消息的时候,Leader 会负责将数据同步到所有 Follower 上去;在读消息的时候,就直接从 Leader 读取数据即可。为什么只能读写 Leader 呢?很简单,要是可以随意读写每一个 Follower,那么就要关心数据一致性的问题,这样系统的复杂度就太高了,很容易出现问题。Kafka 默认会均匀地将一个 Partition 的所有 Replica 分布在不同的机器上,这样才可以提高容错性。
    • 这样一来就有所谓的高可用性了,因为如果某个 Broker 宕机了,那个 Broker 上面的 Partition 在其他机器上都是有副本的。如果在那个 Broker 上面刚好有某个 Partition 的 Leader,那么此时会重新选举一个新的 Leader 出来,然后继续读写那个新的 Leader 即可。
    • 在写消息的时候,生产者只会写给 Leader,然后 Leader 将数据写入到本地磁盘,接着其他 Follower 自己主动从 Leader 拉取最新的数据。一旦所有 Follower 同步完数据,就会发送 ACK 给 Leader,当 Leader 收到所有 Follower 的 ACK 之后,就会返回写成功的消息给生产者(当然,这只是其中一种模式,还可以适当调整这个行为)。在读消息的时候,消费者只会从 Leader 读取消息,但是只有当一个消息已经被所有 Follower 都同步成功并返回 ACK 给 Leader 的时候,这个消息才会被消费者读取到。

如何解决消息大量积压的问题

消息积压的概述

消息积压指的是消息在消息队列中堆积而未能及时处理的情况。消息的积压主要来自于两方面:要么消息生产变快了,要么消息消费变慢了。

  • 监控发现,生产和消费消息的速度没什么变化,出现消息积压的情况,检查是有消费失败反复消费的情况。
  • 监控发现,消费消息的速度变慢,检查消费实例,日志中是否有大量消费错误、消费线程是否死锁、是否卡在某些资源上。
  • 单位时间内发送的消息增多,比如赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,但可以通过扩容消费端的实例数来提升总体的消费能力
  • 如果短时间内没有服务器资源扩容,可以将系统降级,通过关闭某些不重要的业务,减少消息发送的数据量,最低限度让系统还能正常运转,保证核心业务的可用性
  • 严重影响 MQ 甚至整个系统时,可以考虑临时启用多个消费者,并发接收消息,同时持久化消息(比如写入数据库),过段时间再将持久化的消息重新写回 MQ 中进行消费,或者极端情况下直接丢弃消息

消息积压的解决方案

可以使用扩容来解决消息积压的问题,比如利用临时消费者,消费原来积压在队列中的消息。该消费者不做任何耗时的操作,将消息均匀写入新创建的队列里,最后将更多 Consumer 部署到更多的机器上消费新创建队列上的消息。等待积压的消息被消费,恢复到正常状态后,撤掉扩容服务器。具体步骤和思路如下:

  • (1) 先修复 Consumer 的问题,确保其恢复正常的消费速度,然后将现有的 Consumer 都停掉
  • (2) 临时建立好原先 10 倍或者 20 倍的 Queue 数量
  • (3) 写一个临时的分发消息的 Consumer 程序,将这个程序部署上去消费积压的消息,消费之后不做任何耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue
  • (4) 接着临时征用 10 倍机器来部署 Consumer 实例,每一个(或者一批) Consumer 消费一个临时 Queue 的数据
  • (5) 等积压的数据都被消费完之后,恢复原先的部署架构,重新用原先的 Consumer 机器来消费消息

这种做法相当于临时将 Queue 资源和 Consumer 资源扩大了 10 倍,即以正常的 10 倍速度消费积压的消息,如下图所示:

如何解决消息队列的过期失效问题

假设用的是 RabbitMQ,由于 RabbitMQ 是可以设置过期时间的(TTL),如果消息在 Queue 中积压超过一定的时间,就会被 RabbitMQ 清理掉。这个时候就不会有消息被大量积压的问题,而是会有大量的消息丢失了。这种情况下,就不是说要增加 Consumer 消费积压的消息了,因为实际上消息是没有积压的,而是丢了大量的消息。

可以采取的一个解决方案就是 “批量重导”。当大量的消息积压的时候,由于设置了过期时间,RabbitMQ 会直接丢弃数据,然后等业务高峰期过了之后,例如在晚上 12 点以后,写个临时程序将丢失的那批数据查询出来,然后重新将消息写入 RabbitMQ 里,即把白天丢的消息全部补回来。假设 10000 个订单积压在 RabbitMQ 里面,没有来得及处理掉,其中 2000 个订单都丢了,那么只能手动写个临时程序把那 2000 个订单查询出来,然后手动发送消息到 RabbitMQ 中重新进行消费。

消息队列的磁盘满了应该怎么处理

消息积压在 MQ 里,那么如果很长时间都没有处理掉,此时导致 MQ 都快将磁盘写满了,那应该怎么办?这个时候可以写一个临时程序,启用多个消费者,并发消费消息,同时将消息持久化(比如写入数据库),即快速消费掉 MQ 中积压的消息。到凌晨的时候,将持久化的消息重新写回 MQ 中进行消费;如果希望加快已持久化消息的消费速度,可以引入上述的消息积压扩容解决方案