提问者:小点点

使用包含3个代理的集群创建新主题时,kafka代理连接失败


我正在尝试在Docker上建立一个有3个代理的kafka集群。

问题是:当我做一个操作(即创建/列出/删除主题)时,总是有一个代理无法连接并重启Docker容器。这个问题不会发生在2个或单个代理的集群上。

我的复制步骤是:

  • 运行docker-composup
  • 打开1个kafka容器的shell并创建一个主题kafka-topes--bootstrap-server": 9092"--create--topic-name--分区3--复制因子3
  • 在此之后,1个随机代理被断开连接并从集群中删除。有时上述执行的反应是错误说复制因子不能大于2(因为1个代理已从集群中删除)

我是Kafka的新手。我想我只是犯了一些愚蠢的错误,但我不知道是什么。我搜索了文档,但还没有找到。

这是我的docker撰写文件:

version: "3.9"

networks:
  kafka-cluster:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      # ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      # ZOOKEEPER_TICK_TIME: 2000
      # ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
    ports:
      - 2181:2181
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9093
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka1:9092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka2:
    image: confluentinc/cp-kafka:latest
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9094
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka2:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka3:
    image: confluentinc/cp-kafka:latest
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - "9095:9095"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9095
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka3:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    ports:
      - "9000:9000"
    environment:
      - KAFKA_BROKERCONNECT=kafka1:9092,kafka2:9092,kafka3:9092
      - JVM_OPTS="-Xms32M -Xmx64M"
      - SERVER_SERVLET_CONTEXTPATH="/"
    depends_on:
      - kafka1
    networks:
      - kafka-cluster

以下是其他 2 个代理的错误日志:

[2022-01-17 04:32:40,078] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1002, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test-topic-3-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), test-topic-2-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=28449961, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2022-01-17 04:32:42,088] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Connection to node 1001 (kafka1/192.168.48.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-01-17 04:32:42,088] INFO [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=28449961, epoch=INITIAL) to node 1001: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

共1个答案

匿名用户

假设您不需要主机连接(因为您直接在容器中运行 Kafka CLI 命令),您可以大大简化您的撰写文件

  1. 删除主机端口
  2. 删除非CLIENT侦听器,并坚持默认值。
  3. 删除组合网络(用于调试),因为会自动创建一个

总而言之,你最终会得到这样的结果

x-kafka-setup: &kafka-setup
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  ALLOW_PLAINTEXT_LISTENER: 'yes'

version: "3.8"
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.7
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka1:
    image: &broker-image docker.io/bitnami/kafka:3
    environment:
      KAFKA_BROKER_ID: 1
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka2:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 2
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka3:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 3
      <<: *kafka-setup
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: kafka1:9092,kafka2:9092,kafka3:9092
      JVM_OPTS: "-Xms32M -Xmx64M"
      SERVER_SERVLET_CONTEXTPATH: /
    depends_on:
      - kafka1
      - kafka2
      - kafka3