I have a kafka cluster running on an integration Kubernetes cluster with 3 Kafka brokers and 3 Zookeepers, each component running in a statefulset.
The helm chart I use to deploy the cluster is a custom one as I needed external access to the cluster through a BigIp F5 and did not find a helm chart doing this.
The Kafka image is confluentinc/cp-kafka:5.4.0 and the zookeeper one is confluentinc/cp-zookeeper:5.4.0
/var/lib/zookeeper/data
and /var/lib/zookeeper/log
for Zookeeper are mapped to persistent volumes.
The same for /var/lib/kafka
on Kafka
I use hlebalbau/kafka-manager:stable to watch the state of the cluster.
I set-up three partitions per topic, and replication factor is also equal to three.
Recently I realized that if I restarted all three kafka brokers at the same time (with kubectl delete pod
) all the topics contents was lost :
- the log size drops to zero for each topic
- the list of topics stays the same
- the consumers list stays the same, but each consumer current offset drops to negative value (if the consumer was at offset 10000 for a 10000 message topic, then the topic size drops to zero and the consumer offset to -10000 )
I never encountered any issue when restarting one kafka broker at a time and waiting for it to be started before restarting another one.
I know that a kafka cluster is not meant to be stopeed or restarted like this. But I did not expect this kind of behaviour.
Is it expected behaviour ? Or did I miss something obvious ?
Here is my Yaml template for a kafka broker. As indicated by its name, the wait-zookeeper.sh
script "just" waits for the zookeepers to be started.
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-controller-1
spec:
replicas: 1
selector:
matchLabels:
app: kafka-1
serviceName: kafka1-headless
template:
metadata:
labels:
app: kafka-1
cluster: kafka
spec:
initContainers:
- name: init-wait-zookeeper
image: bash:latest
command: ["/usr/local/bin/bash","-c","cp /wait-zookeeper-configmap/wait-zookeeper.sh /wait-zookeeper-emptydir/ && chmod 755 /wait-zookeeper-emptydir/wait-zookeeper.sh && /wait-zookeeper-emptydir/wait-zookeeper.sh"]
volumeMounts:
- name: wait-zookeeper-configmap
mountPath: /wait-zookeeper-configmap
- name: wait-zookeeper-emptydir
mountPath: /wait-zookeeper-emptydir
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- kafka-2
- kafka-3
{{- if gt .Values.workerNodesNumber 5.0 }}
- zookeeper-1
- zookeeper-2
- zookeeper-3
{{- end }}
topologyKey: "kubernetes.io/hostname"
containers:
- name: kafka1
image: confluentinc/cp-kafka:5.4.0
resources:
requests:
memory: "512Mi"
limits:
memory: "{{.Values.jvmMaxHeapSizeGb}}Gi"
ports:
- containerPort: 9092
env:
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: KAFKA_LISTENERS
value: "PLAINTEXT://0.0.0.0:9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://$(HOST_IP):{{ add .Values.startingNodePort 0 }}"
- name: KAFKA_BROKER_ID
value: "10"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper-controller-1-0.zoo1-headless:2181,zookeeper-controller-2-0.zoo2-headless:2181,zookeeper-controller-3-0.zoo3-headless:2181"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_DELETE_TOPIC_ENABLE
value: "true"
- name: KAFKA_DEFAULT_REPLICATION_FACTOR
value: "3"
- name: KAFKA_NUM_PARTITIONS
value: "{{.Values.defaultPartitionsNumber}}"
- name: KAFKA_LOG_RETENTION_HOURS
value: "{{.Values.retentionTimeHours}}"
- name: KAFKA_OFFSETS_RETENTION_MINUTES
value: "{{.Values.retentionTimeHours | mul 60 }}"
- name: JMX_PORT
value: "{{ add .Values.startingNodePort 3 }}"
- name: KAFKA_JMX_HOSTNAME
value: "kafka-service-1"
- name: KAFKA_HEAP_OPTS
value: "-Xms512m -Xmx{{.Values.jvmMaxHeapSizeGb}}G"
livenessProbe:
exec:
command:
- /bin/bash
- -c
- "unset JMX_PORT && kafka-broker-api-versions --bootstrap-server=localhost:9092"
initialDelaySeconds: 60
periodSeconds: 20
volumeMounts:
- name: "kafka-logs"
mountPath: "/var/lib/kafka"
volumes:
- name: "wait-zookeeper-configmap"
configMap:
name: "kafka-initializer"
items:
- key: "wait-zookeeper.sh"
path: "wait-zookeeper.sh"
- name: "wait-zookeeper-emptydir"
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: kafka-logs
spec:
storageClassName: {{.Values.storageClassName}}
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: {{.Values.storageSizeGb}}Gi