# 【云原生 | Kubernetes 系列】—K8S部署kafka集群
版权 本文为云篆录原创文章,转载无需和我联系,但请注明来自云篆录 https://www.yunzhuan.site
🍇 一:基础说明
Kafka和zookeeper是两种典型的有状态的应用集群服务。首先kafka和zookeeper都需要存储盘来保存有状态信息;其次kafka和zookeeper每一个实例都需要有对应的实例Id (Kafka需broker.id, zookeeper需要my.id) 来作为集群内部每个成员的标识,集群内节点之间进行内部通信时需要用到这些标识。
对于这类服务的部署,需要解决两个大的问题:一个是状态保存,另一个是集群管理 (多服务实例管理)。kubernetes中提的StatefulSet方便了有状态集群服务在上的部署和管理。通常来说,通过下面三个手段来实现有状态集群服务的部署:
- 通过Init Container来做集群的初始化工 作。
- 通过Headless Service来维持集群成员的稳定关系。
- 通过Persistent Volume和Persistent Volume Claim提供网络存储来持久化数据。
因此,在K8S集群里面部署类似kafka、zookeeper这种有状态的服务,不能使用Deployment,必须使用StatefulSet来部署,有状态简单来说就是需要持久化数据,比如日志、数据库数据、服务状态等。
StatefulSet 应用场景:
- 稳定的持久化存储,即Pod重新调度后还是能访问到相同的持久化数据,基于PVC来实现
- 稳定的网络标志,即Pod重新调度后其PodName和HostName不变,基于Headless Service(即没有Cluster IP的Service)来实现
- 有序部署,有序扩展,即Pod是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依次进行(即从0到N-1,在下一个Pod运行之前所有之前的Pod必须都是Running和Ready状态),基于init containers来实现
- 有序收缩,有序删除(即从N-1到0)
StatefulSet组成:
- 用于定义网络标志(DNS domain)的Headless Service
- 用于创建PersistentVolumes的volumeClaimTemplates
- 定义具体应用的StatefulSet
StatefulSet中每个Pod的DNS格式为:
statefulSetName-{0..N-1}.serviceName.namespace.svc.cluster.local,其中:
- statefulSetName为StatefulSet的名字
- 0..N-1为Pod所在的序号,从0开始到N-1
- serviceName为Headless Service的名字
- namespace为服务所在的namespace,Headless Servic和StatefulSet必须在相同的namespace
- svc.cluster.local为K8S的Cluster Domain集群根域
🍋 准备
1:准备存储,这里我用ceph,需要自己准备存储 2:下载包:kafka_2.12-2.8.2.tgz
🥭部署
创建ZK集群
apiVersion: v1
kind: Service
metadata:
namespace: zk
name: zk-hs
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
clusterIP: None
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
namespace: zk
name: zk-cs
labels:
app: zk
spec:
type: NodePort
ports:
- port: 2181
targetPort: 2181
name: client
nodePort: 32181
selector:
app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
namespace: zk
name: zk-pdb
spec:
selector:
matchLabels:
app: zk
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: zk
name: zok
spec:
serviceName: zk-hs
replicas: 3
selector:
matchLabels:
app: zk
template:
metadata:
labels:
app: zk
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
containers:
- name: kubernetes-zookeeper
imagePullPolicy: Always
# image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
image: bitnami/zookeeper:3.8.1
resources:
requests:
memory: "1024Mi"
cpu: "500m"
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
volumeClaimTemplates:
- metadata:
name: datadir
annotations:
volume.beta.kubernetes.io/storage-class: "nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
创建并查看
[root zk]# kubectl apply -f zk.yaml
service/zk-hs created
service/zk-cs created
poddisruptionbudget.policy/zk-pdb created
statefulset.apps/zok created
[root ~]# kubectl get po -n zk
NAME READY STATUS RESTARTS AGE
zok-0 1/1 Running 0 41d
zok-1 1/1 Running 0 41d
zok-2 1/1 Running 0 41d
[root ~]# kubectl get svc -n zk
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
zk-cs NodePort 10.98.87.163 <none> 2181:32181/TCP 41d
zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 41d
查看zk集群主从关系
[root ~]# kubectl exec -ti zok-0 -n zk -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: follower
[root ~]# kubectl exec -ti zok-1 -n zk -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: follower
[root ~]# kubectl exec -ti zok-2 -n zk -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: leader
创建KAFKA集群
这里要求能在外部访问到Kafka集群,需要配置三个外网地址,或者一个外网地址+三个端口,依次代理对应后端三个Kafka的Pod容器实例;外网连接Kafka,需要配置使用advertised.listeners监听器;
- advertised_listeners 监听器是对外暴露的服务端口,kafka组件之间通讯用的是 listeners;
- advertised_listeners 监听器会注册在 zookeeper 中;
- 当我们通过对"<公网ip>:端口"请求建立连接,kafka服务器会通过 zookeeper 中注册的监听器,找到定义的公网监听器(默认监听器是PLAINTEXT、也可以自定义),然后通过listeners中找到对应的"通讯ip和端口";
1: kafka kafka_2.12-2.8.2版本的Dockerfile镜像制作
FROM openjdk:8
RUN rm -f /etc/localtime \
&& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& echo "Asia/Shanghai" > /etc/timezone
ENV LANG en_US.UTF-8
ENV KAFKA_DATA_DIR /var/lib/kafka/data
ENV JAVA_HOME /usr/local/openjdk-8
ENV KAFKA_HOME /opt/kafka
ENV PATH /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/kafka/bin
COPY kafka_2.12-2.8.2.tgz /opt/kafka_2.12-2.8.2.tgz
WORKDIR /opt/
RUN tar -zxvf kafka_2.12-2.8.2.tgz && rm -rf kafka_2.12-2.8.2.tgz && mv kafka_2.12-2.8.2 kafka && mkdir -p /var/lib/kafka/data
CMD ["/bin/bash"]
2:制作镜像并上传到Harbor
docker build -t 127.0.0.1/ops/kafka:v2.12-2.8.2 .
docker push 127.0.0.1/ops/kafka:v2.12-2.8.2
3:创建kafka集群 kafka1.yaml
apiVersion: v1
kind: Service
metadata:
namespace: kafka
name: kafka-hs1
labels:
app: kafka1
spec:
ports:
- port: 1099
name: jmx1
clusterIP: None
selector:
app: kafka1
---
apiVersion: v1
kind: Service
metadata:
namespace: kafka
name: kafka-cs1
labels:
app: kafka1
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
name: client1
nodePort: 32092
selector:
app: kafka1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: kafka
name: kafoka1
spec:
serviceName: kafka-hs1
replicas: 1
selector:
matchLabels:
app: kafka1
template:
metadata:
labels:
app: kafka1
spec:
containers:
- name: k8skafka
imagePullPolicy: Always
image: 127.0.0.1/ops/kafka:v2.12-2.8.2
ports:
- containerPort: 9092
name: client1
- containerPort: 1099
name: jmx1
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=0 \
--override advertised.listeners=PLAINTEXT://节点ip:32092 \
--override zookeeper.connect=zok-0.zk-hs.zk.svc.cluster.local:2181,zok-1.zk-hs.zk.svc.cluster.local:2181,zok-2.zk-hs.zk.svc.cluster.local:2181 \
--override log.dirs=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=true \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=24 \
--override log.roll.hours=24 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx8g -Xms8g"
- name: KAFKA_OPTS
value: "-Dlogging.level=ERROE"
volumeMounts:
- name: kafkadatadir
mountPath: /var/lib/kafka
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","touch /tmp/health"]
livenessProbe:
exec:
command: ["test","-e","/tmp/health"]
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: client1
initialDelaySeconds: 15
timeoutSeconds: 5
periodSeconds: 20
volumeClaimTemplates:
- metadata:
name: kafkadatadir
annotations:
volume.beta.kubernetes.io/storage-class: "nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
kafka2.yaml
apiVersion: v1
kind: Service
metadata:
namespace: kafka
name: kafka-hs2
labels:
app: kafka2
spec:
ports:
- port: 1099
name: jmx2
clusterIP: None
selector:
app: kafka2
---
apiVersion: v1
kind: Service
metadata:
namespace: kafka
name: kafka-cs2
labels:
app: kafka2
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
name: client2
nodePort: 32093
selector:
app: kafka2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: kafka
name: kafoka2
spec:
serviceName: kafka-hs2
replicas: 1
selector:
matchLabels:
app: kafka2
template:
metadata:
labels:
app: kafka2
spec:
containers:
- name: k8skafka
imagePullPolicy: Always
image: 127.0.0.1/ops/kafka:v2.12-2.8.2
ports:
- containerPort: 9092
name: client2
- containerPort: 1099
name: jmx2
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=1 \
--override advertised.listeners=PLAINTEXT://节点ip:32093 \
--override zookeeper.connect=zok-0.zk-hs.zk.svc.cluster.local:2181,zok-1.zk-hs.zk.svc.cluster.local:2181,zok-2.zk-hs.zk.svc.cluster.local:2181 \
--override log.dirs=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=true \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=24 \
--override log.roll.hours=24 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx8g -Xms8g"
- name: KAFKA_OPTS
value: "-Dlogging.level=ERROE"
volumeMounts:
- name: kafkadatadir
mountPath: /var/lib/kafka
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","touch /tmp/health"]
livenessProbe:
exec:
command: ["test","-e","/tmp/health"]
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: client2
initialDelaySeconds: 15
timeoutSeconds: 5
periodSeconds: 20
volumeClaimTemplates:
- metadata:
name: kafkadatadir
annotations:
volume.beta.kubernetes.io/storage-class: "nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
kafka3.yaml
apiVersion: v1
kind: Service
metadata:
namespace: kafka
name: kafka-hs3
labels:
app: kafka3
spec:
ports:
- port: 1099
name: jmx3
clusterIP: None
selector:
app: kafka3
---
apiVersion: v1
kind: Service
metadata:
namespace: kafka
name: kafka-cs3
labels:
app: kafka3
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
name: client3
nodePort: 32094
selector:
app: kafka3
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: kafka
name: kafoka3
spec:
serviceName: kafka-hs3
replicas: 1
selector:
matchLabels:
app: kafka3
template:
metadata:
labels:
app: kafka3
spec:
containers:
- name: k8skafka
imagePullPolicy: Always
image: 127.0.0.1/ops/kafka:v2.12-2.8.2
ports:
- containerPort: 9092
name: client3
- containerPort: 1099
name: jmx3
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=2 \
--override advertised.listeners=PLAINTEXT://节点ip:32094 \
--override zookeeper.connect=zok-0.zk-hs.zk.svc.cluster.local:2181,zok-1.zk-hs.zk.svc.cluster.local:2181,zok-2.zk-hs.zk.svc.cluster.local:2181 \
--override log.dirs=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=true \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=24 \
--override log.roll.hours=24 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx8g -Xms8g"
- name: KAFKA_OPTS
value: "-Dlogging.level=ERROE"
volumeMounts:
- name: kafkadatadir
mountPath: /var/lib/kafka
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","touch /tmp/health"]
livenessProbe:
exec:
command: ["test","-e","/tmp/health"]
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: client3
initialDelaySeconds: 15
timeoutSeconds: 5
periodSeconds: 20
volumeClaimTemplates:
- metadata:
name: kafkadatadir
annotations:
volume.beta.kubernetes.io/storage-class: "nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
创建并查看
kubectl apply -f kafka1.yaml
kubectl apply -f kafka2.yaml
kubectl apply -f kafka3.yaml
[root kafka]# kubectl get all -n kafka
NAME READY STATUS RESTARTS AGE
pod/kafoka1-0 1/1 Running 0 23d
pod/kafoka2-0 1/1 Running 0 23d
pod/kafoka3-0 1/1 Running 0 23d
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka-cs1 NodePort 10.100.49.159 <none> 9092:32092/TCP 23d
service/kafka-cs2 NodePort 10.103.67.47 <none> 9092:32093/TCP 23d
service/kafka-cs3 NodePort 10.108.55.28 <none> 9092:32094/TCP 23d
service/kafka-hs1 ClusterIP None <none> 1099/TCP 23d
service/kafka-hs2 ClusterIP None <none> 1099/TCP 23d
service/kafka-hs3 ClusterIP None <none> 1099/TCP 23d
NAME READY AGE
statefulset.apps/kafoka1 1/1 23d
statefulset.apps/kafoka2 1/1 23d
statefulset.apps/kafoka3 1/1 23d
部署 kafka Dashboard kafka-eagle
1: 查看文件
[root kafka-eagle]# ll
total 12
-rw-rw-r-- 1 dsyadmin dsyadmin 137 Mar 28 13:57 kafka_client_jaas.conf
-rw-rw-r-- 1 dsyadmin dsyadmin 1993 Mar 28 14:03 kafka-eagle.yaml
-rw-rw-r-- 1 dsyadmin dsyadmin 3067 Mar 28 13:57 system-config.properties
[root kafka-eagle]# cat kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
[root kafka-eagle]# cat kafka-eagle.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-eagle
namespace: kafka
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
workload.user.cattle.io/workloadselector: deployment-kafka-kafka-eagle
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
type: RollingUpdate
template:
metadata:
labels:
workload.user.cattle.io/workloadselector: deployment-kafka-kafka-eagle
spec:
containers:
- image: buzhiyun/kafka-eagle:latest
imagePullPolicy: Always
name: kafka-eagle
lifecycle:
postStart:
exec:
command: [ "/bin/sh","-c","touch /usr/local/ke.db" ]
ports:
- containerPort: 8048
name: 8048tcp01
protocol: TCP
resources: {}
securityContext:
allowPrivilegeEscalation: false
privileged: false
procMount: Default
readOnlyRootFilesystem: false
runAsNonRoot: false
stdin: true
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
tty: true
volumeMounts:
- mountPath: /opt/kafka-eagle/conf
name: conf
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 30
volumes:
- configMap:
defaultMode: 256
name: kafka-eagle-config
optional: false
name: conf
---
apiVersion: v1
kind: Service
metadata:
name: kafka-eagle-client
namespace: kafka
spec:
type: NodePort
ports:
- port: 8048
targetPort: 8048
nodePort: 30048
selector:
workload.user.cattle.io/workloadselector: deployment-kafka-kafka-eagle
[root kafka-eagle]# cat system-config.properties
######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=zok-0.zk-hs.zk.svc.cluster.local:2181,zok-1.zk-hs.zk.svc.cluster.local:2181,zok-2.zk-hs.zk.svc.cluster.local:2181
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=25
######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=true
######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
######################################
# alarm email configure
######################################
kafka.eagle.mail.enable=false
kafka.eagle.mail.sa=alert_sa@163.com
kafka.eagle.mail.username=alert_sa@163.com
kafka.eagle.mail.password=mqslimczkdqabbbh
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25
######################################
# alarm im configure
######################################
#kafka.eagle.im.dingding.enable=true
#kafka.eagle.im.dingding.url=https://oapi.dingtalk.com/robot/send?access_token=
#kafka.eagle.im.wechat.enable=true
#kafka.eagle.im.wechat.token=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=xxx&corpsecret=xxx
#kafka.eagle.im.wechat.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=
#kafka.eagle.im.wechat.touser=
#kafka.eagle.im.wechat.toparty=
#kafka.eagle.im.wechat.totag=
#kafka.eagle.im.wechat.agentid=
######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=PLAIN
cluster1.kafka.eagle.sasl.jaas.config=kafka_client_jaas.conf
######################################
# kafka jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/usr/local/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/usr/local/ke.db
kafka.eagle.username=root
kafka.eagle.password=smartloli
2: 部署
kubectl create configmap kafka-eagle-config -n kafka --from-file=kafka_client_jaas.conf --from-file=system-config.properties
kubectl apply -f kafka-eagle.yaml
3:查看
[root kafka-eagle]# kubectl get all -n kafka
NAME READY STATUS RESTARTS AGE
pod/kafka-eagle-6b4f6b6bd7-z4qtb 1/1 Running 0 41d
pod/kafoka1-0 1/1 Running 0 23d
pod/kafoka2-0 1/1 Running 0 23d
pod/kafoka3-0 1/1 Running 0 23d
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka-cs1 NodePort 10.100.49.159 <none> 9092:32092/TCP 23d
service/kafka-cs2 NodePort 10.103.67.47 <none> 9092:32093/TCP 23d
service/kafka-cs3 NodePort 10.108.55.28 <none> 9092:32094/TCP 23d
service/kafka-eagle-client NodePort 10.109.195.106 <none> 8048:30048/TCP 41d
service/kafka-hs1 ClusterIP None <none> 1099/TCP 23d
service/kafka-hs2 ClusterIP None <none> 1099/TCP 23d
service/kafka-hs3 ClusterIP None <none> 1099/TCP 23d
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/kafka-eagle 1/1 1 1 41d
NAME DESIRED CURRENT READY AGE
replicaset.apps/kafka-eagle-6b4f6b6bd7 1 1 1 41d
NAME READY AGE
statefulset.apps/kafoka1 1/1 23d
statefulset.apps/kafoka2 1/1 23d
statefulset.apps/kafoka3 1/1 23d
访问dashboard
地址: http://节点ip:30048/ke/account/signin?/ke/ 默认账号密码: admin/123456
版权 本文为云篆录原创文章,转载无需和我联系,但请注明来自云篆录 https://www.yunzhuan.site
评论区