使用场景
有个hudi mor的事实表,需要与外部的维表进行关联以生成明细宽表,方案是基于flink sql对事实表进行增量查询,同时与外部的维表进行lookup join,除了无需编码之外,通过jdbc的方式连接外部表还能利用flink jdbc-connector的缓存功能。
这种join方式要求事实表必须有一个处理时间(process time)属性,所以在定义事实表时增加了一个计算列
ods_test_hudi (
`id` int,
`test` string,
`name` string,
`age` int,
`dt` string,
`create_date` timestamp(3),
`process_time` AS PROCTIME(), //增加计算列
PRIMARY KEY(`id`) NOT ENFORCED
)
WITH (
'connector'='hudi',
'table.type'='MERGE_ON_READ'
)
从flink sql的角度来看,这并没什么毛病,但是直接查这个表的数据时就会报错
tEnv.sqlQuery("ods_test_hudi /*+ OPTIONS('read.start-commit'='earliest')*/").execute().print();
from ods_test_hudi /*+ OPTIONS('read.start-commit'='earliest')*/").execute().print();
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.lambda$genPartColumnarRowReader$0(ParquetSplitReaderUtil.java:118)
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:120)
这里报了一个数组越界,还是在读取parquet文件时候发生的,这是因为在构建parquet reader的时候需要定位每个查询schema中的列对应数据文件中的位置(用selectIndexs表示)
例如有上图parquet文件,包含三列数据col1,col2,col3,定义的表除了这三列还有额外定义的计算列process_time,查询列的数量或位置改变时,selectIndexs也不同
selet col1,col3,col2 | select col2,col3 | select col1,col2,col3,process_time | |
---|---|---|---|
selectIndexs | [0,2,1] | [1,2] | [0,1,2,-1] |
解决方法
- 方法一
既然数据文件中不存在列process_time,那么就不应该把它下推到读取parquet文件,而是把其它列从数据文件中解析出来后,再在flink系统中生成该计算列的值。所以在构建动态表的时候把计算列过滤掉:
org.apache.hudi.table.HoodieTableFactory#createDynamicTableSource:
ResolvedSchema schema = ResolvedSchema.of(context.getCatalogTable().getResolvedSchema().getColumns().stream().filter(c-> !(c instanceof Column.ComputedColumn)).collect(Collectors.toList()));
方法二:
在定义表的时候不要加上process_time列,而是在查询的时候额外补上
slct *,PROCTIME() AS process_time from ods_test_hudi
最终两种方法都能正常查询出数据
+----+-------------+--------------------------------+--------------------------------+-------------+--------------------------------+-------------------------+-------------------------+
| op | id | test | name | age | dt | create_date | process_time |
+----+-------------+--------------------------------+--------------------------------+-------------+--------------------------------+-------------------------+-------------------------+
| +I | 1 | test1 | test | 1 | dt1 | 2022-11-06 12:56:51.000 | 2023-02-18 22:46:59.253 |