提问者:小点点

汇合和卡桑德拉:获取数据异常:无法将数据反序列化到Avro,未知的神奇字节


我遵循了http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/我可以从avro控制台向cassandra插入数据。现在我正在尝试将其扩展到使用水槽,我的机器中设置了水槽,它将拾取日志文件并将其推送到kafka,尝试将我的数据插入cassandra数据库。在文本文件中,我正在放置数据

{“id”: 1, “created”: “2016-05-06 13:53:00”, “product”: “OP-DAX-P-20150201-95.7”, “price”: 94.2}

{“id”: 2, “created”: “2016-05-06 13:54:00”, “product”: “OP-DAX-C-20150201-100”, “price”: 99.5}

{“id”: 3, “created”: “2016-05-06 13:55:00”, “product”: “FU-DATAMOUNTAINEER-20150201-100”, “price”: 10000}

{“id”: 4, “创建”: “2016-05-06 13:56:00”, “产品”: “FU-KOSPI-C-20150201-100”, “价格”: 150}

Flume正在收集数据并将其推送给kafka。

在cassandra水槽,我面临着一个错误,

错误任务cassandra-sink-orders-0引发了一个未捕获且不可恢复的异常(org . Apache . Kafka . connect . runtime . worker Task:142)org . Apache . Kafka . connect . errors . data异常:无法将数据反序列化到Avro:at io . confluent . connect . Avro . Avro . converter . to connect data(Avro converter . Java:109)at org . Apache . Kafka . connect . runtime . workersinktask . convert messages(workersinktask .[2016-09-28 15:47:00,951]错误任务正在被终止,在手动重启之前不会恢复(org . Apache . Kafka . connect . runtime . worker Task:143)[2016-09-28 15:47:00,951]信息停止Cassandra sink。(com . data mountaineer . stream reactor . connect . cassandra . sink . cassandras ink task:79)[2016-09-28 15:47:00,952]关闭Cassandra驱动程序会话和集群的信息。(com . data mountaineer . stream reactor . connect . Cassandra . sink . cassandrajsonwriter:165)

我正在使用的模式

 ./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'

flume的配置:Flume-kafka.conf.properties

agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink


agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576

agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.capacity = 1000

 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
 agent.sinks.kafkaSink.topic = orders-topic
 agent.sinks.kafkaSink.brokerList = localhost:9092
 agent.sinks.kafkaSink.channel = memoryChannel
 agent.sinks.kafkaSink.batchSize = 20

有没有人可以帮我,如何修复这个错误?


共1个答案

匿名用户

一般来说,如果您有一个未知的魔术字节,这意味着您的Kafka客户端和服务器版本不兼容。检查以确保您的Cassandra接收器版本已使用Kafka客户端库构建,其版本小于或等于您的代理。