searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

体验NiFi数据处理流程

2023-07-31 12:58:10
86
0

使用NiFi页面,创建一个mysql到mysql的数据同步流程

  1. 创建处理器,从Processor拖拽一个处理器叫QueryDatabaseTable到画布上,并右键点击Configure进行相关配置

    可以看到这里主要有5个配置Tab,主要的作用如下:

    • SETTINGS:主要配置处理器的名称,惩罚时长,睡眠时长;

    • SCHEDULING:主要配置处理器的调度策略,这里主要有三种调度策略:(1)定时调度(2)cron调度(3)事件调度;一般的处理器都具备前两种调度策略,事件调度策略只有个别处理器才有;此外还支持配置并发任务数,在数据比较多时可以适当上调此值进行提升处理效率,调度周期,执行类型,指定在哪些节点上运行;

    • PROPERTIES:处理器的属性配置,处理器运行所需要必备的配置;

    • RELATIONSHIPS:连接关系,配置处理器运行情况后,怎么跟下一个处理器连接,比如成功时结束,或者传递给其他的处理器等;

    • COMMENTS:对处理器做一些注释说明;

  2. 处理器的属性配置

    • QueryDatabaseTable的配置(查询来源数据库表数据)

      一些关键属性的配置说明:

      • Database Connection Pooling Service:配置一个来源数据库连接池服务,用来存放数据库连接,需要配置jdbc url,驱动包路径,用户名,密码等信息

      • Database Type:数据库类型,支持所有关系型数据库,可以根据实际类型进行选择,示例这里选择MySQL

      • Table Name:查询查询的表名,填写实际要查询的表名

      • Columns to Return:指定要返回的列,不填时默认查询所有列

      • Additional WHERE clause:额外的查询条件,默认是没有任何条件,这里指定条件时,比如:xxx='xxxx'

      • Custom Query:自定义查询sql,当此配置有值时,优先使用此sql进行查询

      • Maximum-value Columns:指定增量查询的列,如配置,每次查询都会将此值最大值进行持久化到本地,下次查询时使用配置列的此值作为增量条件,一般配置主键id,实现数据的增量查询

      • Fetch Size:配置游标查询的数据量大小,建议配置,避免大数据查询时的OOM问题

      • Max Rows Per Flow File:配置每个流文件的最大数据行数,当该属性配置不为0时,每个流文件都以该值作为流文件的最大数据条数,不配置默认为0,则全部数据都输出到一个流文件里,建议此值根据实际情况进行调整,因为数据是分流到流文件中进行流转处理,因此适当的拆分数据,能够更利于提升处理速率

      • Output Batch Size:输出批次大小,数据在内存进行分批输出,当此值配置不为0时,流文件达到这个数值时,进行输出到连接队列,比如配置10时,处理器会每10个流文件就输出一次,当配置为0时,处理器会将所有的流文件全部一起输出,大数据查询时容易OOM,所以次属性也是建议配置

      整体属性配置如下:

    • JoltTransformRecord的配置(表字段映射转换)

      一些关系配置属性的说明:

      • Record Reader:数据记录读取器,选择指定数据类型的读取器,因为QueryDatabaseTable处理器默认输出的数据文件格式是avro,因此这里需要配置avro格式的读取器,也就是读写格式是要保持一致的

      • Record Writer:数据记录写入器,这里是指要将数据以什么格式进行输出流文件,示例这里选择avro

      • Jolt Specification:jolt转换表达式,主要是指定了来源表与目标表之间字段的映射,比如来源表table_a的列col_a,映射到目标表table_b的col_b,具体写法

        [
          {
               "operation": "shift",
               "spec": {
                  # 左边是来源表字段名,右边是目标表字段名
                   "id": "id",
                   "name": "name",
                   "identity": "identity",
                   "age": "age",
                   "birth_date": "birth_date"
              }
          }
        ]

        整体属性配置如下:

    • PutDatabaseRecord的配置(插入目标数据库表数据)

      一些关系配置属性的说明:

      • Record Reader:数据记录读取器,选择指定数据类型的读取器,这里配置的格式取决于上游节点传输的格式,示例流程上游节点JoltTransformRecord输出的数据文件格式是avro,因此这里需要配置avro格式的读取器,也就是读写格式是要保持一致的

      • Database Type:数据库类型,支持所有关系型数据库,可以根据实际类型进行选择,示例这里选择MySQL

      • Statement Type:sql执行类型,根据不同的数据库类型有不同的值,如:INSERT,UPSERT,UPDATE,INSERT_IGNORE,DELETE等,示例流程选择INSERT即插入

      • Database Connection Pooling Service:配置一个目标数据库连接池服务,用来存放数据库连接,需要配置jdbc url,驱动包路径,用户名,密码等信息

      • Catalog Name:数据库的Catalog

      • Schema Name:目标数据库的Schema

      • Table Name:目标表名

        整体属性配置如下:

  3. 处理器之间的连接配置

    这样就完成了对处理器之间的连接,连接完成默认每个处理器都是停止状态,我们现在可以尝试对头节点处理器进行运行。

    运行说明:

    • Start:将整个流程启动,并按照头节点设置的调度周期定时执行;

    • Run Once:只对头节点运行一次;

      示例为了展示流文件在连接队列里流转效果,选择Run Once演示,点击Run Once

      点运行后,该图标表示线程正在执行头节点处理器的执行

      头节点执行后,我们可以看到队列中有两个流文件,我们可以连接队列邮件List Queue看看

      我们可以看到队列流文件列表

      我们可以点击这个小眼睛图标,查看每个流文件的内容

      可以看到,我们流转过程中流文件的格式是avro的,我们可以选择formatted格式格式化一下

      这样就是我们查询表的数据封装到流文件里的内容了

      我们对第二个处理器执行Start,让他把连接队列里面的流文件全都进行处理

      我们可以发现,第二个处理完成后,也是将流文件输出到连接队列里,接着由下一个处理器进行处理

      我们接着对尾结点处理器执行Start,让运行去处理连接队列里的所有流文件

      我们可以看到尾结点处理完后,整个流程就执行结束了

      然后对比一下目标表前后数据变化,我们可以看到,数据已经成功从来源表同步到目标表了

    总结:

    以上演示了一种简单场景的数据流程处理场景,当然NiFi能够实现的数据处理场景时非常多样化的,具体可以结合实际的应用场景,选择不同的处理器组合跟配置进行实现,我们从示例的使用过程中,可以体会到,我们无需关系数据在处理过程中是如何转换和消费的,我们仅需通过一些基本的配置信息,即可实现数据的同步。

