searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享

Flink Operator控制流

2023-11-10 01:16:48
31
0

Flink Operator控制流

本文的目标是深入介绍Flink Operator逻辑并提供足够的控制流设计细节,以便新的开发人员可以快速开始使用。

本文的读者需具备良好的Flink Kubernates水平,并具备不同集群和工作类型的通用运维经验;对Flink Kubernates Operator本身也要有深入的用户级别理解。

本文主要关注的是Operator机制的why/how;而用户面对what已在其他文档中已进行过描述

核心Operator控制流(在FlinkDeploymentController和FlinkSessionJobController中实现的 包含了以下逻辑阶段:

  1. 观察当前部署资源的状态
  2. 校验新资源的规格
  3. 按照新规格和观察到的状态来协调任何所需的变化
  4. 按顺序重复以上操作

需要注意的是,所有这些步骤每次都会执行。无论验证的输出如何,始终执行观察和调解,但验证可能会影响调解目标我们将在下一节中看到这一点。

观察阶段

在观察阶段,观察者模块负责观察任何部署的Flink资源(已提交的集群、作业)的当前(时间点)状态,并在此基础上更新CR(自定义资源)状态字段。

观察者将始终使用以前部署的配置来工作,以确保它可以通过rest api访问Flink集群。用户配置可能会影响其余rest客户端(端口配置等),因此我们必须始终使用在集群上运行的配置。这就是FlinkConfigManager和Operator在整个实现过程中区分观察和部署配置的主要原因。

观察者永远不应该更改或提交新的资源,我们稍后将会看到这些行为是协调模块的责任。这种分离的主要原因是,所需的操作不仅取决于当前集群状态,还取决于用户在此期间可能提交的任何新规格更改。

观察者类的层次结构

观察者步骤

  1. 如果没有部署任何资源,则跳过观察
  2. 如果处于UPGRADING状态,将检查升级是否已完成。通常情况下,协调者会在成功提交后将状态更新为DEPLOYED,但短暂的错误可能会阻止此状态更新。因此,我们使用特定于资源的逻辑来检查集群上的资源是否已经匹配目标升级规格(注释、确定性作业ID等)。
  3. 对于FlinkDeployments,我们接下来观察Kubernetes集群资源、JobManagerDeployment 和Pods的状态。我们在升级后或无法访问Job 的Rest端点的任何时候都会执行此操作。此状态记录在jobManagerDeploymentStatus字段中。

这里需要注意的是,试图通过Flink rest API观察Jobs或任何东西只有在JobManager的Kubernetes资源看起来健康的情况下才有意义。JobManager部署中的任何错误都会自动转换为错误状态,并且应该清除之前观察到的任何正在运行的JobStatus。没有健康的JobManager,作业就不能处于运行状态。

  1. 如果集群看起来是健康的,我们继续观察(应用程序和会话作业资源的)JobStatus。在这个阶段,我们使用JobManager的rest api查询有关当前作业状态和挂起的保存点进度的信息。

作业的当前状态决定了可以观察到的内容。对于终点作业状态(FAILEDFINISHED),我们还记录最后一次状态变更期间要使用的最后一个可用保存点信息。我们只能在终点状态下这样做,因为否则无法保证在升级时不会创建新的检查点。

如果无法访问作业,我们必须再次检查集群的状态(请参见步骤3),或者如果我们在集群上找不到作业,下一步将取决于资源类型和配置,并将触发错误,或者我们只需要等待。当无法确定作业的状态时,我们使用RECONCILING状态。

  1. 观察流程的最后一步是根据观察到的状态进行一些管理操作。如果一切正常并运行,我们将清除以前记录的资源状态错误。如果作业不再运行,我们将清除以前的保存点触发信息,以便稍后在协调阶段重新触发它。
  2. 在观察者阶段结束时,Operator将更新后的资源状态发送给Kubernetes。这是避免在后续阶段丢失关键信息的一个非常重要的步骤。这种状态丢失的一个例子是:您有一个失败/完成的作业,观察者会记录最新的保存点信息。协调者可能会决定删除此群集进行升级,但如果此时Operator出现故障,则观察者将无法再次记录上一个保存点,因为在已删除的群上无法再观察到它。在执行任何集群操作之前记录状态是逻辑的关键部分。

观察保存点信息

保存点信息是JobStatus的一部分,它根据配置跟踪挂起(手动或定期保存点触发信息)和保存点历史记录。如观察步骤中所示,保存点信息仅针对运行中和终止作业进行更新。如果作业失败、重启等,则意味着保存点失败,并需要重新触发。

我们使用triggerId观察挂起的保存点,如果它们完成了,我们将它们记录到历史记录中。如果历史记录达到配置的大小限制,我们将通过运行作业的rest-api处理保存点,这样我们就可以在没有任何用户存储凭据的情况下处理保存点。如果作业未运行或不正常,我们会清除挂起的保存点触发器信息,从Operator的角度来看,这基本上会中止保存点。

验证阶段

在成功观察到资源并更新状态后,我们接下来验证传入的资源规格字段。

如果新的规格验证失败,我们会向用户触发一个错误事件,并重置资源中最后一个成功提交的规格(我们不会在Kubernetes中更新它,只是在本地进行协调)。

这一步骤对于确保即使用户提交了不正确的规格也能运行协调非常重要,因此,如果在此期间部署的资源出现任何错误,则允许Operator稳定之前所需的状态。

协调阶段

最后一步是协调阶段,它将执行任何所需的集群操作,使资源达到最后一个所需的(有效的)规格。在某些情况下,在单个协调循环中就能达到所需的规格;在其他情况下我们可能需要多次循环,如下面的过程所示

理解观察者阶段将集群和资源的时间点视图记录到状态中是非常重要的。在大多数情况下,这种情况可能在未来的任何时候发生变化(运行中的作业可能在任何时候失败),在极少数情况下,它是稳定的(最终失败或完成的作业将保持状态)。因此,协调器逻辑必须始终考虑集群状态已经偏离状态的可能性(大多数复杂情况都是由这种情况引起的)。

基本协调步骤

AbstractFlinkResourceReconciler封装了所有Flink资源类型的核心协调流。在讨论会话、应用程序和会话作业资源的细节之前,让我们先看一下高层级流程。

  1. 检查资源是否已准备好进行协调,或者是否有任何不应中断的挂起操作(例如手动保存点)
  2. 如果这是资源的第一次部署尝试,只需部署它。这里需要注意的是,这是使用规格中提供的initialSavepointPath参数的唯一部署操作。
  3. 接下来,确定所需的规格是否发生了变化,以及变化的类型:IGNORESCALEUPGRADE。只有对于规模和升级类型的更改,才需要执行进一步的协调逻辑。
  4. 如果有升级/规模规格更改,将执行特定于资源类型的升级逻辑
  5. 如果没有收到任何规格更改,仍然必须确保当前部署的资源完全协调:
    1. 检查以前部署的资源是否失败/未稳定,如果是,则我们必须执行回滚操作。
    2. 针对单个资源应用任何进一步的协调步骤(触发保存点、恢复部署、重新启动不正常的集群等)

关于部署操作的说明

我们必须特别小心在启动集群和作业时的部署操作,这些集群和作业可能会立即开始生成数据和检查点。因此,能够识别部署何时成功/失败至关重要。同时,Operator操作可能在任何时间点失败,因此很难始终记录正确的状态。

为了确保我们能够始终恢复部署状态以及集群上正在运行的内容,在尝试部署之前,将要部署的规格始终写入具有UPGRADING状态的状态变量。此外,在已部署的Kubernetes Deployment资源中添加了一个注解,以便能够区分确切的部署尝试(我们使用自定义资源生成)。对于会话作业,由于我们无法添加注解,我们使用一种特殊的方式来生成包含相同信息的作业ID。

基本的作业协调器

AbstractJobReconciler负责执行Flink资源的共享逻辑,Flink资源还管理作业(Application和SessionJob集群)。这里,逻辑的核心部分以安全的方式管理作业状态和执行有状态作业更新。

根据规格更改SCALE/UPGRADE的类型,作业协调器的代码路径略有不同。对于扩缩容操作,如果启用了独立模式和反应式缩放,我们只需要重新缩放任务管理器。未来,我们还可能在这里为其他集群类型添加更有效的重新缩放(例如在上游Flink中实现后使用重新缩放API)

如果在规格中检测到升级类型更改,我们将执行作业升级流程:

  1. 如果作业当前正在运行,请根据需要挂起(可用的升级模式),稍后将对此进行详细说明
  2. 将状态变量标记为升级状态,并触发一个新的协调循环(这允许我们在升级过程中获取新的规格更改,因为暂停可能需要一段时间)
  3. 根据新规格从状态中记录的HA元数据或保存点信息或空状态(取决于升级模式设置)恢复作业

升级模式和挂起/取消行为

在进行有状态升级时,Operator必须始终遵守升级模式设置,以避免数据丢失。然而,该机制具有一定的灵活性,可以解决不健康的作业,并在版本升级期间提供额外的保障。getAvailableUpgradeMode方法是升级逻辑中的一个重要基石,它用于决定在用户请求和当前集群状态的情况下应该使用什么实际升级模式。

在正常的健康情况下,可用的升级模式将与用户在规格中的模式相同。但是,在某些情况下,我们必须在保存点和上次状态升级模式之间进行更改。只有当作业正常且正在运行时,才能使用保存点升级模式。对于失败、重新启动或其他不正常的部署,只要HA元数据可用(并且没有明确配置),我们就可以使用最后状态升级模式。这使我们即使在作业失败的情况下也能拥有健壮的升级流程,同时保持状态一致性。

Last-state升级模式是指使用存储在HA元数据中的检查点信息进行升级。在Flink次要版本之间升级时,HA元数据格式可能不兼容,因此版本更改必须强制执行保存点升级模式,并且需要健康运行的作业。

需要注意的是,我们从不将Last-state/savepoint升级模式更改为stateless,因为这会影响升级逻辑并导致状态丢失。

应用协调器

与我们在部署/升级/取消操作期间需要处理的会话作业相比,应用程序集群有更多额外的配置步骤。以下是Operator为确保健壮行为所做的一些重要事项:

setRandomJobResultStorePath为了避免在JobManager故障转移时重新启动已终止的应用程序,我们必须禁用作业结果清理。这迫使我们为每个应用程序部署创建一个随机作业结果存储路径。用户稍后需要手工清理作业结果存储jobresultstore)。

