searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

K8s Application模式下的flink任务执行精要

2023-05-30 07:08:32
92
0

构键k8s集群

  1. 在这里,我们需要搭建一个K8S环境用于提供flink任务的运行时环境。在这里推荐使用kubeadm或者一些脚本工具搭建,可参考本自动k8s脚本工具。具体过程在这里省略,可以参考上述链接中的文档进行操作。

  2. 需要注意的是,我们需要在相应用户的目录下提供一个kubeconfig文件,如下图所示,通过该文件,StreamPark才能顺利地调用K8S客户端提交任务,该config的内容为与K8S的ApiServer进行连接时需要使用的信息。

提供flink运行任务的环境

  1. 将kubeconfig提供出来,供flink客户端调用

  2. 在这里主要提供一个供flink使用命名空间、和sa

    # 创建namespace
    kubectl create ns flink-dev
    # 创建serviceaccount
    kubectl create serviceaccount flink-service-account -n flink-dev
    # 用户授权
    kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-dev:flink-service-account

下载flink客户端

flink客户端是控制flink的核心,需要下载并部署

wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
tar -xf flink-1.14.3-bin-scala_2.12.tgz

任务编程

任务jar生成过程

在这里,主要提供一个flink任务案例供flink k8s application进行调用

  1. 开发java代码,供使用,本示例项目较为简单,仅为将数据输出至mysql中,调用mysql-connector进行实现

    package cn.ctyun.demo;

    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    public class SinkToMySQL {
       public static void main(String[] args) throws Exception {
    //       从启动参数中获取连接信息
           ParameterTool parameterTool = ParameterTool.fromArgs(new String[]{"url", "passwd", "user"});
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.setParallelism(1);


           DataStreamSource<Event> stream = env.fromElements(
                   new Event("Mary", "./home", 1000L),
                   new Event("Bob", "./cart", 2000L),
                   new Event("Alice", "./prod?id=100", 3000L),
                   new Event("Alice", "./prod?id=200", 3500L),
                   new Event("Bob", "./prod?id=2", 2500L),
                   new Event("Alice", "./prod?id=300", 3600L),
                   new Event("Bob", "./home", 3000L),
                   new Event("Bob", "./prod?id=1", 2300L),
                   new Event("Bob", "./prod?id=3", 3300L));

           stream.addSink(
                   JdbcSink.sink(
                           "INSERT INTO clicks (user, url) VALUES (?, ?)",
                          (statement, r) -> {
                               statement.setString(1, r.user);
                               statement.setString(2, r.url);
                          },
                           new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                  .withUrl(parameterTool.get("url"))
                                  .withDriverName("com.mysql.jdbc.Driver")
                                  .withUsername(parameterTool.get("user"))
                                  .withPassword(parameterTool.get("passwd"))
                                  .build()
                  )
          );
           env.execute();
      }
    }
  2. 项目打包

    防止一些依赖缺失,这里使用fatjar的方式进行打包,注意,这里使用了jar-with-dependencies方法进行打包,即将依赖全部打入到相应的jar包中,这样可以防止平台上的flink因为以来缺失问题导致无法使用flink程序。maven相关的设置如下所示:

    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    </configuration>
    <executions>
    <execution>
    <id>make-assembly</id>
    <phase>package</phase>
    <goals>
    <goal>single</goal>
    </goals>
    </execution>
    </executions>
    </plugin>

    之后通过命令mvn package进行打包,注意将打包后带有with-dependencies.jar后缀的jar包留下。以供使用

  3. 制作镜像,在这里通过官方镜像作为基础镜像进行构建,

    使用docker进行镜像生成,使用命令为docker build -t /flink-demo-jar-job:1.0-SNAPSHOT .

    FROM apache/flink:1.14.3-scala_2.12
    RUN mkdir -p $FLINK_HOME/usrlib
    COPY lib $FLINK_HOME/lib/
    COPY flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar
  4. 推送镜像

     

    这里推送镜像一般会推送到默认的dockerhub相应的仓库。如果需要push到自己的镜像仓,则需要修改相应的镜像前缀${docker_repository}为自己的镜像仓位置
    docker push ${docker_repository}/flink-demo-jar-job:1.0-SNAPSHOT

     

k8s Application运行

Application模式架构

在k8s application模式下,用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。

启动命令

这里我们可以指定一定的运行参数,相关的参数设定方案请参考官方文档https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/config/#kubernetes

./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
# 指定容器启动的镜像(与之前提交的保持一致)
-Dkubernetes.container.image=****/flink-demo-jar-job:1.0-SNAPSHOT \
-Dkubernetes.jobmanager.replicas=1 \
# 指定容器运行的命名空间
-Dkubernetes.namespace=flink-dev \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.memory.process.size=4096mb \
-Dkubernetes.jobmanager.cpu=1 \
-Djobmanager.memory.process.size=4096mb \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dclassloader.resolve-order=parent-first \
# yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包、配置文件和持久化一些数据
# -Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \
# Main方法
-c cn.ctyun.demo.SinkToMySQL \
# 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机)
local:///usr/local/flink/lib/flink-realtime-1.0-SNAPSHOT.jar \
# 如下将提供mysql的连接信息,通过参数的方式传递给jar包
--passwd ****** \
--user ******\
--url ******

