侧边栏壁纸
博主头像
云录博主等级

行动起来,活在当下

  • 累计撰写 24 篇文章
  • 累计创建 11 个标签
  • 累计收到 18 条评论

目 录CONTENT

文章目录

【云原生 | Kubernetes 系列】—Flink On Kubernetes Session

Dylan
2023-12-21 / 0 评论 / 0 点赞 / 73 阅读 / 13070 字 / 正在检测是否收录...
广告 广告

【云原生 | Kubernetes 系列】—Flink On Kubernetes Session集群部署

版权 本文为云篆录原创文章,转载无需和我联系,但请注明来自云篆录 https://www.yunzhuan.site

🥭 一: 概述

Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。
其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。

FLink on yarn 有三种运行模式:

yarn-session模式(Seesion Mode)

yarn-cluster模式(Per-Job Mode)

Application模式(Application Mode)

img.png

【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在FLINK-26000中。

img_1.png

session模式

Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。 Kubernetes 中的Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署
  • TaskManagers池的部署
  • 暴露JobManager 的REST 和 UI 端口的服务

创建和启动Session集群

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

  log4j-cli.properties: |+
    rootLogger.level = INFO
    rootLogger.appenderRef.file.ref = FileAppender
    # Log all infos in the given file
    appender.file.name = FileAppender
    appender.file.type = FILE
    appender.file.append = false
    appender.file.fileName = ${sys:log.file}
    appender.file.layout.type = PatternLayout
    appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    # Log output from org.apache.flink.yarn to the console. This is used by the
    # CliFrontend class when using a per-job YARN cluster.
    logger.yarn.name = org.apache.flink.yarn
    logger.yarn.level = INFO
    logger.yarn.appenderRef.console.ref = ConsoleAppender
    logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
    logger.yarncli.level = INFO
    logger.yarncli.appenderRef.console.ref = ConsoleAppender
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.hadoop.appenderRef.console.ref = ConsoleAppender
    # Log output from org.apache.flink.kubernetes to the console.
    logger.kubernetes.name = org.apache.flink.kubernetes
    logger.kubernetes.level = INFO
    logger.kubernetes.appenderRef.console.ref = ConsoleAppender
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    # suppress the warning that hadoop native libraries are not loaded (irrelevant for the client)
    logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
    logger.hadoopnative.level = OFF
    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF


  log4j-session.properties: |+
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = WARN
    logger.curator.name = org.apache.flink.shaded.org.apache.curator.framework
    logger.curator.level = WARN
    logger.runtimeutils.name= org.apache.flink.runtime.util.ZooKeeperUtils
    logger.runtimeutils.level = WARN
    logger.runtimeleader.name = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
    logger.runtimeleader.level = WARN

2:创建JobManager Service

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

3: 创建JobManager和TaskManager Deployment

jobmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.11.1-scala_2.11
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.11.1-scala_2.11
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

jobmanager-rest-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30086
  selector:
    app: flink
    component: jobmanager

5:最终文件

[root session-mode]# ll
total 28
-rw-rw-r-- 1 dsyadmin dsyadmin 5252 Jun  1 09:12 flink-configuration-configmap.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin  224 Jan 13  2021 jobmanager-rest-service.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin  246 Jan 13  2021 jobmanager-service.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 1326 Jun  1 09:21 jobmanager-session-deployment.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin  240 Jun  1 09:22 taskmanager-query-state-service.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 1280 Jun  1 09:22 taskmanager-session-deployment.yaml

然后执行所有文件并查看

kubectl  apply   -f   ./
[root@021rjsh102045s session-mode]# kubectl   get   po 
NAME                                      READY   STATUS    RESTARTS      AGE
flink-jobmanager-65784cc466-7xvfk         2/2     Running   0             7d1h
flink-taskmanager-7f547b7b94-9rlrf        2/2     Running   0             7d1h
flink-taskmanager-7f547b7b94-xzgbz        2/2     Running   0             7d1h
[root@021rjsh102045s session-mode]# kubectl   get   svc 
NAME                            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                                       AGE
flink-jobmanager                ClusterIP   10.104.83.126    <none>        6123/TCP,6124/TCP,8081/TCP                                    7d1h
flink-jobmanager-rest           NodePort    10.101.113.59    <none>        8081:30086/TCP                                                7d1h
flink-taskmanager-query-state   NodePort    10.106.48.198    <none>        6125:30025/TCP                                                7d1h
# 通过kubectl get svc 获取端口和地址信息
[root@node01 flink-training] kubectl get svc flink-jobmanager-rest
NAME                    TYPE       CLUSTER-IP     EXTERNAL-IP   PORT(S)          AGE
flink-jobmanager-rest   NodePort   10.96.174.29   <none>        8081:30086/TCP   13h
# 通过指定宿主机节点地址和端口提交Job
./bin/flink run -m node01:30086 ./examples/streaming/WordCount.jar

访问dashaboard

http://节点ip:30086/#/overview

我这里跑了一个例子 img_2.png

删除创建的组件和服务

kubectl  delete   -f   ./
0

评论区