我开始接触Apache Kafka(Confluent),并对模式的使用有一些疑问。首先,我对模式用于验证数据的一般理解是否正确?我对模式的理解是,当数据“产生”时,它会检查键和值是否符合预定义的概念,并相应地拆分它们。
我目前的技术设置如下:
Python:
from confluent_kafka import Producer
from config import conf
import json
# create producer
producer = Producer(conf)
producer.produce("datagen-topic", json.dumps({"product":"table","brand":"abc"}))
producer.flush()
在Confluent中,我为我的主题设置了一个json键模式:
{
"$schema": "http://json-schema.org/draft-04/schema#",
"properties": {
"brand": {
"type": "string"
},
"product": {
"type": "string"
}
},
"required": [
"product",
"brand"
],
"type": "object"
}
现在,当我生成数据时,Confluent中的消息仅包含“值”中的内容。Key和Header为空:
{
"product": "table",
"brand": "abc"
}
基本上,我是否设置了这个模式并没有什么区别,所以我想它只是在我设置它时不起作用。你能帮助我我的思维方式是错误的,或者我的代码缺乏输入吗?
ConfluentPython库Producer
类不以任何方式与注册表交互,因此您的消息不会被验证。
您将需要使用SerializingProducer
,如示例中所示-https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/json_producer.py
如果您想要非空键和标头,您需要将它们传递给send方法