我正在尝试使用kstream解决一个问题。我目前在进行聚合时遇到了这个错误。
Exception in thread "main" java.lang.NoClassDefFoundError: org/rocksdb/RocksDBException
at org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.get(RocksDbWindowBytesStoreSupplier.java:50)
at org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.get(RocksDbWindowBytesStoreSupplier.java:24)
at org.apache.kafka.streams.state.internals.WindowStoreBuilder.build(WindowStoreBuilder.java:40)
at org.apache.kafka.streams.state.internals.WindowStoreBuilder.build(WindowStoreBuilder.java:26)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder$StateStoreFactory.build(InternalTopologyBuilder.java:141)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.buildProcessorNode(InternalTopologyBuilder.java:966)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:869)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:822)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:805)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:667)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:624)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:534)
我的代码实际上是这样的:
KStream<String, InputData> input = builder.stream(topicname);
KTable<Windowed<String>, CustomAgg> grouped =
input.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(60000)))
.aggregate(
CustomAgg::new,
(k, v, agg) -> agg.add(v),
Materialized.<String, CustomAgg, WindowStore<Bytes, byte[]>>as("aggs").withValueSerde(new CustomAggSerde()));
grouped.toStream().print(Printed.toSysOut());
kafka流版本2.1.0
我似乎找不到任何资源在网上如何设置rocksDBkafka流-任何建议将不胜感激。(我有它与brew安装,但我不确定我需要如何指向它,任何设置,它是否需要在我pom. xml文件等)。工作在MacOS目前的发展。
谢谢!
您不需要为Kafka Streams安装RocksDB。RocksDB是Kafka Streams的依赖项。如果您在构建自动化工具(例如maven或gradle)中将Kafka Streams作为依赖项,则应在构建期间自动下载RocksDB JAR并将其放在您的类路径上。
如果没有构建自动化工具,您可能需要手动将RocksDB JAR放在类路径上。适用于Kafka Streams 2.1.0的RocksDB的正确版本应该是5.14.2。
你得到的错误似乎是一个类路径问题,所以可能与上面有关。
尝试在pom. xml中插入下面的依赖项:
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>4.9.0</version>
</dependency>
此链接可能对您有帮助:https://technology.amis.nl/software-development/java/getting-started-with-kafka-streams-building-a-streaming-analytics-java-application-against-a-kafka-topic/