searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Flink-Iceberg Upsert流程

2024-07-18 09:48:13
112
0
 

Flink输入Iceberg的数据格式为RowData,如 row("+I", 1, "aaa"),其涉及的RowKind 所有类型如下:

● Insert: +I

● Update before -U : delete

● Update after +U :Insert

● Delete -D

 

其中,Flink 将数据写入Iceberg的操作类型分为:mor和cow

● mor(Merge on Read)

○ 当进行写操作时,更新和删除不直接修改历史数据,而是单独记录数据变更,即要删除的数据被写入到delete file中,写入的数据,保存到data File中,同时更新元数据文件以包含对新数据文件的引用。

○ 当读取数据时,Iceberg 会将Delete file的记录加载到内存中,与Data file中的记录比较,获取最终的准确结果

■ 写入操作不需要修改现有的数据文件,因此可以支持高并发写入

● cow(Copy On Write)

○ 每次写操作都会生成新的数据文件,用来替换旧的数据文件。

此外,Flink upsert 数据到Iceberg,upsert操作包括delete和insert两种操作,会产生的delete文件分为 position-delete和eq-delete,其对应的delete file内容如下:

● position-delete:[dataFilePath,pos]

● eq-delete:[eq-field: value]

在Iceberg中,数据写入逻辑的入口在FlinkSink.createStreamWriter(),在其中构建TaskWriterFactory和IcebergStreamWriter对象:

● TaskWriterFactory:创建TaskWriter实例,TaskWriter 是负责将数据写入Iceberg表对应文件的类,它会根据数据文件的大小和其他参数来分割数据,并生成数据文件。TaskWriterFactory 的实现会依赖于数据文件的格式(如 Parquet、ORC、Avro 等)和表的配置

● IcebergStreamWriter:接收Flink RowData数据;为每个subTask初始化一个taskWriterFactory;然后使用其创建TaskWriter,将数据分发到不同的 TaskWriter 实例中进行写入

class FlinkSink
 输入为RowData,输出为WriteResult
 
  static IcebergStreamWriter<RowData> createStreamWriter(
      SerializableSupplier<Table> tableSupplier,
      FlinkWriteConf flinkWriteConf,
      RowType flinkRowType,
      List<Integer> equalityFieldIds) {
    Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null");

    Table initTable = tableSupplier.get();
    FileFormat format = flinkWriteConf.dataFileFormat();
    //构建TaskWriterFactory,根据不同的操作类型(Insert/upsert/partition),实例化对应的Writer
    TaskWriterFactory<RowData> taskWriterFactory =
        new RowDataTaskWriterFactory(
            tableSupplier,
            flinkRowType,
            flinkWriteConf.targetDataFileSize(),
            format,
            writeProperties(initTable, format, flinkWriteConf),
            equalityFieldIds,
            flinkWriteConf.upsertMode());
    //执行Writer的实际写入,构建dataFile和deleteFile
    return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
  }
  
  
  class IcebergStreamWriter

  @Override
  public void open() {
    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
    this.attemptId = getRuntimeContext().getAttemptNumber();
    // Initialize the task writer factory.
    this.taskWriterFactory.initialize(subTaskId, attemptId);

    // Initialize the task writer.
    this.writer = taskWriterFactory.create();
  }

  @Override
  public void processElement(StreamRecord<T> element) throws Exception {
  // TaskWriter执行数据写入操作
    writer.write(element.getValue());

 

 

upsert

首先,该模式在构建FlinkAppenderFactory实例时,使用的eq-delete schema为从table schema中根据eq-field过滤出的eq-delete schema

 this.appenderFactory =
          new FlinkAppenderFactory(
              table,
              schema,
              flinkSchema,
              writeProperties,
              spec,
              ArrayUtil.toIntArray(equalityFieldIds),
              TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), //删除涉及的eq-field的schema
              null); 

 

然后,在执行TaskWriter.write()时,upsert操作先执行writer.deleteKey(),然后继续执行append操作的行writer.write(),如下所示:

 

