1、Parallelism(并行度)的概念
parallelism 在 Flink 中表示每个算子的并行度。
举两个例子
(1)比如 kafka 某个 topic 数据量太大,设置了10个分区,但 source 端的算子并行度却为1,只有一个 subTask 去同时消费10个分区,明显很慢。此时需要适当的调大并行度。
(2)比如 某个算子执行了比较复杂的操作,导致该算子执行特别慢,那么可以考虑给该算子增加并行度。
如图所示,当前数据流中有 source、map、window、sink 四个算子,除最后 sink,其他算子的并行度都为 2。整个程序包含了 7 个子任务,至少需要 2 个分区来并行执行。我们可以说,这段流处理程序的并行度就是 2。
2、Slot(任务槽)的概念
slot 是 TaskManager 资源的最小单元。比如 TaskManager 有 5 个 slot,那么每个 slot 分配 25% 的内存,所有 slot 共享 TaskManager 的 cpu。
在一个 slot 中可以运行一个或者多个线程。(一个 slot 可以跑同一个job里面,不同算子的不同子任务。)
假如一个 TaskManager 有三个 slot,那么它会将管理的内存平均分成三份,每个 slot 独自占据一份。这样一来,我们在 slot 上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其他作业的任务去竞争内存资源了。所以现在我们只要 2 个 TaskManager,就可以并行处理分配好的 5 个任务了,如图所示。
3、任务槽和并行度的关系
Slot 和并行度确实都跟程序的并行执行有关,但两者是完全不同的概念。简单来说,task slot 是 静 态 的 概 念 , 是 指 TaskManager 具 有 的 并 发 执 行 能 力 , 可 以 通 过 参 数taskmanager.numberOfTaskSlots 进行配置;而并行度(parallelism)是动态概念,也就是TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。换句话说,并行度如果小于等于集群中可用 slot 的总数,程序是可以正常执行的,因为 slot 不一定要全部占用,有十分力气可以只用八分;而如果并行度大于可用 slot 总数,导致超出了并行能力上限,那么心有余力不足,程序就只好等待资源管理器分配更多的资源了。(当slot为2,设置Parallel为3,运行Flink任务报错 NoResourceAvailableException: Could not acquire the minimum required resources. 任务会处于restart状态,等待资源释放)
下面我们再举一个具体的例子。假设一共有 3 个 TaskManager,每一个 TaskManager 中的slot 数量设置为 3 个,那么一共有 9 个 task slot,如图 4-16 所示,表示集群最多能并行执行 9个任务。
而我们定义 WordCount 程序的处理操作是四个转换算子:
source→ flatMap→ reduce→ sink
当所有算子并行度相同时,容易看出 source 和 flatMap 可以合并算子链,于是最终有三个任务节点。
如果我们没有任何并行度设置,而配置文件中默认 parallelism.default=1,那么程序运行的默认并行度为 1,总共有 3 个任务。由于不同算子的任务可以共享任务槽,所以最终占用的 slot只有 1 个。9 个 slot 只用了 1 个,有 8 个空闲,如图 中的 示例1 所示
如果我们更改默认参数,或者提交作业时设置并行度为 2,那么总共有 6 个任务,共享任务槽之后会占用 2 个 slot,如图中 示例 2 所示。同样,就有 7 个 slot 空闲,计算资源没有充分利用。所以可以看到,设置合适的并行度才能提高效率。