操作场景
Flume支持将采集的日志信息导入到Kafka。
前提条件
- 已创建启用Kerberos认证的流集群。
- 已在日志生成节点安装Flume客户端,例如安装目录为“/opt/Flumeclient”,客户端安装请参见“组件操作指南 > 使用Flume > 安装Flume客户端”。以下操作的客户端目录只是举例,请根据实际安装目录修改。
- 已配置网络,使日志生成节点与流集群互通。
使用Flume客户端(MRS 3.x之前版本)
说明普通集群不需要执行步骤1-步骤5。
- 将Master1节点上的认证服务器配置文件,复制到安装Flume客户端的节点,保存到Flume客户端中“Flume客户端安装目录/fusioninsight-flume-Flume 组件版本号 /conf”目录下。
文件完整路径为${BIGDATA_HOME}/MRS_Current/1_ X _KerberosClient/etc/kdc.conf。
其中“X”为随机生成的数字,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
- 查看任一部署Flume角色节点的“业务IP”。
登录集群详情页面,选择“集群>组件管理 >Flume > 实例”,查看任一部署Flume角色节点的“业务IP”。
说明若集群详情页面没有“组件管理”页签,请先完成IAM用户同步(在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步)。
- 将此节点上的用户认证文件,复制到安装Flume客户端的节点,保存到Flume客户端中“Flume客户端安装目录/fusioninsight-flume-Flume 组件版本号 /conf”目录下。
文件完整路径为${BIGDATA_HOME}/MRS_ XXX /install/FusionInsight-Flume-Flume 组件版本号 /flume/conf/flume.keytab。
其中“XXX”为产品版本号,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
- 将此节点上的配置文件“jaas.conf”,复制到安装Flume客户端的节点,保存到Flume客户端中“conf”目录。
文件完整路径为${BIGDATA_HOME}/MRS_Current/1_ X _Flume/etc/jaas.conf。
其中“X”为随机生成的数字,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
- 登录安装Flume客户端节点,切换到客户端安装目录,执行以下命令修改文件:
vi conf/jaas.conf
修改参数“keyTab”定义的用户认证文件完整路径即步骤3中保存用户认证文件的目录:“Flume客户端安装目录/fusioninsight-flume-Flume 组件版本号 /conf”,然后保存并退出。
- 执行以下命令,修改Flume客户端配置文件“flume-env.sh”:
vi Flume 客户端安装目录 / fusioninsight-flume- Flume 组件版本号 /conf/flume-env.sh
在“-XX:+UseCMSCompactAtFullCollection”后面,增加以下内容:
-Djava.security.krb5.conf= Flume 客户端安装目录 /fusioninsight-flume-1.9.0/conf/kdc.conf
-Djava.security.auth.login.config=Flume 客户端安装目录 /fusioninsight-flume-1.9.0/conf/jaas.conf
-Dzookeeper.request.timeout=120000
例如: "-XX:+UseCMSCompactAtFullCollection
-Djava.security.krb5.conf=/opt/FlumeClient / fusioninsight-flume- Flume 组件版本号 /conf/kdc.conf
-Djava.security.auth.login.config=/opt/FlumeClient / fusioninsight-flume- Flume 组件版本号 /conf/jaas.conf
-Dzookeeper.request.timeout=120000"
请根据实际情况,修改“Flume客户端安装目录”,然后保存并退出。
- 执行以下命令,重启Flume客户端:
cd Flume 客户端安装目录 /fusioninsight-flume- Flume 组件版本号 /bin
./flume-manage.shrestart
例如:
cd /opt/FlumeClient/fusioninsight-flume- Flume 组件版本号 /bin./flume-manage.shrestart
- 执行以下命令,根据实际业务需求,在Flume客户端配置文件“properties.properties”中配置并保存作业。
vi Flume 客户端安装目录 /fusioninsight-flume- Flume 组件版本号 /conf/properties.properties
以配置SpoolDir Source+File Channel+Kafka Sink为例:
#########################################################################################
client.sources = static_log_source
client.channels = static_log_channel
client.sinks = kafka_sink
#########################################################################################
#LOG_TO_HDFS_ONLINE_1
client.sources.static_log_source.type = spooldir
client.sources.static_log_source.spoolDir = 监控目录
client.sources.static_log_source.fileSuffix = .COMPLETED
client.sources.static_log_source.ignorePattern = ^$
client.sources.static_log_source.trackerDir = 传输过程中元数据存储路径
client.sources.static_log_source.maxBlobLength = 16384
client.sources.static_log_source.batchSize = 51200
client.sources.static_log_source.inputCharset = UTF-8
client.sources.static_log_source.deserializer = LINE
client.sources.static_log_source.selector.type = replicating
client.sources.static_log_source.fileHeaderKey = file
client.sources.static_log_source.fileHeader = false
client.sources.static_log_source.basenameHeader = true
client.sources.static_log_source.basenameHeaderKey = basename
client.sources.static_log_source.deletePolicy = never
client.channels.static_log_channel.type = file
client.channels.static_log_channel.dataDirs = 数据缓存路径,设置多个路径可提升性能,中间用逗号分开
client.channels.static_log_channel.checkpointDir = 检查点存放路径
client.channels.static_log_channel.maxFileSize = 2146435071
client.channels.static_log_channel.capacity = 1000000
client.channels.static_log_channel.transactionCapacity = 612000
client.channels.static_log_channel.minimumRequiredSpace = 524288000
client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
client.sinks.kafka_sink.kafka.topic = 数据写入的topic ,如flume_test
client.sinks.kafka_sink.kafka.bootstrap.servers = XXX . XXX . XXX . XXX :kafka 端口号 , XXX . XXX . XXX . XXX :kafka 端口号 , XXX . XXX . XXX . XXX :kafka端口号
client.sinks.kafka_sink.flumeBatchSize = 1000
client.sinks.kafka_sink.kafka.producer.type = sync
client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka Domain名称,安全集群必填,如hadoop.xxx.1com
client.sinks.kafka_sink.requiredAcks = 0
client.sources.static_log_source.channels = static_log_channel
client.sinks.kafka_sink.channel = static_log_channel
说明
client.sinks.kafka_sink.kafka.topic:数据写入的topic。若kafka中该topic不存在,默认情况下会自动创建该topic。
client.sinks.kafka_sink.kafka.bootstrap.servers:Kafkabrokers列表,多个用英文逗号分隔。默认情况下,安全集群端口21007,普通集群对应端口9092。
client.sinks.kafka_sink.kafka.security.protocol:安全集群为SASL_PLAINTEXT,普通集群为PLAINTEXT。
client.sinks.kafka_sink.kafka.kerberos.domain.name:
普通集群无需配置此参数。安全集群对应此参数的值为Kafka集群中“kerberos.domain.name”对应的值。
具体可到Broker实例所在节点上查看${BIGDATA_HOME}/MRS_Current/1_ X _Broker/etc/server.properties。
其中X为随机生成的数字,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
具体可到Broker实例所在节点上查看“${BIGDATA_HOME}/FusionInsight_Current/1_X_Broker/etc/server.properties”。
- 参数配置并保存后,Flume客户端将自动加载“properties.properties”中配置的内容。当spoolDir生成新的日志文件,文件内容将发送到Kafka生产者,并支持Kafka消费者消费。
使用Flume客户端(MRS 3.x及之后版本)
说明普通集群不需要执行1-5。
- 将Master1节点上的认证服务器配置文件,复制到安装Flume客户端的节点,保存到Flume客户端中Flume 客户端安装目录 /fusioninsight-flume-Flume 组件版本号 /conf目录下。
文件完整路径为“${BIGDATA_HOME}/FusionInsight_BASE_ XXX /1_ X _KerberosClient/etc/kdc.conf”。其中“XXX”为产品版本号,“X”为随机生成的数字,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
- 查看任一部署Flume角色节点的“业务IP”。
登录FusionInsight Manager页面,具体请参见访问FusionInsight Manager(MRS 3.x及之后版本),选择“集群 > 服务 > Flume > 实例”。查看任一部署Flume角色节点的“业务IP”。
说明若集群详情页面没有“组件管理”页签,请先完成IAM用户同步(在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步)。
- 将此节点上的用户认证文件,复制到安装Flume客户端的节点,保存到Flume客户端中“Flume客户端安装目录/fusioninsight-flume-Flume 组件版本号 /conf”目录下。
文件完整路径为${BIGDATA_HOME}/FusionInsight_Porter_ XXX /install/FusionInsight-Flume-Flume 组件版本号 /flume/conf/flume.keytab。
其中“XXX”为产品版本号,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
- 将此节点上的配置文件“jaas.conf”,复制到安装Flume客户端的节点,保存到Flume客户端中“conf”目录。
文件完整路径为${BIGDATA_HOME}/FusionInsight_Current/1_ X _Flume/etc/jaas.conf。
其中“X”为随机生成的数字,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
- 登录安装Flume客户端节点,切换到客户端安装目录,执行以下命令修改文件:
vi conf/jaas.conf
修改参数“keyTab”定义的用户认证文件完整路径即步骤3中保存用户认证文件的目录:“Flume客户端安装目录/fusioninsight-flume-Flume 组件版本号 /conf”,然后保存并退出。
- 执行以下命令,修改Flume客户端配置文件“flume-env.sh”:
vi Flume 客户端安装目录 / fusioninsight-flume- Flume 组件版本号 /conf/flume-env.sh
在“-XX:+UseCMSCompactAtFullCollection”后面,增加以下内容:
-Djava.security.krb5.conf=Flume 客户端安装目录 /fusioninsight-flume-1.9.0/conf/kdc.conf
-Djava.security.auth.login.config=Flume 客户端安装目录 /fusioninsight-flume-1.9.0/conf/jaas.conf -Dzookeeper.request.timeout=120000
例如: "-XX:+UseCMSCompactAtFullCollection
-Djava.security.krb5.conf=/opt/FlumeClient / fusioninsight-flume- Flume 组件版本号 /conf/kdc.conf -Djava.security.auth.login.config=/opt/FlumeClient / fusioninsight-flume- Flume 组件版本号 /conf/jaas.conf
-Dzookeeper.request.timeout=120000"
请根据实际情况,修改“Flume客户端安装目录”,然后保存并退出。
- 执行以下命令,重启Flume客户端:
cd Flume 客户端安装目录 /fusioninsight-flume- Flume 组件版本号 /bin
./flume-manage.shrestart
例如:
cd /opt/FlumeClient/fusioninsight-flume- Flume 组件版本号 /bin
./flume-manage.sh restart
- 根据实际业务场景配置作业。
- MRS 3.x及之后版本部分参数可直接在Manager界面配置。
- 在“properties.properties”文件中配置,以配置SpoolDir Source+File Channel+Kafka Sink为例。
在安装Flume客户端的节点执行以下命令,根据实际业务需求,在Flume客户端配置文件“properties.properties”中配置并保存作业。
vi Flume 客户端安装目录 /fusioninsight-flume- Flume 组件版本号 /conf/properties.properties
#########################################################################################
client.sources = static_log_source
client.channels = static_log_channel
client.sinks = kafka_sink
#########################################################################################
#LOG_TO_HDFS_ONLINE_1
client.sources.static_log_source.type = spooldir
client.sources.static_log_source.spoolDir = 监控目录
client.sources.static_log_source.fileSuffix = .COMPLETED
client.sources.static_log_source.ignorePattern = ^$
client.sources.static_log_source.trackerDir = 传输过程中元数据存储路径
client.sources.static_log_source.maxBlobLength = 16384
client.sources.static_log_source.batchSize = 51200
client.sources.static_log_source.inputCharset = UTF-8
client.sources.static_log_source.deserializer = LINE
client.sources.static_log_source.selector.type = replicating
client.sources.static_log_source.fileHeaderKey = file
client.sources.static_log_source.fileHeader = false
client.sources.static_log_source.basenameHeader = true
client.sources.static_log_source.basenameHeaderKey = basename
client.sources.static_log_source.deletePolicy = never
client.channels.static_log_channel.type = file
client.channels.static_log_channel.dataDirs = 数据缓存路径,设置多个路径可提升性能,中间用逗号分开
client.channels.static_log_channel.checkpointDir = 检查点存放路径
client.channels.static_log_channel.maxFileSize = 2146435071
client.channels.static_log_channel.capacity = 1000000
client.channels.static_log_channel.transactionCapacity = 612000
client.channels.static_log_channel.minimumRequiredSpace = 524288000
client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
client.sinks.kafka_sink.kafka.topic = 数据写入的topic ,如flume_test
client.sinks.kafka_sink.kafka.bootstrap.servers = XXX . XXX . XXX . XXX :kafka 端口号 , XXX . XXX . XXX . XXX :kafka 端口号 , XXX . XXX . XXX . XXX :kafka端口号
client.sinks.kafka_sink.flumeBatchSize = 1000
client.sinks.kafka_sink.kafka.producer.type = sync
client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka Domain名称,安全集群必填,如hadoop.xxx.1com
client.sinks.kafka_sink.requiredAcks = 0
client.sources.static_log_source.channels = static_log_channel
client.sinks.kafka_sink.channel = static_log_channel
![](file:///C:/Users/yangj/AppData/Local/Temp/msohtmlclip1/01/clip_image005.gif)
说明
client.sinks.kafka_sink.kafka.topic:数据写入的topic。若kafka中该topic不存在,默认情况下会自动创建该topic。
client.sinks.kafka_sink.kafka.bootstrap.servers:Kafkabrokers列表,多个用英文逗号分隔。默认情况下,安全集群端口21007,普通集群对应端口9092。
client.sinks.kafka_sink.kafka.security.protocol:安全集群为SASL_PLAINTEXT,普通集群为PLAINTEXT。
client.sinks.kafka_sink.kafka.kerberos.domain.name:
普通集群无需配置此参数。安全集群对应此参数的值为Kafka集群中“kerberos.domain.name”对应的值。
具体可到Broker实例所在节点上查看${BIGDATA_HOME}/MRS_Current/1_ X _Broker/etc/server.properties。
其中X为随机生成的数字,请根据实际情况修改。同时文件需要以Flume客户端安装用户身份保存,例如root用户。
具体可到Broker实例所在节点上查看“${BIGDATA_HOME}/FusionInsight_Current/1_X_Broker/etc/server.properties”。
- 参数配置并保存后,Flume客户端将自动加载“properties.properties”中配置的内容。当spoolDir生成新的日志文件,文件内容将发送到Kafka生产者,并支持Kafka消费者消费。