我的设置就像,我有一个生产者服务作为minikube集群的一部分,它试图将消息发布到主机上运行的kafka实例。
我写了一个kafka服务和endpointyaml如下:
kind: Service
apiVersion: v1
metadata:
name: kafka
spec:
ports:
- name: "broker"
protocol: "TCP"
port: 9092
targetPort: 9092
nodePort: 0
---
kind: Endpoints
apiVersion: v1
metadata:
name: kafka
namespace: default
subsets:
- addresses:
- ip: 10.0.2.2
ports:
- name: "broker"
port: 9092
endpoint中提到的minikube集群内部主机的ip地址是从以下命令获取的:
minikube ssh "route -n | grep ^0.0.0.0 | awk '{ print \$2 }'"
我面临的问题是,当生产者第一次尝试发布消息但没有消息写入该主题时,主题正在创建。
深入到pod日志中,我发现生产者正在尝试连接到localhost或其他kafka实例(不太确定)。
2020-05-17T19:09:43.021Z [warn] org.apache.kafka.clients.NetworkClient [] -
[Producer clientId=/system/sharding/kafkaProducer-greetings/singleton/singleton/producer]
Connection to node 0 (omkara/127.0.1.1:9092) could not be established. Broker may not be available.
接下来,我怀疑我可能需要修改server.properties
进行以下更改:
listeners=PLAINTEXT://localhost:9092
然而,这导致日志中的ip地址发生了变化:
2020-05-17T19:09:43.021Z [warn] org.apache.kafka.clients.NetworkClient [] -
[Producer clientId=/system/sharding/kafkaProducer-greetings/singleton/singleton/producer]
Connection to node 0 (omkara/127.0.0.1:9092) could not be established. Broker may not be available.
我不确定这里必须提到什么IP地址?或者什么是替代解决方案?如果有可能从kubernetes集群内部连接到安装在kubernetes集群外部的kafka实例。
由于生产者kafka客户端与代理在同一个网络上,我们需要像这样配置一个额外的侦听器:
listeners=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://localhost:9093,EXTERNAL://10.0.2.2:9092
inter.broker.listener.name=INTERNAL
我们可以像这样验证主题中的消息:
kafka-console-consumer.sh --bootstrap-server INTERNAL://0.0.0.0:9093 --topic greetings --from-beginning
{"name":"Alice","message":"Namastey"}
你可以在理解上找到详细的解释