用户可将自己开发的程序提交到MRS中,执行程序并获取结果。本章节指导用户在MRS集群页面如何提交一个新的Flink作业。Flink作业用于提交jar程序处理流式数据。
前提条件
用户已经将运行作业所需的程序包和数据文件上传至OBS系统或HDFS中。
通过界面提交作业
1.登录MRS管理控制台。
2.选择“集群列表 > 现有集群”,选中一个运行中的集群并单击集群名称,进入集群信息页面。
3.若集群开启Kerberos认证时执行该步骤,若集群未开启Kerberos认证,请无需执行该步骤。
在“概览”页签的基本信息区域,单击“IAM用户同步”右侧的“同步”进行IAM用户同步。
说明
当IAM用户的用户组的所属策略从MRS ReadOnlyAccess向MRS CommonOperations、MRS FullAccess、MRS Administrator变化时,由于集群节点的SSSD(System Security Services Daemon)缓存刷新需要时间,因此同步完成后,请等待5分钟,等待新修改策略生效之后,再进行提交作业。否则,会出现提交作业失败的情况。
当IAM用户的用户组的所属策略从MRS CommonOperations、MRS FullAccess、MRS Administrator向MRS ReadOnlyAccess变化时,由于集群节点的SSSD缓存刷新需要时间,因此同步完成后,请等待5分钟,新修改策略才能生效。
4.单击“作业管理”,进入“作业管理”页签。
5.单击“添加”,进入“添加作业”页面。
6.“作业类型”选择“Flink”,参考下表配置Flink作业信息。
作业配置信息
参数 | 参数说明 |
---|---|
作业名称 | 作业名称,只能由字母、数字、中划线和下划线组成,并且长度为1~64个字符。 说明 建议不同的作业设置不同的名称。 |
执行程序路径 | 待执行程序包地址,需要满足如下要求: 最多为1023字符,不能包含;l&>,<'$特殊字符,且不可为空或全空格。 执行程序路径可存储于HDFS或者OBS中,不同的文件系统对应的路径存在差异。 - OBS:以“obs://”开头。示例: obs://wordcount/program/xxx.jar。 - HDFS:以“/user”开头。数据导入HDFS请参考导入导出数据章节中的“导入数据”。 |
运行程序参数 | 可选参数,为本次执行的作业配置相关优化参数(例如线程、内存、CPU核数等),用于优化资源使用效率,提升作业的执行性能。 常用运行程序参数如下表“运行程序参数”。 |
执行程序参数 | 可选参数,程序执行的关键参数,该参数由用户程序内的函数指定,MRS只负责参数的传入。多个参数间使用空格隔开。 最多为150000字符,不能包含不能包含;;&><'$特殊字符,可为空。 注意 若输入带有敏感信息(如登录密码)的参数可能在作业详情展示和日志打印中存在暴露的风险,请谨慎操作。 |
服务配置参数 | 可选参数,用于为本次执行的作业修改服务配置参数。该参数的修改仅适用于本次执行的作业,如需对集群永久生效,请参考配置服务参数页面进行修改。 如需添加多个参数,请单击右侧增加,如需删除参数,请单击右侧“删除”。 常用服务配置参数如下表“服务配置参数”。 |
命令参考 | 用于展示提交作业时提交到后台执行的命令。 |
运行程序参数
参数 | 参数说明 | 取值样例 |
---|---|---|
-ytm | 设置每个TaskManager容器的内存(单位可选,默认单位:MB)。 | 1024 |
-yjm | 设置JobManager容器内存(单位可选,默认单位:MB)。 | 1024 |
-yn | 设置分配给应用程序的Yarn容器的数量,该值与TaskManager数量相同。 | 2 |
-ys | 设置TaskManager的核数。 | 2 |
-ynm | 自定义Yarn上应用程序名称。 | test |
-c | 设置程序入口点的类(如“main”或“getPlan()”方法)。该参数仅在JAR文件未指定其清单的类时需要。 | com.bigdata.mrs.test |
说明针对MRS 3.x及之后版本,运行程序参数不支持“-yn”。
服务配置参数
参数 | 参数说明 | 取值样例 |
---|---|---|
fs.obs.access.key | 访问OBS的密钥ID。 | - |
fs.obs.secret.key | 访问OBS与密钥ID对应的密钥。 | - |
7.确认作业配置信息,单击“确定”,完成作业的新增。
作业新增完成后,可对作业进行管理。
通过后台提交作业
MRS 3.x及之后版本客户端默认安装路径为“/opt/Bigdata/client”,MRS 3.x之前版本为“/opt/client”。具体以实际为准。
1.登录MRS客户端。
2.执行如下命令初始化环境变量。
source /opt/Bigdata/client/bigdata_env
3.若集群开启Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。
a.准备一个提交Flink作业的用户。
b.使用新创建的用户登录Manager页面。
- MRS 3.x之前版本,登录集群的Manager界面,选择“系统设置 > 用户管理”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。
- MRS 3.x及之后版本,登录集群的Manager界面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。
c.将下载的认证凭据压缩包解压缩,并将得到的user.keytab文件拷贝到客户端节点中,例如客户端节点的“/opt/Bigdata/client/Flink/flink/conf”目录下。如果是在集群外节点安装的客户端,需要将得到的krb5.conf文件拷贝到该节点的“/etc/”目录下。
d.MRS 3.x及之后版本,安全模式下需要将客户端安装节点的业务IP以及Manager的浮动ip追加到“/opt/Bigdata/client/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,ip之间使用英文逗号分隔。
e.配置安全认证,在“/opt/Bigdata/client/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
security.kerberos.login.keytab: <user.keytab 文件路径 >
security.kerberos.login.principal: < 用户名 >
例如:
security.kerberos.login.keytab: /opt/Bigdata/client/Flink/flink/conf/user.keytab
security.kerberos.login.principal: test
f.在Flink的客户端bin目录下,执行如下命令进行安全加固,password请重新设置为一个用于提交作业的密码。
sh generate_keystore.sh < password >
该脚本会自动替换“/opt/Bigdata/client/Flink/flink/conf/flink-conf.yaml”中关于SSL的值,针对MRS 3.x之前版本,安全集群默认没有开启外部SSL,用户如果需要启用外部SSL,进行配置后再次运行该脚本即可,配置参数在MRS的Flink默认配置中不存在,用户如果开启外部连接SSL,则需要添加下表中参数。
参数描述
参数 | 参数值示例 | 描述 |
---|---|---|
security.ssl.rest.enabled | true | 打开外部SSL开关。 |
security.ssl.rest.keystore | ${path}/flink.keystore | keystore的存放路径。 |
security.ssl.rest.keystore-password | 123456 | keystore的password,“123456”表示需要用户输入自定义设置的密码值。 |
security.ssl.rest.key-password | 123456 | ssl key的password,“123456”表示需要用户输入自定义设置的密码值。 |
security.ssl.rest.truststore | ${path}/flink.truststore | truststore存放路径。 |
security.ssl.rest.truststore-password | 123456 | truststore的password,“123456”表示需要用户输入自定义设置的密码值。 |
说明
针对MRS 3.x之前版本,generate_keystore.sh脚本无需手动生成。
会将生成的flink.keystore、flink.truststore、security.cookie自动填充到“flink-conf.yaml”对应配置项中。
针对MRS 3.x及之后版本,“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值需要使用Manager明文加密API进行获取:
curl -k -i -u < user name >:< password > -X POST -HContent-type:application/json -d '{"plainText":"< password >"}' 'https:// x.x.x.x :28443/web/api/v2/tools/encrypt' ;其中< password >要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。
g.客户端访问flink.keystore和flink.truststore文件的路径配置。
- 绝对路径:执行该脚本后,在flink-conf.yaml文件中将flink.keystore和flink.truststore文件路径自动配置为绝对路径“/opt/Bigdata/client/Flink/flink/conf/”,此时需要将conf目录中的flink.keystore和flink.truststore文件分别放置在Flink Client以及Yarn各个节点的该绝对路径上。
- 相对路径:请执行如下步骤配置flink.keystore和flink.truststore文件路径为相对路径,并确保Flink Client执行命令的目录可以直接访问该相对路径。
i.在“/opt/Bigdata/client/Flink/flink/conf/”目录下新建目录,例如ssl。
ii. 移动flink.keystore和flink.truststore文件到“/opt/Bigdata/client/Flink/flink/conf/ssl/”中。
iii. 针对MRS 3.x及之后版本,修改flink-conf.yaml文件中如下两个参数为相对路径。
security.ssl.keystore: ssl/flink.keystore
security.ssl.truststore: ssl/flink.truststore
iv. 针对MRS 3.x之前版本,修改flink-conf.yaml文件中如下两个参数为相对路径。
security.ssl.internal.keystore: ssl/flink.keystore
security.ssl.internal.truststore: ssl/flink.truststore
h.如果客户端安装在集群外节点,请在配置文件(如:“ /opt/Bigdata/client/Flink/fink/conf/flink-conf.yaml ”) 中增加如下配置值,其中xx.xx.xxx.xxx请替换为客户端所在节点的IP。
web.access-control-allow-origin: xx.xx.xxx.xxx
jobmanager.web.allow-access-address: xx.xx.xxx.xxx
4.运行wordcount作业。
- 普通集群(未开启Kerberos认证)
−执行如下命令启动session,并在session中提交作业。
yarn-session.sh -nm "session-name"
flink run /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar
−执行如下命令在Yarn上提交单个作业。
flink run -m yarn-cluster /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar
- 安全集群(开启Kerberos认证)
−flink.keystore和flink.truststore文件路径为绝对路径时:
执行如下命令启动session,并在session中提交作业。
yarn-session.sh -nm "session-name"
flink run /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar
执行如下命令在Yarn上提交单个作业。
flink run -m yarn-cluster /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar
−flink.keystore和flink.truststore文件路径为相对路径时:
在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业,其中“ssl”是相对路径,如“ssl”所在目录是“opt/Bigdata/client/Flink/flink/conf/”,则在“opt/Bigdata/client/Flink/flink/conf/”目录下执行命令。
yarn-session.sh -t ssl/ -nm "session-name"
flink run /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar
执行如下命令在Yarn上提交单个作业。
flink run -m yarn-cluster -yt ssl/ /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar