提问者:小点点

kafka监听器在conflu的kubernetes设置中是错误的


我无法从外部连接到我的kafka集群。侦听器和广告侦听器似乎有问题。

有什么建议吗?

当我尝试从外部连接端口30092时,我总是得到一个引用返回kafka-svc: 9092

  • 集群名称:dev-docker-x02
  • 如何测试:windows的默认kafka:.\bin\windows\kafka-topic. bat--list--bootstrap-server dev-docker-x02:30092
  • 要求:使用confluentinc/cp-kafka: 5.4.0-1-ubi8

我的设置:

我的经纪人配置(问题似乎在(广告)侦听器中。

kind: Deployment
metadata:
  name: kafka-deploy
spec:
  replicas: 1
  selector:
    matchLabels:
        app: kafka-pod
  template:
    metadata:
      labels:
        app: kafka-pod
    spec:
      containers:
      - name: kafka-ctr         # Container name
        image: confluentinc/cp-kafka:5.4.0-1-ubi8
        ports:
        - containerPort: 9092   # Port exposed by the container
        env:
        - name: KAFKA_BROKER_ID
          value: "0"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-svc:2181
        - name: KAFKA_LISTENERS
          value: "LISTENER_INTERNAL://:9092,LISTENER_EXTERNAL://:30092"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "LISTENER_INTERNAL://kafka-svc:9092,LISTENER_EXTERNAL://dev-kube-x02:30092"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: "LISTENER_EXTERNAL"
        - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
          value: "false"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
          value: "100"

共2个答案

匿名用户

Kafka有很多组件,如Headless Services、Statefulset,每个组件都有一个独特的角色。出于这个原因,我也建议使用Kafka Conflu和Helm Chart。

本指南基于helm图表,因为您在评论中提到您将使用它,但此处的概念可以扩展到任何使用无头服务并需要外部访问的应用程序。

对于您提供的内容,我相信您面临一些困难,因为您在外部引用无头服务,这将无法工作,因为无头服务没有内部操作IP。

Headless Service与StatefulSet一起创建。创建的服务不会被赋予grousterIP,而是简单地包含一个endpoint列表。然后,这些endpoint用于生成特定于实例的DNS记录,形式如下:

它为每个pod创建一个DNS名称,例如:

[ root@curl:/ ]$ nslookup my-confluent-cp-kafka-headless
Server:    10.0.0.10
Address 1: 10.0.0.10 kube-dns.kube-system.svc.cluster.local

Name:      my-confluent-cp-kafka-headless
Address 1: 10.8.0.23 my-confluent-cp-kafka-1.my-confluent-cp-kafka-headless.default.svc.cluster.local
Address 2: 10.8.1.21 my-confluent-cp-kafka-0.my-confluent-cp-kafka-headless.default.svc.cluster.local
Address 3: 10.8.3.7 my-confluent-cp-kafka-2.my-confluent-cp-kafka-headless.default.svc.cluster.local

>

  • 这就是使这些服务在集群内相互连接的原因。

    因此,您不能公开cp-kafka: 9092,这是无头服务,也仅在内部使用,正如我上面解释的那样。

    复制:

    • git clonehttps://github.com/confluentinc/cp-helm-charts.git
    • 编辑文件cp-helm图表/cp-kafka/value. yaml节点false更改为true并根据需要更改端口:
    nodeport:
      enabled: true
      servicePort: 19092
      firstListenerPort: 31090
    
    • 部署图表:
    $ helm install demo cp-helm-charts
    $ kubectl get pods
    NAME                                       READY   STATUS    RESTARTS   AGE
    demo-cp-control-center-6d79ddd776-ktggw    1/1     Running   3          113s
    demo-cp-kafka-0                            2/2     Running   1          113s
    demo-cp-kafka-1                            2/2     Running   0          94s
    demo-cp-kafka-2                            2/2     Running   0          84s
    demo-cp-kafka-connect-79689c5c6c-947c4     2/2     Running   2          113s
    demo-cp-kafka-rest-56dfdd8d94-79kpx        2/2     Running   1          113s
    demo-cp-ksql-server-c498c9755-jc6bt        2/2     Running   2          113s
    demo-cp-schema-registry-5f45c498c4-dh965   2/2     Running   3          113s
    demo-cp-zookeeper-0                        2/2     Running   0          112s
    demo-cp-zookeeper-1                        2/2     Running   0          93s
    demo-cp-zookeeper-2                        2/2     Running   0          74s
    
    $ kubectl get svc
    NAME                         TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
    demo-cp-control-center       ClusterIP   10.0.13.134   <none>        9021/TCP            50m
    demo-cp-kafka                ClusterIP   10.0.15.71    <none>        9092/TCP            50m
    demo-cp-kafka-0-nodeport     NodePort    10.0.7.101    <none>        19092:31090/TCP     50m
    demo-cp-kafka-1-nodeport     NodePort    10.0.4.234    <none>        19092:31091/TCP     50m
    demo-cp-kafka-2-nodeport     NodePort    10.0.3.194    <none>        19092:31092/TCP     50m
    demo-cp-kafka-connect        ClusterIP   10.0.3.217    <none>        8083/TCP            50m
    demo-cp-kafka-headless       ClusterIP   None          <none>        9092/TCP            50m
    demo-cp-kafka-rest           ClusterIP   10.0.14.27    <none>        8082/TCP            50m
    demo-cp-ksql-server          ClusterIP   10.0.7.150    <none>        8088/TCP            50m
    demo-cp-schema-registry      ClusterIP   10.0.7.84     <none>        8081/TCP            50m
    demo-cp-zookeeper            ClusterIP   10.0.9.119    <none>        2181/TCP            50m
    demo-cp-zookeeper-headless   ClusterIP   None          <none>        2888/TCP,3888/TCP   50m
    
    • 我的Node在IP35.226.189.123上,我将连接到demo-cp-kafka-0-nodeportnodeport服务,它位于端口31090上,现在让我们尝试从集群外部连接。为此,我将连接到另一个VM,我有一个minikube,所以我可以使用kafka-clientpod来测试:
    user@minikube:~$ kubectl get pods
    NAME           READY   STATUS    RESTARTS   AGE
    kafka-client   1/1     Running   0          17h
    
    user@minikube:~$ kubectl exec kafka-client -it -- bin/bash
    
    root@kafka-client:/# kafka-console-consumer --bootstrap-server 35.226.189.123:31090 --topic demo-topic --from-beginning --timeout-ms 8000 --max-messages 1
    Wed Apr 15 18:19:48 UTC 2020
    Processed a total of 1 messages
    root@kafka-client:/# 
    

    如你所见,我可以从外面进入kafka。

    • 使用此方法,helm图表将为您定义的每个副本创建1个外部服务。
    • 如果您需要外部访问Zoogger,它不会像kafka代理那样自动配置,但我会为您留下一个服务模型:

    zoogger-外部-0. yam l

    apiVersion: v1
    kind: Service
    metadata:
      labels:
        app: cp-zookeeper
        pod: demo-cp-zookeeper-0
      name: demo-cp-zookeeper-0-nodeport
      namespace: default
    spec:
      externalTrafficPolicy: Cluster
      ports:
      - name: external-broker
        nodePort: 31181
        port: 12181
        protocol: TCP
        targetPort: 31181
      selector:
        app: cp-zookeeper
        statefulset.kubernetes.io/pod-name: demo-cp-zookeeper-0
      sessionAffinity: None
      type: NodePort
    
    • 它将为它创建一个服务:
    NAME                           TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
    demo-cp-zookeeper-0-nodeport   NodePort    10.0.5.67     <none>        12181:31181/TCP     2s
    
    • 使用外部IP进行测试:
    pod/zookeeper-client created
    user@minikube:~$ kubectl exec -it zookeeper-client -- /bin/bash
    root@zookeeper-client:/# zookeeper-shell 35.226.189.123:31181
    Connecting to 35.226.189.123:31181
    Welcome to ZooKeeper!
    JLine support is disabled
    

    如果你有任何疑问,请在评论中告诉我!

  • 匿名用户

    如果您使用了Confluent Helm Charts并通读了那里的文档,那么您可以为远程侦听器配置不同的功能选项。

    此外,我建议使用运算符而不是简单的部署https://operatorhub.io/?keyword=kafka

    或者,如果您只是在单台机器上,请使用Docker Comment