本文旨在介绍FlowFile属性和内容、模板和简单介绍一个应用示例。其中模板将是后续文章的主要使用内容。
本分前提是nifi环境正常使用。
本分分为三个部分,即FlowFile生成器示例、模板以及FlowFile的内容与属性。
一、FlowFile生成器示例
FlowFile生成器,GenerateFlowFile和ReplaceText处理器,用于生成数据。
1、GenerateFlowFile解析
1)、描述
该处理器使用随机数据或自定义内容创建流文件。GenerateFlowFile用于负载测试、配置和仿真。
2)、属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
3)、应用场景
该处理器多用于测试,配置生成设计人员所需要的特定数据,模拟数据来源或者压力测试、负载测试。
2、ReplaceText解析
1)、描述
使用其他值替换匹配正则表达式的流文件部分内容,从而更新流文件的内容。
2)、属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
3)、应用场景
使用正则表达式,来逐行或者全文本替换文件流内容,往往用于业务逻辑处理。
3、示例
1)、创建GenerateFlowFile并设置大小
2)、配置FlowFile
仅主节点运行,每10s运行一次,每个文件1B,每个批次5个文件
3)、创建ReplaceText并连接
4)、启动GenerateFile
启动GenerateFile处理器,看是否自动生成了数据文件
5)、配置ReplaceText
所有的内容都替换成 hello world
6)、创建PutFile
停止GenerateFlowFile,然后创建PutFile
7)、查看结果
查看主节点目录下的文件内容
[root@server1 nifioutputtest]# ll
总用量 20
-rw-r--r-- 1 root root 11 2月 7 08:09 2536169b-33eb-4d7e-a22c-0d4ecc9c2496
-rw-r--r-- 1 root root 11 2月 7 08:09 28c09c45-d33b-4381-ae81-87a04e6138c5
-rw-r--r-- 1 root root 11 2月 7 08:09 81303b0e-30f3-4aac-b569-082015a283be
-rw-r--r-- 1 root root 11 2月 7 08:09 852748a5-c6ba-4a73-bf18-8c15425edba0
-rw-r--r-- 1 root root 11 2月 7 08:09 c46d56bf-69d3-455b-9539-07b1a2b5d7ad
[root@server1 nifioutputtest]# cat c46d56bf-69d3-455b-9539-07b1a2b5d7ad
hello world
[root@server1 nifioutputtest]#
验证正常。
二、Nifi模板
1、导入模板
1)、导入
导入本模板(图文不匹配,本模板是上述例子的模板)
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description>创建模板试一试</description>
<groupId>29e21e65-0186-1000-01d5-e5cdf061c513</groupId>
<name>test_template</name>
<snippet>
<connections>
<id>71591b1b-3df4-310b-0000-000000000000</id>
<parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>6cb373b2-097c-3113-0000-000000000000</groupId>
<id>47f06192-31ad-3186-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name>gen_rep</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>6cb373b2-097c-3113-0000-000000000000</groupId>
<id>49cfc51b-650d-3596-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>daf85806-06eb-347c-0000-000000000000</id>
<parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>6cb373b2-097c-3113-0000-000000000000</groupId>
<id>f4c648ea-d396-3680-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name>rep_put</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>6cb373b2-097c-3113-0000-000000000000</groupId>
<id>47f06192-31ad-3186-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<processors>
<id>47f06192-31ad-3186-0000-000000000000</id>
<parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId>
<position>
<x>551.0</x>
<y>4.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>hello world</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Always Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ReplaceText_Demo</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>49cfc51b-650d-3596-0000-000000000000</id>
<parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>PRIMARY</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>1B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>5</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>20 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>GenerateFlowFile_Demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>f4c648ea-d396-3680-0000-000000000000</id>
<parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId>
<position>
<x>1101.0</x>
<y>2.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>
<name>Create Missing Directories</name>
</value>
</entry>
<entry>
<key>Maximum File Count</key>
<value>
<name>Maximum File Count</name>
</value>
</entry>
<entry>
<key>Last Modified Time</key>
<value>
<name>Last Modified Time</name>
</value>
</entry>
<entry>
<key>Permissions</key>
<value>
<name>Permissions</name>
</value>
</entry>
<entry>
<key>Owner</key>
<value>
<name>Owner</name>
</value>
</entry>
<entry>
<key>Group</key>
<value>
<name>Group</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Directory</key>
<value>/usr/local/bigdata/testdata/nifioutputtest</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>fail</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>true</value>
</entry>
<entry>
<key>Maximum File Count</key>
</entry>
<entry>
<key>Last Modified Time</key>
</entry>
<entry>
<key>Permissions</key>
</entry>
<entry>
<key>Owner</key>
</entry>
<entry>
<key>Group</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PutFile_demo</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.PutFile</type>
</processors>
</snippet>
<timestamp>02/07/2023 08:15:48 GMT</timestamp>
</template>
2)、查看模板列表
3)、创建模板引用
2、组、导出模板
1)、创建组
2)、移动处理器到组
3)、进入与退出组
4)、创建模板
创建模板就是针对当前画布创建的模板,也即该模板包含当前画布的所有操作。
5)、下载模板
6)、嵌套组
三、FlowFile内容和属性
1、FlowFile属性及操作
1)、FlowFile包含两部分
- 属性
FlowFile的元数据.保存了关于内容的信息,比如:什么时候创建的,从哪里来,代表什么等等 - 内容
FlowFile的实际内容。比如:使用getfile读取数据时,读取到的实际内容文本。
2)、处理器对FlowFile的操作
更新,添加,或者删除FlowFile属性。改变FlowFile内容。
2、处理器
1)、ExtractText
- 描述
该处理器使用正则表达式,匹配流文件中的内容,并将匹配成功的内容输出到属性中;如果正则匹配到多个结果,默认只取第一个结果;匹配成功则流文件路由matched,没有匹配则到unmatched; - 属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。 - 应用场景
提取content中的内容,输出到流属性当中
2)、UpdateAttribute
- 描述
该处理器使用属性表达式语言更新流文件的属性,并且/或则基于正则表达式删除属性 - 属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言 - 应用场景
该处理器基本用法最为常用,增加,修改或删除流属性;
此处理器使用用户添加的属性或规则更新FlowFile的属性。有三种方法可以使用此处理器添加或修改属性。
1、基本用法: 默认更改通过处理器的每个FlowFile的匹配的属性
2、高级用法:可以进行条件属性更改,只有在满足特定条件时才会影响FlowFile。可以在同一处理器中同时使用这两种方法
3、删除属性表达式: 允许提供正则表达式,并且将删除匹配的任何属性
“删除属性表达式”将取代发生的任何更新。如果现有属性与“删除属性表达式”匹配,则无论是否更新,都将删除该属性。“删除属性表达式”仅适用于输入FlowFile中存在的属性,如果属性是由此处理器添加的,则“删除属性表达式”将不会匹配到它。
3、示例
1)、改为单点造数据
2)、负载均衡消费数据
3)、查看队列数据
4)、通过ExtractText将数据写入属性
查看输出数据属性
以上完成了FlowFile的示例及模板的示例。更多信息请关注该专栏的其他文章。