阅读要求
在阅读文章前必须对NiFi的概念及基础使用有所了解,NiFi官网地址:https://nifi.apache.org/docs.html
版本信息
- 1.19.1
源码跟踪
从获取state入手,入口都是ProcessSession#getState接口,调用时必须要指定范围Scope是单机还是集群(可选值为Scope.LOCAL和Scope.CLUSTER),其具体实现方法为StandardProcessSession#getState
@Override
public StateMap getState(final Scope scope) throws IOException {
if (scope == Scope.LOCAL) {
if (localState != null) {
return localState;
}
if (checkpoint != null && checkpoint.localState != null) {
return checkpoint.localState;
}
// localState 未被初始化,则从stateManager获取
return context.getStateManager().getState(scope);
}
if (clusterState != null) {
return clusterState;
}
if (checkpoint != null && checkpoint.clusterState != null) {
return checkpoint.clusterState;
}
// clusterState未被初始化,则从stateManager获取
return context.getStateManager().getState(scope);
}
由状态管理提供者实现状态管理,见方法StandardStateManagerProvider#getStateManager
/**
* 根据组件id获取它的状态管理器
*/
@Override
public synchronized StateManager getStateManager(final String componentId) {
//先从缓存取
StateManager stateManager = stateManagers.get(componentId);
if (stateManager != null) {
return stateManager;
}
//缓存中没有则创建状态管理器,并存入缓存
stateManager = new StandardStateManager(localStateProvider, clusterStateProvider, componentId);
stateManagers.put(componentId, stateManager);
return stateManager;
}
状态管理提供者StandardStateManagerProvider是怎么产生的呢?那是使用NiFi的配置创建的,见方法StandardStateManagerProvider#create
public static synchronized StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager,
final ParameterLookup parameterLookup) throws ConfigParseException, IOException {
nifiProperties = properties;
if (provider != null) {
return provider;
}
//默认创建local state provider
final StateProvider localProvider = createLocalStateProvider(properties,variableRegistry, extensionManager, parameterLookup);
final StateProvider clusterProvider;
//判断配置 nifi.properties文件中 nifi.cluster.is.node 的值,true 则创建cluster 的state provider
if (properties.isNode()) {
clusterProvider = createClusteredStateProvider(properties,variableRegistry, extensionManager, parameterLookup);
} else {
clusterProvider = null;
}
provider = new StandardStateManagerProvider(localProvider, clusterProvider);
return provider;
}
localProvider和clusterProvider又是从哪里来的呢?我们不妨看看启动配置文件nifi.properties,里面有state management的配置项
####################
# State Management #
####################
nifi.state.management.configuration.file=./conf/state-management.xml
# The ID of the local state provider
nifi.state.management.provider.local=local-provider
# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
nifi.state.management.provider.cluster=zk-provider
# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
nifi.state.management.embedded.zookeeper.start=true
# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
所有的stateProvider都定义在state-management.xml里,默认定义了一个local-provider和一个cluster-provider(即zk-provider),当然,还支持Redis作为状态管理器
<stateManagement>
<local-provider>
<id>local-provider</id>
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
<property name="Directory">./state/local</property>
<property name="Always Sync">false</property>
<property name="Partitions">16</property>
<property name="Checkpoint Interval">2 mins</property>
</local-provider>
<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect String"></property>
<property name="Root Node">/nifi</property>
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>
<!--
<cluster-provider>
<id>redis-provider</id>
<class>org.apache.nifi.redis.state.RedisStateProvider</class>
<property name="Redis Mode">Standalone</property>
<property name="Connection String">localhost:6379</property>
</cluster-provider>
-->
</stateManagement>
状态持久化
单机时state信息存储在nifi根目录(我的根目录\work file\nifi-jdk8\nifi-standalone)下的state/local目录,可直接删除local文件夹以清楚state
集群状态下state存储在state/zookeeper目录下,可直接删除version-2文件夹以清楚所有state,注意不能删除myid文件