大纲
消息队列整合
消息队列介绍
使用场景
异步
- 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们
解耦
- 允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
削峰
- 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源并随时待命,这无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
缓冲
- 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况
Kafka 使用介绍
基本概念
总体架构
Producer
:消息生产者,就是向 Kafka Broker 发消息的客户端。Consumer
:消息消费者,就是向 Kafka Broker 取消息的客户端。Consumer Group (CG)
:消费者组,由多个 consumer
组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。Broker
:一台 Kafka 服务器就是一个 broker
。一个 Kafka 集群由多个 broker
组成。一个 broker
可以容纳多个 topic
。Topic
:主题,可以理解为一个队列,生产者和消费者面向的都是一个 topic
。Partition
:分区,为了实现扩展性,一个非常大的 topic
可以分布到多个 broker
(即 Kafka 服务器)上,一个 topic
可以分为多个 partition
,每个 partition
是一个有序的队列。Replica
:副本,为保证集群中的某个节点发生故障时,该节点上的 partition
数据不丢失,且让 Kafka 仍然能够继续工作,Kafka 为此提供了副本机制。一个 topic
的每个分区都有若干个副本,包括一个 leader
和若干个 follower
。Leader
:每个分区多个副本的 主
,生产者发送数据的对象,以及消费者消费数据的对象都是 leader
。Follower
:每个分区多个副本的 从
,实时从 leader
中同步数据,保持和 leader
数据的同步。leader
发生故障时,某个 follower
会成为新的 leader
。
工作原理
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main
线程和 Sender
线程。在 main
线程中,会创建一个双端队列 RecordAccumulator
。值得一提的是,main
线程将消息发送给 RecordAccumulator
时,Sender
线程会不断从 RecordAccumulator
中拉取消息并发送到 Kafka Broker。
消息模式
点对点模式
点对点模式
就是一对一,消费者主动拉取数据,消息收到后消息会被清除。消息生产者将消息发送到 Queue 中,然后消息消费者从 Queue 中取出并消费消息。消息被消费以后,Queue 中不再存储它,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
发布 / 订阅模式
发布/订阅模式
就是一对多,消息产生后主动推送给订阅者,消费者消费消息之后不会清除消息。消息生产者(发布)将消息发布到 topic
主题(如浏览、点赞、收藏、评论等)中,同时有多个消息消费者(订阅)消费该消息。这和点对点模式不同,每个消费者互相独立,发布到 topic
的消息会被所有订阅者消费。
Kafka 整合案例
本章节所需的案例代码,可以直接从 GitHub 下载对应章节 spring-boot3-15
。更多关于 Spring 整合 Kafka 的使用说明,请阅读 官方文档。
准备工作
首先在 Kafka 中创建 Topic (主题),建议使用 Kafka-UI 这样的 GUI 工具进行操作,如下所示:
引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
添加配置
1 2
| spring.kafka.bootstrap-servers=127.0.0.1:9092
|
消息发送
发送普通消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @SpringBootTest public class KafkaSendStringMessageTest {
@Autowired private KafkaTemplate kafkaTemplate;
@Test public void sendSimpleMessage() { CompletableFuture[] futures = new CompletableFuture[5]; StopWatch stopWatch = new StopWatch(); stopWatch.start();
for (int i = 0; i < 5; i++) { CompletableFuture future = kafkaTemplate.send("news", "hello", "world"); futures[i] = future; }
CompletableFuture.allOf(futures).join(); stopWatch.stop();
System.out.println("take " + stopWatch.getTotalTimeMillis() + " millis to send message"); }
}
|
发送对象消息
- Kafka 客户端默认是以字符串的形式发送消息,如果发送的是 POJO 对象,则需要指定 Value 的序列化器(如使用 JSON 序列化器)
1 2 3 4 5 6 7 8
| spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
|
1 2 3 4 5 6 7 8 9 10
| @Data @NoArgsConstructor @AllArgsConstructor public class Person {
private Long id; private String name; private Integer age;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @SpringBootTest public class KafkaSendObjectMessageTest {
@Autowired private KafkaTemplate kafkaTemplate;
@Test public void sendObjectMessage() { Person person = new Person(1L, "Jim", 18); CompletableFuture future = kafkaTemplate.send("news", "person", person); future.join(); System.out.println("success to send message"); }
}
|
- 消息内容最终会以 JSON 字符串的形式存储在 Kafka 中
消息订阅
订阅最新消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @Component public class SportTopicListener {
@KafkaListener(topics = "sport", groupId = "sport-group-1") public void subscribeNewest(ConsumerRecord record) { Object key = record.key(); Object value = record.value(); String topic = record.topic(); log.info("receive message from topic {}, key: {}, value: {}", topic, key, value); }
}
|
订阅所有消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @Component public class SportTopicListener {
@KafkaListener(groupId = "sport-group-2", topicPartitions = {@TopicPartition(topic = "sport", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")})}) public void subscribeAll(ConsumerRecord record) { Object key = record.key(); Object value = record.value(); String topic = record.topic(); log.info("receive message from topic [{}], key: {}, value: {}", topic, key, value); }
}
|
创建主题
自动创建主题
若希望 SpringBoot 应用在启动的时候,自动创建 Topic (主题),则可以参考以下代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Configuration public class KafkaConfiguration {
@Bean public NewTopic sportTopic() { return TopicBuilder.name("sport") .partitions(1) .compact() .build(); }
}
|
- 在主启动类上添加
@EnableKafka
注解,开启 Kafka 的注解驱动功能
1 2 3 4 5 6 7 8 9
| @EnableKafka @SpringBootApplication public class MainApplication {
public static void main(String[] args) { SpringApplication.run(MainApplication.class, args); }
}
|
自动配置原理
- kafka 的自动配置由
KafkaAutoConfiguration
实现- 往容器中放注入了
KafkaTemplate
,可以进行消息的接收和发送 - 往容器中放了
KafkaAdmin
,可以进行 Kafka 的管理,比如创建 Topic 等 - Kafka 的配置内容都在
KafkaProperties
中 @EnableKafka
注解可以开启 Kafka 基于注解的模式