searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-流式增量查询

2023-02-16 01:23:36
90
0

1. 简介

hudi的两大特点:流式查询和支持upsert/delete,hudi的数据变更是基于timeline的,所以时间点(Instant)就成为了实现增量查询的依据。在与flink集成中,当开启了流式读,其实就是一个持续的增量查询的过程,可以通过配置参数read.start-commit和read.end-commit来指定一个无状态的flink job的初始查询范围。

  • 2. 代码示例

tEnv.executeSql("CREATE xxx WITH (\n" +
        "\t'connector' = 'hudi',\n" +
        "\t'table.type' = 'MERGE_ON_READ',\n" +
        "\t'path' = 'file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi',\n" +
        "\t'read.start-commit' = '20220722103000',\n" +
        "\t'read.task' = '1',\n" +
        "\t'read.streaming.enabled' = 'true',\n" +
        "\t'read.streaming.check-interval' = '30' \n" +
        ")");
Table table = tEnv.from("tb_person_hudi ");
tEnv.toChangelogStream(table).print().setParallelism(1);
env.execute("incremently query");

3. 流程分析

3.1 hudi源入口(HoodieTableSource)

HoodieTableSource实现ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4个接口主要是支持对查询计划的优化。ScanTableSource则提供了读取hudi表的具体实现,核心方法为org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider

if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {  //开启了流式读(read.streaming.enabled)
  StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
      conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
  InputFormat<RowData, ?> inputFormat = getInputFormat(true);
  OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
  SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
      .setParallelism(1)
      .transform("split_reader", typeInfo, factory)
      .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); 
  return new DataStreamSource<>(source);
}

上面代码在流环境中创建了一个SourceFunction(StreamReadMonitoringFunction)和一个自定义的转换(StreamReadOperator)

  • StreamReadMonitoringFunction: 监控hudi表元数据目录(.hoodie)获取需要被读取的文件分片(MergeOnReadInputSplit,一个base parquet文件和一组log文件),然后把分片递给下游的转换算子StreamReadOperator进行文件读取;固定一个线程去监控,名称为split_monitorxxxxx.
  • StreamReadOperator:将按timeline升序收到的MergeOnReadInputSplit一个一个地读取分片数据;算子名称为split_reader->xxxxx,可以通过设置read.tasks进行设置并行度
    在这里插入图片描述

    3.2 定时监控元数据获得增量分片(StreamReadMonitoringFunction)

    StreamReadMonitoringFunction负责定时(read.streaming.check-interval)扫描hudi表的元数据目录.hoodie,如果发现在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],从这些instant信息中可以知道数据变更写到了哪些文件(parquet,log),然后构建成分片对象(MergeOnReadInputSplit)。
  • 核心属性:issuedInstant,这个是增量查询的依据,记录着当前已经消费的数据的最新instant,类似于kafka的offset,但是hudi是基于timeline.该值是有状态的,维护在ListState中,所以flink job重启依然可以做到增量。
  • 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很简单,主要就做了两件事,调用IncrementalInputSplits#inputSplits获取到增量分片(已进行了排序),然后传递给下游的算子(StreamReadOperator)
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
  HoodieTableMetaClient metaClient = getOrCreateMetaClient();
  IncrementalInputSplits.Result result =incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
  for (MergeOnReadInputSplit split : result.getInputSplits()) {
    context.collect(split);
  }
}

3.3 获取增量分片(IncrementalInputSplits)

主要逻辑在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi关于timeline和instant的一些基本概念,详细的流程如下图所示:
在这里插入图片描述

如果flink job首次运行指定了read.start-commit和read.end-commit,但是该范围是比较久以前,instant已经被归档,那么流作业将永远不能消费到数据
https://github.com/apache/hudi/issues/6167

3.4 读取数据文件(StreamReadOperator)

StreamReadOperator算子接收分片后会缓存在队列Queue splits,然后不停从队列中poll分片放到线程池中执行

private void processSplits() throws IOException {
  format.open(split);  
  consumeAsMiniBatch(split);   
  enqueueProcessSplits();
}

主要有三个步骤:

  1. 从队列中peek分片,调用MergeOnReadInputFormat.open构建迭代器,迭代器是用来进行文件的数据读取,一个迭代器对应一个分片(多个物理文件,base+log),对应不同读取的场景,实现了几种迭代器:BaseFileOnlyFilteringIterator,BaseFileOnlyIterator,LogFileOnlyIterator,MergeIterator,SkipMergeIterator
  2. 微批量消费,每批只读2048记录,将把记录传递给下游的算子消费同时标记消费的总数,如果该分片读到了尾,则将该分片从队列中弹出,并关闭MergeOnReadInputFormat
  3. 继续处理队列中的分片,回到步骤1,如果上一次的分片没消费完,那么本次循环将继续消费,只不过是由另一个线程处理。
0条评论
0 / 1000
矛始
28文章数
0粉丝数
矛始
28 文章 | 0 粉丝
原创

