1. Flume是什么
Flume是一个高可用,高可靠,分布式的海量日志采集、聚合和传输的系统,能够有效的收集、聚合、移动大量的日志数据。通俗来说就是一个靠谱、方便的日志采集工具。同时,他也是目前大数据领域数据采集最常用的一个框架。
2. Flume三大组件
从图中可以看到Flume从Web网站获取信息并上传到HDFS,其组件主要包含Source、Channel和Sink。
- Source:负责从数据源读取信息,根据读取的数据源类型不同,分为很多类,例如从文件读取数据的Source、从网站日志读取数据的Source等。常用的Source组件有:
- Exec Source:实现文件监控,可以实时监控文件中的新增内容,类似于Linux中的
tail -f
效果; - NetCat TCP/UDP Source:采集指定端口(TCP、UDP)的数据,可以读取流经端口的每一行数据;
- Spooling Directory Source:采集文件夹里新增的文件;
- Kafka Source:从Kafka消息队列中采集数据。
- Channel:负责临时存储数据,Source会将读取到的信息临时存储在这里,根据存储方式的不同,有很多类Channel,例如基于内存的Channel、基于文件的Channel等。常用的Channel组件有:
- Memory Channel:使用内存作为数据的存储。优点是效率高,因为就不涉及磁盘IO;缺点有两个:可能会丢数据,如果Flume的agent挂了,那么channel中的数据就丢失了;内存是有限的,会存在内存不够用的情况。
- File Channel:使用文件来作为数据的存储。优点是数据不会丢失;缺点是效率相对内存来说会有点慢,但是这个慢并没有我们想象中的那么慢,所以这个也是比较常用的一种Channel。
- Spillable Memory Channel:使用内存和文件作为数据存储,即先把数据存到内存中,如果内存中数据达到阈值再flush到文件中。优点:解决了内存不够用的问题;缺点:还是存在数据丢失的风险。
- Sink:负责将数据从Channel读取出来并写到目的地,根据写入的地方不同有很多种Sink,例如写入文件的Sink、写入HDFS的Sink等。Channel中的数据直到进入目的地才会被删除,当Sink写入目的地失败后,可以自动重写,不会造成数据丢失,这是有事务保证的。常用的Sink组件有:
- Logger Sink:将数据作为日志处理,可以选择打印到控制台或者写到文件中,这个主要在测试的时候使用;
- HDFS Sink:将数据传输到HDFS中,这个是比较常见的,主要针对离线计算的场景
- Kafka Sink:将数据发送到kafka消息队列中,这个也是比较常见的,主要针对实时计算场景,数据不落盘,实时传输,最后使用实时计算框架直接处理。
3. Flume高级应用场景
3.1 多路复用
图中共有两个Agent,表示我们启动了2个Flume的代理,或者可以理解为了启动了2个Flume的进程。首先看左边这个Agent,给他起个名字叫 foo,有一个Source,Source后面接了3个Channel,表示Source读取到的数据会重复发送给每个Channel,每个Channel中的数据都是一样的,针对每个Channel都接了一个Sink,这三个Sink负责读取对应Channel中的数据,并且把数据输出到不同的目的地,Sink1负责把数据写到HDFS中,Sink2负责把数据写到一个Java消息服务数据队列中,Sink3负责把数据写给另一个Agent。
Sink3把数据输出到了Agent bar中,在Agent bar中同样有三个组件,Source组件获取Sink3发送过来的数据,然后把数据临时存储到自己的Channel4中,最终再通过Sink组件把数据写到其他地方,把采集到的一份数据重复输出到不同的目的地中。
3.2 整合
图中共启动了四个Agent,左边的三个Agent都是负责采集对应web服务器中的日志数据,数据采集过来之后统一发送给Agent4,最后Agent4进行统一汇总,最终写入HDFS。
这种架构的好处是后期如果要修改最终数据的输出目的地,只需要修改Agent4中的Sink即可,不需要修改Agent1、2、3。但是这种架构也有弊端:如果有很多个Agent同时向Agent4写数据,那么Agent4会出现性能瓶颈,导致数据处理过慢;这种架构还存在单点故障问题,如果Agent4挂了,那么所有的数据都断了。不过这些问题可以通过Flume中的负载均衡和故障转移机制解决,会在后续的博客中详细分析。
4. 示例实践
需求为配置Flume,使其可以接收通过TCP协议传输的信息并打印到控制台。
4.1 配置
Flume的运行不需要编写代码,但需要提前写好配置,由于每种组件的参数较多,因此一般直接查阅官网文档就可以。下面这段配置,应该写到Flume的conf目录下,我命名为example.conf。
这段代码的含义是:配置一个名为a1
的Agent,将Source组件命名为r1
,Sink组件命名为k1
,Channel组件命名为c1
。使用NetCat TCP Source组件,监听localhost的44444端口;使用Logger Sink组件;使用Memory Channel组件,Channel中最多存储1000个event,一次事务中写入和读取的event最大数为100;最后设置Source连接的Channel以及Sink连接的Channel。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.2 运行
运行代码为:
bin/flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
其中--conf
指定配置目录,--conf-file
指定配置文件,--name
指定Agent名称,-D
指定特殊参数,如flume.root.logger=INFO,console
表示输出结果到控制台,这样便于测试时查看结果。
4.2.1 运行结果输出
使用telnet
命令向localhost的44444端口发送信息,启动Flume服务后,复制一个当前虚拟机的命令窗口,输入:telnet localhost 44444
,然后键入要发送的信息即可,如下:
- 发送信息"hello world"(字符串"OK"是系统打印的)。
- Flume接收信息"hello world"并打印在控制台上。
4.2.2 设置后台运行
由于默认Flume服务会在控制台运行且输入"Ctrl+C"后会终止服务(不像Hadoop的MapReduce会偷偷在后台继续运行),因此如果要设置后台运行的话需要在运行命令后面加上Linux的&
命令,并且在前面使用nohup
命令保证关闭Shell窗口后命令仍然可以运行。完整命令为:
nohup bin/flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console &