Kafka 开发随笔

Kafka 核心配置

提示

本节将介绍 Kafka 核心配置的最佳实践,若需要具体的配置案例代码,请参考 这里

生产者提高吞吐量

为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:

  • (1) 在生产者端设置 batch.size:批次大小,默认 16k
  • (2) 在生产者端设置 linger.ms:等待时间,默认 0ms,修改为 5-100ms
  • (3) 在生产者端设置 compression.type:压缩方式,默认是 none,修改过为 snappy
  • (4) 在生产者端设置 RecordAccumulator:缓冲区(双端队列)大小,默认是 32m,修改为 64m

特别注意

  • (1) 上述的四个参数值并不是设置得越大就越好,设置得过大会导致 Kafka 中的消息被延迟消费。
  • (2) 当 Topic 的分区数量比较多的时候,可以适当增加 RecordAccumulator(缓冲区) 的大小。

生产者避免消息丢失

生产者发送消息后,由于网络故障或网络延迟,可能会导致消息在传输过程中丢失。Kafka 只要至少配置以下 4 个参数,就可以保证生产者发送的消息不会丢失:

  • (1) 给 Topic 设置 replication.factor 参数:这个值必须大于等于 2,即要求每个 Partition 必须至少有 2 个副本
  • (2) 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于等于 2,即要求一个 Leader 至少要有 1 个 Follower 还跟自己保持着同步,这样才能确保 Leader 宕机了,还有一个 Follower 可以使用
  • (3) 在生产者端设置 acks=all:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
  • (4) 在生产者端设置 retries=MAX(可以是一个很大很大的值,表示无限次重试的意思):这个是要求一旦消息发送失败,就无限重试

生产者避免消息重复发送

生产者开启幂等性

什么是 Kafka 的幂等性

  • Kafka 的幂等性是指 Producer (生产者) 无论向 Broker 发送多少条重复消息,Broker 端都只会持久化一条消息,保证了消息不重复。
  • 精确一次 (Exactly Once) = 幂等性 + 至少一次 (acks = -1 + 分区副本数 >= 2 + ISR 里应答的最小副本数量 >= 2)。
  • Kafka 官方文档中的幂等性详细介绍可以看 这里
  • 在 Kafka 中开启幂等性的配置参数是 enable.idempotence,默认值为 true,设置 false 会关闭幂等性。如果没有设置冲突的配置,默认情况下会启用幂等性。如果设置了冲突的配置,并且未显式启用幂等性,则会禁用幂等性。如果显式启用了幂等性,并且设置了冲突的配置,则会抛出 ConfigException 异常。

  • 当在生产者端设置 enable.idempotencetrue 时,生产者将确保数据流中只写入每条消息的一个副本,即可以确保生产者不会写入重复消息。如果设置为 false,由于代理失败等原因,生产者重试发送消息,可能会在数据流中写入重试消息的副本,即生产者可能会写入重复消息。

  • 特别注意,Kafka 启用幂等性要至少满足以下 4 个条件:

    • (1) 在生产者端设置 enable.idempotence 参数:这个值必须为 true,即开启幂等性
    • (2) 在生产者端设置 max.in.flight.requests.per.connection 参数:这个值必须小于或等于 5
    • (3) 在生产者端设置 acks=all:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
    • (4) 在生产者端设置 retries:这个值必须大于 0,即必须要有重试机制

生产者使用事务机制

