searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

【flink】算子ID生成

2023-04-13 09:13:40
256
0

一般来说在使用Streaming Api编程时都建议给算子自定义uid,特别有些转换涉及到状态,因为算子ID是算子和状态之间的纽带,一直都认为指定的uid就是最终的算子ID。但是在基于flink sql层次编程时,很多时候并不清楚整个job最由多少个算子组成,也不知道每个算子的ID是怎么生成的,以及如果进行个修改会不会不能从状态中恢复。

1. 唯一标识

在用streaming api编写flink程序时,就是在DataStream上直接生成转换(org.apache.flink.api.dag.Transformation)的过程,Transformation有两个重要的属性:

  • id,转换的唯一标识,通过程序静态计数器从1开始递增,存在并发问题,相同的程序可能得到不同的结果。
  • uid,用户为该转换自定义的id值,通过uid()进行设置,是一个固定的、可读性强的唯一标记。如下示例,map1/2调换顺序,不影响逻辑,id改变了,但uid依然不变。

一系列的Transformation会被遍历封装成org.apache.flink.streaming.api.graph.StreamNode对象,生成StreamGraph,表示整个流程序的拓扑结构,StreamNode维护着前后关联信息,也保留了原Transformation标识

  • id,原转换id
  • transformationUID,原转换uid
  • userHash,用于自定义生成operator id的散列值,用户可使用setUidHash()方法自定义

StreamGraph会进一步经过优化后变成JobGraph,JobVertex是JobGraph的核心对象 ,优化过程主要是设置了算子链,使得多个步骤可以在同一个task中运行,所以多个StreamNode逻辑上对应一个JobVertex。在StreamGraph->JobGraph这个过程中生成的org.apache.flink.runtime.jobgraph.OperatorID才是我们平时据说的算子ID,状态的保存和恢复都以它为依据,需要了解算子ID生成的逻辑,这样在修改作业代码时也能更好兼容旧程序。

2. 算子ID生成

2.1 遍历所有节点

断点调试看堆栈可以看到,构建OperatorID时的字节数组是从StreamingJobGraphGenerator.OperatorChainInfo#hashes来的,在StreamGraphHasherV2#traverseStreamGraphAndGenerateHashes中初始化

    public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
        final HashFunction hashFunction = Hashing.murmur3_128(0);
        final Map<Integer, byte[]> hashes = new HashMap<>();

        Set<Integer> visited = new HashSet<>();  //已遍历的节点
        Queue<StreamNode> remaining = new ArrayDeque<>();  //已遍历的节点进入队列,按队列顺序给节点生成哈希值

        List<Integer> sources = new ArrayList<>();
        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {  // 获取整个graph的source节点并进行排序,因为原来是一个Set,所以多次提交相同的程序可能会得到不一样的顺序
            sources.add(sourceNodeId);
        }
        Collections.sort(sources);
        for (Integer sourceNodeId : sources) {  //以广度优先方式遍历整个图,以source为起始
            remaining.add(streamGraph.getStreamNode(sourceNodeId));
            visited.add(sourceNodeId);
        }
        StreamNode currentNode;
        while ((currentNode = remaining.poll()) != null) {  //从队列取出当前节点,并为该节点生成hash值
            if (generateNodeHash(currentNode, hashFunction,hashes,streamGraph.isChainingEnabled(),streamGraph)) {
                for (StreamEdge outEdge : currentNode.getOutEdges()) { //如果当前节点生成了hash值,马上把子节点入列,所以接下来就是为子节点生成hash
                    StreamNode child = streamGraph.getTargetVertex(outEdge);

                    if (!visited.contains(child.getId())) {
                        remaining.add(child);
                        visited.add(child.getId());
                    }
                }
            } else {
                visited.remove(currentNode.getId());  //当前节点生成hash不成功,移除,后续将继续生成,由于这是图,每个节点不止一条路径可达,可能其它路径访问到它时才满足条件
            }
        }
        return hashes;
    }

整个方法就是一个图的遍历过程,从source节点开始的广度优先,这里有一点需要关注,节点在某一条路径被访问到时,本次可能并不满足生成hash的条件,那么将会再次被遍历。

  • 只有一个source的情况

  • 有多个source节点时,即多个独立流,会先把一个流完整生成完成,再到下一个流

2.2 为每个节点生成哈希值

如果在程序中有手动设置uid,那么就从指定的uid生成hash,所以始终都能得到固定的算子ID,否则通过方法generateDeterministicHash根据节点的本地属性、输入、输出情况生成确定的hash

private boolean generateNodeHash(StreamNode node,HashFunction hashFunction,Map<Integer, byte[]> hashes,boolean isChainingEnabled,StreamGraph streamGraph) {

    String userSpecifiedHash = node.getTransformationUID();
    if (userSpecifiedHash == null) {
        for (StreamEdge inEdge : node.getInEdges()) {
            if (!hashes.containsKey(inEdge.getSourceId())) {  //如果当前节点的前置节点还没生成hash,则当前节点生成失败
                return false;
            }
        }
        Hasher hasher = hashFunction.newHasher();
        byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);
        return true;
    } else {  //指定uid的算子
        Hasher hasher = hashFunction.newHasher();
        byte[] hash = generateUserSpecifiedHash(node, hasher);
        return true;
    }
}
private byte[] generateDeterministicHash(StreamNode node,Hasher hasher,Map<Integer, byte[]> hashes,boolean isChainingEnabled,StreamGraph streamGraph) {
    generateNodeLocalHash(hasher, hashes.size());   //hases.size()表示当前节点的位置
    for (StreamEdge outEdge : node.getOutEdges()) {
        if (isChainable(outEdge, isChainingEnabled, streamGraph)) {
            generateNodeLocalHash(hasher, hashes.size());
        }
    }
    byte[] hash = hasher.hash().asBytes();
    for (StreamEdge inEdge : node.getInEdges()) {
        byte[] otherHash = hashes.get(inEdge.getSourceId());
        for (int j = 0; j < hash.length; j++) {
            hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
        }
    }
    return hash;
}

当算子ID默认生成时,有3个影响因素:

  1. StreamNode所在StreamGraph中的位置,因为每次都是按广度优化遍历,所以如果整个图结构不变,那么节点的位置是固定的。这里不用StreamNode#id,是因为上面提过,它可能会变化
  2. 输出节点数量
  3. 所有输入节点的hash值,如果输入节点多了少了,或者输入节点有变

2.2 两个算子ID

在上面StreamGraphHasherV2中无论是否指定uid,最终生成的hash都是基于哈希算法,如果flink版本升级导致算法有所改变,那么hash值肯定会改变;所以还有另一种直接干预hash值的方式,即在编程时setUidHash()直接设置散列值,设置的值在StreamGraphUserHashHasher中被处理,两种方式都可以生成OperatorID分别叫generatedOperatorID、userDefinedOperatorID,被封装为OperatorIDPair对象,当恢复状态时可以看到两个id都取来做映射Checkpoints#loadAndValidateCheckpoint

for (ExecutionJobVertex task : tasks.values()) {
    for (OperatorIDPair operatorIDPair : task.getOperatorIDs()) {
        operatorToJobVertexMapping.put(operatorIDPair.getGeneratedOperatorID(), task);
        operatorIDPair
                .getUserDefinedOperatorID()
                .ifPresent(id -> operatorToJobVertexMapping.put(id, task));
    }
}
0条评论
0 / 1000