Flink SQL作业的消费能力如何,即一天可以处理多大的数据量?
Flink SQL作业的消费能力与源端的数据发送、队列大小、作业参数配置均有关系,每秒10M峰值。
实际处理数据量,与您使用时长相关。
Flink SQL中的temp流中数据是否需要定期清理,如何清理?
不需要定期清理。
Flink SQL中的temp流类似于子查询,只是逻辑意义上的流,用于简化SQL逻辑,不会产生数据存储,因而不存在清理问题。
创建FlinkSQL作业时选择OBS桶,提示未授权
问题描述
用户创建Flink SQL作业,配置参数时,选择自己创建的OBS桶,提示“该OBS桶未授权。立即授权”,单击“立即授权”后提示“服务器内部出错了,请联系客服或者稍后重试”,无法授权。
解决方案
在报错页面,通过F12查看错误详细信息:
{"error_code":"DLI.10001","error_msg":"服务内部出错了。{0} 请联系客服或者稍后重试","error_json_opt":{"error":" Unexpected exception[NoSuchElementException: None.get]"}}
查看用户是否创建DLI委托,发现用户没有创建委托权限,在“全局配置”>“服务授权”页面勾选“Tenant Administrator(全局服务)”权限后,重试可以给OBS桶授权。
Flink SQL作业将OBS表映射为DLI的分区表
场景概述
用户使用Flink SQL作业时,需要创建OBS分区表,用于后续进行批处理。
操作步骤
该示例将car_info数据,以day字段为分区字段,parquet为编码格式(目前仅支持parquet格式),转储数据到OBS。
create sink stream car_infos (
carId string,
carOwner string,
average_speed double,
day string
) partitioned by (day)
with (
type = "filesystem",
file.path = "obs://obs-sink/car_infos",
encode = "parquet",
ak = "{{myAk}}",
sk = "{{mySk}}"
);
数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/day=xx/part-x-x。
数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理:
1.创建OBS分区表。
create table car_infos (
carId string,
carOwner string,
average_speed double
)
partitioned by (day string)
stored as parquet
location 'obs://obs-sink/car-infos';
2.从关联OBS路径中恢复分区信息。
alter table car_infos recover partitions;
OBS表如何映射为DLI的分区表?
该示例将car_info数据,以day字段为分区字段,parquet为编码格式(目前仅支持parquet格式),转储数据到OBS。
create sink stream car_infos (
carId string,
carOwner string,
average_speed double,
day string
) partitioned by (day)
with (
type = "filesystem",
file.path = "obs://obs-sink/car_infos",
encode = "parquet",
ak = "{{myAk}}",
sk = "{{mySk}}"
);
数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/day=xx/part-x-x。
数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理:
1.创建OBS分区表。
create table car_infos (
carId string,
carOwner string,
average_speed double
)
partitioned by (day string)
stored as parquet
location 'obs://obs-sink/car-infos';
2.从关联OBS路径中恢复分区信息。
alter table car_infos recover partitions;
在Flink SQL作业中创建表使用EL表达式,作业运行报DLI.0005错误
问题现象
Flink SQL作业创建表时,表名使用EL表达式,运行作业时报如下错误:
DLI.0005: AnalysisException: t_user_message_input_#{date_format(date_sub(current_date(), 1), 'yyyymmddhhmmss')} is not a valid name for tables/databases. Valid names only contain alphabet characters, numbers and _.
解决方案
需要将SQL中表名的“#”字符改成“”即可。DLI中使用EL表达式的格式为:{ expr } 。
Flink作业输出流写入数据到OBS,通过该OBS文件路径创建的DLI表查询无数据
问题现象
使用Flink作业输出流写入数据到了OBS中,通过该OBS文件路径创建的DLI表进行数据查询时,无法查询到数据。
例如,使用如下Flink结果表将数据写入到OBS的“obs://obs-sink/car_infos”路径下。
create sink stream car_infos_sink (
carId string,
carOwner string,
average_speed double,
buyday string
) partitioned by (buyday)
with (
type = "filesystem",
file.path = "obs://obs-sink/car_infos",
encode = "parquet",
ak = "{{myAk}}",
sk = "{{mySk}}"
);
通过该OBS文件路径创建DLI分区表,在DLI查询car_infos表数据时没有查询到数据。
create table car_infos (
carId string,
carOwner string,
average_speed double
)
partitioned by (buyday string)
stored as parquet
location 'obs://obs-sink/car_infos';
解决方案
1.在DLI创建Flink结果表到OBS的作业时,如上述举例中的car_infos_sink表,是否开启了Checkpoint。如果未开启则需要开启Checkpoint参数,重新运行作业生成OBS数据文件。
开启Checkpoint步骤如下。
a.到DLI管理控制台,左侧导航栏选择“作业管理 > Flink作业”,在对应的Flink作业所在行,操作列下单击“编辑”。
b.在“运行参数”下,查看“开启Checkpoint”参数是否开启。
2.确认Flink结果表的表结构和DLI分区表的表结构是否保持一致。如问题描述中car_infos_sink和car_infos表的字段是否一致。
3.通过OBS文件创建DLI分区表后,是否执行以下命令从OBS路径中恢复分区信息。如下,在创建完DLI分区表后,需要恢复DLI分区表car_infos分区信息。
alter table car_infos recover partitions;
Flink SQL作业运行失败,日志中有connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null错误
问题现象
在DLI上提交Flink SQL作业,作业运行失败,在作业日志中有如下报错信息:
connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null
问题根因
该Flink SQL作业在配置作业运行参数时,有选择保存作业日志或开启Checkpoint,配置了OBS桶保存作业日志和Checkpoint。但是运行该Flink SQL作业的IAM用户没有OBS写入权限导致该问题。
解决方案
1.登录IAM控制台页面,单击“用户”,在搜索框中选择“用户名”,输入运行作业的IAM用户名。
2.单击查询到用户名,查看该用户对应的用户组。
3.单击“用户组”,输入查询到的用户组查询,单击用户组名称,在“授权记录”中查看当前用户的权限。
4.确认当前用户所属用户组下的权限是否包含OBS写入的权限,比如“OBS OperateAccess”。如果没有OBS写入权限,则给对应的用户组进行授权。
5.授权完成后,等待5到10分钟等待权限生效。再次运行失败的Flink SQL作业,查看作业运行状态。
Flink SQL作业读取DIS数据报Not authorized错误
问题现象
Flink SQL作业读取DIS数据,运行该作业时,语义校验失败。具体作业失败提示信息如下:
Get dis channel xxx info failed. error info: Not authorized, please click the overview page to do the authorize action
问题原因
运行Flink作业前,没有对运行的用户账号授权获取DIS数据的权限。
解决方案
1.登录到DLI管理控制台,左侧导航栏选择“全局配置 > 服务授权”。
2.在服务授权管理界面,勾选“DIS Administrator”权限,单击“更新委托授权”完成对当前用户的DIS权限授权。
3.在“作业管理 > Flink作业”,单击对应的Flink SQL作业,重新启动和运行该作业。
Flink SQL作业消费Kafka后sink到es集群,作业执行成功,但未写入数据
问题现象
客户创建Flink SQL作业,消费Kafka后sink到es集群,作业执行成功,但无数据。
原因分析
查看客户作业脚本内容,排查无问题,作业执行成功,出现该问题可能的原因如下:
- 数据不准确。
- 数据处理有问题。
处理步骤
1.在Flink UI查看task日志,发现报错中提到json体,基本确定原因为数据格式问题。
2.排查客户实际数据,发现客户Kafka数据存在多层嵌套的复杂json体。不支持解析。
3.有两种方式解决此问题:
- 通过udf成jar包的形式
- 修改配置
4.修改源数据格式,再次执行作业,无问题。