一、实现流程
1、模板
1)、模板1
该模板可能出现异常–在验证中有说明–不同的环境可能存在不同。
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description>将mysql中的数据导入到Hdfs中,并且使用lzo压缩方式。
存在重复的数据</description>
<groupId>2f7d3766-0186-1000-0000-00006e07b64a</groupId>
<name>MysqlToHDFSByLzo</name>
<snippet>
<connections>
<id>8bacaebe-bce0-31e8-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>4cb1eb1d-ca3a-34e0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>COMPRESS_ATTRIBUTES_AND_CONTENT</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name>Q_C</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>c16280cc-6d1d-355c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>ce7dcdb2-bcd9-38a8-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>26c8401a-8807-3771-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>COMPRESS_ATTRIBUTES_AND_CONTENT</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name>S_P</name>
<selectedRelationships>split</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>20f76bcb-e978-3263-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f5322759-8583-3753-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>20f76bcb-e978-3263-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name>C_S</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>4cb1eb1d-ca3a-34e0-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>55bee1a0-0b0c-3a63-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
<value>
<identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService>
<name>kerberos-credentials-service</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
<entry>
<key>dbcp-min-idle-conns</key>
<value>
<name>dbcp-min-idle-conns</name>
</value>
</entry>
<entry>
<key>dbcp-max-idle-conns</key>
<value>
<name>dbcp-max-idle-conns</name>
</value>
</entry>
<entry>
<key>dbcp-max-conn-lifetime</key>
<value>
<name>dbcp-max-conn-lifetime</name>
</value>
</entry>
<entry>
<key>dbcp-time-between-eviction-runs</key>
<value>
<name>dbcp-time-between-eviction-runs</name>
</value>
</entry>
<entry>
<key>dbcp-min-evictable-idle-time</key>
<value>
<name>dbcp-min-evictable-idle-time</name>
</value>
</entry>
<entry>
<key>dbcp-soft-min-evictable-idle-time</key>
<value>
<name>dbcp-soft-min-evictable-idle-time</name>
</value>
</entry>
</descriptors>
<name>MySQL_ConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.10.44:3306/test?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/usr/local/bigdata/testdata/mysql-connector-java-5.1.44.jar</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
</entry>
<entry>
<key>Max Total Connections</key>
</entry>
<entry>
<key>Validation-query</key>
</entry>
<entry>
<key>dbcp-min-idle-conns</key>
</entry>
<entry>
<key>dbcp-max-idle-conns</key>
</entry>
<entry>
<key>dbcp-max-conn-lifetime</key>
</entry>
<entry>
<key>dbcp-time-between-eviction-runs</key>
</entry>
<entry>
<key>dbcp-min-evictable-idle-time</key>
</entry>
<entry>
<key>dbcp-soft-min-evictable-idle-time</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<processors>
<id>20f76bcb-e978-3263-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>4.0</x>
<y>413.5</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JsonPath Expression</key>
<value>
<name>JsonPath Expression</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JsonPath Expression</key>
<value>$.*</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>SplitJson_Demo</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>split</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.SplitJson</type>
</processors>
<processors>
<id>26c8401a-8807-3771-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>3.0</x>
<y>624.5</y>
</position>
<bundle>
<artifact>nifi-hadoop-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Hadoop Configuration Resources</key>
<value>
<name>Hadoop Configuration Resources</name>
</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
<value>
<identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService>
<name>kerberos-credentials-service</name>
</value>
</entry>
<entry>
<key>Kerberos Principal</key>
<value>
<name>Kerberos Principal</name>
</value>
</entry>
<entry>
<key>Kerberos Keytab</key>
<value>
<name>Kerberos Keytab</name>
</value>
</entry>
<entry>
<key>Kerberos Relogin Period</key>
<value>
<name>Kerberos Relogin Period</name>
</value>
</entry>
<entry>
<key>Additional Classpath Resources</key>
<value>
<name>Additional Classpath Resources</name>
</value>
</entry>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Block Size</key>
<value>
<name>Block Size</name>
</value>
</entry>
<entry>
<key>IO Buffer Size</key>
<value>
<name>IO Buffer Size</name>
</value>
</entry>
<entry>
<key>Replication</key>
<value>
<name>Replication</name>
</value>
</entry>
<entry>
<key>Permissions umask</key>
<value>
<name>Permissions umask</name>
</value>
</entry>
<entry>
<key>Remote Owner</key>
<value>
<name>Remote Owner</name>
</value>
</entry>
<entry>
<key>Remote Group</key>
<value>
<name>Remote Group</name>
</value>
</entry>
<entry>
<key>Compression codec</key>
<value>
<name>Compression codec</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Hadoop Configuration Resources</key>
<value>/usr/local/bigdata/hadoop-3.1.4/etc/hadoop/hdfs-site.xml,/usr/local/bigdata/hadoop-3.1.4/etc/hadoop/core-site.xml</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
</entry>
<entry>
<key>Kerberos Principal</key>
</entry>
<entry>
<key>Kerberos Keytab</key>
</entry>
<entry>
<key>Kerberos Relogin Period</key>
<value>4 hours</value>
</entry>
<entry>
<key>Additional Classpath Resources</key>
<value>/usr/local/bigdata/testdata/hadoop-lzo-0.4.21-SNAPSHOT.jar</value>
</entry>
<entry>
<key>Directory</key>
<value>/user/hive/warehouse/test.db/user</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>append</value>
</entry>
<entry>
<key>Block Size</key>
</entry>
<entry>
<key>IO Buffer Size</key>
</entry>
<entry>
<key>Replication</key>
</entry>
<entry>
<key>Permissions umask</key>
</entry>
<entry>
<key>Remote Owner</key>
</entry>
<entry>
<key>Remote Group</key>
</entry>
<entry>
<key>Compression codec</key>
<value>LZO</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PutHDFS_Demo</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.hadoop.PutHDFS</type>
</processors>
<processors>
<id>4cb1eb1d-ca3a-34e0-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>206.5</y>
</position>
<bundle>
<artifact>nifi-avro-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>array</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>true</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ConvertAvroToJSON_Demo</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
<processors>
<id>c16280cc-6d1d-355c-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>9.0</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>
<name>db-fetch-db-type</name>
</value>
</entry>
<entry>
<key>Table Name</key>
<value>
<name>Table Name</name>
</value>
</entry>
<entry>
<key>Columns to Return</key>
<value>
<name>Columns to Return</name>
</value>
</entry>
<entry>
<key>db-fetch-where-clause</key>
<value>
<name>db-fetch-where-clause</name>
</value>
</entry>
<entry>
<key>db-fetch-sql-query</key>
<value>
<name>db-fetch-sql-query</name>
</value>
</entry>
<entry>
<key>Maximum-value Columns</key>
<value>
<name>Maximum-value Columns</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Fetch Size</key>
<value>
<name>Fetch Size</name>
</value>
</entry>
<entry>
<key>qdbt-max-rows</key>
<value>
<name>qdbt-max-rows</name>
</value>
</entry>
<entry>
<key>qdbt-output-batch-size</key>
<value>
<name>qdbt-output-batch-size</name>
</value>
</entry>
<entry>
<key>qdbt-max-frags</key>
<value>
<name>qdbt-max-frags</name>
</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>
<name>dbf-normalize</name>
</value>
</entry>
<entry>
<key>transaction-isolation-level</key>
<value>
<name>transaction-isolation-level</name>
</value>
</entry>
<entry>
<key>dbf-user-logical-types</key>
<value>
<name>dbf-user-logical-types</name>
</value>
</entry>
<entry>
<key>dbf-default-precision</key>
<value>
<name>dbf-default-precision</name>
</value>
</entry>
<entry>
<key>dbf-default-scale</key>
<value>
<name>dbf-default-scale</name>
</value>
</entry>
</descriptors>
<executionNode>PRIMARY</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Database Connection Pooling Service</key>
<value>55bee1a0-0b0c-3a63-0000-000000000000</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>MySQL</value>
</entry>
<entry>
<key>Table Name</key>
<value>user</value>
</entry>
<entry>
<key>Columns to Return</key>
</entry>
<entry>
<key>db-fetch-where-clause</key>
</entry>
<entry>
<key>db-fetch-sql-query</key>
<value>select * from user</value>
</entry>
<entry>
<key>Maximum-value Columns</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
<entry>
<key>Fetch Size</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-max-rows</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-output-batch-size</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-max-frags</key>
<value>0</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>false</value>
</entry>
<entry>
<key>transaction-isolation-level</key>
</entry>
<entry>
<key>dbf-user-logical-types</key>
<value>false</value>
</entry>
<entry>
<key>dbf-default-precision</key>
<value>10</value>
</entry>
<entry>
<key>dbf-default-scale</key>
<value>0</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>true</executionNodeRestricted>
<name>QueryDatabaseTable_demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.QueryDatabaseTable</type>
</processors>
</snippet>
<timestamp>02/08/2023 08:45:41 GMT</timestamp>
</template>
2)、模板2
增加了ControlRate处理器以及日志处理器,经测试未发现异常
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description></description>
<groupId>2f7d3766-0186-1000-0000-00006e07b64a</groupId>
<name>MysqlToHDFSByLzo2</name>
<snippet>
<connections>
<id>25c778c6-63df-3672-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>203e8481-e4c7-3340-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>4cb1eb1d-ca3a-34e0-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>59e154ce-8ca9-329f-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>26c8401a-8807-3771-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>COMPRESS_ATTRIBUTES_AND_CONTENT</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>1b9fd194-4cdb-369f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>60539d1e-e7f5-396c-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>203e8481-e4c7-3340-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>26c8401a-8807-3771-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6e3859ca-2a0d-3560-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>1b9fd194-4cdb-369f-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>COMPRESS_ATTRIBUTES_AND_CONTENT</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name>S_C</name>
<selectedRelationships>split</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>20f76bcb-e978-3263-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>7b343e88-ab1a-30ee-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>203e8481-e4c7-3340-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>c16280cc-6d1d-355c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>8bacaebe-bce0-31e8-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>4cb1eb1d-ca3a-34e0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>COMPRESS_ATTRIBUTES_AND_CONTENT</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name>Q_C</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>c16280cc-6d1d-355c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>ee0fcd22-6c7c-3edc-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>203e8481-e4c7-3340-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>COMPRESS_ATTRIBUTES_AND_CONTENT</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>1b9fd194-4cdb-369f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f4577d45-be28-3c83-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>203e8481-e4c7-3340-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>20f76bcb-e978-3263-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f5322759-8583-3753-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>20f76bcb-e978-3263-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>COMPRESS_ATTRIBUTES_AND_CONTENT</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name>C_S</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>60d38136-211b-3d16-0000-000000000000</groupId>
<id>4cb1eb1d-ca3a-34e0-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>55bee1a0-0b0c-3a63-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
<value>
<identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService>
<name>kerberos-credentials-service</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
<entry>
<key>dbcp-min-idle-conns</key>
<value>
<name>dbcp-min-idle-conns</name>
</value>
</entry>
<entry>
<key>dbcp-max-idle-conns</key>
<value>
<name>dbcp-max-idle-conns</name>
</value>
</entry>
<entry>
<key>dbcp-max-conn-lifetime</key>
<value>
<name>dbcp-max-conn-lifetime</name>
</value>
</entry>
<entry>
<key>dbcp-time-between-eviction-runs</key>
<value>
<name>dbcp-time-between-eviction-runs</name>
</value>
</entry>
<entry>
<key>dbcp-min-evictable-idle-time</key>
<value>
<name>dbcp-min-evictable-idle-time</name>
</value>
</entry>
<entry>
<key>dbcp-soft-min-evictable-idle-time</key>
<value>
<name>dbcp-soft-min-evictable-idle-time</name>
</value>
</entry>
</descriptors>
<name>MySQL_ConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.10.44:3306/test?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/usr/local/bigdata/testdata/mysql-connector-java-5.1.44.jar</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>500 millis</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>8</value>
</entry>
<entry>
<key>Validation-query</key>
</entry>
<entry>
<key>dbcp-min-idle-conns</key>
<value>0</value>
</entry>
<entry>
<key>dbcp-max-idle-conns</key>
<value>8</value>
</entry>
<entry>
<key>dbcp-max-conn-lifetime</key>
<value>-1</value>
</entry>
<entry>
<key>dbcp-time-between-eviction-runs</key>
<value>-1</value>
</entry>
<entry>
<key>dbcp-min-evictable-idle-time</key>
<value>30 mins</value>
</entry>
<entry>
<key>dbcp-soft-min-evictable-idle-time</key>
<value>-1</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<processors>
<id>1b9fd194-4cdb-369f-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>2.974225266934127</x>
<y>627.7810694387299</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Rate Control Criteria</key>
<value>
<name>Rate Control Criteria</name>
</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>
<name>Maximum Rate</name>
</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
<value>
<name>Rate Controlled Attribute</name>
</value>
</entry>
<entry>
<key>Time Duration</key>
<value>
<name>Time Duration</name>
</value>
</entry>
<entry>
<key>Grouping Attribute</key>
<value>
<name>Grouping Attribute</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Rate Control Criteria</key>
<value>flowfile count</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>100000</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
</entry>
<entry>
<key>Time Duration</key>
<value>1 min</value>
</entry>
<entry>
<key>Grouping Attribute</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ControlRate_demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.ControlRate</type>
</processors>
<processors>
<id>203e8481-e4c7-3340-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>712.1617915050342</x>
<y>435.16275513999926</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>LogAttribute——demo</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>20f76bcb-e978-3263-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>1.783660888671875</x>
<y>408.520751953125</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>3</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JsonPath Expression</key>
<value>
<name>JsonPath Expression</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JsonPath Expression</key>
<value>$.*</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>SplitJson_Demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>split</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.SplitJson</type>
</processors>
<processors>
<id>26c8401a-8807-3771-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>825.9684448242188</y>
</position>
<bundle>
<artifact>nifi-hadoop-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>3</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Hadoop Configuration Resources</key>
<value>
<name>Hadoop Configuration Resources</name>
</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
<value>
<identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService>
<name>kerberos-credentials-service</name>
</value>
</entry>
<entry>
<key>Kerberos Principal</key>
<value>
<name>Kerberos Principal</name>
</value>
</entry>
<entry>
<key>Kerberos Keytab</key>
<value>
<name>Kerberos Keytab</name>
</value>
</entry>
<entry>
<key>Kerberos Relogin Period</key>
<value>
<name>Kerberos Relogin Period</name>
</value>
</entry>
<entry>
<key>Additional Classpath Resources</key>
<value>
<name>Additional Classpath Resources</name>
</value>
</entry>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Block Size</key>
<value>
<name>Block Size</name>
</value>
</entry>
<entry>
<key>IO Buffer Size</key>
<value>
<name>IO Buffer Size</name>
</value>
</entry>
<entry>
<key>Replication</key>
<value>
<name>Replication</name>
</value>
</entry>
<entry>
<key>Permissions umask</key>
<value>
<name>Permissions umask</name>
</value>
</entry>
<entry>
<key>Remote Owner</key>
<value>
<name>Remote Owner</name>
</value>
</entry>
<entry>
<key>Remote Group</key>
<value>
<name>Remote Group</name>
</value>
</entry>
<entry>
<key>Compression codec</key>
<value>
<name>Compression codec</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Hadoop Configuration Resources</key>
<value>/usr/local/bigdata/hadoop-3.1.4/etc/hadoop/hdfs-site.xml,/usr/local/bigdata/hadoop-3.1.4/etc/hadoop/core-site.xml</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
</entry>
<entry>
<key>Kerberos Principal</key>
</entry>
<entry>
<key>Kerberos Keytab</key>
</entry>
<entry>
<key>Kerberos Relogin Period</key>
<value>4 hours</value>
</entry>
<entry>
<key>Additional Classpath Resources</key>
<value>/usr/local/bigdata/testdata/hadoop-lzo-0.4.21-SNAPSHOT.jar</value>
</entry>
<entry>
<key>Directory</key>
<value>/user/hive/warehouse/test.db/testuser</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>append</value>
</entry>
<entry>
<key>Block Size</key>
</entry>
<entry>
<key>IO Buffer Size</key>
</entry>
<entry>
<key>Replication</key>
</entry>
<entry>
<key>Permissions umask</key>
</entry>
<entry>
<key>Remote Owner</key>
</entry>
<entry>
<key>Remote Group</key>
</entry>
<entry>
<key>Compression codec</key>
<value>LZO</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PutHDFS_Demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.hadoop.PutHDFS</type>
</processors>
<processors>
<id>4cb1eb1d-ca3a-34e0-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>5.04095458984375</x>
<y>203.5</y>
</position>
<bundle>
<artifact>nifi-avro-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>array</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>true</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ConvertAvroToJSON_Demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
<processors>
<id>c16280cc-6d1d-355c-0000-000000000000</id>
<parentGroupId>60d38136-211b-3d16-0000-000000000000</parentGroupId>
<position>
<x>4.04095458984375</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>
<name>db-fetch-db-type</name>
</value>
</entry>
<entry>
<key>Table Name</key>
<value>
<name>Table Name</name>
</value>
</entry>
<entry>
<key>Columns to Return</key>
<value>
<name>Columns to Return</name>
</value>
</entry>
<entry>
<key>db-fetch-where-clause</key>
<value>
<name>db-fetch-where-clause</name>
</value>
</entry>
<entry>
<key>db-fetch-sql-query</key>
<value>
<name>db-fetch-sql-query</name>
</value>
</entry>
<entry>
<key>Maximum-value Columns</key>
<value>
<name>Maximum-value Columns</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Fetch Size</key>
<value>
<name>Fetch Size</name>
</value>
</entry>
<entry>
<key>qdbt-max-rows</key>
<value>
<name>qdbt-max-rows</name>
</value>
</entry>
<entry>
<key>qdbt-output-batch-size</key>
<value>
<name>qdbt-output-batch-size</name>
</value>
</entry>
<entry>
<key>qdbt-max-frags</key>
<value>
<name>qdbt-max-frags</name>
</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>
<name>dbf-normalize</name>
</value>
</entry>
<entry>
<key>transaction-isolation-level</key>
<value>
<name>transaction-isolation-level</name>
</value>
</entry>
<entry>
<key>dbf-user-logical-types</key>
<value>
<name>dbf-user-logical-types</name>
</value>
</entry>
<entry>
<key>dbf-default-precision</key>
<value>
<name>dbf-default-precision</name>
</value>
</entry>
<entry>
<key>dbf-default-scale</key>
<value>
<name>dbf-default-scale</name>
</value>
</entry>
</descriptors>
<executionNode>PRIMARY</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Database Connection Pooling Service</key>
<value>55bee1a0-0b0c-3a63-0000-000000000000</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>MySQL</value>
</entry>
<entry>
<key>Table Name</key>
<value>dx_user</value>
</entry>
<entry>
<key>Columns to Return</key>
</entry>
<entry>
<key>db-fetch-where-clause</key>
</entry>
<entry>
<key>db-fetch-sql-query</key>
<value>select * from dx_user </value>
</entry>
<entry>
<key>Maximum-value Columns</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
<entry>
<key>Fetch Size</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-max-rows</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-output-batch-size</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-max-frags</key>
<value>0</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>false</value>
</entry>
<entry>
<key>transaction-isolation-level</key>
</entry>
<entry>
<key>dbf-user-logical-types</key>
<value>false</value>
</entry>
<entry>
<key>dbf-default-precision</key>
<value>10</value>
</entry>
<entry>
<key>dbf-default-scale</key>
<value>0</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>86400 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>true</executionNodeRestricted>
<name>QueryDatabaseTable_demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.QueryDatabaseTable</type>
</processors>
</snippet>
<timestamp>02/09/2023 05:48:36 GMT</timestamp>
</template>
2、处理器流程
1)、模板1处理流程
QueryDatabaseTable ——> ConvertAvroToJSON ——> SplitJson ——> PutHDFS
- QueryDatabaseTable读取Mysql数据
- ConvertAvroToJSON将数据转换为可阅读的Json格式
- SplitJson进行切割获得单独的对象
- PutHDFS将所有对象写入HDFS中
2)、模板2处理流程
QueryDatabaseTable ——> ConvertAvroToJSON ——> SplitJson ——> ControlRate ——> PutHDFS
- QueryDatabaseTable读取Mysql数据
- ConvertAvroToJSON将数据转换为可阅读的Json格式
- SplitJson进行切割获得单独的对象
- ControlRate
- PutHDFS将所有对象写入HDFS中
二、处理器说明
本处介绍该示例使用到的处理。
1、QueryDatabaseTable
1)、描述
生成SQL选择查询,或使用提供的语句,并执行该语句以获取其指定的“最大值”列中的值大于先前看到的最大值的所有行。查询结果将转换为Avro格式。几种属性都支持表达式语言,但不允许传入连接。变量注册表可用于为包含表达式语言的任何属性提供值。如果需要利用流文件属性来执行这些查询,则可以将GenerateTableFetch和/或ExecuteSQL处理器用于此目的。使用流技术,因此支持任意大的结果集。使用标准调度方法,可以将该处理器调度为在计时器或cron表达式上运行。该处理器只能在主节点上运行。
2)、属性配置
在下面的列表中,列出所有默认值,以及属性是否支持NiFi表达式语言
2、ConvertAvroToJSON
1)、描述
将Binary Avro记录转换为JSON对象。该处理器提供了Avro字段到JSON字段的直接映射,因此,生成的JSON将具有与Avro文档相同的层次结构。请注意,Avro模式信息将丢失,因为这不是从二进制Avro到JSON格式的Avro的转换。输出JSON编码为UTF-8编码。如果传入的FlowFile包含多个Avro记录的流,则生成的FlowFile将包含一个JSON Array,其中包含所有Avro记录或JSON对象序列。如果传入的FlowFile不包含任何记录,则输出为空JSON对象。空/单个Avro记录FlowFile输入可以根据“包装单个记录”的要求选择包装在容器中。
2)、属性配置
在下面的列表中,列出属性及其默认值
3、SplitJson
1)、描述
该处理器使用JsonPath表达式指定需要的数组元素,将JSON数组分割为多个单独的流文件。每个生成的流文件都由指定数组的一个元素组成,并传输到关系“split”,原始文件传输到关系“original”。如果没有找到指定的JsonPath,或者没有对数组元素求值,则将原始文件路由到“failure”,不会生成任何文件。
该处理器需要使用人员掌握JsonPath表达式语言。
2)、属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言
4、PutHDFS
1)、描述
将FlowFile数据写入Hadoop分布式文件系统(HDFS)
2)、属性配置
在下面的列表中,列出所有属性及默认值,以及属性是否支持NiFi表达式语言
三、操作
1、创建组
2、创建并配置QueryDatabaseTable
3、创建并配置Mysql连接池
1)、创建
2)、配置
Database Connection URL = jdbc:mysql://192.168.10.44:3306/test?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
Database Driver Class Name = com.mysql.jdbc.Driver
#此处的jar包需要提前上传到nifi服务器中
Database Driver Location(s) = /usr/local/bigdata/imply-3.0.4/dist/druid/extensions/mysql-metadata-storage/mysql-connector-java-5.1.44.jar
Database User = root
Password = 8888888
3)、启动连接池
即便参数配置错了,还是能启动的,原因不详
4、创建并配置ConvertAvroToJSON
QueryDatabaseTable从ExecuteSQL里出来的是avro格式的数据,要先将其转化成json格式
1)、创建配置ConvertAvroToJSON
2)、连接
3)、负载均衡消费数据
5、创建并配置SplitJson
从上一步输出的数据是由多条记录构成的整体,需要将其分割成独立的单条数据
拖入一个SplitJson processor到界面中,然后从ConvertAvroToJson连一条线到SplitJson,关系为success。
配置SplitJson,在properties页,将JsonPath Expression设置为$.*
6、创建并配置PutHDFS
Hadoop Configuration Resources = /export/download/config/hdfs-site.xml,/export/download/config/core-site.xml
Directory = /user/hive/warehouse/nifi_test.db/user_info_nifi
Conflict Resolution Strategy = append
根据需要设置QueryDatabaseTable processor的scheduling选项,默认的执行间隔是0秒,即不间断的执行SQL语句,会导致从Mysql中读出大量重复数据。如果仅仅需要将一次SQL查询的结果导入HBase,建议将该值设置大一些,等待执行完毕后手动结束即可;如果需要定期执行,则应设置合适的执行间隔时间。
其不能自己控制每个处理器完成任务的时间,需要人工自己控制。
四、验证
1、启动QueryDatabaseTable,并查看队列中数据
2、启动ConvertAvroToJSON,并查看队列中数据
3、启动SplitJson,并查看队列中数据
4、启动PutHDFS,并查看处理器接收和输出的数据
- 如果配置的压缩方式与hadoop的压缩方式不一致,则需要配置保持一致;如果NiFi本身没有相应的jar包,则需要添加Additional Classpath Resources指定具体的jar包的位置。
- NiFi的部署用户与hadoop HDFS用户是否一致,如果不一致,则需要设置一致,一般而言可能需要修改HDFS文件对应的用户权限
如果按照上述配置,可能存在如下异常
Caused by: org.apache.hadoop.ipc.RemoteException:
Failed to APPEND_FILE /user/hive/warehouse/test.db/testuser/06b034cf-f4a0-49f1-9742-7b6d74ce024b.lzo_deflate for DFSClient_NONMAPREDUCE_2099184430_144 on 192.168.10.41
because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1635697973_57 on 192.168.10.42
经查询相关资料提示,需要增加ControlRate处理器,设置最大的速率。具体参考模板2。
5、查看HDFS数据
通过hue查看该表的前提是hive中已经创建表。验证该步骤的前提是已经将数据同步到hive中,并且hue环境好用,否则可以通过hadoop的命令直接查看文件内容。