hudi系列-流式增量查询

2023-02-16 01:23:36
90
0

1. 简介

hudi的两大特点:流式查询和支持upsert/delete,hudi的数据变更是基于timeline的,所以时间点(Instant)就成为了实现增量查询的依据。在与flink集成中,当开启了流式读,其实就是一个持续的增量查询的过程,可以通过配置参数read.start-commit和read.end-commit来指定一个无状态的flink job的初始查询范围。

  • 2. 代码示例

tEnv.executeSql("CREATE xxx WITH (\n" +
        "\t'connector' = 'hudi',\n" +
        "\t'table.type' = 'MERGE_ON_READ',\n" +
        "\t'path' = 'file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi',\n" +
        "\t'read.start-commit' = '20220722103000',\n" +
        "\t'read.task' = '1',\n" +
        "\t'read.streaming.enabled' = 'true',\n" +
        "\t'read.streaming.check-interval' = '30' \n" +
        ")");
Table table = tEnv.from("tb_person_hudi ");
tEnv.toChangelogStream(table).print().setParallelism(1);
env.execute("incremently query");

3. 流程分析

3.1 hudi源入口(HoodieTableSource)

HoodieTableSource实现ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4个接口主要是支持对查询计划的优化。ScanTableSource则提供了读取hudi表的具体实现,核心方法为org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider

if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {  //开启了流式读(read.streaming.enabled)
  StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
      conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
  InputFormat<RowData, ?> inputFormat = getInputFormat(true);
  OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
  SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
      .setParallelism(1)
      .transform("split_reader", typeInfo, factory)
      .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); 
  return new DataStreamSource<>(source);
}

上面代码在流环境中创建了一个SourceFunction(StreamReadMonitoringFunction)和一个自定义的转换(StreamReadOperator)

  • StreamReadMonitoringFunction: 监控hudi表元数据目录(.hoodie)获取需要被读取的文件分片(MergeOnReadInputSplit,一个base parquet文件和一组log文件),然后把分片递给下游的转换算子StreamReadOperator进行文件读取;固定一个线程去监控,名称为split_monitorxxxxx.
  • StreamReadOperator:将按timeline升序收到的MergeOnReadInputSplit一个一个地读取分片数据;算子名称为split_reader->xxxxx,可以通过设置read.tasks进行设置并行度
    在这里插入图片描述

    3.2 定时监控元数据获得增量分片(StreamReadMonitoringFunction)

    StreamReadMonitoringFunction负责定时(read.streaming.check-interval)扫描hudi表的元数据目录.hoodie,如果发现在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],从这些instant信息中可以知道数据变更写到了哪些文件(parquet,log),然后构建成分片对象(MergeOnReadInputSplit)。
  • 核心属性:issuedInstant,这个是增量查询的依据,记录着当前已经消费的数据的最新instant,类似于kafka的offset,但是hudi是基于timeline.该值是有状态的,维护在ListState中,所以flink job重启依然可以做到增量。
  • 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很简单,主要就做了两件事,调用IncrementalInputSplits#inputSplits获取到增量分片(已进行了排序),然后传递给下游的算子(StreamReadOperator)
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
  HoodieTableMetaClient metaClient = getOrCreateMetaClient();
  IncrementalInputSplits.Result result =incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
  for (MergeOnReadInputSplit split : result.getInputSplits()) {
    context.collect(split);
  }
}

3.3 获取增量分片(IncrementalInputSplits)

主要逻辑在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi关于timeline和instant的一些基本概念,详细的流程如下图所示:
在这里插入图片描述

如果flink job首次运行指定了read.start-commit和read.end-commit,但是该范围是比较久以前,instant已经被归档,那么流作业将永远不能消费到数据
https://github.com/apache/hudi/issues/6167

3.4 读取数据文件(StreamReadOperator)

StreamReadOperator算子接收分片后会缓存在队列Queue splits,然后不停从队列中poll分片放到线程池中执行

private void processSplits() throws IOException {
  format.open(split);  
  consumeAsMiniBatch(split);   
  enqueueProcessSplits();
}

主要有三个步骤:

  1. 从队列中peek分片,调用MergeOnReadInputFormat.open构建迭代器,迭代器是用来进行文件的数据读取,一个迭代器对应一个分片(多个物理文件,base+log),对应不同读取的场景,实现了几种迭代器:BaseFileOnlyFilteringIterator,BaseFileOnlyIterator,LogFileOnlyIterator,MergeIterator,SkipMergeIterator
  2. 微批量消费,每批只读2048记录,将把记录传递给下游的算子消费同时标记消费的总数,如果该分片读到了尾,则将该分片从队列中弹出,并关闭MergeOnReadInputFormat
  3. 继续处理队列中的分片,回到步骤1,如果上一次的分片没消费完,那么本次循环将继续消费,只不过是由另一个线程处理。
文章来自个人专栏
hudi
17 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0