Kafka 入门教程之六

大纲

前言

学习资源

Kafka 集成 Flume

Flume 简介

Flume 是一个分布式、可靠且高可用的日志收集和传输系统,主要用于大数据环境中的海量数据采集,功能类似于 ELK 中的 LogStash。它最初由 Cloudera 提出,是 Apache 软件基金会的顶级项目之一,主要用于将日志数据从不同的来源(如服务器日志文件、事件流等)采集、聚合并传输到大数据处理系统(如 HDFS 或 HBase)。请注意,Flume 项目已于 2024 年 10 月 10 日被标记为休眠状态,停止维护,官方建议迁移到其他替代方案。

其他日志采集技术

在大数据和日志收集领域,还有许多类似的技术,比如 Kafka、Logstash、Filebeat、Fluentd、Graylog、Splunk Forwarder、Apache NiFi、Heka 等。

Flume 的优缺点

  • 优点:
    • 易于集成 Hadoop 生态系统。
    • 配置简单,使用灵活。
    • 支持多种数据源和目标。
    • 高可用和可靠性强。
  • 缺点:
    • 对实时性要求特别高的场景可能不适用。
    • 配置灵活但复杂度高,可能需要较多调优。

Flume 的核心特点

  • 分布式架构:
    • 支持分布式部署,可以跨多个节点扩展采集和传输能力。
  • 高可靠性:
    • 提供可靠的事件传输机制,确保数据不会丢失。
  • 灵活性:
    • 通过配置文件,可以灵活地定义数据来源、数据处理逻辑和数据目标。
  • 可扩展性:
    • 提供插件机制,可以轻松扩展以支持自定义的数据来源和目标。
  • 数据传输效率高:
    • 支持批量传输和数据压缩,提升传输效率。

Flume 的核心组件

  • Source(数据源):

    • 数据输入端,负责从外部数据源中获取数据。例如,可以从日志文件、网络端口或自定义数据源读取数据。
    • 常见的 Source 类型:
      • Avro Source
      • Spooling Directory Source
      • Syslog Source
      • HTTP Source
  • Channel(通道):

    • 数据的中转站,临时存储数据以确保数据的可靠性,同时可以解耦 Source 和 Sink 的处理速度。
    • 常见的 Channel 类型:
      • Memory Channel:速度快,但数据可能丢失(断电或程序崩溃),适用于非关键性数据。
      • File Channel:速度较慢,但数据安全性高,适合需要保证数据可靠性的场景。
      • Kafka Channel: 将 Kafka 作为 Channel,适合分布式和高可靠性场景。
      • JDBC Channel:,将数据库作为 Channel,速度较慢,但数据可靠性高,适合要求高可靠性、需要持久化存储的场景。
  • Sink(数据目的地):

    • 数据的输出端,负责将数据发送到目标存储或处理系统。
    • 常见的 Sink 类型:
      • HDFS Sink
      • HBase Sink
      • Kafka Sink
  • Agent(代理):

    • Flume 的运行实例,一个 Agent 包括一个或多个 Source、Channel 和 Sink。
  • Interceptors(拦截器):

    • 用于在数据流动过程中,对数据进行过滤、修改或增强。

Flume 的工作原理

  • 数据采集:
    • Source 从指定数据源获取数据,并将数据封装成 Event(事件)。
  • 数据传输:
    • Event 被传递到 Channel,并存储在 Channel 中。
  • 数据写入:
    • Sink 从 Channel 中读取 Event,并将其写入指定的目标存储或处理系统。

Flume 的适用场景

  • 日志采集:
    • 将分布式系统中的应用日志采集并传输到 HDFS 或 Kafka 进行分析。
  • 实时数据流处理:
    • 配合 Kafka 等流处理系统,传输实时事件流数据。
  • 海量数据传输:
    • 在分布式环境下高效、稳定地传输大规模数据。
  • 数据备份:
    • 将日志或事件数据备份到分布式存储系统中。

为什么要同时使用 Flume 和 Kafka,而不是单独使用 Kafka 采集日志信息?

  • 数据采集能力:Flume 支持多种数据源(如文件、HTTP、Syslog),采集灵活,而 Kafka 更适合作为高性能消息队列。
  • 缓存与流控:Flume 的 Channel 提供可靠的缓冲和容错能力,可应对下游系统压力或故障。
  • 数据预处理:Flume 支持通过拦截器进行数据过滤、格式化和增强,简化了日志预处理。
  • 与 HDFS/HBase 的无缝集成:Flume 内置对 Hadoop 生态的支持,可直接将数据写入 HDFS 或 HBase。
  • 兼容现有系统:许多项目中 Flume 已经存在,引入 Kafka 可以重用配置,降低改造成本。
  • 灵活架构:Flume 负责采集日志,Kafka 负责分发数据,实现采集与分发的解耦。

