空指针异常
业务库使用的是postgresql, 使用flink cdc同步几个表到hudi时,发现其中有一个表在同步过程中一直报空指针异常
Caused by: java.lang.NullPointerException
at org.postgresql.jdbc.FieldMetadata.getSize(FieldMetadata.java:79)
at org.postgresql.util.LruCache.put(LruCache.java:128)
at org.postgresql.util.LruCache.putAll(LruCache.java:154)
at org.postgresql.jdbc.PgResultSetMetaData.fetchFieldMetaData(PgResultSetMetaData.java:267)
at org.postgresql.jdbc.PgResultSetMetaData.isAutoIncrement(PgResultSetMetaData.java:57)
异常分析
在org.postgresql.jdbc.PgResultSetMetaData.fetchFieldMetaData
打断点调试,其实也就通过sql去查表的元数据而已,但是观察查询结果ResultSet有两个列元数据的schema值为null,这才导致了上述的空指针出现:
结果集中有57条记录,但是我的表只有55列,一开始以为这个表有什么虚拟列之类的,但是观察两条多出来的数据不太像属于schema的内容,首先tableOid就不对了,tableName也感觉是随机生成的。最终发现多出的两条记录是表的数据记录,tableOid为表的主键值,多出的两条是相邻的主键(1033629,1033630),schema对应的是第5列也恰恰为null.
1、查询表的列元数据时,为什么会有表的数据记录混在里面?sql单独拎出来查询是可以得到正常结果55列的。
2、表的数据很多,为什么每次这里都固定多出相同的两条?
异常解决
从表面看,像是使用jdbc的过程中由于并发而引发的数据脏乱问题。这种问题很难调试,也没有太多时间去发追踪,所以把结果集中schema为null的数据过滤作为解决方法。
if(null!=schemaName)
md.put(key, fieldMetadata);
数据丢失
flink cdc job无异常运行一段时间后,产生了另一个问题,sink到hudi的表少了2条数据。通过二分查询的方式将source/sink表做比较,所幸表的数据并不多,最终也确定了这两条数据就是上面的1033629,1033630
Exported {} of {} records for table
从程序日志看到,cdc snapshot阶段读取出来后数据就少了,而不是写入hudi后丢失的。
分析与解决
io.debezium.relational.RelationalSnapshotChangeEventSource#createDataEventsForTable
方法就是全量把表数据查询到ResultSet中,然后循环将每行Row数据转为Object[]发往下游的flink算子,调试发现问题是出现在下面的代码:
final Object[] row = jdbcConnection.rowToArray(table, schema(), rs, columnArray);
正好,rowToArray
会垂直向下调用到开头的空指针异常的堆栈,即在将行转为数组过程中需要获取该表的元数据,PostgresConnection#getColumnValue
final ResultSetMetaData metaData = rs.getMetaData();
final String columnTypeName = metaData.getColumnTypeName(columnIndex);// 主要方法,获取列的类型名称,int8->bigserial类型映射
final PostgresType type = ((PostgresSchema) schema).getTypeRegistry().get(columnTypeName); //类型名称对应的PgType
在获取列的类型名称时,会进一步去获取列的元数据,对于自动递增的列会做 int8->bigserial类型映射,bigserial对应的PostgresType是int8,没必要做多一次无用的转化,直接改了以上的代码避免进入org.postgresql.jdbc.PgResultSetMetaData.fetchFieldMetaData
final ResultSetMetaData metaData = rs.getMetaData();
final String columnTypeName =( (PgResultSetMetaData) metaData)._getPGType(columnIndex-1); //直接从ResultSetMetaData获取类型
final PostgresType type = ((PostgresSchema) schema).getTypeRegistry().get(columnTypeName);
getPGType
为protected,需要放大访问权限为public,为了安全增加了_getPGType
总结
目前这样的方式解决了空指针和数据丢失的问题,这应该算并发引起的两种不同现象。不清楚产生并发的根是在哪里,所以这种方法解决了当前场景的异常,但是不确定是否还会发生在其它地方。