Kafa 除了可以使用幂等性来解决生产者重复发送消息之外,还可以使用事务机制来解决,详细介绍请看 [这里](/posts/60ddcede.html# 事务使用)。

生产最佳落地实践

如何保证消息的顺序性

关键点

  • 不同 MQ 对消息顺序性的支持

    • 在 Kafka 和 RabbitMQ 中,无法保证消息的顺序性,需要开发者自己实现。
    • 在 RocketMQ 中,有完整的针对性设计(原生支持消息的顺序性),可以保证消息的顺序性。
  • 全局有序和局部有序

    • MQ 通常只需要保证消息在局部有序,而不需要保证消息在全局有序。
    • 比如,在局部层面上,某个订单的多个消息必须有序的,但在全局层面上,不同订单的消息是不需要保证有序的。这类似微信的聊天窗口,在单个聊天窗口内,多个消息必须是有序的,但在多个聊天窗口中消息不一定是要有序的。
  • 生产实践案例分析

    • 大数据团队需要开发一个 MySQL Binlog 同步系统,要求同步一个 MySQL 库的数据过来,对公司业务系统的数据做各种复杂的分析。那么在 MySQL 里增删改一条数据,对应出来了增删改 3 条 Binlog,接着这三条 Binlog 发送到 MQ 里面。当消费者将消息读取出来依次执行时,这就要保证消息是有序,不然本来依次是增加、修改、删除;搞错顺序后,给执行成删除、修改、增加,这就全乱套了。因为本来这条数据同步过来,最后这条数据应该被删除了;结果搞错了这个顺序,最后这条数据保留下来了,这就造成数据同步出错了。

解决方案

在 MQ 中,保证消息的顺序性,最关键的是严格保证生产者、MQ 队列、消费者这三者是一对一的关系

  • 要保证一个生产者只对应一个 Topic,一个 Topic 只对应一个 Partition,并且一个 Partition 只对应一个消费者。

  • 比如,生产者在发送消息的时候,可以指定一个 Key,比如指定某个订单 的 ID 作为 Key,那么这个订单相关的所有消息,一定都会被分发到同一个 Partition 中去,而且这个 Partition 中的数据一定是有顺序的。当消费者从 Partition 中读取消息的时候,也一定是有顺序的。另外,如果消费者是单线程进行消费处理,而处理比较耗时的话,假设处理一条消息耗时几十毫秒,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。建议在消费者内部使用单线程进行消费时,将消息写入 N 个内存队列,并且通过哈希算法将拥有相同 Key 的消息都写入到同一个内存队列里面;最后启动 N 个线程,每个线程分别消费一个内存队列即可,这就能保证消息的顺序性,也能大大提高消费消息的吞吐量。

如何避免消息丢失的问题

这问题相当于 “如何保证消息的可靠性” 或者 “如何保证消息可靠传输”。消息丢失的问题,可能会出现在生产者、消息队列服务器、消费者中的任一环节。

生产者发送消息丢失

生产者发送消息后,由于网络故障或网络延迟,可能会导致消息在传输过程中丢失。

  • 解决方案
    • Kafka 只要至少配置以下 4 个参数,就可以保证生产者发送的消息不会丢失
      • (1) 给 Topic 设置 replication.factor 参数:这个值必须大于等于 2,即要求每个 Partition 必须至少有 2 个副本
      • (2) 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于等于 2,即要求一个 Leader 至少要有 1 个 Follower 还跟自己保持着同步,这样才能确保 Leader 宕机了,还有一个 Follower 可以使用
      • (3) 在生产者端设置 acks=all:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
      • (4) 在生产者端设置 retries=MAX(可以是一个很大很大的值,表示无限次重试的意思):这个是要求一旦消息发送失败,就无限重试

MQ 服务器消息同步丢失

在 MQ 集群中,多个节点之间同步消息时,可能会发生消息丢失。

  • 问题发生
    • Leader 与 Follower 之间同步消息时,可能会发生消息丢失。比如,Broker 上面有某个 Replica 的 Leader,刚好其他的 Follower 还有一些数据没有同步,此时 Leader 突然宕机了,然后 Kafa 选举某个 Follower 成为新的 Leader 之后,这就会丢失一些消息
  • 解决方案
    • 要求至少配置以下 4 个参数,这样至少可以保证 Leader 所在 Broker 发生故障,进行 Leader 切换时,Kafka 不会丢失数据
      • (1) 给 Topic 设置 replication.factor 参数:这个值必须大于 1,即要求每个 Partition 必须至少有 2 个副本
      • (2) 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 0,即要求一个 Leader 至少要有 1 个 Follower 还跟自己保持着同步,这样才能确保 Leader 宕机了,还有一个 Follower 可以使用
      • (3) 在生产者端设置 acks 参数为 all:这个是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
      • (4) 在生产者端设置 retries=MAX(可以是一个很大很大的值,表示无限次重试的意思):这个是要求一旦消息发送失败,就无限重试

MQ 服务器消息存盘丢失

MQ 服务器将内存中的数据持久化到硬盘时,可能会发生消息丢失(比如 MQ 在持久化之前意外宕机)。

  • 解决方案
    • 适当配置日志保留时间(retention.ms)和日志段大小(segment.bytes),这样可以避免日志被过早删除,特别是在高负载系统中,这可以保证数据在宕机恢复后仍然可用
    • Kafka 默认使用内存缓冲来写入磁盘数据,可以通过调整日志刷盘策略(以下两个参数)来控制数据的刷盘频率,避免数据在缓冲区中丢失
      • log.flush.interval.messages:达到一定数量的消息后强制刷盘
      • log.flush.interval.ms:在规定的时间内强制刷盘

消费者消费消息丢失

消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。

  • 解决方案
    • 关闭 Offset 自动提交,使用 Offset 手动提交

如何保证消息队列的高可用性

  • Kafka 的概述

    • Kafka 一个最基本的架构认识:由多个 Broker 组成,每个 Broker 就是一个节点;当你创建一个 Topic 时,这个 Topic 可以划分为多个 Partition(分区),每个 Partition 可以存在于不同的 Broker 上,每个 Partition 只存放这个 Topic 的一部分数据(类似数据分片)。
    • Kafka 是天生的分布式消息队列,就是说一个 Topic 的数据,是分散存放在多台机器上的,每台机器只存放一部分数据。实际上像 RabbitMQ 之类的消息队列,并不是分布式消息队列,它们本质上还是传统的消息队列,只不过提供了一些集群、HA 的机制而已。因为无论怎么使用,RabbitMQ 每一个队列的数据都是存放在一个实例(节点)里的,即使在镜像集群下,也只是多个实例(节点)都存放某个队列的完整数据。
  • Kafka 0.8 版本以前

    • 在 Kafka 的 0.8 版本以前,是没有 HA 机制的,也就是任何一个 Broker 宕机了,那个 Broker 上的 Partition 就废掉了,没法写也没法读,没有什么高可用性可言。
    • 比如说,假设创建了一个 Topic,指定其 Partition 数量是 3 个,分布在三台机器上。但是,如果第二台机器宕机了,会导致这个 Topic 的 1/3 的数据丢失,因此这个是做不到高可用的。

  • Kafka 0.8 版本以后

    • 在 Kafka 的 0.8 版本以后,提供了 HA 机制,也就是 Replica(副本)机制。每个 Partition(分区)的数据都会同步到其他机器上,形成自己的多个 Replica 副本。然后,所有 Replica 会选举一个 Leader 出来,其他 Replica 就是 Follower。
    • 生产者和消费者都只跟 Leader 打交道。在写消息的时候,Leader 会负责将数据同步到所有 Follower 上去;在读消息的时候,就直接从 Leader 读取数据即可。为什么只能读写 Leader 呢?很简单,要是可以随意读写每一个 Follower,那么就要关心数据一致性的问题,这样系统的复杂度就太高了,很容易出现问题。Kafka 默认会均匀地将一个 Partition 的所有 Replica 分布在不同的机器上,这样才可以提高容错性。
    • 这样一来就有所谓的高可用性了,因为如果某个 Broker 宕机了,那个 Broker 上面的 Partition 在其他机器上都是有副本的。如果在那个 Broker 上面刚好有某个 Partition 的 Leader,那么此时会重新选举一个新的 Leader 出来,然后继续读写那个新的 Leader 即可。
    • 在写消息的时候,生产者只会写给 Leader,然后 Leader 将数据写入到本地磁盘,接着其他 Follower 自己主动从 Leader 拉取最新的数据。一旦所有 Follower 同步完数据,就会发送 ACK 给 Leader,当 Leader 收到所有 Follower 的 ACK 之后,就会返回写成功的消息给生产者(当然,这只是其中一种模式,还可以适当调整这个行为)。在读消息的时候,消费者只会从 Leader 读取消息,但是只有当一个消息已经被所有 Follower 都同步成功并返回 ACK 给 Leader 的时候,这个消息才会被消费者读取到。

如何解决消息大量积压的问题

消息积压的概述

消息积压指的是消息在消息队列中堆积而未能及时处理的情况。消息的积压主要来自于两方面:要么消息生产变快了,要么消息消费变慢了。

  • 监控发现,生产和消费消息的速度没什么变化,出现消息积压的情况,检查是有消费失败反复消费的情况。
  • 监控发现,消费消息的速度变慢,检查消费实例,日志中是否有大量消费错误、消费线程是否死锁、是否卡在某些资源上。
  • 单位时间内发送的消息增多,比如赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,但可以通过扩容消费端的实例数来提升总体的消费能力
  • 如果短时间内没有服务器资源扩容,可以将系统降级,通过关闭某些不重要的业务,减少消息发送的数据量,最低限度让系统还能正常运转,保证核心业务的可用性
  • 严重影响 MQ 甚至整个系统时,可以考虑临时启用多个消费者,并发接收消息,同时持久化消息(比如写入数据库),过段时间再将持久化的消息重新写回 MQ 中进行消费,或者极端情况下直接丢弃消息

消息积压的解决方案

可以使用扩容来解决消息积压的问题,比如利用临时消费者,消费原来积压在队列中的消息。该消费者不做任何耗时的操作,将消息均匀写入新创建的队列里,最后将更多 Consumer 部署到更多的机器上消费新创建队列上的消息。等待积压的消息被消费,恢复到正常状态后,撤掉扩容服务器。具体步骤和思路如下:

  • (1) 先修复 Consumer 的问题,确保其恢复正常的消费速度,然后将现有的 Consumer 都停掉
  • (2) 临时建立好原先 10 倍或者 20 倍的 Queue 数量
  • (3) 写一个临时的分发消息的 Consumer 程序,将这个程序部署上去消费积压的消息,消费之后不做任何耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue
  • (4) 接着临时征用 10 倍机器来部署 Consumer 实例,每一个(或者一批) Consumer 消费一个临时 Queue 的数据
  • (5) 等积压的数据都被消费完之后,恢复原先的部署架构,重新用原先的 Consumer 机器来消费消息

这种做法相当于临时将 Queue 资源和 Consumer 资源扩大了 10 倍,即以正常的 10 倍速度消费积压的消息,如下图所示:

如何解决消息队列的过期失效问题

假设用的是 RabbitMQ,由于 RabbitMQ 是可以设置过期时间的(TTL),如果消息在 Queue 中积压超过一定的时间,就会被 RabbitMQ 清理掉。这个时候就不会有消息被大量积压的问题,而是会有大量的消息丢失了。这种情况下,就不是说要增加 Consumer 消费积压的消息了,因为实际上消息是没有积压的,而是丢了大量的消息。

可以采取的一个解决方案就是 “批量重导”。当大量的消息积压的时候,由于设置了过期时间,RabbitMQ 会直接丢弃数据,然后等业务高峰期过了之后,例如在晚上 12 点以后,写个临时程序将丢失的那批数据查询出来,然后重新将消息写入 RabbitMQ 里,即把白天丢的消息全部补回来。假设 10000 个订单积压在 RabbitMQ 里面,没有来得及处理掉,其中 2000 个订单都丢了,那么只能手动写个临时程序把那 2000 个订单查询出来,然后手动发送消息到 RabbitMQ 中重新进行消费。

消息队列的磁盘满了应该怎么处理

消息积压在 MQ 里,那么如果很长时间都没有处理掉,此时导致 MQ 都快将磁盘写满了,那应该怎么办?这个时候可以写一个临时程序,启用多个消费者,并发消费消息,同时将消息持久化(比如写入数据库),即快速消费掉 MQ 中积压的消息。到凌晨的时候,将持久化的消息重新写回 MQ 中进行消费;如果希望加快已持久化消息的消费速度,可以引入上述的消息积压扩容解决方案