提问者:小点点

阿帕奇Kafka


我开始接触Apache Kafka(Confluent),并对模式的使用有一些疑问。首先,我对模式用于验证数据的一般理解是否正确?我对模式的理解是,当数据“产生”时,它会检查键和值是否符合预定义的概念,并相应地拆分它们。

我目前的技术设置如下:

Python:

from confluent_kafka import Producer
from config import conf
import json

# create producer
producer = Producer(conf)

producer.produce("datagen-topic", json.dumps({"product":"table","brand":"abc"}))
producer.flush()

在Confluent中,我为我的主题设置了一个json键模式:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "properties": {
    "brand": {
      "type": "string"
    },
    "product": {
      "type": "string"
    }
  },
  "required": [
    "product",
    "brand"
  ],
  "type": "object"
}

现在,当我生成数据时,Confluent中的消息仅包含“值”中的内容。Key和Header为空:

{
  "product": "table",
  "brand": "abc"
}

基本上,我是否设置了这个模式并没有什么区别,所以我想它只是在我设置它时不起作用。你能帮助我我的思维方式是错误的,或者我的代码缺乏输入吗?


共1个答案

匿名用户

ConfluentPython库Producer类不以任何方式与注册表交互,因此您的消息不会被验证。

您将需要使用SerializingProducer,如示例中所示-https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/json_producer.py

如果您想要非空键和标头,您需要将它们传递给send方法