Flink输入Iceberg的数据格式为RowData,如 row("+I", 1, "aaa"),其涉及的RowKind 所有类型如下:
● Insert: +I
● Update before -U : delete
● Update after +U :Insert
● Delete -D
其中,Flink 将数据写入Iceberg的操作类型分为:mor和cow
● mor(Merge on Read)
○ 当进行写操作时,更新和删除不直接修改历史数据,而是单独记录数据变更,即要删除的数据被写入到delete file中,写入的数据,保存到data File中,同时更新元数据文件以包含对新数据文件的引用。
○ 当读取数据时,Iceberg 会将Delete file的记录加载到内存中,与Data file中的记录比较,获取最终的准确结果
■ 写入操作不需要修改现有的数据文件,因此可以支持高并发写入
● cow(Copy On Write)
○ 每次写操作都会生成新的数据文件,用来替换旧的数据文件。
此外,Flink upsert 数据到Iceberg,upsert操作包括delete和insert两种操作,会产生的delete文件分为 position-delete和eq-delete,其对应的delete file内容如下:
● position-delete:[dataFilePath,pos]
● eq-delete:[eq-field: value]
在Iceberg中,数据写入逻辑的入口在FlinkSink.createStreamWriter(),在其中构建TaskWriterFactory和IcebergStreamWriter对象:
● TaskWriterFactory:创建TaskWriter实例,TaskWriter 是负责将数据写入Iceberg表对应文件的类,它会根据数据文件的大小和其他参数来分割数据,并生成数据文件。TaskWriterFactory 的实现会依赖于数据文件的格式(如 Parquet、ORC、Avro 等)和表的配置
● IcebergStreamWriter:接收Flink RowData数据;为每个subTask初始化一个taskWriterFactory;然后使用其创建TaskWriter,将数据分发到不同的 TaskWriter 实例中进行写入
class FlinkSink
输入为RowData,输出为WriteResult
static IcebergStreamWriter<RowData> createStreamWriter(
SerializableSupplier<Table> tableSupplier,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null");
Table initTable = tableSupplier.get();
FileFormat format = flinkWriteConf.dataFileFormat();
//构建TaskWriterFactory,根据不同的操作类型(Insert/upsert/partition),实例化对应的Writer
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
tableSupplier,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
format,
writeProperties(initTable, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
//执行Writer的实际写入,构建dataFile和deleteFile
return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
}
class IcebergStreamWriter
@Override
public void open() {
this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
this.attemptId = getRuntimeContext().getAttemptNumber();
// Initialize the task writer factory.
this.taskWriterFactory.initialize(subTaskId, attemptId);
// Initialize the task writer.
this.writer = taskWriterFactory.create();
}
@Override
public void processElement(StreamRecord<T> element) throws Exception {
// TaskWriter执行数据写入操作
writer.write(element.getValue());
upsert
首先,该模式在构建FlinkAppenderFactory实例时,使用的eq-delete schema为从table schema中根据eq-field过滤出的eq-delete schema
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), //删除涉及的eq-field的schema
null);
然后,在执行TaskWriter.write()时,upsert操作先执行writer.deleteKey(),然后继续执行append操作的行writer.write(),如下所示:
class BaseDeltaTaskWriter
public void write(RowData row) throws IOException {
//每个分区表,一个Writer
RowDataDeltaWriter writer = route(row);
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
假设输入的RowData为 (1,aaa),schema 为[id,data],eq-field 为[data],
则writer.deleteKey()的输入为(aaa),
writer.write()的输入为(1,aaa)
● writer.deleteKey()
该方法,将更新前的表中的old数据,写入position-delete或者eq-delete的data file
执行逻辑如下:
首先判断 是否执行position-delete,如果没有,就执行eq-delete的写入操作
class BaseTaskWriter
//keyFunc.apply("aaa")
public void deleteKey(T key) throws IOException {
//不是position-delete执行eqdelete write
if (!internalPosDelete(asStructLikeKey(key))) {
//将数据按照eq-field schema过滤直接写入eq-delete file
eqDeleteWriter.write(key);
}
}
先讨论eq-delete的写入操作,将数据通过(parquet/orc)的writer写入dataFile中,在per ck & task 或者数据积攒批量,打开一个新的dataFile.
public void write(T record) throws IOException {
write(currentWriter, record);
this.currentRows++;
//判断数据量是否达到预设值
if (shouldRollToNewFile()) {
//每个subTask的per ck 也会执行
closeCurrent();
openCurrent();
}
}
然后,分析position-delete操作逻辑
在internalPosDelete()中,首先判断,该key是否在insert操作时,已经写入内存Map中,如果已经写入,将key关联的dataFile和pos 写入position-delete
其中Map的信息会在每个ck&subTask被清除
class BaseTaskWriter
Map<StructLike, PathOffset> insertedRowMap
private boolean internalPosDelete(StructLike key) {
//对该字段的rowData多次insert,会将字段值(key,[dataFile,pos])保存在内存map中,这里获取(重复数据)
PathOffset previous = insertedRowMap.remove(key);
if (previous != null) {
// 删除保存内存中的历史文件信息
posDeleteWriter.delete(previous.path, previous.rowOffset, null);
return true;
}
return false;
}
接下来分析posDeleteWriter.delete()的具体逻辑
获取每个dataFilePath上的posRow,判断将posRow追加到path中还是新增posRow
当数据积攒一定量,将数据写入dataFile
class SortedPosDeleteWriter
public void delete(CharSequence path, long pos, T row) {
//获取dataFile关联的系列pos-delete数据(pos,rowData)
List<PosRow<T>> posRows = posDeletes.get(wrapper.set(path));
/补充数据
if (posRows != null) {
posRows.add(PosRow.of(pos, row));
} else {
//新增数据
posDeletes.put(CharSequenceWrapper.wrap(path), Lists.newArrayList(PosRow.of(pos, row)));
}
records += 1;
//积攒一定数量,再刷新执行position-delete写入dataFile
if (records >= recordsNumThreshold) {
flushDeletes();
}
}
positionDelete的刷新操作,执行数据的真正写入(数据积攒到一定程度,才会写入文件中) 写入文件的内容:[path,posOfRow,row(null)]
其中将数据按照path排序,
private void flushDeletes() {
if (posDeletes.isEmpty()) {
return;
}
// Create a new output file(dataFle路径)
EncryptedOutputFile outputFile;
if (partition == null) {
outputFile = fileFactory.newOutputFile();
} else {
outputFile = fileFactory.newOutputFile(partition);
}
//构建PositionDeleteWriter
PositionDeleteWriter<T> writer =
appenderFactory.newPosDeleteWriter(outputFile, format, partition);
PositionDelete<T> posDelete = PositionDelete.create();
try (PositionDeleteWriter<T> closeableWriter = writer) {
// 排序 all the paths.
List<CharSequence> paths = Lists.newArrayListWithCapacity(posDeletes.keySet().size());
for (CharSequenceWrapper charSequenceWrapper : posDeletes.keySet()) {
paths.add(charSequenceWrapper.get());
}
paths.sort(Comparators.charSequences());
// Write all the sorted <path, pos, row> triples.
for (CharSequence path : paths) {
List<PosRow<T>> positions = posDeletes.get(wrapper.set(path));
positions.sort(Comparator.comparingLong(PosRow::pos));
positions.forEach(
posRow -> closeableWriter.write(posDelete.set(path, posRow.pos(), posRow.row())));
}
} catch (IOException e) {
setFailure(e);
throw new UncheckedIOException(
"Failed to write the sorted path/pos pairs to pos-delete file: "
+ outputFile.encryptingOutputFile().location(),
e);
}
// Clear the buffered pos-deletions.
posDeletes.clear();
records = 0;
// Add the referenced data files.
referencedDataFiles.addAll(writer.referencedDataFiles());
// Add the completed delete files.
completedFiles.add(writer.toDeleteFile());
}
● writer.write()
该方法,将数据写入到文件中,如果存在重复数据的写入,会将旧的数据,转为position-delete
该方法的逻辑如下:
● 首先获取该dataWriter目前写入的最新dataFile和rowNum,
● 然后获取eq-delete field对应的row中的数据,
● 之后将eq-delete对应field的值和dataFile path,pos暂存到内存map中,如果该eq-delete的value已经存在,执行position-delete操作:将eq-delete value所属的[dataFile,pos] 写入position-delete
最后调用parquet/avro Writer执行将row写入dataFile中
class BaseTaskWriter
public void write(T row) throws IOException {
//获取最新的Writer写入的最新文件和rowNum
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
// 获取deleteSchema对应的row中的数据(1,aaa)->(aaa)
StructLike copiedKey = StructCopy.copy(structProjection.wrap(asStructLike(row)));
// 将eq-delete对应field的字段和path保存
PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
if (previous != null) {
// TODO attach the previous row if has a positional-delete row schema in appender factory.
posDeleteWriter.delete(previous.path, previous.rowOffset, null);
}
//调用parquet/avro Writer执行写入
dataWriter.write(row);
}
该方法最终生成的dataFile的名称:partitionId-taskId-opeartionId-fileSequence
deleteSchema中的field作为pk,当write的rowData存在多个相同pk,将一个ck&per task中间重复的数据转为position-delete