Flume 集成

在大数据和日志收集领域,Flume 可以作为 Kafka 的生产者,也可以作为 Kafka 的消费者,如下图所示:

Flume 的安装与配置

  • 官网 下载 Flume 的二进制压缩包并解压
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 下载文件
$ wget http://archive.apache.org/dist/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz

# 解压文件
$ tar -xvf apache-flume-1.11.0-bin.tar.gz

# 重命名目录
$ mv apache-flume-1.11.0-bin flume

# 移动目录
$ sudo mv flume /opt/flume

# 文件授权
$ sudo chmod -R 777 /opt/flume

# 脚本授权执行
$ sudo chmod +x /opt/flume/bin/flume-ng
  • 更改 Flume 的日志目录路径
1
2
3
4
5
# 进入 Flume 的配置目录
$ cd /opt/flume/conf

# 编辑日志配置文件,更改以下内容(建议使用绝对路径)
$ vi conf/log4j2.xml
1
2
3
<Properties>
<Property name="LOG_DIR">/opt/flume/logs</Property>
</Properties>
  • 调整 Flume 的堆内存大小(在生产环境中,建议设置堆内存为 4G 或以上)
1
2
3
4
5
6
7
8
# 进入 Flume 的配置目录
$ cd /opt/flume/conf

# 拷贝运行环境脚本文件
$ cp flume-env.sh.template flume-env.sh

# 编辑运行环境脚本文件(更改以下内容)
$ vi flume-env.sh
1
export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
  • 添加环境变量
1
2
3
4
5
6
7
# 更改系统配置文件,添加环境变量
$ sudo vi /etc/profile
export KE_HOME=/opt/flume
export PATH=$PATH:$KE_HOME/bin

# 使系统配置文件的更改生效
$ sudo source /etc/profile

Flume 作为 Kafka 的生产者

本节将演示 Flume 通过 Source 组件读取磁盘里的日志文件,并将日志信息写入到 Channel(基于内存)组件里面,然后通过 Sink 组件从 Channel 中读取日志信息并写入 Kafka,最后由 Kafka 消费者消费日志信息(如下图所示)。

集成 Kafka 的配置
  • 创建用于测试的日志文件
1
2
3
4
5
6
7
8
# 创建日志目录
$ sudo mkdir /opt/applog

# 创建日志文件
$ sudo touch /opt/applog/app.log

# 日志文件授权
$ sudo chmod 666 /opt/applog/app.log
  • 创建 Position 文件
1
2
3
4
5
# 创建 Position 目录
$ mkdir /opt/flume/positions

# 创建 Position 文件
$ touch /opt/flume/positions/taildir_position.json
  • 创建 Job 文件
1
2
3
4
5
6
7
8
# 创建 Job 目录
$ mkdir /opt/flume/jobs

# 创建 Job 文件
$ touch /opt/flume/jobs/file_to_kafka.conf

# 编辑 Job 文件(添加以下内容)
$ vi /opt/flume/jobs/file_to_kafka.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/applog/app.*
a1.sources.r1.positionFile = /opt/flume/positions/taildir_position.json

# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

重要参数说明

  • a1.sources.r1.filegroups.f1:需要传输日志数据到 Kafka 的日志文件
  • a1.channels.c1.capacity:Channel 的总容量
  • a1.channels.c1.transactionCapacity:Channel 在每次事务中处理的最大事件数量
  • a1.sinks.k1.kafka.bootstrap.servers:Kafka 集群的节点列表
  • a1.sinks.k1.kafka.topic:Kafka 的 Topic
  • a1.sinks.k1.kafka.flumeBatchSize:控制 Sink 每次从 Channel 中批量读取的最大事件数量
  • a1.sinks.k1.kafka.producer.linger.ms:底层 Kafka 客户端的参数,用于控制 Producer 在每次批量发送数据前的等待时间
集成 Kafka 的测试
  • 启动 Kafka 控制台消费者
1
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first
  • 启动 Flume
1
2
3
4
5
# 进入 Flume 的安装目录
$ cd /opt/flume/

# 后台启动 Flume(如果希望前台启动,可以去掉尾部的 & 符号)
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf &

