searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

NiFi状态管理之StateManager

2023-05-26 05:50:06
23
0

阅读要求

在阅读文章前必须对NiFi的概念及基础使用有所了解,NiFi官网地址:https://nifi.apache.org/docs.html

版本信息

  • 1.19.1

源码跟踪

从获取state入手,入口都是ProcessSession#getState接口,调用时必须要指定范围Scope是单机还是集群(可选值为Scope.LOCAL和Scope.CLUSTER),其具体实现方法为StandardProcessSession#getState

    @Override
    public StateMap getState(final Scope scope) throws IOException {
        if (scope == Scope.LOCAL) {
            if (localState != null) {
                return localState;
            }

            if (checkpoint != null && checkpoint.localState != null) {
                return checkpoint.localState;
            }

            // localState 未被初始化,则从stateManager获取
            return context.getStateManager().getState(scope);
        }

        if (clusterState != null) {
            return clusterState;
        }

        if (checkpoint != null && checkpoint.clusterState != null) {
            return checkpoint.clusterState;
        }

        // clusterState未被初始化,则从stateManager获取
        return context.getStateManager().getState(scope);
    }

 

由状态管理提供者实现状态管理,见方法StandardStateManagerProvider#getStateManager

    /**
     * 根据组件id获取它的状态管理器
     */
    @Override
    public synchronized StateManager getStateManager(final String componentId) {
        //先从缓存取
        StateManager stateManager = stateManagers.get(componentId);
        if (stateManager != null) {
            return stateManager;
        }
        //缓存中没有则创建状态管理器,并存入缓存
        stateManager = new StandardStateManager(localStateProvider, clusterStateProvider, componentId);
        stateManagers.put(componentId, stateManager);
        return stateManager;
    }

 

状态管理提供者StandardStateManagerProvider是怎么产生的呢?那是使用NiFi的配置创建的,见方法StandardStateManagerProvider#create

    public static synchronized StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager,
                                                           final ParameterLookup parameterLookup) throws ConfigParseException, IOException {
        nifiProperties = properties;

        if (provider != null) {
            return provider;
        }

        //默认创建local state provider
        final StateProvider localProvider = createLocalStateProvider(properties,variableRegistry, extensionManager, parameterLookup);

        final StateProvider clusterProvider;
        //判断配置 nifi.properties文件中 nifi.cluster.is.node 的值,true 则创建cluster 的state provider
        if (properties.isNode()) {
            clusterProvider = createClusteredStateProvider(properties,variableRegistry, extensionManager, parameterLookup);
        } else {
            clusterProvider = null;
        }

        provider = new StandardStateManagerProvider(localProvider, clusterProvider);
        return provider;
    }

 

localProvider和clusterProvider又是从哪里来的呢?我们不妨看看启动配置文件nifi.properties,里面有state management的配置项

####################
# State Management #
####################
nifi.state.management.configuration.file=./conf/state-management.xml
# The ID of the local state provider
nifi.state.management.provider.local=local-provider
# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
nifi.state.management.provider.cluster=zk-provider
# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
nifi.state.management.embedded.zookeeper.start=true
# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties

 

所有的stateProvider都定义在state-management.xml里,默认定义了一个local-provider和一个cluster-provider(即zk-provider),当然,还支持Redis作为状态管理器

<stateManagement> 
   <local-provider>
        <id>local-provider</id>
        <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
        <property name="Directory">./state/local</property>
        <property name="Always Sync">false</property>
        <property name="Partitions">16</property>
        <property name="Checkpoint Interval">2 mins</property>
    </local-provider>
    <cluster-provider>
        <id>zk-provider</id>
        <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
        <property name="Connect String"></property>
        <property name="Root Node">/nifi</property>
        <property name="Session Timeout">10 seconds</property>
        <property name="Access Control">Open</property>
    </cluster-provider>

    <!--
        <cluster-provider>
            <id>redis-provider</id>
            <class>org.apache.nifi.redis.state.RedisStateProvider</class>
            <property name="Redis Mode">Standalone</property>
            <property name="Connection String">localhost:6379</property>
        </cluster-provider>
    -->
</stateManagement>

 

状态持久化

单机时state信息存储在nifi根目录(我的根目录\work file\nifi-jdk8\nifi-standalone)下的state/local目录,可直接删除local文件夹以清楚state

集群状态下state存储在state/zookeeper目录下,可直接删除version-2文件夹以清楚所有state,注意不能删除myid文件

 

 

 

 

0条评论
0 / 1000
l****n
2文章数
0粉丝数
l****n
2 文章 | 0 粉丝