提问者:小点点

如果某些消息错过了输入,是否可以将重新摄取(接收器连接器)数据放入db


目前,我设置了2个运行JDBC Sink Connector的独立连接器来摄取从生产者生成的主题以读取到数据库中。有时,我在日志中看到错误,这导致生成的消息无法存储到数据库中。我经常看到的错误是

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id:11
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'topic-io..models.avro.Topic' not found; error code404

这是真的,因为TopicRecordName不应该指向这个主题,而是我指向的另一个主题,它只应该指向model. avro.Topic

我想知道如果这种情况经常发生,是否有一种方法可以在消息产生后将这些产生的记录/消息重新摄取到数据库中。例如,如果消息在12am-1am期间产生,并且日志中出现了某种错误并且在该时间范围内未能使用这些消息,则配置或偏移可以通过将其重新摄取到数据库来恢复它。错误是由于模式注册表链接无法读取/链接到正确的模式链接。它失败了,因为它读取了不正确的工作文件,因为我的一个工作文件有一个value.converter.value.subject.name. policy=io.confluent.kafka.seralizers.sub.TopicRecordNameStrategy而另一个连接器没有读取该subjectName。

目前,我设置consumer.auto. offset.reset=最早开始读取消息。有没有办法像文件一样取回这些数据,我可以恢复这些数据,因为我正在部署到生产环境中,并且必须始终将数据消耗到数据库中而不会出现任何错误。


共1个答案

匿名用户

您可以使用死信队列配置将错误记录发送到新主题,而不是搞乱消费者组偏移,这最终会导致正确处理的数据再次被消费和复制,您需要在主题保留完全丢弃事件之前对其进行监控和消费

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

我的一个worker文件有一个[不同的配置]

这就是为什么配置管理软件很重要。不要在没有更新所有服务器的进程的情况下修改分布式系统中的一台服务器。如果您没有在库伯内特斯运行连接器,Anable/Terraform是最常见的