查看 Flume 是否启动成功

  • 查看进程信息:ps -aux|grep flume
  • 查看启动日志:vi /opt/flume/logs/flume.log
  • 往日志文件追加数据,查看 Kafka 控制台消费者的消费情况
1
2
# 追加写入日志文件
$ echo "hello kafka" >> /opt/applog/app.log
集成 Kafka 的优化

为了进一步优化 Flume 采集日志的效率,还可以采用 Kafka Channel(相当于 Kafka 作为 Channel),将日志信息直接写入 Kafka,从而省去了 Sink 的处理,提高整体的处理效率。Flume 完整的 Job 配置示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 组件定义
a1.sources = r1
a1.channels = c1

# 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/applog/app.*
a1.sources.r1.positionFile = /opt/flume/positions/taildir_position.json

# 配置 channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.channels.c1.kafka.topic = first
a1.channels.c1.parseAsFlumeEvent = false

# 拼接组件
a1.sources.r1.channels = c1

重要参数说明

  • a1.sources.r1.filegroups.f1:需要传输日志数据到 Kafka 的日志文件
  • a1.channels.c1.kafka.bootstrap.servers:Kafka 集群的节点列表
  • a1.channels.c1.kafka.topic:Kafka 的 Topic

Flume 作为 Kafka 的消费者

本节将演示 Flume 通过 Source 组件消费 Kafka 中的消息,并将消息写入到 Channel(基于内存)组件里面,然后通过 Sink 组件将消息打印到控制台和记录到日志文件中(如下所示)。

集成 Kafka 的配置
  • 创建 Job 文件
1
2
3
4
5
6
7
8
# 创建 Job 目录
$ mkdir /opt/flume/jobs

# 创建 Job 文件
$ touch /opt/flume/jobs/kafka_to_log.conf

# 编辑 Job 文件(添加以下内容)
$ vi /opt/flume/jobs/kafka_to_log.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = flume_group

# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置 sink
a1.sinks.k1.type = logger

# 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

重要参数说明

  • a1.sources.r1.batchSize:Source 每次批量写入 Channel 的最大事件数量
  • a1.sources.r1.batchDurationMillis:Souce 将事件批量写入到 Channel 的最大时间间隔
  • a1.sources.r1.kafka.bootstrap.servers:Kafka 集群的节点列表
  • a1.sources.r1.kafka.topics:Kafka 的 Topic
  • a1.sources.r1.kafka.consumer.group.id:Kafka 的消费者组 ID
集成 Kafka 的测试
  • 启动 Flume
1
2
3
4
5
# 进入 Flume 的安装目录
$ cd /opt/flume/

# 后台启动 Flume(如果希望前台启动,可以去掉尾部的 & 符号)
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_log.conf -Dflume.root.logger=INFO,console &

查看 Flume 是否启动成功

  • 查看进程信息:ps -aux|grep flume
  • 查看启动日志:vi /opt/flume/logs/flume.log
  • 启动 Kafka 控制台生产者,并手动发送消息
1
$ kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic first
  • 查看 Flume 的日志文件,观察是否可以记录 Kafka 控制台生产者发送的消息
1
$ tail -f /opt/flume/logs/flume.log
1
2
3
INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95)  - Event: { headers:{topic=first, partition=1, offset=6045, timestamp=1733839351854} body: 68 65 6C 6C 6F      hello }
INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95) - Event: { headers:{topic=first, partition=0, offset=5656, timestamp=1733839231348} body: 6B 61 66 6B 61 kafka }
INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95) - Event: { headers:{topic=first, partition=2, offset=4103, timestamp=1733839428791} body: 66 6C 75 6D 65 flume }

Flink 是一个分布式数据处理框架和计算引擎,它最初由德国柏林工业大学开发,后来开源并成为 Apache 软件基金会的顶级项目。Flink 被广泛用于处理大规模数据流,特别是在实时数据处理和批处理场景中表现出色。

  • 低延迟和高吞吐:通过流优先的设计,保证了实时处理性能。
  • 灵活性和扩展性:支持多种数据源和目标,支持无缝扩展。
  • 强大的生态系统:支持 HDFS、Kafka、Elasticsearch、Cassandra 等多种流行的大数据组件。

