searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-不支持flink计算列

2023-12-19 03:01:08
37
0

使用场景

有个hudi mor的事实表,需要与外部的维表进行关联以生成明细宽表,方案是基于flink sql对事实表进行增量查询,同时与外部的维表进行lookup join,除了无需编码之外,通过jdbc的方式连接外部表还能利用flink jdbc-connector的缓存功能。

这种join方式要求事实表必须有一个处理时间(process time)属性,所以在定义事实表时增加了一个计算列

ods_test_hudi (
    `id` int,
    `test` string,
    `name` string,
    `age` int,
    `dt` string,
    `create_date` timestamp(3),
    `process_time` AS PROCTIME(),  //增加计算列
    PRIMARY KEY(`id`) NOT ENFORCED
)
WITH (
    'connector'='hudi',
    'table.type'='MERGE_ON_READ'
)

从flink sql的角度来看,这并没什么毛病,但是直接查这个表的数据时就会报错

 tEnv.sqlQuery("ods_test_hudi /*+ OPTIONS('read.start-commit'='earliest')*/").execute().print();
 from ods_test_hudi /*+ OPTIONS('read.start-commit'='earliest')*/").execute().print();

 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.lambda$genPartColumnarRowReader$0(ParquetSplitReaderUtil.java:118)
 at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
 at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
 at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:120)

这里报了一个数组越界,还是在读取parquet文件时候发生的,这是因为在构建parquet reader的时候需要定位每个查询schema中的列对应数据文件中的位置(用selectIndexs表示)

image

例如有上图parquet文件,包含三列数据col1,col2,col3,定义的表除了这三列还有额外定义的计算列process_time,查询列的数量或位置改变时,selectIndexs也不同

  selet col1,col3,col2 select col2,col3 select col1,col2,col3,process_time
selectIndexs [0,2,1] [1,2] [0,1,2,-1]

解决方法

  • 方法一

既然数据文件中不存在列process_time,那么就不应该把它下推到读取parquet文件,而是把其它列从数据文件中解析出来后,再在flink系统中生成该计算列的值。所以在构建动态表的时候把计算列过滤掉:

org.apache.hudi.table.HoodieTableFactory#createDynamicTableSource:

ResolvedSchema schema = ResolvedSchema.of(context.getCatalogTable().getResolvedSchema().getColumns().stream().filter(c-> !(c instanceof Column.ComputedColumn)).collect(Collectors.toList()));

方法二:

在定义表的时候不要加上process_time列,而是在查询的时候额外补上

slct *,PROCTIME() AS process_time from ods_test_hudi

最终两种方法都能正常查询出数据

+----+-------------+--------------------------------+--------------------------------+-------------+--------------------------------+-------------------------+-------------------------+
| op |          id |                           test |                           name |         age |                             dt |             create_date |                process_time |
+----+-------------+--------------------------------+--------------------------------+-------------+--------------------------------+-------------------------+-------------------------+
| +I |           1 |                          test1 |               test |           1 |                            dt1 | 2022-11-06 12:56:51.000 | 2023-02-18 22:46:59.253 |
0条评论
0 / 1000