是否可以实现一个一旦有答案就停止处理流的收集器?
例如,如果收集器正在计算平均值,其中一个值是NaN,我知道答案是NaN而没有看到更多的值,因此进一步的计算是没有意义的。
感谢您的回复。这些评论指出了解决问题的途径,我将在这里对此进行描述。它深受StreamEx的启发,但适应了我的特殊情况。
首先,我定义了一个名为XdmStream的Stream实现,它通常将所有方法委托给它包装的底层Stream。
这立即使我有机会定义新的方法,因此例如,我的用户可以执行流.last()
而不是流.reduce((第一,第二)-
作为一个短路方法的例子,我实现了< code > xdmstream . until first(Predicate),如下所示(< code>base是包装的流)。该方法的思想是返回一个流,该流传递与原始流相同的结果,除了当一个谓词被满足时,不再传递更多的结果。
public XdmStream<T> untilFirst(Predicate<? super XdmItem> predicate) {
Stream<T> stoppable = base.peek(item -> {
if (predicate.test(item)) {
base.close();
}
});
return new XdmStream<T>(stoppable);
}
当我第一次创建基本Stream时,我调用它的onCloch()
方法,以便对开()的调用触发数据提供者停止提供数据。
close()
机制似乎没有很好的文档说明(它依赖于“流管道”的概念,并且当某个方法返回的新流与原始流属于同一管道时,还不完全清楚),但它对我来说很管用。我想我应该确保这只是一个优化,这样,即使数据流没有立即关闭(例如,如果流中有任何缓冲),结果也仍然正确。
除了费德里科的评论之外,一旦满足特定条件,就可以通过停止积累来模拟短路收集器
。但是,这种方法只有在积累成本高昂的情况下才会有益。下面是一个示例,但请记住,此实现存在缺陷:
public class AveragingCollector implements Collector<Double, double[], Double> {
private final AtomicBoolean hasFoundNaN = new AtomicBoolean();
@Override
public Supplier<double[]> supplier() {
return () -> new double[2];
}
@Override
public BiConsumer<double[], Double> accumulator() {
return (a, b) -> {
if (hasFoundNaN.get()) {
return;
}
if (b.equals(Double.NaN)) {
hasFoundNaN.set(true);
return;
}
a[0] += b;
a[1]++;
};
}
@Override
public BinaryOperator<double[]> combiner() {
return (a, b) -> {
a[0] += b[0];
a[1] += b[1];
return a;
};
}
@Override
public Function<double[], Double> finisher() {
return average -> average[0] / average[1];
}
@Override
public Set<Characteristics> characteristics() {
return new HashSet<>();
}
}
以下用例返回Double。NaN
,如预期:
public static void main(String args[]) throws IOException {
DoubleStream.of(1, 2, 3, 4, 5, 6, 7, Double.NaN)
.boxed()
.collect(new AveragingCollector()));
}
代替使用Collector
,您可以使用Stream.allMatch(…)
提前终止Stream
,并直接使用LongSummary统计
等util类。如果所有值(至少一个)都存在,则返回它们,例如:
Optional<LongSummaryStatistics> toLongStats(Stream<OptionalLong> stream) {
LongSummaryStatistics stat = new LongSummaryStatistics();
boolean allPresent = stream.allMatch(opt -> {
if (opt.isEmpty()) return false;
stat.accept(opt.getAsLong());
return true;
});
return allPresent && stat.getCount() > 0 ? Optional.of(stat) : Optional.empty();
}
而不是Stream