Score:0

Kafka logs get reset when all brokers restart at the same time

cn flag

I have a kafka cluster running on an integration Kubernetes cluster with 3 Kafka brokers and 3 Zookeepers, each component running in a statefulset. The helm chart I use to deploy the cluster is a custom one as I needed external access to the cluster through a BigIp F5 and did not find a helm chart doing this.

The Kafka image is confluentinc/cp-kafka:5.4.0 and the zookeeper one is confluentinc/cp-zookeeper:5.4.0

/var/lib/zookeeper/data and /var/lib/zookeeper/log for Zookeeper are mapped to persistent volumes. The same for /var/lib/kafka on Kafka

I use hlebalbau/kafka-manager:stable to watch the state of the cluster.

I set-up three partitions per topic, and replication factor is also equal to three.

Recently I realized that if I restarted all three kafka brokers at the same time (with kubectl delete pod) all the topics contents was lost :

  • the log size drops to zero for each topic
  • the list of topics stays the same
  • the consumers list stays the same, but each consumer current offset drops to negative value (if the consumer was at offset 10000 for a 10000 message topic, then the topic size drops to zero and the consumer offset to -10000 )

I never encountered any issue when restarting one kafka broker at a time and waiting for it to be started before restarting another one. I know that a kafka cluster is not meant to be stopeed or restarted like this. But I did not expect this kind of behaviour.

Is it expected behaviour ? Or did I miss something obvious ?

Here is my Yaml template for a kafka broker. As indicated by its name, the wait-zookeeper.sh script "just" waits for the zookeepers to be started.

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-controller-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-1
  serviceName: kafka1-headless
  template:
    metadata:
      labels:
        app: kafka-1
        cluster: kafka
    spec:
      initContainers:
        - name: init-wait-zookeeper
          image: bash:latest
          command: ["/usr/local/bin/bash","-c","cp /wait-zookeeper-configmap/wait-zookeeper.sh /wait-zookeeper-emptydir/ && chmod 755 /wait-zookeeper-emptydir/wait-zookeeper.sh && /wait-zookeeper-emptydir/wait-zookeeper.sh"]
          volumeMounts:
            - name: wait-zookeeper-configmap
              mountPath: /wait-zookeeper-configmap
            - name: wait-zookeeper-emptydir
              mountPath: /wait-zookeeper-emptydir
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - kafka-2
                  - kafka-3
                  {{- if gt .Values.workerNodesNumber 5.0 }}
                  - zookeeper-1
                  - zookeeper-2
                  - zookeeper-3
                  {{- end }}
              topologyKey: "kubernetes.io/hostname"
      containers:
        - name: kafka1
          image: confluentinc/cp-kafka:5.4.0
          resources:
            requests:
              memory: "512Mi"
            limits:
              memory: "{{.Values.jvmMaxHeapSizeGb}}Gi"
          ports:
          - containerPort: 9092
          env:
          - name: HOST_IP
            valueFrom:
              fieldRef:
                fieldPath: status.hostIP
          - name: KAFKA_LISTENERS
            value: "PLAINTEXT://0.0.0.0:9092"
          - name: KAFKA_ADVERTISED_LISTENERS
            value: "PLAINTEXT://$(HOST_IP):{{ add .Values.startingNodePort 0 }}"
          - name: KAFKA_BROKER_ID
            value: "10"
          - name: KAFKA_ZOOKEEPER_CONNECT
            value: "zookeeper-controller-1-0.zoo1-headless:2181,zookeeper-controller-2-0.zoo2-headless:2181,zookeeper-controller-3-0.zoo3-headless:2181"
          - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
            value: "3"
          - name: KAFKA_DELETE_TOPIC_ENABLE
            value: "true"
          - name: KAFKA_DEFAULT_REPLICATION_FACTOR
            value: "3"
          - name: KAFKA_NUM_PARTITIONS
            value: "{{.Values.defaultPartitionsNumber}}"
          - name: KAFKA_LOG_RETENTION_HOURS
            value: "{{.Values.retentionTimeHours}}"
          - name: KAFKA_OFFSETS_RETENTION_MINUTES
            value: "{{.Values.retentionTimeHours | mul 60 }}"
          - name: JMX_PORT
            value: "{{ add .Values.startingNodePort 3 }}"
          - name: KAFKA_JMX_HOSTNAME
            value: "kafka-service-1"
          - name: KAFKA_HEAP_OPTS
            value: "-Xms512m -Xmx{{.Values.jvmMaxHeapSizeGb}}G"
          livenessProbe:
            exec:
              command:
              - /bin/bash
              - -c
              - "unset JMX_PORT && kafka-broker-api-versions --bootstrap-server=localhost:9092"
            initialDelaySeconds: 60
            periodSeconds: 20
          volumeMounts:
            - name: "kafka-logs"
              mountPath: "/var/lib/kafka"
      volumes:
        - name: "wait-zookeeper-configmap"
          configMap:
            name: "kafka-initializer"
            items:
              - key: "wait-zookeeper.sh"
                path: "wait-zookeeper.sh"
        - name: "wait-zookeeper-emptydir"
          emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: kafka-logs
    spec:
      storageClassName: {{.Values.storageClassName}}
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: {{.Values.storageSizeGb}}Gi
ar flag
Start here. It sets up the volumes for you - https://strimzi.io
mangohost

Post an answer

Most people don’t grasp that asking a lot of questions unlocks learning and improves interpersonal bonding. In Alison’s studies, for example, though people could accurately recall how many questions had been asked in their conversations, they didn’t intuit the link between questions and liking. Across four studies, in which participants were engaged in conversations themselves or read transcripts of others’ conversations, people tended not to realize that question asking would influence—or had influenced—the level of amity between the conversationalists.