我将其定义为全局:
Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32);
Stream<List<Integer>> s = Streams.wrap(p).distinct().buffer(5, TimeUnit.SECONDS).log().unbounded();
关于导师:
s.consume(i -> System.err.println(Thread.currentThread() + " data=" + i));
现在我调用这个函数两次:
for (int i = 0; i < 1000; i++) {
p.onNext(i % 3);
}
不同的工作正常,我第一次消费。当我再次调用这个方法时,他仍然记得不同,不会触发消费。
每次消费后是否有任何选择来清理不同。我需要实现的想法是,我将缓冲所有输入,每次我将只消费唯一的项目…
有人知道吗?
Tnx
经过深入搜索,我得出结论,我相信这不是一个选择。我设法做的事情是,当我消费时,我正在转换为Set。这样,每个时间流都在消耗我只处理唯一的值