与其他大数据技术的比较

  • 与 Spark 相比:Flink 更专注于实时流处理,而 Spark 在批处理领域更强。
  • 与 Kafka Streams 相比:Flink 功能更全面,支持复杂的流处理逻辑和状态管理。
  • 实时流处理(Stream Processing):

    • Flink 是流处理优先的框架,支持以事件为单位处理数据流。
    • 可以实现低延迟、高吞吐量的数据处理,适用于实时监控、在线分析等场景。
  • 批处理能力(Batch Processing):

    • 虽然主要面向流处理,但 Flink 也支持批处理,利用流处理 API 实现批数据的高效处理。
  • 事件时间支持(Event Time Support):

    • Flink 通过支持事件时间和水位线(Watermark),可以处理乱序数据流,适用于复杂的实时分析场景。
  • 状态管理(State Management):

    • Flink 提供强大的状态管理功能,可以在流处理中存储和管理状态。
    • 支持状态的容错(通过检查点 Checkpoint 和保存点 Savepoint)。
  • 分布式架构:

    • Flink 的分布式架构设计支持横向扩展,能够高效处理 PB 级别的数据。
  • 容错机制(Fault Tolerance):

    • Flink 提供了基于一致性检查点的强大容错机制,确保数据处理不会因为系统故障而丢失或重复。
  • JobManager 和 TaskManager:

    • JobManager 负责任务的协调和调度。
    • TaskManager 是工作节点,执行实际的计算任务。
  • DataStream API:

    • 用于流处理任务,支持事件时间、窗口操作、状态管理等。
  • DataSet API:

    • 用于批处理任务,支持丰富的转换操作。
  • Table API 和 SQL:

    • 提供 SQL 风格的查询接口,支持更高层次的抽象。
  • 实时数据分析:

    • 监控系统的日志分析、实时流量监控、用户行为分析等。
  • 数据处理和转换:

    • 数据清洗、数据聚合、实时数据增强。
  • 复杂事件处理(CEP):

    • 检测金融欺诈、网络安全威胁、物联网(IoT)中的事件模式。
  • 机器学习:

    • 实时预测、模型更新。
  • ETL(Extract,Transform,Load):

    • 从数据源提取数据,进行转换处理,然后加载到目标数据仓库或系统。

Flink 可以作为 Kafka 的生产者,也可以作为 Kafka 的消费者(如下图所示)。

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-18

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<properties>
<kafka.version>3.6.0</kafka.version>
<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

案例目标

本节将集成 Kafka 与 Flink,并使用 Flink 作为 Kafka 生产者,实现向 Kafka 发送消息。

  • 第一种写法:基于 KafkaSink 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class FlinkKafkaProducerTest1 {

public static void main(String[] args) throws Exception {
// 初始化 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 模拟源数据
List<String> list = new ArrayList<>();
list.add("hello");
list.add("flink");
DataStream<String> dataStream = env.fromCollection(list);

// 配置 KafkaSink(相当于 Kafka 生产者)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("127.0.0.1:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("first")
.setKeySerializationSchema(new SimpleStringSchema())
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

// Flink 流关联 KafkaSink
dataStream.sinkTo(kafkaSink);

// 执行
env.execute();
}

}
  • 第二种写法:基于 FlinkKafkaProducer 类(官方已标记为过时)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class FlinkKafkaProducerTest2 {

public static void main(String[] args) throws Exception {
// 初始化 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 模拟源数据
List<String> worldsList = new ArrayList<>();
worldsList.add("hello");
worldsList.add("flink");
DataStreamSource<String> stream = env.fromCollection(worldsList);

// Kafka 生产者的配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

// 创建 Kafka 生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer("first", new SimpleStringSchema(), properties);

// Flink 流关联 Kafka 生产者
stream.addSink(kafkaProducer);

// 执行
env.execute();
}

}

案例目标

本节将集成 Kafka 与 Flink,并使用 Flink 作为 Kafka 消费者,实现从 Kafka 消费消息。

  • 第一种写法:基于 KafkaSource 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class FlinkKafkaConsumerTest1 {

public static void main(String[] args) throws Exception {
// 初始化 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 配置 KafkaSource(相当于 Kafka 消费者)
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("first")
.setGroupId("flink1")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

// Flink 流关联 KafkaSource
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
stream.print();

// 执行
env.execute();
}

}
  • 第二种写法:基于 FlinkKafkaConsumer 类(官方已标记为过时)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class FlinkKafkaConsumerTest2 {

public static void main(String[] args) throws Exception {
// 初始化 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka 消费者的配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "flink2");

// 创建 Kafka 消费者
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer("first", new SimpleStringSchema(), properties);
kafkaConsumer.setStartFromLatest();

// Flink 流关联 Kafka 消费者
DataStream<String> dataStream = env.addSource(kafkaConsumer);
dataStream.print();

// 执行
env.execute();
}

}

Kafka 集成 SpringBoot

Kafka 集成 SpringCloud