我是Kafka和流的新手。我正在创建一个本地商店来保存来自特定主题组件
的所有更新。我不知道我在这里做错了什么。还有其他方法可以从Stream创建商店吗?
我需要在Kafka中创建一个主题comp-store
吗?
public class MyStream {
final static CountDownLatch latch = new CountDownLatch(1);
private static final String APP_ID = "MyTestApp";
public static void main(String[] args) throws InterruptedException {
final Properties streamsConfiguration = getStreamsConfiguration();
final StreamsBuilder builder = new StreamsBuilder();
//
final KStream<String, Component> componentStream = builder.stream("components");
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
KeyValueMapper<String, Component, Iterable<KeyValue<String, Component>>> mapper = new KeyValueMapper<String, Component, Iterable<KeyValue<String,Component>>>() {
@Override
public Iterable<KeyValue<String, Component>> apply(String list, Component comp) {
ArrayList<KeyValue<String, Component>> result = new ArrayList<>();
result.add(KeyValue.pair(comp.getCompId()+":"+comp.getListId(), comp));
return result;
}
};
KStream<String,Component> componentsStram = componentStream.flatMap(mapper);
KGroupedStream<String,Component> componentsGroupedStream = componentsStram.groupByKey();
componentsGroupedStream.reduce(new Reducer<Component>() {
public Component apply(Component oldVal, Component newVal) {
return newVal;
}
}, Materialized.<String, Component, KeyValueStore<Bytes, byte[]>>as("comp-store"));
streams.start();
new Thread(new Runnable() {
@Override
public void run() {
while(true){
if(streams.state().isRunning()){
latch.countDown();
}
}
}
}).start();
latch.await();
Thread.sleep(5000);
ReadOnlyKeyValueStore<String,Component> localStore = waitUntilStoreIsQueryable("comp-store", QueryableStoreTypes.<String, Component> keyValueStore(), streams);
System.out.println(localStore.approximateNumEntries());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static Properties getStreamsConfiguration() {
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ProtoSerde.class);
settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
settings.put("auto.offset.reset","earliest");
settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return settings;
}
public static <T> T waitUntilStoreIsQueryable(final String storeName, final QueryableStoreType<T> queryableStoreType, final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
Thread.sleep(100);
}
}
}
}
例外情况
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, comp-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1038)
at com.mr.streams.MyStream.main(MyStream.java:110)
更新后waitUntilStoreIsQueryable
我的异常得到解决,但我仍然无法查询状态存储。似乎它正在无限循环。然而,数据存在于组件Stram
中。我在这里做错了什么吗?
由于Kafka Streams实例尚未准备就绪,因此引发异常。
根据留档:https://docs.confluent.io/current/streams/faq.html#interactive-queries上述例外可能有两个原因:
>
本地KafkaStreams实例尚未准备就绪,因此还无法查询其本地状态存储。
本地KafkaStreams实例已准备就绪,但特定状态存储刚刚迁移到幕后的另一个实例。
处理它的最简单方法是等到state store可查询:
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
更新:
你必须在定义整个拓扑后移动KafkaStream的创建:line:最终的KafkaStreams流=新的KafkaStreams(builder.build(),stream sConfiguration);
应该在:组件GroupeStream. duce(…)
之后