PodTemplate

PodTemplate主要是通过指定pod的启动样例,在podtemplate中可以指定域名、挂载路径、配置文件、初始化容器等信息,如下给出一个提供一个持久化保存日志的PodTemplate。

apiVersion: v1
kind: Pod
metadata:
name: jobmanager-pod-template
spec:
initContainers:
  - name: artifacts-fetcher
    image: artifacts-fetcher:latest
     # Use wget or other tools to get user jars from remote storage
    command: [ 'wget', 'https://path/of/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
    volumeMounts:
      - mountPath: /flink-artifact
        name: flink-artifact
containers:
   # Do not change the main container name
  - name: flink-main-container
    resources:
      requests:
        ephemeral-storage: 2048Mi
      limits:
        ephemeral-storage: 2048Mi
    volumeMounts:
      - mountPath: /opt/flink/volumes/hostpath
        name: flink-volume-hostpath
      - mountPath: /opt/flink/artifacts
        name: flink-artifact
      - mountPath: /opt/flink/log
        name: flink-logs
     # Use sidecar container to push logs to remote storage or do some other debugging things
  - name: sidecar-log-collector
    image: sidecar-log-collector:latest
    command: [ 'command-to-upload', '/remote/path/of/flink-logs/' ]
    volumeMounts:
      - mountPath: /flink-logs
        name: flink-logs
volumes:
  - name: flink-volume-hostpath
    hostPath:
      path: /tmp
      type: Directory
  - name: flink-artifact
    emptyDir: { }
  - name: flink-logs
    emptyDir: { }

可知,通过如上的配置文件,启动taskmanager、JobManager后将能够提供挂载功能,能够将主容器中存储日志的目录进行挂载,供另一个容器artifacts-fetcher获取并通过其内置脚本command-to-upload实时将日志进行上传。该功能是flink官方提供的一种通过podtemplate方法解决flink中日志持久化问题的一个案例,具体podTemplate的使用需要结合实际需求场景进行调整。

 

0条评论
0 / 1000
l****n
17文章数
0粉丝数
l****n
17 文章 | 0 粉丝
原创

K8s Application模式下的flink任务执行精要

2023-05-30 07:08:32
92
0

构键k8s集群

  1. 在这里,我们需要搭建一个K8S环境用于提供flink任务的运行时环境。在这里推荐使用kubeadm或者一些脚本工具搭建,可参考本自动k8s脚本工具。具体过程在这里省略,可以参考上述链接中的文档进行操作。

  2. 需要注意的是,我们需要在相应用户的目录下提供一个kubeconfig文件,如下图所示,通过该文件,StreamPark才能顺利地调用K8S客户端提交任务,该config的内容为与K8S的ApiServer进行连接时需要使用的信息。

提供flink运行任务的环境

  1. 将kubeconfig提供出来,供flink客户端调用

  2. 在这里主要提供一个供flink使用命名空间、和sa

    # 创建namespace
    kubectl create ns flink-dev
    # 创建serviceaccount
    kubectl create serviceaccount flink-service-account -n flink-dev
    # 用户授权
    kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-dev:flink-service-account

下载flink客户端

flink客户端是控制flink的核心,需要下载并部署

wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
tar -xf flink-1.14.3-bin-scala_2.12.tgz

任务编程

任务jar生成过程

在这里,主要提供一个flink任务案例供flink k8s application进行调用

  1. 开发java代码,供使用,本示例项目较为简单,仅为将数据输出至mysql中,调用mysql-connector进行实现

    package cn.ctyun.demo;

    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    public class SinkToMySQL {
       public static void main(String[] args) throws Exception {
    //       从启动参数中获取连接信息
           ParameterTool parameterTool = ParameterTool.fromArgs(new String[]{"url", "passwd", "user"});
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.setParallelism(1);


           DataStreamSource<Event> stream = env.fromElements(
                   new Event("Mary", "./home", 1000L),
                   new Event("Bob", "./cart", 2000L),
                   new Event("Alice", "./prod?id=100", 3000L),
                   new Event("Alice", "./prod?id=200", 3500L),
                   new Event("Bob", "./prod?id=2", 2500L),
                   new Event("Alice", "./prod?id=300", 3600L),
                   new Event("Bob", "./home", 3000L),
                   new Event("Bob", "./prod?id=1", 2300L),
                   new Event("Bob", "./prod?id=3", 3300L));

           stream.addSink(
                   JdbcSink.sink(
                           "INSERT INTO clicks (user, url) VALUES (?, ?)",
                          (statement, r) -> {
                               statement.setString(1, r.user);
                               statement.setString(2, r.url);
                          },
                           new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                  .withUrl(parameterTool.get("url"))
                                  .withDriverName("com.mysql.jdbc.Driver")
                                  .withUsername(parameterTool.get("user"))
                                  .withPassword(parameterTool.get("passwd"))
                                  .build()
                  )
          );
           env.execute();
      }
    }
  2. 项目打包

    防止一些依赖缺失,这里使用fatjar的方式进行打包,注意,这里使用了jar-with-dependencies方法进行打包,即将依赖全部打入到相应的jar包中,这样可以防止平台上的flink因为以来缺失问题导致无法使用flink程序。maven相关的设置如下所示:

    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    </configuration>
    <executions>
    <execution>
    <id>make-assembly</id>
    <phase>package</phase>
    <goals>
    <goal>single</goal>
    </goals>
    </execution>
    </executions>
    </plugin>

    之后通过命令mvn package进行打包,注意将打包后带有with-dependencies.jar后缀的jar包留下。以供使用

  3. 制作镜像,在这里通过官方镜像作为基础镜像进行构建,

    使用docker进行镜像生成,使用命令为docker build -t /flink-demo-jar-job:1.0-SNAPSHOT .

    FROM apache/flink:1.14.3-scala_2.12
    RUN mkdir -p $FLINK_HOME/usrlib
    COPY lib $FLINK_HOME/lib/
    COPY flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar
  4. 推送镜像

     

    这里推送镜像一般会推送到默认的dockerhub相应的仓库。如果需要push到自己的镜像仓,则需要修改相应的镜像前缀${docker_repository}为自己的镜像仓位置
    docker push ${docker_repository}/flink-demo-jar-job:1.0-SNAPSHOT

     

k8s Application运行

Application模式架构

在k8s application模式下,用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。

启动命令

这里我们可以指定一定的运行参数,相关的参数设定方案请参考官方文档https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/config/#kubernetes

./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
# 指定容器启动的镜像(与之前提交的保持一致)
-Dkubernetes.container.image=****/flink-demo-jar-job:1.0-SNAPSHOT \
-Dkubernetes.jobmanager.replicas=1 \
# 指定容器运行的命名空间
-Dkubernetes.namespace=flink-dev \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.memory.process.size=4096mb \
-Dkubernetes.jobmanager.cpu=1 \
-Djobmanager.memory.process.size=4096mb \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dclassloader.resolve-order=parent-first \
# yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包、配置文件和持久化一些数据
# -Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \
# Main方法
-c cn.ctyun.demo.SinkToMySQL \
# 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机)
local:///usr/local/flink/lib/flink-realtime-1.0-SNAPSHOT.jar \
# 如下将提供mysql的连接信息,通过参数的方式传递给jar包
--passwd ****** \
--user ******\
--url ******

PodTemplate

PodTemplate主要是通过指定pod的启动样例,在podtemplate中可以指定域名、挂载路径、配置文件、初始化容器等信息,如下给出一个提供一个持久化保存日志的PodTemplate。

apiVersion: v1
kind: Pod
metadata:
name: jobmanager-pod-template
spec:
initContainers:
  - name: artifacts-fetcher
    image: artifacts-fetcher:latest
     # Use wget or other tools to get user jars from remote storage
    command: [ 'wget', 'https://path/of/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
    volumeMounts:
      - mountPath: /flink-artifact
        name: flink-artifact
containers:
   # Do not change the main container name
  - name: flink-main-container
    resources:
      requests:
        ephemeral-storage: 2048Mi
      limits:
        ephemeral-storage: 2048Mi
    volumeMounts:
      - mountPath: /opt/flink/volumes/hostpath
        name: flink-volume-hostpath
      - mountPath: /opt/flink/artifacts
        name: flink-artifact
      - mountPath: /opt/flink/log
        name: flink-logs
     # Use sidecar container to push logs to remote storage or do some other debugging things
  - name: sidecar-log-collector
    image: sidecar-log-collector:latest
    command: [ 'command-to-upload', '/remote/path/of/flink-logs/' ]
    volumeMounts:
      - mountPath: /flink-logs
        name: flink-logs
volumes:
  - name: flink-volume-hostpath
    hostPath:
      path: /tmp
      type: Directory
  - name: flink-artifact
    emptyDir: { }
  - name: flink-logs
    emptyDir: { }

可知,通过如上的配置文件,启动taskmanager、JobManager后将能够提供挂载功能,能够将主容器中存储日志的目录进行挂载,供另一个容器artifacts-fetcher获取并通过其内置脚本command-to-upload实时将日志进行上传。该功能是flink官方提供的一种通过podtemplate方法解决flink中日志持久化问题的一个案例,具体podTemplate的使用需要结合实际需求场景进行调整。

 

文章来自个人专栏
flink使用和知识整理
6 文章 | 2 订阅
0条评论
0 / 1000
请输入你的评论
0
0