使用flink-1.16和flinkcdc-3.0进行数据接入,采用standalone模式。运行一段时间后checkpoint开始失败,但日志中没有报错信息。
因savepoint和checkpoint机制一致,使用手动打savepoint断点的方式,尝试查明失败原因。执行指令:
/usr/local/flink/bin/flink savepoint ${job_id} /usr/local/flink-cdc/savepoint/
执行后报错,发现如下报错信息。
[1]:Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5325329, maxSize=5242880. Consider using a different checkpoint storage, like the FileSystemCheckpointStorage.
经分析,是作业的state,默认使用内存存储。随着作业的运行,state超出允许的最大内存,所以无法存储,导致checkpoint被cancel进而失败。
直接原因是作业的state,默认使用内存存储。随着作业的运行,state超出允许的最大内存,所以无法存储,导致checkpoint被cancel进而失败。
尝试在flink-conf.yaml中添加如下参数,加大内存限制。发现checkpoint仍会超出内存限制失败。排查源码发现,flink在1.16已去掉该参数。官方建议standalone模式仅作为测试,因此不提供加大内存限制的方式。
state.backend.memory.max-state-size: 10000000
正确解决办法为,使用hdfs存储checkpoint。