我有一个在本地机器上运行的Kafka集群,默认设置在我的minikube设置之外。我在我的一个Web服务中创建了一个生产者并将其部署在minikube上。
对于生产者连接到Kafka我使用10.0.2.2
IP我也用它来连接Cassandra和DGraph在minikube之外,它工作得很好。
然而,Kafka生产者不工作,甚至在发送数据时没有抛出错误说Broker可能不可用
或任何其他错误。但是我在消费者方面没有收到任何东西。
当我在库伯内特斯之外运行这个网络服务时,一切都正常。
如果你们知道这里可能出了什么问题,请。
下面是我正在使用的库伯内特斯yaml
文件。
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: servicename
labels:
app: servicename
metrics: kamon
spec:
replicas: 1
template:
metadata:
labels:
app: servicename
metrics: kamon
spec:
containers:
- image: "image:app"
imagePullPolicy: IfNotPresent
name: servicename
env:
- name: CIRCUIT_BREAKER_MAX_FAILURES
value: "10"
- name: CIRCUIT_BREAKER_RESET_TIMEOUT
value: 30s
- name: CIRCUIT_BREAKER_CALL_TIMEOUT
value: 30s
- name: CONTACT_POINT_ONE
value: "10.0.2.2"
- name: DGRAPH_HOSTS
value: "10.0.2.2"
- name: DGRAPH_PORT
value: "9080"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "10.0.2.2:9092"
- name: KAFKA_PRODUCER_NOTIFICATION_CLIENT_ID
value: "notificationProducer"
- name: KAFKA_NOTIFICATION_TOPIC
value: "notification"
- name: LAGOM_PERSISTENCE_READ_SIDE_OFFSET_TIMEOUT
value: 5s
- name: LAGOM_PERSISTENCE_READ_SIDE_FAILURE_EXPONENTIAL_BACKOFF_MIN
value: 3s
- name: LAGOM_PERSISTENCE_READ_SIDE_FAILURE_EXPONENTIAL_BACKOFF_MAX
value: 30s
- name: LAGOM_PERSISTENCE_READ_SIDE_FAILURE_EXPONENTIAL_BACKOFF_RANDOM_FACTOR
value: "0.2"
- name: LAGOM_PERSISTENCE_READ_SIDE_GLOBAL_PREPARE_TIMEOUT
value: 30s
- name: LAGOM_PERSISTENCE_READ_SIDE_RUN_ON_ROLE
value: ""
- name: LAGOM_PERSISTENCE_READ_SIDE_USE_DISPATCHER
value: lagom.persistence.dispatcher
- name: AKKA_TIMEOUT
value: 30s
- name: NUMBER_OF_DGRAPH_REPOSITORY_ACTORS
value: "2"
- name: DGRAPH_ACTOR_TIMEOUT_MILLIS
value: "20000"
- name: AKKA_ACTOR_PROVIDER
value: "cluster"
- name: AKKA_CLUSTER_SHUTDOWN_AFTER_UNSUCCESSFUL_JOIN_SEED_NODES
value: 40s
- name: AKKA_DISCOVERY_METHOD
value: "kubernetes-api"
- name: AKKA_IO_DNS_RESOLVER
value: "async-dns"
- name: AKKA_IO_DNS_ASYNC_DNS_PROVIDER_OBJECT
value: "com.lightbend.rp.asyncdns.AsyncDnsProvider"
- name: AKKA_IO_DNS_ASYNC_DNS_RESOLVE_SRV
value: "true"
- name: AKKA_IO_DNS_ASYNC_DNS_RESOLV_CONF
value: "on"
- name: AKKA_MANAGEMENT_HTTP_PORT
value: "10002"
- name: AKKA_MANAGEMENT_HTTP_BIND_HOSTNAME
value: "0.0.0.0"
- name: AKKA_MANAGEMENT_HTTP_BIND_PORT
value: "10002"
- name: AKKA_MANAGEMENT_CLUSTER_BOOTSTRAP_CONTACT_POINT_DISCOVERY_REQUIRED_CONTACT_POINT_NR
value: "1"
- name: AKKA_REMOTE_NETTY_TCP_PORT
value: "10001"
- name: AKKA_REMOTE_NETTY_TCP_BIND_HOSTNAME
value: "0.0.0.0"
- name: AKKA_REMOTE_NETTY_TCP_BIND_HOSTNAME
value: "0.0.0.0"
- name: AKKA_REMOTE_NETTY_TCP_BIND_PORT
value: "10001"
- name: LAGOM_CLUSTER_EXIT_JVM_WHEN_SYSTEM_TERMINATED
value: "on"
- name: PLAY_SERVER_HTTP_ADDRESS
value: "0.0.0.0"
- name: PLAY_SERVER_HTTP_PORT
value: "9000"
ports:
- containerPort: 9000
- containerPort: 9095
- containerPort: 10001
- containerPort: 9092
name: "akka-remote"
- containerPort: 10002
name: "akka-mgmt-http"
---
apiVersion: v1
kind: Service
metadata:
name: servicename
labels:
app: servicename
spec:
ports:
- name: "http"
port: 9000
nodePort: 31001
targetPort: 9000
- name: "akka-remote"
port: 10001
protocol: TCP
targetPort: 10001
- name: "akka-mgmt-http"
port: 10002
protocol: TCP
targetPort: 10002
selector:
app: servicename
type: NodePort
我已经连接到与Kafka在同一台机器上运行的Cassandra和Dgraph
好吧,这些服务不会通过动物园管理员宣传他们的网络地址。
我的Kafka集群在K8之外。但是,生产者在K8中。
为了让k8s之外的服务知道Kafka的位置,广告. listeners
需要设置为外部IP或DNS地址,k8s环境中的所有生产者/消费者服务都可以识别,这就是您的服务将连接到的地址。例如PLAINTEXT://10.0.2.2:9092
换句话说,如果您没有设置侦听器,它只是侦听localhost,仅仅因为Kafka端口是外部公开的,这意味着虽然您可能能够访问一个代理,但您作为协议一部分返回的地址不能保证与客户端的配置相同,这就是广告侦听器地址发挥作用的地方。