本文是在6、NIFI综合应用场景-离线同步Mysql数据到HDFS中基础上完成的,仅仅是将上文中的json数据转成txt文件。本文仅列出了和该示例增加的部分,其他的则没有变化。
本文前置条件是mysql、hadoop、nifi、hive和hue环境运行正常。如果没有hive或hue环境,则可通过查看hadoop的文件数据即可。
本分分为四部分,即实现流程、处理器说明、操作和验证。
一、实现流程
1、模板
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description></description>
<groupId>34bb2ec3-0186-1000-0000-00006aa1300a</groupId>
<name>Mysql_Hdfs_replace</name>
<snippet>
<connections>
<id>00e9c3d4-1d6c-34c5-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>ec9cd04c-b93f-3c21-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name>Q_C</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>df6b3226-a4c1-300f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>04c2abc1-92ba-3fb9-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>d554635b-4984-31af-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>ec9cd04c-b93f-3c21-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>0a66705b-f2c8-35b1-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>07400794-0659-3efe-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name>R_P</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>4c839042-f10c-313e-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>155f51cd-a0a5-3fd4-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>caf57854-0eff-3d22-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_INACTIVE</loadBalanceStatus>
<loadBalanceStrategy>ROUND_ROBIN</loadBalanceStrategy>
<name>C_S</name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>ec9cd04c-b93f-3c21-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>3fda3197-9ec4-3d2f-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>d554635b-4984-31af-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>caf57854-0eff-3d22-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>43cda33e-7b67-34ea-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>4c839042-f10c-313e-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name>E_R</name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>3ee738a1-bea1-3a92-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4f9b64c4-e136-3188-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>d554635b-4984-31af-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>4c839042-f10c-313e-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>beb7e986-0351-3abc-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>d554635b-4984-31af-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>07400794-0659-3efe-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>c3b234bd-9990-3eb2-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>d554635b-4984-31af-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>df6b3226-a4c1-300f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d09dac25-4157-3743-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>d554635b-4984-31af-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name>E_L</name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>3ee738a1-bea1-3a92-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>ef9dd0e8-996a-3142-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>3ee738a1-bea1-3a92-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name>S_E</name>
<selectedRelationships>original</selectedRelationships>
<selectedRelationships>split</selectedRelationships>
<source>
<groupId>01a24222-073e-3a50-0000-000000000000</groupId>
<id>caf57854-0eff-3d22-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>92d8aa93-71a2-3d31-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
<value>
<identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService>
<name>kerberos-credentials-service</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
<entry>
<key>dbcp-min-idle-conns</key>
<value>
<name>dbcp-min-idle-conns</name>
</value>
</entry>
<entry>
<key>dbcp-max-idle-conns</key>
<value>
<name>dbcp-max-idle-conns</name>
</value>
</entry>
<entry>
<key>dbcp-max-conn-lifetime</key>
<value>
<name>dbcp-max-conn-lifetime</name>
</value>
</entry>
<entry>
<key>dbcp-time-between-eviction-runs</key>
<value>
<name>dbcp-time-between-eviction-runs</name>
</value>
</entry>
<entry>
<key>dbcp-min-evictable-idle-time</key>
<value>
<name>dbcp-min-evictable-idle-time</name>
</value>
</entry>
<entry>
<key>dbcp-soft-min-evictable-idle-time</key>
<value>
<name>dbcp-soft-min-evictable-idle-time</name>
</value>
</entry>
</descriptors>
<name>MySQL_ConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.10.44:3306/test?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/usr/local/bigdata/testdata/mysql-connector-java-5.1.44.jar</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>500 millis</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>8</value>
</entry>
<entry>
<key>Validation-query</key>
</entry>
<entry>
<key>dbcp-min-idle-conns</key>
<value>0</value>
</entry>
<entry>
<key>dbcp-max-idle-conns</key>
<value>8</value>
</entry>
<entry>
<key>dbcp-max-conn-lifetime</key>
<value>-1</value>
</entry>
<entry>
<key>dbcp-time-between-eviction-runs</key>
<value>-1</value>
</entry>
<entry>
<key>dbcp-min-evictable-idle-time</key>
<value>30 mins</value>
</entry>
<entry>
<key>dbcp-soft-min-evictable-idle-time</key>
<value>-1</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<processors>
<id>07400794-0659-3efe-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>1044.726806640625</y>
</position>
<bundle>
<artifact>nifi-hadoop-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Hadoop Configuration Resources</key>
<value>
<name>Hadoop Configuration Resources</name>
</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
<value>
<identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService>
<name>kerberos-credentials-service</name>
</value>
</entry>
<entry>
<key>Kerberos Principal</key>
<value>
<name>Kerberos Principal</name>
</value>
</entry>
<entry>
<key>Kerberos Keytab</key>
<value>
<name>Kerberos Keytab</name>
</value>
</entry>
<entry>
<key>Kerberos Relogin Period</key>
<value>
<name>Kerberos Relogin Period</name>
</value>
</entry>
<entry>
<key>Additional Classpath Resources</key>
<value>
<name>Additional Classpath Resources</name>
</value>
</entry>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Block Size</key>
<value>
<name>Block Size</name>
</value>
</entry>
<entry>
<key>IO Buffer Size</key>
<value>
<name>IO Buffer Size</name>
</value>
</entry>
<entry>
<key>Replication</key>
<value>
<name>Replication</name>
</value>
</entry>
<entry>
<key>Permissions umask</key>
<value>
<name>Permissions umask</name>
</value>
</entry>
<entry>
<key>Remote Owner</key>
<value>
<name>Remote Owner</name>
</value>
</entry>
<entry>
<key>Remote Group</key>
<value>
<name>Remote Group</name>
</value>
</entry>
<entry>
<key>Compression codec</key>
<value>
<name>Compression codec</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Hadoop Configuration Resources</key>
<value>/usr/local/bigdata/hadoop-3.1.4/etc/hadoop/hdfs-site.xml,/usr/local/bigdata/hadoop-3.1.4/etc/hadoop/core-site.xml</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
</entry>
<entry>
<key>Kerberos Principal</key>
</entry>
<entry>
<key>Kerberos Keytab</key>
</entry>
<entry>
<key>Kerberos Relogin Period</key>
<value>4 hours</value>
</entry>
<entry>
<key>Additional Classpath Resources</key>
<value>/usr/local/bigdata/testdata/hadoop-lzo-0.4.21-SNAPSHOT.jar</value>
</entry>
<entry>
<key>Directory</key>
<value>/user/hive/warehouse/test.db/testuser</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>append</value>
</entry>
<entry>
<key>Block Size</key>
</entry>
<entry>
<key>IO Buffer Size</key>
</entry>
<entry>
<key>Replication</key>
</entry>
<entry>
<key>Permissions umask</key>
</entry>
<entry>
<key>Remote Owner</key>
</entry>
<entry>
<key>Remote Group</key>
</entry>
<entry>
<key>Compression codec</key>
<value>LZO</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PutHDFS_Demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.hadoop.PutHDFS</type>
</processors>
<processors>
<id>3ee738a1-bea1-3a92-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<position>
<x>4.637380919756993</x>
<y>625.7774307668278</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>amount</key>
<value>
<name>amount</name>
</value>
</entry>
<entry>
<key>channelid</key>
<value>
<name>channelid</name>
</value>
</entry>
<entry>
<key>channelname</key>
<value>
<name>channelname</name>
</value>
</entry>
<entry>
<key>date1</key>
<value>
<name>date1</name>
</value>
</entry>
<entry>
<key>id</key>
<value>
<name>id</name>
</value>
</entry>
<entry>
<key>price</key>
<value>
<name>price</name>
</value>
</entry>
<entry>
<key>productid</key>
<value>
<name>productid</name>
</value>
</entry>
<entry>
<key>productname</key>
<value>
<name>productname</name>
</value>
</entry>
<entry>
<key>regionid</key>
<value>
<name>regionid</name>
</value>
</entry>
<entry>
<key>regionname</key>
<value>
<name>regionname</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Return Type</key>
<value>auto-detect</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>amount</key>
<value>$.amount</value>
</entry>
<entry>
<key>channelid</key>
<value>$.channelid</value>
</entry>
<entry>
<key>channelname</key>
<value>$.channelname</value>
</entry>
<entry>
<key>date1</key>
<value>$.date1</value>
</entry>
<entry>
<key>id</key>
<value>$.id</value>
</entry>
<entry>
<key>price</key>
<value>$.price</value>
</entry>
<entry>
<key>productid</key>
<value>$.productid</value>
</entry>
<entry>
<key>productname</key>
<value>$.productname</value>
</entry>
<entry>
<key>regionid</key>
<value>$.regionid</value>
</entry>
<entry>
<key>regionname</key>
<value>$.regionname</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
<processors>
<id>4c839042-f10c-313e-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<position>
<x>1.5301540524440043</x>
<y>833.9638161676247</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>${id},${date1},${channelid},${productid},${regionid},${amount},${price},${channelname},${productname},${regionname}</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Regex Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Line-by-Line</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ReplaceText</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>caf57854-0eff-3d22-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<position>
<x>8.582122802734375</x>
<y>408.520751953125</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>3</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JsonPath Expression</key>
<value>
<name>JsonPath Expression</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JsonPath Expression</key>
<value>$.*</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>SplitJson_Demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>split</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.SplitJson</type>
</processors>
<processors>
<id>d554635b-4984-31af-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<position>
<x>767.6406733409717</x>
<y>649.5637561165618</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>LogAttribute——demo</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>df6b3226-a4c1-300f-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<position>
<x>10.83941650390625</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>
<name>db-fetch-db-type</name>
</value>
</entry>
<entry>
<key>Table Name</key>
<value>
<name>Table Name</name>
</value>
</entry>
<entry>
<key>Columns to Return</key>
<value>
<name>Columns to Return</name>
</value>
</entry>
<entry>
<key>db-fetch-where-clause</key>
<value>
<name>db-fetch-where-clause</name>
</value>
</entry>
<entry>
<key>db-fetch-sql-query</key>
<value>
<name>db-fetch-sql-query</name>
</value>
</entry>
<entry>
<key>Maximum-value Columns</key>
<value>
<name>Maximum-value Columns</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Fetch Size</key>
<value>
<name>Fetch Size</name>
</value>
</entry>
<entry>
<key>qdbt-max-rows</key>
<value>
<name>qdbt-max-rows</name>
</value>
</entry>
<entry>
<key>qdbt-output-batch-size</key>
<value>
<name>qdbt-output-batch-size</name>
</value>
</entry>
<entry>
<key>qdbt-max-frags</key>
<value>
<name>qdbt-max-frags</name>
</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>
<name>dbf-normalize</name>
</value>
</entry>
<entry>
<key>transaction-isolation-level</key>
<value>
<name>transaction-isolation-level</name>
</value>
</entry>
<entry>
<key>dbf-user-logical-types</key>
<value>
<name>dbf-user-logical-types</name>
</value>
</entry>
<entry>
<key>dbf-default-precision</key>
<value>
<name>dbf-default-precision</name>
</value>
</entry>
<entry>
<key>dbf-default-scale</key>
<value>
<name>dbf-default-scale</name>
</value>
</entry>
</descriptors>
<executionNode>PRIMARY</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Database Connection Pooling Service</key>
<value>92d8aa93-71a2-3d31-0000-000000000000</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>MySQL</value>
</entry>
<entry>
<key>Table Name</key>
<value>dm_sales</value>
</entry>
<entry>
<key>Columns to Return</key>
</entry>
<entry>
<key>db-fetch-where-clause</key>
</entry>
<entry>
<key>db-fetch-sql-query</key>
<value>select * from dm_sales</value>
</entry>
<entry>
<key>Maximum-value Columns</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
<entry>
<key>Fetch Size</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-max-rows</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-output-batch-size</key>
<value>0</value>
</entry>
<entry>
<key>qdbt-max-frags</key>
<value>0</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>false</value>
</entry>
<entry>
<key>transaction-isolation-level</key>
</entry>
<entry>
<key>dbf-user-logical-types</key>
<value>false</value>
</entry>
<entry>
<key>dbf-default-precision</key>
<value>10</value>
</entry>
<entry>
<key>dbf-default-scale</key>
<value>0</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>86400 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>true</executionNodeRestricted>
<name>QueryDatabaseTable_demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.QueryDatabaseTable</type>
</processors>
<processors>
<id>ec9cd04c-b93f-3c21-0000-000000000000</id>
<parentGroupId>01a24222-073e-3a50-0000-000000000000</parentGroupId>
<position>
<x>11.83941650390625</x>
<y>203.5</y>
</position>
<bundle>
<artifact>nifi-avro-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.2</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>array</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>true</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ConvertAvroToJSON_Demo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
</snippet>
<timestamp>02/13/2023 02:25:32 GMT</timestamp>
</template>
2、处理流程
QueryDatabaseTable ——> ConvertAvroToJSON ——> SplitJson ——> EvaluateJsonPath ——> ReplaceText ——> PutHDFS
- 增加了EvaluateJsonPath和ReplaceText处理器,EvaluateJsonPath用来提取json中的属性,ReplaceText用来替换掉FlowFile中的内容
- 将Json数据中的属性值提取出来
- 转换为\t分割字段;\n分割行数据的格式
二、处理器说明
1、EvaluateJsonPath
1)、描述
该处理器根据流文件的内容计算一个或多个JsonPath表达式。这些表达式的结果被写入到FlowFile属性,或者写入到FlowFile本身的内容中,这取决于处理器的配置。通过添加用户自定义的属性来输入jsonpath,添加的属性的名称映射到输出流中的属性名称(如果目标是flowfile-attribute;否则,属性名将被忽略)。
属性的值必须是有效的JsonPath表达式。
“auto-detect”的返回类型将根据配置的目标进行确定。
当“Destination”被设置为“flowfile-attribute”时,将使用“scalar”的返回类型。
当“Destination”被设置为“flowfile-content”时,将使用“JSON”返回类型。
如果JsonPath计算为JSON数组或JSON对象,并且返回类型设置为“scalar”,则流文件将不进行修改,并将路由到失败。
如果所提供的JsonPath计算为指定的值,JSON的返回类型可以返回“scalar”。
如果目标是“flowfile-content”,并且JsonPath没有计算到一个已定义的路径,那么流文件将被路由到“unmatched”,无需修改其内容。
如果目标是“flowfile-attribute”,而表达式不匹配任何内容,那么将使用空字符串创建属性作为值,并且FlowFile将始终被路由到“matched”。
2)、属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
3)、动态属性
该处理器允许用户指定属性的名称和值。
4)、应用场景
通常当需要从流文件json中提取某些数据作为流属性时,使用此处理器;或者从流文件json内容中提取一部分内容作为下一个流文件内容,使用此处理器。
2、ReplaceText
1)、描述
使用其他值替换匹配正则表达式的流文件部分内容,从而更新流文件的内容。
2)、属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言
3)、应用场景
使用正则表达式,来逐行或者全文本替换文件流内容,往往用于业务逻辑处理。
三、操作
1、创建EvaluateJsonPath并配置
flowfile-attribute即为将变量放置在属性中;
扩展属性就是我们读取到的Json属性。
同时处理把Invalid警告处理掉
2、验证EvaluateJsonPath
虽然我们已经获取到了Json中的具体字段值,但是可以看到,FlowFile的内容还是Json。
3、创建ReplaceText并配置
4、验证ReplaceText
四、验证
本处验证仅仅是列出一些必要的步骤和最终结果。
至此,完成了该示例。