setJobIdIfNecessaryFlink默认情况下基于clusterId(在我们的例子中是自定义资源名称)生成确定性作业ID。如果作业从空状态重新启动(无状态升级),则会导致检查点路径冲突。因此,我们生成一个随机的jobid来避免这种情况。

cleanupTerminalJmAfterTtlFlink 1.15及以上版本在关闭/失败后,我们不会自动终止jobmanager进程,以获得稳健的观察者行为。然而,一旦观察到终止状态,我们就可以清理jobmanager进程/部署。

自定义Flink资源状态更新机制

JOSDK提供了用于更新协调器实现的资源规格和状态的内置方式,但是Flink Operator没有使用它,并且由于以下原因具有自定义状态更新机制。

使用JOSDK时,自定义资源的状态只能在协调方法结束时更新。在这里的案例中,我们通常需要在协调流的中间触发状态更新,以提供最大的鲁棒性。这种情况的一个示例是在执行部署操作之前将部署信息记录在状态中。

另一种方法是,Flink Operator将资源状态用作许多操作的预写日志(WAL),以确保在操作员出现故障时的鲁棒性。

状态更新机制在StatusRecorder类中实现,该类同时用作最新状态和更新器逻辑的缓存。我们需要在控制器流开始时始终从缓存更新自定义资源状态,因为我们绕过了JOSDK更新机制/缓存,这可能会导致返回旧的状态实例。对于实际的状态更新,我们使用了一种修改后的乐观锁定机制,只有在状态没有被外部修改的情况下,它才会更新状态。

