【云原生 | Kubernetes 系列】—Flink On Kubernetes Session集群部署
版权 本文为云篆录原创文章,转载无需和我联系,但请注明来自云篆录 https://www.yunzhuan.site
🥭 一: 概述
Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。
其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
🍎 二: Flink 运行模式
FLink on yarn 有三种运行模式:
yarn-session模式(Seesion Mode)
yarn-cluster模式(Per-Job Mode)
Application模式(Application Mode)
【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在FLINK-26000中。
🍏 三: Flink on k8s实战操作
session模式
Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。 Kubernetes 中的Flink Session 集群部署至少包含三个组件:
- 运行JobManager的部署
- TaskManagers池的部署
- 暴露JobManager 的REST 和 UI 端口的服务
创建和启动Session集群
1:创建flink conf configmap 资源
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
log4j-cli.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = FileAppender
# Log all infos in the given file
appender.file.name = FileAppender
appender.file.type = FILE
appender.file.append = false
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN cluster.
logger.yarn.name = org.apache.flink.yarn
logger.yarn.level = INFO
logger.yarn.appenderRef.console.ref = ConsoleAppender
logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
logger.yarncli.level = INFO
logger.yarncli.appenderRef.console.ref = ConsoleAppender
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.hadoop.appenderRef.console.ref = ConsoleAppender
# Log output from org.apache.flink.kubernetes to the console.
logger.kubernetes.name = org.apache.flink.kubernetes
logger.kubernetes.level = INFO
logger.kubernetes.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# suppress the warning that hadoop native libraries are not loaded (irrelevant for the client)
logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
logger.hadoopnative.level = OFF
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
log4j-session.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = WARN
logger.curator.name = org.apache.flink.shaded.org.apache.curator.framework
logger.curator.level = WARN
logger.runtimeutils.name= org.apache.flink.runtime.util.ZooKeeperUtils
logger.runtimeutils.level = WARN
logger.runtimeleader.name = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
logger.runtimeleader.level = WARN
2:创建JobManager Service
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
3: 创建JobManager和TaskManager Deployment
jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.11.1-scala_2.11
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.11.1-scala_2.11
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
4:创建Restful服务,暴露Flink Web UI端口
jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30086
selector:
app: flink
component: jobmanager
5:最终文件
[root session-mode]# ll
total 28
-rw-rw-r-- 1 dsyadmin dsyadmin 5252 Jun 1 09:12 flink-configuration-configmap.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 224 Jan 13 2021 jobmanager-rest-service.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 246 Jan 13 2021 jobmanager-service.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 1326 Jun 1 09:21 jobmanager-session-deployment.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 240 Jun 1 09:22 taskmanager-query-state-service.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 1280 Jun 1 09:22 taskmanager-session-deployment.yaml
然后执行所有文件并查看
kubectl apply -f ./
[root@021rjsh102045s session-mode]# kubectl get po
NAME READY STATUS RESTARTS AGE
flink-jobmanager-65784cc466-7xvfk 2/2 Running 0 7d1h
flink-taskmanager-7f547b7b94-9rlrf 2/2 Running 0 7d1h
flink-taskmanager-7f547b7b94-xzgbz 2/2 Running 0 7d1h
[root@021rjsh102045s session-mode]# kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-jobmanager ClusterIP 10.104.83.126 <none> 6123/TCP,6124/TCP,8081/TCP 7d1h
flink-jobmanager-rest NodePort 10.101.113.59 <none> 8081:30086/TCP 7d1h
flink-taskmanager-query-state NodePort 10.106.48.198 <none> 6125:30025/TCP 7d1h
提交任务到Flink Session集群
# 通过kubectl get svc 获取端口和地址信息
[root@node01 flink-training] kubectl get svc flink-jobmanager-rest
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-jobmanager-rest NodePort 10.96.174.29 <none> 8081:30086/TCP 13h
# 通过指定宿主机节点地址和端口提交Job
./bin/flink run -m node01:30086 ./examples/streaming/WordCount.jar
访问dashaboard
我这里跑了一个例子
删除创建的组件和服务
kubectl delete -f ./
评论区