Flink Jar作业配置checkpoint保存到OBS
Flink Jar作业配置checkpoint保存到OBS步骤如下:
1.在Flink Jar作业的Jar包代码中加入如下代码:
//StreamExecutionEnvironment 依赖的pom文件配置参考后续说明
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(40000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${bucket}/jobs/checkpoint/my_jar"), false);
rocksDbBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setMaxLogFileSize(64 * 1024 * 1024)
.setKeepLogFileNum(3);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions;
}
});
env.setStateBackend(rocksDbBackend);
说明上述代码含义是以EXACTLY_ONCE模式,每隔40s保存checkpoint到OBS的${bucket}桶中的jobs/checkpoint/my_jar路径。
其中,最重要的是保存checkpoint路径。一般是将checkpoint存入OBS桶中,路径格式如下:
- 路径格式:obs://${bucket}/xxx/xxx/xxx
- StreamExecutionEnvironment依赖的包需要在pom文件中添加如下配置。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
2.在DLI Flink Jar作业中配置“ 优化参数 ”和“从checkpoint恢复”功能。
−配置优化参数
i.在DLI控制台,选择“作业管理>Flink作业”。
ii.在对应Flink Jar作业操作列单击“编辑”,进入Flink Jar作业编辑页面。
iii.在优化参数中,添加如下两个参数:
fs.obs.access.key=xxx
fs.obs.secret.key=xxx
优化参数说明
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
fs.obs.access.key | 无 | 是 | AK(Access Key Id),需要具备访问OBS对应桶的权限。 |
fs.obs.secret.key | 无 | 是 | SK(Secret Access Key),需要具备访问OBS对应桶的权限。 |
−配置从checkpoint恢复
i.勾选“异常自动重启”。
ii.勾选“从checkpoint恢复”,填写“Checkpoint路径”。
Checkpoint路径与用户在Jar包中设置的checkpoint路径相对应,格式如下:
“Checkpoint路径”格式为:${bucket}/xxx/xxx/xxx
示例:
如果Jar包中代码配置为:obs://mybucket/jobs/checkpoint/jar-3
那么“Checkpoint路径”填写为: mybucket/jobs/checkpoint/jar-3
说明l 每个Flink Jar作业配置的Checkpoint路径要保持不同,否则无法从准确的checkpoint路径恢复。
l checkpoint路径中的OBS桶需要给DLI授权,DLI服务才能访问此桶下的文件。
3.查看作业是否从checkpoint恢复。
Flink Jar作业是否支持上传配置文件,要如何操作?
自定义(JAR)作业支持上传配置文件。
1.将配置文件通过程序包管理上传到DLI;
2.在Flink jar作业的其他依赖文件参数中,选择创建的DLI程序包;
3.在代码中通过ClassName.class.getClassLoader().getResource("userData/fileName")加载该文件(其中,“fileName”为需要访问的文件名,“ClassName”为需要访问该文件的类名)。
Flink Jar 包冲突,导致提交失败
问题描述
用户Flink程序的依赖包与DLI Flink平台的内置依赖包冲突,导致提交失败。
解决方案
查看是否已包含DLI Flink运行平台中已经存在的包,如果存在,则需要将自己的Jar包删除。
Flink Jar作业访问DWS启动异常,提示客户端连接数太多错误
问题描述
提交Flink Jar作业访问DWS数据仓库服务时,提示启动失败,作业日志报如下错误信息。
FATAL: Already too many clients, active/non-active/reserved: 5/508/3
原因分析
当前访问的DWS数据库连接已经超过了最大连接数。错误信息中,non-active的个数表示空闲连接数,例如,non-active为508,说明当前有大量的空闲连接。
解决方案
出现该问题时建议通过以下操作步骤解决。
1.登录DWS命令执行窗口,执行以下SQL命令,临时将所有non-active的连接释放掉。
SELECT PG_TERMINATE_BACKEND(pid) from pg_stat_activity WHERE state='idle';
2.检查应用程序是否未主动释放连接,导致连接残留。建议优化代码,合理释放连接。
3.在GaussDB(DWS) 控制台设置会话闲置超时时长session_timeout,在闲置会话超过所设定的时间后服务端将主动关闭连接。
session_timeout默认值为600秒,设置为0表示关闭超时限制,一般不建议设置为0。
session_timeout设置方法如下:
a.登录GaussDB(DWS) 管理控制台。
b.在左侧导航栏中,单击“集群管理”。
c.在集群列表中找到所需要的集群,单击集群名称,进入集群“基本信息”页面。
d.单击“参数修改”页签,修改参数“session_timeout”,然后单击“保存”。
e.在“修改预览”窗口,确认修改无误后,单击“保存”。
Flink Jar作业运行报错,报错信息为Authentication failed
问题现象
Flink Jar作业运行异常,作业日志中有如下报错信息:
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed
问题原因
因为帐号没有在全局配置中配置服务授权,导致该帐号在创建跨源连接访问外部数据时因为权限不足而导致跨源访问失败。
解决方案
1.登录DLI管理控制台,选择“全局配置 > 服务授权”。
2.在服务授权界面,全选委托权限。
3.单击“更新委托授权”。界面会提示“委托权限更新成功”,表示修改成功。
4.委托授权完成后,重新创建跨源连接和运行作业。
Flink Jar作业设置backend为OBS,报错不支持OBS文件系统
问题现象
客户执行Flink Jar作业,通过设置checkpoint存储在OBS桶中,作业一直提交失败,并伴有报错提交日志,提示OBS桶名不合法。
原因分析
1.确认OBS桶名是否正确。
2.确认所用AKSK是否有权限。
3.设置依赖关系provided防止Jar包冲突。
4.确认客户esdk-obs-java-3.1.3.jar的版本。
5.确认是集群存在问题。
处理步骤
1.设置依赖关系provided。
2.重启clusteragent应用集群升级后的配置。
3.去掉OBS依赖,否则checkpoint会写不进OBS。
Hadoop jar包冲突,导致Flink提交失败
问题现象
Flink 提交失败,异常为:
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.obs.metrics.OBSAMetricsProvider not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2664)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
... 31 common frames omitted
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.obs.metrics.OBSAMetricsProvider not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2568)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2662)
... 32 common frames omitted
原因分析
Flink jar包冲突。用户提交的flink jar 与 DLI 集群中的hdfs jar包存在冲突。
处理步骤
1.将用户pom文件中的的hadoop-hdfs设置为:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope> provided </scope>
</dependency>
或使用exclusions标签将其排除关联。
2.若使用到hdfs的配置文件,则需要将core-site.xml、hdfs-site.xml、yarn-site.xml 修改为mrs-core-site.xml、mrs-hdfs-site.xml、mrs-hbase-site.xml。
conf.addResource(HBaseUtil.class.getClassLoader().getResourceAsStream("mrs-core-site.xml"), false);
conf.addResource(HBaseUtil.class.getClassLoader().getResourceAsStream("mrs-hdfs-site.xml"), false);
conf.addResource(HBaseUtil.class.getClassLoader().getResourceAsStream("mrs-hbase-site.xml"), false);
Flink jar 如何连接SASL_SSL?
使用Flink Jar连接开启SASL_SSL认证的Kafka。