在正常情况下,这一假设成立,因为Operator是状态的唯一所有者/更新者。这里的异常可能表明,受状态影响的用户或另一个Operator实例可能在管理相同资源的同时运行,这可能会导致严重问题。

0条评论
0 / 1000
YT20
20文章数
1粉丝数
YT20
20 文章 | 1 粉丝

Flink Operator控制流

2023-11-10 01:16:48
31
0

Flink Operator控制流

本文的目标是深入介绍Flink Operator逻辑并提供足够的控制流设计细节,以便新的开发人员可以快速开始使用。

本文的读者需具备良好的Flink Kubernates水平,并具备不同集群和工作类型的通用运维经验;对Flink Kubernates Operator本身也要有深入的用户级别理解。

本文主要关注的是Operator机制的why/how;而用户面对what已在其他文档中已进行过描述

核心Operator控制流(在FlinkDeploymentController和FlinkSessionJobController中实现的 包含了以下逻辑阶段:

  1. 观察当前部署资源的状态
  2. 校验新资源的规格
  3. 按照新规格和观察到的状态来协调任何所需的变化
  4. 按顺序重复以上操作

需要注意的是,所有这些步骤每次都会执行。无论验证的输出如何,始终执行观察和调解,但验证可能会影响调解目标我们将在下一节中看到这一点。

观察阶段

在观察阶段,观察者模块负责观察任何部署的Flink资源(已提交的集群、作业)的当前(时间点)状态,并在此基础上更新CR(自定义资源)状态字段。

观察者将始终使用以前部署的配置来工作,以确保它可以通过rest api访问Flink集群。用户配置可能会影响其余rest客户端(端口配置等),因此我们必须始终使用在集群上运行的配置。这就是FlinkConfigManager和Operator在整个实现过程中区分观察和部署配置的主要原因。

观察者永远不应该更改或提交新的资源,我们稍后将会看到这些行为是协调模块的责任。这种分离的主要原因是,所需的操作不仅取决于当前集群状态,还取决于用户在此期间可能提交的任何新规格更改。

观察者类的层次结构

观察者步骤

  1. 如果没有部署任何资源,则跳过观察
  2. 如果处于UPGRADING状态,将检查升级是否已完成。通常情况下,协调者会在成功提交后将状态更新为DEPLOYED,但短暂的错误可能会阻止此状态更新。因此,我们使用特定于资源的逻辑来检查集群上的资源是否已经匹配目标升级规格(注释、确定性作业ID等)。
  3. 对于FlinkDeployments,我们接下来观察Kubernetes集群资源、JobManagerDeployment 和Pods的状态。我们在升级后或无法访问Job 的Rest端点的任何时候都会执行此操作。此状态记录在jobManagerDeploymentStatus字段中。

这里需要注意的是,试图通过Flink rest API观察Jobs或任何东西只有在JobManager的Kubernetes资源看起来健康的情况下才有意义。JobManager部署中的任何错误都会自动转换为错误状态,并且应该清除之前观察到的任何正在运行的JobStatus。没有健康的JobManager,作业就不能处于运行状态。

  1. 如果集群看起来是健康的,我们继续观察(应用程序和会话作业资源的)JobStatus。在这个阶段,我们使用JobManager的rest api查询有关当前作业状态和挂起的保存点进度的信息。

作业的当前状态决定了可以观察到的内容。对于终点作业状态(FAILEDFINISHED),我们还记录最后一次状态变更期间要使用的最后一个可用保存点信息。我们只能在终点状态下这样做,因为否则无法保证在升级时不会创建新的检查点。

如果无法访问作业,我们必须再次检查集群的状态(请参见步骤3),或者如果我们在集群上找不到作业,下一步将取决于资源类型和配置,并将触发错误,或者我们只需要等待。当无法确定作业的状态时,我们使用RECONCILING状态。

  1. 观察流程的最后一步是根据观察到的状态进行一些管理操作。如果一切正常并运行,我们将清除以前记录的资源状态错误。如果作业不再运行,我们将清除以前的保存点触发信息,以便稍后在协调阶段重新触发它。
  2. 在观察者阶段结束时,Operator将更新后的资源状态发送给Kubernetes。这是避免在后续阶段丢失关键信息的一个非常重要的步骤。这种状态丢失的一个例子是:您有一个失败/完成的作业,观察者会记录最新的保存点信息。协调者可能会决定删除此群集进行升级,但如果此时Operator出现故障,则观察者将无法再次记录上一个保存点,因为在已删除的群上无法再观察到它。在执行任何集群操作之前记录状态是逻辑的关键部分。

观察保存点信息

保存点信息是JobStatus的一部分,它根据配置跟踪挂起(手动或定期保存点触发信息)和保存点历史记录。如观察步骤中所示,保存点信息仅针对运行中和终止作业进行更新。如果作业失败、重启等,则意味着保存点失败,并需要重新触发。

我们使用triggerId观察挂起的保存点,如果它们完成了,我们将它们记录到历史记录中。如果历史记录达到配置的大小限制,我们将通过运行作业的rest-api处理保存点,这样我们就可以在没有任何用户存储凭据的情况下处理保存点。如果作业未运行或不正常,我们会清除挂起的保存点触发器信息,从Operator的角度来看,这基本上会中止保存点。

验证阶段

在成功观察到资源并更新状态后,我们接下来验证传入的资源规格字段。

如果新的规格验证失败,我们会向用户触发一个错误事件,并重置资源中最后一个成功提交的规格(我们不会在Kubernetes中更新它,只是在本地进行协调)。

这一步骤对于确保即使用户提交了不正确的规格也能运行协调非常重要,因此,如果在此期间部署的资源出现任何错误,则允许Operator稳定之前所需的状态。

协调阶段

最后一步是协调阶段,它将执行任何所需的集群操作,使资源达到最后一个所需的(有效的)规格。在某些情况下,在单个协调循环中就能达到所需的规格;在其他情况下我们可能需要多次循环,如下面的过程所示

理解观察者阶段将集群和资源的时间点视图记录到状态中是非常重要的。在大多数情况下,这种情况可能在未来的任何时候发生变化(运行中的作业可能在任何时候失败),在极少数情况下,它是稳定的(最终失败或完成的作业将保持状态)。因此,协调器逻辑必须始终考虑集群状态已经偏离状态的可能性(大多数复杂情况都是由这种情况引起的)。

基本协调步骤

AbstractFlinkResourceReconciler封装了所有Flink资源类型的核心协调流。在讨论会话、应用程序和会话作业资源的细节之前,让我们先看一下高层级流程。

  1. 检查资源是否已准备好进行协调,或者是否有任何不应中断的挂起操作(例如手动保存点)
  2. 如果这是资源的第一次部署尝试,只需部署它。这里需要注意的是,这是使用规格中提供的initialSavepointPath参数的唯一部署操作。
  3. 接下来,确定所需的规格是否发生了变化,以及变化的类型:IGNORESCALEUPGRADE。只有对于规模和升级类型的更改,才需要执行进一步的协调逻辑。
  4. 如果有升级/规模规格更改,将执行特定于资源类型的升级逻辑
  5. 如果没有收到任何规格更改,仍然必须确保当前部署的资源完全协调:
    1. 检查以前部署的资源是否失败/未稳定,如果是,则我们必须执行回滚操作。
    2. 针对单个资源应用任何进一步的协调步骤(触发保存点、恢复部署、重新启动不正常的集群等)

关于部署操作的说明

我们必须特别小心在启动集群和作业时的部署操作,这些集群和作业可能会立即开始生成数据和检查点。因此,能够识别部署何时成功/失败至关重要。同时,Operator操作可能在任何时间点失败,因此很难始终记录正确的状态。

为了确保我们能够始终恢复部署状态以及集群上正在运行的内容,在尝试部署之前,将要部署的规格始终写入具有UPGRADING状态的状态变量。此外,在已部署的Kubernetes Deployment资源中添加了一个注解,以便能够区分确切的部署尝试(我们使用自定义资源生成)。对于会话作业,由于我们无法添加注解,我们使用一种特殊的方式来生成包含相同信息的作业ID。

基本的作业协调器

AbstractJobReconciler负责执行Flink资源的共享逻辑,Flink资源还管理作业(Application和SessionJob集群)。这里,逻辑的核心部分以安全的方式管理作业状态和执行有状态作业更新。

根据规格更改SCALE/UPGRADE的类型,作业协调器的代码路径略有不同。对于扩缩容操作,如果启用了独立模式和反应式缩放,我们只需要重新缩放任务管理器。未来,我们还可能在这里为其他集群类型添加更有效的重新缩放(例如在上游Flink中实现后使用重新缩放API)

如果在规格中检测到升级类型更改,我们将执行作业升级流程:

  1. 如果作业当前正在运行,请根据需要挂起(可用的升级模式),稍后将对此进行详细说明
  2. 将状态变量标记为升级状态,并触发一个新的协调循环(这允许我们在升级过程中获取新的规格更改,因为暂停可能需要一段时间)
  3. 根据新规格从状态中记录的HA元数据或保存点信息或空状态(取决于升级模式设置)恢复作业

升级模式和挂起/取消行为

在进行有状态升级时,Operator必须始终遵守升级模式设置,以避免数据丢失。然而,该机制具有一定的灵活性,可以解决不健康的作业,并在版本升级期间提供额外的保障。getAvailableUpgradeMode方法是升级逻辑中的一个重要基石,它用于决定在用户请求和当前集群状态的情况下应该使用什么实际升级模式。

在正常的健康情况下,可用的升级模式将与用户在规格中的模式相同。但是,在某些情况下,我们必须在保存点和上次状态升级模式之间进行更改。只有当作业正常且正在运行时,才能使用保存点升级模式。对于失败、重新启动或其他不正常的部署,只要HA元数据可用(并且没有明确配置),我们就可以使用最后状态升级模式。这使我们即使在作业失败的情况下也能拥有健壮的升级流程,同时保持状态一致性。

Last-state升级模式是指使用存储在HA元数据中的检查点信息进行升级。在Flink次要版本之间升级时,HA元数据格式可能不兼容,因此版本更改必须强制执行保存点升级模式,并且需要健康运行的作业。

需要注意的是,我们从不将Last-state/savepoint升级模式更改为stateless,因为这会影响升级逻辑并导致状态丢失。

应用协调器

与我们在部署/升级/取消操作期间需要处理的会话作业相比,应用程序集群有更多额外的配置步骤。以下是Operator为确保健壮行为所做的一些重要事项:

setRandomJobResultStorePath为了避免在JobManager故障转移时重新启动已终止的应用程序,我们必须禁用作业结果清理。这迫使我们为每个应用程序部署创建一个随机作业结果存储路径。用户稍后需要手工清理作业结果存储jobresultstore)。