class BaseDeltaTaskWriter
public void write(RowData row) throws IOException {
    //每个分区表,一个Writer
    RowDataDeltaWriter writer = route(row);

    switch (row.getRowKind()) {
      case INSERT:
      case UPDATE_AFTER:
        if (upsert) {
          writer.deleteKey(keyProjection.wrap(row));
        }
        writer.write(row);
        break;



 
 

假设输入的RowData为 (1,aaa),schema 为[id,data],eq-field 为[data],

则writer.deleteKey()的输入为(aaa),

writer.write()的输入为(1,aaa)

● writer.deleteKey()

该方法,将更新前的表中的old数据,写入position-delete或者eq-delete的data file

执行逻辑如下:

首先判断 是否执行position-delete,如果没有,就执行eq-delete的写入操作

 

 class BaseTaskWriter
 //keyFunc.apply("aaa")
 public void deleteKey(T key) throws IOException {
      //不是position-delete执行eqdelete write
      if (!internalPosDelete(asStructLikeKey(key))) {
      //将数据按照eq-field schema过滤直接写入eq-delete file
        eqDeleteWriter.write(key);
      }
    }
 

 

先讨论eq-delete的写入操作,将数据通过(parquet/orc)的writer写入dataFile中,在per ck & task 或者数据积攒批量,打开一个新的dataFile.

 
 

 public void write(T record) throws IOException {
      write(currentWriter, record);
      this.currentRows++;
      //判断数据量是否达到预设值
      if (shouldRollToNewFile()) {
       //每个subTask的per ck 也会执行
        closeCurrent();
        openCurrent();
      }
    }
 

 

然后,分析position-delete操作逻辑

internalPosDelete()中,首先判断,该key是否在insert操作时,已经写入内存Map中,如果已经写入,将key关联的dataFile和pos 写入position-delete

其中Map的信息会在每个ck&subTask被清除

 

 class BaseTaskWriter
Map<StructLike, PathOffset> insertedRowMap
private boolean internalPosDelete(StructLike key) {
      //对该字段的rowData多次insert,会将字段值(key,[dataFile,pos])保存在内存map中,这里获取(重复数据)
      PathOffset previous = insertedRowMap.remove(key);

      if (previous != null) {
        // 删除保存内存中的历史文件信息
        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
        return true;
      }

      return false;
    } 
 

 

接下来分析posDeleteWriter.delete()的具体逻辑

获取每个dataFilePath上的posRow,判断将posRow追加到path中还是新增posRow

当数据积攒一定量,将数据写入dataFile

 

  class SortedPosDeleteWriter
  public void delete(CharSequence path, long pos, T row) {
   //获取dataFile关联的系列pos-delete数据(pos,rowData)
    List<PosRow<T>> posRows = posDeletes.get(wrapper.set(path));
    /补充数据
    if (posRows != null) {
      posRows.add(PosRow.of(pos, row));
    } else {
    //新增数据
      posDeletes.put(CharSequenceWrapper.wrap(path), Lists.newArrayList(PosRow.of(pos, row)));
    }

    records += 1;

    //积攒一定数量,再刷新执行position-delete写入dataFile

    if (records >= recordsNumThreshold) {
      flushDeletes();
    }
  } 
 

 

positionDelete的刷新操作,执行数据的真正写入(数据积攒到一定程度,才会写入文件中) 写入文件的内容:[path,posOfRow,row(null)]

其中将数据按照path排序,

 

private void flushDeletes() {
    if (posDeletes.isEmpty()) {
      return;
    }

    // Create a new output file(dataFle路径)
    EncryptedOutputFile outputFile;
    if (partition == null) {
      outputFile = fileFactory.newOutputFile();
    } else {
      outputFile = fileFactory.newOutputFile(partition);
    }
    //构建PositionDeleteWriter
    PositionDeleteWriter<T> writer =
        appenderFactory.newPosDeleteWriter(outputFile, format, partition);
    PositionDelete<T> posDelete = PositionDelete.create();
    try (PositionDeleteWriter<T> closeableWriter = writer) {
      // 排序 all the paths.
      List<CharSequence> paths = Lists.newArrayListWithCapacity(posDeletes.keySet().size());
      for (CharSequenceWrapper charSequenceWrapper : posDeletes.keySet()) {
        paths.add(charSequenceWrapper.get());
      }
      paths.sort(Comparators.charSequences());

      // Write all the sorted <path, pos, row> triples.
      for (CharSequence path : paths) {
        List<PosRow<T>> positions = posDeletes.get(wrapper.set(path));
        positions.sort(Comparator.comparingLong(PosRow::pos));

        positions.forEach(
            posRow -> closeableWriter.write(posDelete.set(path, posRow.pos(), posRow.row())));
      }
    } catch (IOException e) {
      setFailure(e);
      throw new UncheckedIOException(
          "Failed to write the sorted path/pos pairs to pos-delete file: "
              + outputFile.encryptingOutputFile().location(),
          e);
    }

    // Clear the buffered pos-deletions.
    posDeletes.clear();
    records = 0;

    // Add the referenced data files.
    referencedDataFiles.addAll(writer.referencedDataFiles());

    // Add the completed delete files.
    completedFiles.add(writer.toDeleteFile());
  }
 

 

 

● writer.write()

该方法,将数据写入到文件中,如果存在重复数据的写入,会将旧的数据,转为position-delete

该方法的逻辑如下:

● 首先获取该dataWriter目前写入的最新dataFile和rowNum,

● 然后获取eq-delete field对应的row中的数据,

● 之后将eq-delete对应field的值和dataFile path,pos暂存到内存map中,如果该eq-delete的value已经存在,执行position-delete操作:将eq-delete value所属的[dataFile,pos] 写入position-delete

最后调用parquet/avro Writer执行将row写入dataFile中

 
 

 class BaseTaskWriter
public void write(T row) throws IOException {
      //获取最新的Writer写入的最新文件和rowNum
      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());

      // 获取deleteSchema对应的row中的数据(1,aaa)->(aaa)
      StructLike copiedKey = StructCopy.copy(structProjection.wrap(asStructLike(row)));

      // 将eq-delete对应field的字段和path保存
      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
      if (previous != null) {
        // TODO attach the previous row if has a positional-delete row schema in appender factory.
        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
      }
      //调用parquet/avro Writer执行写入
      dataWriter.write(row);
    }
 

 

 

该方法最终生成的dataFile的名称:partitionId-taskId-opeartionId-fileSequence

deleteSchema中的field作为pk,当write的rowData存在多个相同pk,将一个ck&per task中间重复的数据转为position-delete

0条评论
0 / 1000
张****领
4文章数
0粉丝数
张****领
4 文章 | 0 粉丝
张****领
4文章数
0粉丝数
张****领
4 文章 | 0 粉丝
原创

Flink-Iceberg Upsert流程

2024-07-18 09:48:13
112
0
 

Flink输入Iceberg的数据格式为RowData,如 row("+I", 1, "aaa"),其涉及的RowKind 所有类型如下:

● Insert: +I

● Update before -U : delete

● Update after +U :Insert

● Delete -D

 

其中,Flink 将数据写入Iceberg的操作类型分为:mor和cow

● mor(Merge on Read)

○ 当进行写操作时,更新和删除不直接修改历史数据,而是单独记录数据变更,即要删除的数据被写入到delete file中,写入的数据,保存到data File中,同时更新元数据文件以包含对新数据文件的引用。

○ 当读取数据时,Iceberg 会将Delete file的记录加载到内存中,与Data file中的记录比较,获取最终的准确结果

■ 写入操作不需要修改现有的数据文件,因此可以支持高并发写入

● cow(Copy On Write)

○ 每次写操作都会生成新的数据文件,用来替换旧的数据文件。

此外,Flink upsert 数据到Iceberg,upsert操作包括delete和insert两种操作,会产生的delete文件分为 position-delete和eq-delete,其对应的delete file内容如下:

● position-delete:[dataFilePath,pos]

● eq-delete:[eq-field: value]

在Iceberg中,数据写入逻辑的入口在FlinkSink.createStreamWriter(),在其中构建TaskWriterFactory和IcebergStreamWriter对象:

● TaskWriterFactory:创建TaskWriter实例,TaskWriter 是负责将数据写入Iceberg表对应文件的类,它会根据数据文件的大小和其他参数来分割数据,并生成数据文件。TaskWriterFactory 的实现会依赖于数据文件的格式(如 Parquet、ORC、Avro 等)和表的配置

● IcebergStreamWriter:接收Flink RowData数据;为每个subTask初始化一个taskWriterFactory;然后使用其创建TaskWriter,将数据分发到不同的 TaskWriter 实例中进行写入

class FlinkSink
 输入为RowData,输出为WriteResult
 
  static IcebergStreamWriter<RowData> createStreamWriter(
      SerializableSupplier<Table> tableSupplier,
      FlinkWriteConf flinkWriteConf,
      RowType flinkRowType,
      List<Integer> equalityFieldIds) {
    Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null");

    Table initTable = tableSupplier.get();
    FileFormat format = flinkWriteConf.dataFileFormat();
    //构建TaskWriterFactory,根据不同的操作类型(Insert/upsert/partition),实例化对应的Writer
    TaskWriterFactory<RowData> taskWriterFactory =
        new RowDataTaskWriterFactory(
            tableSupplier,
            flinkRowType,
            flinkWriteConf.targetDataFileSize(),
            format,
            writeProperties(initTable, format, flinkWriteConf),
            equalityFieldIds,
            flinkWriteConf.upsertMode());
    //执行Writer的实际写入,构建dataFile和deleteFile
    return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
  }
  
  
  class IcebergStreamWriter

  @Override
  public void open() {
    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
    this.attemptId = getRuntimeContext().getAttemptNumber();
    // Initialize the task writer factory.
    this.taskWriterFactory.initialize(subTaskId, attemptId);

    // Initialize the task writer.
    this.writer = taskWriterFactory.create();
  }

  @Override
  public void processElement(StreamRecord<T> element) throws Exception {
  // TaskWriter执行数据写入操作
    writer.write(element.getValue());

 

 

upsert

首先,该模式在构建FlinkAppenderFactory实例时,使用的eq-delete schema为从table schema中根据eq-field过滤出的eq-delete schema

 this.appenderFactory =
          new FlinkAppenderFactory(
              table,
              schema,
              flinkSchema,
              writeProperties,
              spec,
              ArrayUtil.toIntArray(equalityFieldIds),
              TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), //删除涉及的eq-field的schema
              null); 

 

然后,在执行TaskWriter.write()时,upsert操作先执行writer.deleteKey(),然后继续执行append操作的行writer.write(),如下所示:

 

class BaseDeltaTaskWriter
public void write(RowData row) throws IOException {
    //每个分区表,一个Writer
    RowDataDeltaWriter writer = route(row);

    switch (row.getRowKind()) {
      case INSERT:
      case UPDATE_AFTER:
        if (upsert) {
          writer.deleteKey(keyProjection.wrap(row));
        }
        writer.write(row);
        break;



 
 

假设输入的RowData为 (1,aaa),schema 为[id,data],eq-field 为[data],

则writer.deleteKey()的输入为(aaa),

writer.write()的输入为(1,aaa)

● writer.deleteKey()

该方法,将更新前的表中的old数据,写入position-delete或者eq-delete的data file

执行逻辑如下:

首先判断 是否执行position-delete,如果没有,就执行eq-delete的写入操作

 

 class BaseTaskWriter
 //keyFunc.apply("aaa")
 public void deleteKey(T key) throws IOException {
      //不是position-delete执行eqdelete write
      if (!internalPosDelete(asStructLikeKey(key))) {
      //将数据按照eq-field schema过滤直接写入eq-delete file
        eqDeleteWriter.write(key);
      }
    }
 

 

先讨论eq-delete的写入操作,将数据通过(parquet/orc)的writer写入dataFile中,在per ck & task 或者数据积攒批量,打开一个新的dataFile.

 
 

 public void write(T record) throws IOException {
      write(currentWriter, record);
      this.currentRows++;
      //判断数据量是否达到预设值
      if (shouldRollToNewFile()) {
       //每个subTask的per ck 也会执行
        closeCurrent();
        openCurrent();
      }
    }
 

 

然后,分析position-delete操作逻辑

internalPosDelete()中,首先判断,该key是否在insert操作时,已经写入内存Map中,如果已经写入,将key关联的dataFile和pos 写入position-delete

其中Map的信息会在每个ck&subTask被清除

 

 class BaseTaskWriter
Map<StructLike, PathOffset> insertedRowMap
private boolean internalPosDelete(StructLike key) {
      //对该字段的rowData多次insert,会将字段值(key,[dataFile,pos])保存在内存map中,这里获取(重复数据)
      PathOffset previous = insertedRowMap.remove(key);

      if (previous != null) {
        // 删除保存内存中的历史文件信息
        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
        return true;
      }

      return false;
    } 
 

 

接下来分析posDeleteWriter.delete()的具体逻辑

获取每个dataFilePath上的posRow,判断将posRow追加到path中还是新增posRow

当数据积攒一定量,将数据写入dataFile

 

  class SortedPosDeleteWriter
  public void delete(CharSequence path, long pos, T row) {
   //获取dataFile关联的系列pos-delete数据(pos,rowData)
    List<PosRow<T>> posRows = posDeletes.get(wrapper.set(path));
    /补充数据
    if (posRows != null) {
      posRows.add(PosRow.of(pos, row));
    } else {
    //新增数据
      posDeletes.put(CharSequenceWrapper.wrap(path), Lists.newArrayList(PosRow.of(pos, row)));
    }

    records += 1;

    //积攒一定数量,再刷新执行position-delete写入dataFile

    if (records >= recordsNumThreshold) {
      flushDeletes();
    }
  } 
 

 

positionDelete的刷新操作,执行数据的真正写入(数据积攒到一定程度,才会写入文件中) 写入文件的内容:[path,posOfRow,row(null)]

其中将数据按照path排序,

 

private void flushDeletes() {
    if (posDeletes.isEmpty()) {
      return;
    }

    // Create a new output file(dataFle路径)
    EncryptedOutputFile outputFile;
    if (partition == null) {
      outputFile = fileFactory.newOutputFile();
    } else {
      outputFile = fileFactory.newOutputFile(partition);
    }
    //构建PositionDeleteWriter
    PositionDeleteWriter<T> writer =
        appenderFactory.newPosDeleteWriter(outputFile, format, partition);
    PositionDelete<T> posDelete = PositionDelete.create();
    try (PositionDeleteWriter<T> closeableWriter = writer) {
      // 排序 all the paths.
      List<CharSequence> paths = Lists.newArrayListWithCapacity(posDeletes.keySet().size());
      for (CharSequenceWrapper charSequenceWrapper : posDeletes.keySet()) {
        paths.add(charSequenceWrapper.get());
      }
      paths.sort(Comparators.charSequences());

      // Write all the sorted <path, pos, row> triples.
      for (CharSequence path : paths) {
        List<PosRow<T>> positions = posDeletes.get(wrapper.set(path));
        positions.sort(Comparator.comparingLong(PosRow::pos));

        positions.forEach(
            posRow -> closeableWriter.write(posDelete.set(path, posRow.pos(), posRow.row())));
      }
    } catch (IOException e) {
      setFailure(e);
      throw new UncheckedIOException(
          "Failed to write the sorted path/pos pairs to pos-delete file: "
              + outputFile.encryptingOutputFile().location(),
          e);
    }

    // Clear the buffered pos-deletions.
    posDeletes.clear();
    records = 0;

    // Add the referenced data files.
    referencedDataFiles.addAll(writer.referencedDataFiles());

    // Add the completed delete files.
    completedFiles.add(writer.toDeleteFile());
  }
 

 

 

● writer.write()

该方法,将数据写入到文件中,如果存在重复数据的写入,会将旧的数据,转为position-delete

该方法的逻辑如下:

● 首先获取该dataWriter目前写入的最新dataFile和rowNum,

● 然后获取eq-delete field对应的row中的数据,

● 之后将eq-delete对应field的值和dataFile path,pos暂存到内存map中,如果该eq-delete的value已经存在,执行position-delete操作:将eq-delete value所属的[dataFile,pos] 写入position-delete

最后调用parquet/avro Writer执行将row写入dataFile中

 
 

 class BaseTaskWriter
public void write(T row) throws IOException {
      //获取最新的Writer写入的最新文件和rowNum
      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());

      // 获取deleteSchema对应的row中的数据(1,aaa)->(aaa)
      StructLike copiedKey = StructCopy.copy(structProjection.wrap(asStructLike(row)));

      // 将eq-delete对应field的字段和path保存
      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
      if (previous != null) {
        // TODO attach the previous row if has a positional-delete row schema in appender factory.
        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
      }
      //调用parquet/avro Writer执行写入
      dataWriter.write(row);
    }
 

 

 

该方法最终生成的dataFile的名称:partitionId-taskId-opeartionId-fileSequence

deleteSchema中的field作为pk,当write的rowData存在多个相同pk,将一个ck&per task中间重复的数据转为position-delete

文章来自个人专栏
Flink的学习
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0