提问者:小点点

如何为Kafka流设置rocksDB


我正在尝试使用kstream解决一个问题。我目前在进行聚合时遇到了这个错误。

Exception in thread "main" java.lang.NoClassDefFoundError: org/rocksdb/RocksDBException
    at org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.get(RocksDbWindowBytesStoreSupplier.java:50)
    at org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.get(RocksDbWindowBytesStoreSupplier.java:24)
    at org.apache.kafka.streams.state.internals.WindowStoreBuilder.build(WindowStoreBuilder.java:40)
    at org.apache.kafka.streams.state.internals.WindowStoreBuilder.build(WindowStoreBuilder.java:26)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder$StateStoreFactory.build(InternalTopologyBuilder.java:141)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.buildProcessorNode(InternalTopologyBuilder.java:966)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:869)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:822)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:805)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:667)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:624)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:534)

我的代码实际上是这样的:

KStream<String, InputData> input = builder.stream(topicname);

KTable<Windowed<String>, CustomAgg> grouped =
                input.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(60000)))
                .aggregate(
                        CustomAgg::new,
                        (k, v, agg) -> agg.add(v),
                        Materialized.<String, CustomAgg, WindowStore<Bytes, byte[]>>as("aggs").withValueSerde(new CustomAggSerde()));
        grouped.toStream().print(Printed.toSysOut());

kafka流版本2.1.0

我似乎找不到任何资源在网上如何设置rocksDBkafka流-任何建议将不胜感激。(我有它与brew安装,但我不确定我需要如何指向它,任何设置,它是否需要在我pom. xml文件等)。工作在MacOS目前的发展。

谢谢!


共2个答案

匿名用户

您不需要为Kafka Streams安装RocksDB。RocksDB是Kafka Streams的依赖项。如果您在构建自动化工具(例如maven或gradle)中将Kafka Streams作为依赖项,则应在构建期间自动下载RocksDB JAR并将其放在您的类路径上。

如果没有构建自动化工具,您可能需要手动将RocksDB JAR放在类路径上。适用于Kafka Streams 2.1.0的RocksDB的正确版本应该是5.14.2。

你得到的错误似乎是一个类路径问题,所以可能与上面有关。

匿名用户

尝试在pom. xml中插入下面的依赖项:

<dependency>
    <groupId>org.rocksdb</groupId>
    <artifactId>rocksdbjni</artifactId>
    <version>4.9.0</version>
</dependency>

此链接可能对您有帮助:https://technology.amis.nl/software-development/java/getting-started-with-kafka-streams-building-a-streaming-analytics-java-application-against-a-kafka-topic/