setJobIdIfNecessaryFlink默认情况下基于clusterId(在我们的例子中是自定义资源名称)生成确定性作业ID。如果作业从空状态重新启动(无状态升级),则会导致检查点路径冲突。因此,我们生成一个随机的jobid来避免这种情况。

cleanupTerminalJmAfterTtlFlink 1.15及以上版本在关闭/失败后,我们不会自动终止jobmanager进程,以获得稳健的观察者行为。然而,一旦观察到终止状态,我们就可以清理jobmanager进程/部署。

自定义Flink资源状态更新机制

JOSDK提供了用于更新协调器实现的资源规格和状态的内置方式,但是Flink Operator没有使用它,并且由于以下原因具有自定义状态更新机制。

使用JOSDK时,自定义资源的状态只能在协调方法结束时更新。在这里的案例中,我们通常需要在协调流的中间触发状态更新,以提供最大的鲁棒性。这种情况的一个示例是在执行部署操作之前将部署信息记录在状态中。

另一种方法是,Flink Operator将资源状态用作许多操作的预写日志(WAL),以确保在操作员出现故障时的鲁棒性。

状态更新机制在StatusRecorder类中实现,该类同时用作最新状态和更新器逻辑的缓存。我们需要在控制器流开始时始终从缓存更新自定义资源状态,因为我们绕过了JOSDK更新机制/缓存,这可能会导致返回旧的状态实例。对于实际的状态更新,我们使用了一种修改后的乐观锁定机制,只有在状态没有被外部修改的情况下,它才会更新状态。

在正常情况下,这一假设成立,因为Operator是状态的唯一所有者/更新者。这里的异常可能表明,受状态影响的用户或另一个Operator实例可能在管理相同资源的同时运行,这可能会导致严重问题。

文章来自个人专栏
云存储
20 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0