提问者:小点点

Kafka流Ktable Ktable连接在输出主题中没有结果


我正在Scala中尝试一个非常简单的Ktable到Ktable连接。两个主题都有一个分区,但我仍然没有看到任何涉及“输出主题”的内容

object SimpleMerge extends App {
  val properties = KafkaProperties.get()

  val streamBuilder = new StreamsBuilder()

  val objectMetadataTable: KTable[Long, String] = streamBuilder.table("metadata-topic")

  val objectClassificationTable: KTable[Long, String] = streamBuilder.table("classification-topic")

  val objectClassificationWithMetadata: KTable[Long, String] = objectMetadataTable
    .join(objectClassificationTable, (metadata: String, classification: String) => metadata + classification)


  objectClassificationWithMetadata.toStream().to("output-topic")

  val kafkaStreams = new KafkaStreams(streamBuilder.build(), properties)
  kafkaStreams.cleanUp()
  kafkaStreams.start()

  Runtime.getRuntime.addShutdownHook(new Thread(() => {
    kafkaStreams.close()
  }))
}

我相信这是非常基本的,但我找不到任何线索。注意:我可以看到两个输入主题都使用正确的相同键产生结果,因此已经排除了空值键的问题。

谢啦

例外情况

02:26:57.936 [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - stream-thread [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] task [0_0] Failed to flush state store metadata-topic-STATE-STORE-0000000000: 
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: [B, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:333) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:276) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:582) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:536) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:524) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:529) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:959) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:813) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-2.4.0.jar:?]
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at com.pan.pcs.dlp.kafkamerger.MergeObjectMetadataClassification$$anon$1.apply(MergeObjectMetadataClassification.scala:19) ~[classes/:?]
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:110) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:67) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-2.4.0.jar:?]
    ... 24 more

共1个答案

匿名用户

默认情况下,使用ByteArraySerdes,如果LongSerde用于键,StringSerde用于值,则错误消息表明您的程序仍然使用这些。

如果您使用官方的ScalaAPI并加载相应的显式,则应该自动使用正确的Serdes。否则,您需要通过StreamsConfig设置不同的默认serdes,并通过将Consumed. with(…)传入表(…)运算符来覆盖它们。

查看文档了解详情:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstreams-dsl-for-scala