0条评论
0 / 1000
h****n
4文章数
1粉丝数
h****n
4 文章 | 1 粉丝
h****n
4文章数
1粉丝数
h****n
4 文章 | 1 粉丝
原创

体验NiFi数据处理流程

2023-07-31 12:58:10
86
0

使用NiFi页面,创建一个mysql到mysql的数据同步流程

  1. 创建处理器,从Processor拖拽一个处理器叫QueryDatabaseTable到画布上,并右键点击Configure进行相关配置

    可以看到这里主要有5个配置Tab,主要的作用如下:

    • SETTINGS:主要配置处理器的名称,惩罚时长,睡眠时长;

    • SCHEDULING:主要配置处理器的调度策略,这里主要有三种调度策略:(1)定时调度(2)cron调度(3)事件调度;一般的处理器都具备前两种调度策略,事件调度策略只有个别处理器才有;此外还支持配置并发任务数,在数据比较多时可以适当上调此值进行提升处理效率,调度周期,执行类型,指定在哪些节点上运行;

    • PROPERTIES:处理器的属性配置,处理器运行所需要必备的配置;

    • RELATIONSHIPS:连接关系,配置处理器运行情况后,怎么跟下一个处理器连接,比如成功时结束,或者传递给其他的处理器等;

    • COMMENTS:对处理器做一些注释说明;

  2. 处理器的属性配置

    • QueryDatabaseTable的配置(查询来源数据库表数据)

      一些关键属性的配置说明:

      • Database Connection Pooling Service:配置一个来源数据库连接池服务,用来存放数据库连接,需要配置jdbc url,驱动包路径,用户名,密码等信息

      • Database Type:数据库类型,支持所有关系型数据库,可以根据实际类型进行选择,示例这里选择MySQL

      • Table Name:查询查询的表名,填写实际要查询的表名

      • Columns to Return:指定要返回的列,不填时默认查询所有列

      • Additional WHERE clause:额外的查询条件,默认是没有任何条件,这里指定条件时,比如:xxx='xxxx'

      • Custom Query:自定义查询sql,当此配置有值时,优先使用此sql进行查询

      • Maximum-value Columns:指定增量查询的列,如配置,每次查询都会将此值最大值进行持久化到本地,下次查询时使用配置列的此值作为增量条件,一般配置主键id,实现数据的增量查询

      • Fetch Size:配置游标查询的数据量大小,建议配置,避免大数据查询时的OOM问题

      • Max Rows Per Flow File:配置每个流文件的最大数据行数,当该属性配置不为0时,每个流文件都以该值作为流文件的最大数据条数,不配置默认为0,则全部数据都输出到一个流文件里,建议此值根据实际情况进行调整,因为数据是分流到流文件中进行流转处理,因此适当的拆分数据,能够更利于提升处理速率

      • Output Batch Size:输出批次大小,数据在内存进行分批输出,当此值配置不为0时,流文件达到这个数值时,进行输出到连接队列,比如配置10时,处理器会每10个流文件就输出一次,当配置为0时,处理器会将所有的流文件全部一起输出,大数据查询时容易OOM,所以次属性也是建议配置

      整体属性配置如下:

    • JoltTransformRecord的配置(表字段映射转换)

      一些关系配置属性的说明:

      • Record Reader:数据记录读取器,选择指定数据类型的读取器,因为QueryDatabaseTable处理器默认输出的数据文件格式是avro,因此这里需要配置avro格式的读取器,也就是读写格式是要保持一致的

      • Record Writer:数据记录写入器,这里是指要将数据以什么格式进行输出流文件,示例这里选择avro

      • Jolt Specification:jolt转换表达式,主要是指定了来源表与目标表之间字段的映射,比如来源表table_a的列col_a,映射到目标表table_b的col_b,具体写法

        [
          {
               "operation": "shift",
               "spec": {
                  # 左边是来源表字段名,右边是目标表字段名
                   "id": "id",
                   "name": "name",
                   "identity": "identity",
                   "age": "age",
                   "birth_date": "birth_date"
              }
          }
        ]

        整体属性配置如下:

    • PutDatabaseRecord的配置(插入目标数据库表数据)

      一些关系配置属性的说明:

      • Record Reader:数据记录读取器,选择指定数据类型的读取器,这里配置的格式取决于上游节点传输的格式,示例流程上游节点JoltTransformRecord输出的数据文件格式是avro,因此这里需要配置avro格式的读取器,也就是读写格式是要保持一致的

      • Database Type:数据库类型,支持所有关系型数据库,可以根据实际类型进行选择,示例这里选择MySQL

      • Statement Type:sql执行类型,根据不同的数据库类型有不同的值,如:INSERT,UPSERT,UPDATE,INSERT_IGNORE,DELETE等,示例流程选择INSERT即插入

      • Database Connection Pooling Service:配置一个目标数据库连接池服务,用来存放数据库连接,需要配置jdbc url,驱动包路径,用户名,密码等信息

      • Catalog Name:数据库的Catalog

      • Schema Name:目标数据库的Schema

      • Table Name:目标表名

        整体属性配置如下:

  3. 处理器之间的连接配置

    这样就完成了对处理器之间的连接,连接完成默认每个处理器都是停止状态,我们现在可以尝试对头节点处理器进行运行。

    运行说明:

    • Start:将整个流程启动,并按照头节点设置的调度周期定时执行;

    • Run Once:只对头节点运行一次;

      示例为了展示流文件在连接队列里流转效果,选择Run Once演示,点击Run Once

      点运行后,该图标表示线程正在执行头节点处理器的执行

      头节点执行后,我们可以看到队列中有两个流文件,我们可以连接队列邮件List Queue看看

      我们可以看到队列流文件列表

      我们可以点击这个小眼睛图标,查看每个流文件的内容

      可以看到,我们流转过程中流文件的格式是avro的,我们可以选择formatted格式格式化一下

      这样就是我们查询表的数据封装到流文件里的内容了

      我们对第二个处理器执行Start,让他把连接队列里面的流文件全都进行处理

      我们可以发现,第二个处理完成后,也是将流文件输出到连接队列里,接着由下一个处理器进行处理

      我们接着对尾结点处理器执行Start,让运行去处理连接队列里的所有流文件

      我们可以看到尾结点处理完后,整个流程就执行结束了

      然后对比一下目标表前后数据变化,我们可以看到,数据已经成功从来源表同步到目标表了

    总结:

    以上演示了一种简单场景的数据流程处理场景,当然NiFi能够实现的数据处理场景时非常多样化的,具体可以结合实际的应用场景,选择不同的处理器组合跟配置进行实现,我们从示例的使用过程中,可以体会到,我们无需关系数据在处理过程中是如何转换和消费的,我们仅需通过一些基本的配置信息,即可实现数据的同步。

文章来自个人专栏
大数据技术专栏
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0