我试图设置mongodb源连接器[https://www.confluent.io/hub/mongodb/kafka-connect-mongodb]合流kafka平台。当我使用MongoDBAtlas的连接URI时,我能够成功地建立mongodb和kafka之间的流。当我使用mongodb运行在我的azure kubernetes集群上时,问题出现了。我创建了一个mongodb statefulset,有3个副本,我通过负载均衡器将mongodb服务公开到互联网。我能够通过使用robo 3T连接到公共IP上公开的mongodb,并进行CRUD操作。现在,当我使用在kubernetes中运行的mongodb的连接URI时,它看起来像“mongoDB://load-bal-ip: 27017/test?ssl=false
INFO无法恢复更改流:$changeStream阶段仅在副本集40573(com. mongob.kafka.connect.source.MongoSourceTask:253)上受支持
mongo db有状态集yml看起来像这样
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: eic-mongo-mongodb
spec:
serviceName: eic-mongo-mongodb
replicas: 3
selector:
matchLabels:
app: eic-mongo-mongodb
template:
metadata:
labels:
app: eic-mongo-mongodb
selector: eic-mongo-mongodb
spec:
affinity:
# Try to put each ES data node on a different node in the K8s cluster
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- eic-mongo-mongodb
topologyKey: kubernetes.io/hostname
containers:
- name: eic-mongo-mongodb
image: mongo:4.0.8
resources:
limits:
cpu: 500m
memory: "1Gi"
requests:
cpu: 500m
memory: "1Gi"
volumeMounts:
- name: mongo-volume
mountPath: /data/db
volumeClaimTemplates:
- metadata:
name: mongo-volume
spec:
accessModes:
- ReadWriteOnce
volumeMode: Filesystem
resources:
requests:
storage: 3Gi
mongoDB服务中心是这样的
apiVersion: v1
kind: Service
metadata:
name: eic-mongo-mongodb
ports:
- name: "27017"
nodePort: 31683
port: 27017
protocol: TCP
targetPort: 27017
selector:
app: eic-mongo-mongodb
type: LoadBalancer
有人能告诉我哪里出错了吗?
我认为您需要以正确的方式设置mongoDB集群。请按照此操作使用运算符在kuberntes上部署mongoDB副本集。如果您想使用statefulset进行设置,请按照此操作
我试图设置mongo数据库源连接器和接收器连接器使用MongoDB图集URI和合流kafka平台连接器。
接收器连接器能够成功建立。
但是当我尝试使源连接器时,出现了问题。
你能找到我的问题吗?
这是http://localhost:8083/connectors的POST json数据
{"name": "mongo-source-cashRecord",
"config": {
"tasks.max":"1",
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"topic.prefix":"mymongo_joy_",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":false,
"connection.uri":"mongodb+srv://userid:password@clusterjoy.iz9ag.mongodb.net/testdb?retryWrites=true&w=majority",
"database":"testdb",
"collection":"cashRecord",
"pipeline": "[{\"$match\":{\"operationType\":{\"$in\":[\"insert\",\"update\",\"replace\"]}}}]"
}}