使用NiFi页面,创建一个mysql到mysql的数据同步流程
-
创建处理器,从Processor拖拽一个处理器叫QueryDatabaseTable到画布上,并右键点击Configure进行相关配置
可以看到这里主要有5个配置Tab,主要的作用如下:
-
SETTINGS:主要配置处理器的名称,惩罚时长,睡眠时长;
-
SCHEDULING:主要配置处理器的调度策略,这里主要有三种调度策略:(1)定时调度(2)cron调度(3)事件调度;一般的处理器都具备前两种调度策略,事件调度策略只有个别处理器才有;此外还支持配置并发任务数,在数据比较多时可以适当上调此值进行提升处理效率,调度周期,执行类型,指定在哪些节点上运行;
-
PROPERTIES:处理器的属性配置,处理器运行所需要必备的配置;
-
RELATIONSHIPS:连接关系,配置处理器运行情况后,怎么跟下一个处理器连接,比如成功时结束,或者传递给其他的处理器等;
-
COMMENTS:对处理器做一些注释说明;
-
-
处理器的属性配置
-
QueryDatabaseTable的配置(查询来源数据库表数据)
一些关键属性的配置说明:
-
Database Connection Pooling Service:配置一个来源数据库连接池服务,用来存放数据库连接,需要配置jdbc url,驱动包路径,用户名,密码等信息
-
Database Type:数据库类型,支持所有关系型数据库,可以根据实际类型进行选择,示例这里选择MySQL
-
Table Name:查询查询的表名,填写实际要查询的表名
-
Columns to Return:指定要返回的列,不填时默认查询所有列
-
Additional WHERE clause:额外的查询条件,默认是没有任何条件,这里指定条件时,比如:xxx='xxxx'
-
Custom Query:自定义查询sql,当此配置有值时,优先使用此sql进行查询
-
Maximum-value Columns:指定增量查询的列,如配置,每次查询都会将此值最大值进行持久化到本地,下次查询时使用配置列的此值作为增量条件,一般配置主键id,实现数据的增量查询
-
Fetch Size:配置游标查询的数据量大小,建议配置,避免大数据查询时的OOM问题
-
Max Rows Per Flow File:配置每个流文件的最大数据行数,当该属性配置不为0时,每个流文件都以该值作为流文件的最大数据条数,不配置默认为0,则全部数据都输出到一个流文件里,建议此值根据实际情况进行调整,因为数据是分流到流文件中进行流转处理,因此适当的拆分数据,能够更利于提升处理速率
-
Output Batch Size:输出批次大小,数据在内存进行分批输出,当此值配置不为0时,流文件达到这个数值时,进行输出到连接队列,比如配置10时,处理器会每10个流文件就输出一次,当配置为0时,处理器会将所有的流文件全部一起输出,大数据查询时容易OOM,所以次属性也是建议配置
整体属性配置如下:
-
-
JoltTransformRecord的配置(表字段映射转换)
一些关系配置属性的说明:
-
Record Reader:数据记录读取器,选择指定数据类型的读取器,因为QueryDatabaseTable处理器默认输出的数据文件格式是avro,因此这里需要配置avro格式的读取器,也就是读写格式是要保持一致的
-
Record Writer:数据记录写入器,这里是指要将数据以什么格式进行输出流文件,示例这里选择avro
-
Jolt Specification:jolt转换表达式,主要是指定了来源表与目标表之间字段的映射,比如来源表table_a的列col_a,映射到目标表table_b的col_b,具体写法
[
{
"operation": "shift",
"spec": {
# 左边是来源表字段名,右边是目标表字段名
"id": "id",
"name": "name",
"identity": "identity",
"age": "age",
"birth_date": "birth_date"
}
}
]整体属性配置如下:
-
-
PutDatabaseRecord的配置(插入目标数据库表数据)
一些关系配置属性的说明:
-
Record Reader:数据记录读取器,选择指定数据类型的读取器,这里配置的格式取决于上游节点传输的格式,示例流程上游节点JoltTransformRecord输出的数据文件格式是avro,因此这里需要配置avro格式的读取器,也就是读写格式是要保持一致的
-
Database Type:数据库类型,支持所有关系型数据库,可以根据实际类型进行选择,示例这里选择MySQL
-
Statement Type:sql执行类型,根据不同的数据库类型有不同的值,如:INSERT,UPSERT,UPDATE,INSERT_IGNORE,DELETE等,示例流程选择INSERT即插入
-
Database Connection Pooling Service:配置一个目标数据库连接池服务,用来存放数据库连接,需要配置jdbc url,驱动包路径,用户名,密码等信息
-
Catalog Name:数据库的Catalog
-
Schema Name:目标数据库的Schema
-
Table Name:目标表名
整体属性配置如下:
-
-
-
处理器之间的连接配置
这样就完成了对处理器之间的连接,连接完成默认每个处理器都是停止状态,我们现在可以尝试对头节点处理器进行运行。
运行说明:
-
Start:将整个流程启动,并按照头节点设置的调度周期定时执行;
-
Run Once:只对头节点运行一次;
示例为了展示流文件在连接队列里流转效果,选择Run Once演示,点击Run Once
点运行后,该图标表示线程正在执行头节点处理器的执行
头节点执行后,我们可以看到队列中有两个流文件,我们可以连接队列邮件List Queue看看
我们可以看到队列流文件列表
我们可以点击这个小眼睛图标,查看每个流文件的内容
可以看到,我们流转过程中流文件的格式是avro的,我们可以选择formatted格式格式化一下
这样就是我们查询表的数据封装到流文件里的内容了
我们对第二个处理器执行Start,让他把连接队列里面的流文件全都进行处理
我们可以发现,第二个处理完成后,也是将流文件输出到连接队列里,接着由下一个处理器进行处理
我们接着对尾结点处理器执行Start,让运行去处理连接队列里的所有流文件
我们可以看到尾结点处理完后,整个流程就执行结束了
然后对比一下目标表前后数据变化,我们可以看到,数据已经成功从来源表同步到目标表了
总结:
以上演示了一种简单场景的数据流程处理场景,当然NiFi能够实现的数据处理场景时非常多样化的,具体可以结合实际的应用场景,选择不同的处理器组合跟配置进行实现,我们从示例的使用过程中,可以体会到,我们无需关系数据在处理过程中是如何转换和消费的,我们仅需通过一些基本的配置信息,即可实现数据的同步。
-