1 概览
flink的命令参数不可谓不多,常用的应该是run,run-application
用来提交job,从命令梳理了一下运行方式和部署模式。flink可以以local或cluster方式运行job,一般来说在本地开发调试时就以local在idea中运行,完成后就提交到cluster.根据资源管理器不同又可以分为standalone,yarn,k8s等,从命令参数也可以看出,flink对yarn和k8s的支持是最好的。
2 本地运行
本地运行就是运行在本地的一个jvm进程中,这种方式构建的执行环境是LocalExecutionEnvironment
,会启动多个线程去运行不同的角色。可以使用ExecutionEnvironment.createLocalEnvironment
和ExecutionEnvironment.getExecutionEnvironment()
两种方法去创建,推荐使用后者 ,它会根据配置信息去判断是否是本地方式然后返回不同的执行环境,这样在本地开发调试完后不用改代码就可以发布到集群。
3 集群运行
3.1 提交方式
flink有两种方式提交程序到集群执行
- 命令行方式:通过命令行,编写好脚本,把打好的jar包提交执行
- Remote Environment:构建执行环境
RemoteEnvironment
,与远程集群的JobManager通讯,直接提交job到远程集群执行3.2 部署模式
一般说flink的部署模式都是指在基于集群的部署模式。根据集群的生命周期和资源隔离情况不同以及编写的flink程序main()
方法是执行在client端还是集群端,可以分为三种模式:
- Application 模式
- Session 模式
- Per-Job 模式
3.2.1 Application 模式
除了这种模式,flink程序main()
方法都在client端执行的,即ExecutionEviroment
对象在client创建,这个环境初始化的过程会下载程序依赖包到本地,构建JobGraph
,然后将依赖包和生成的任务分发到集群中。所以非application模式情况下,client都是要消耗很大的带宽和cpu资源,当不同的用户都在同一个client下提交多个job,客户端服务器很容易挂掉的。还有在某些环境中,集群节点自成一个局域网,而客户端在局域网外,客户端和集群节点需要双向通讯。这两个问题都曾经在使用spark on yarn client模式遇到过,而且还不止一次,不过有个好处就是运行日志固定在客户端节点。
在application模式中,flink程序main()
方法运行在JobManager中,与per-job模式相比,它提供的是application级别的资源隔离(一个application只有一个job不就是job级别的了,没毛病),一个application可以有多个job。可以通过execute()
阻塞有序地把job启动,或者通过executeAsync()
非阻塞无序地启动。
3.2.2 session模式
多个job使用相同的session运行在同一个已启动的集群中,所以job之间会竞争集群资源,好处是免了每次提交job都要启动一个集群的开销。如果由于其中某个job异常行为导致TaskManager挂掉,会影响到所有的job,说白了就是没资源隔离,没高可用。此外job运行得多,对JobManager也是一种压力,所以session模式适合运行一些生命周期小、负载不大的job.
3.2.3 per-job模式
per-job只是支持基于yarn的集群模式,在1.15中彻底弃用。上面也提到这个模式比较方便的一个地方就是看日志,但是job少的话完全可以在ResourceManager上看(使用yarn的话),如果job多,那就更不可能用这个模式了,比较鸡肋。
3.3 yarn-application时加载第三方依赖和外部配置文件
提交job时,可以通过 -Dyarn.ship-files=/test.jar,/test.properties
这样把第三方文件同步到所有集群节点,这些文件会被下载到TaskManager节点本地目录,类似:/path/nm-local-dir/usercache/appcache/application_1661411501820_0073/container_e01_1661411501820_0073_01_000001,只有jar包会被TaskManager默认加载到classpath中,配置文件则需要以读取本地文件方式去加载,绝对路径为TaskManager进程的user.dir
.