我有一个接口,我必须实现它,它需要一个Stream响应。我的源中的一些元素缺少数据,我必须使用源中的其他元素来找到它。它太大了,无法将所有元素保存在内存中。我可以编写一个例程来查找丢失的数据,但前提是我最后处理丢失数据的元素。
这是我试图解决这个问题的一个简化示例。在这种情况下,我试图保存30个元素,以便在addOne的附加例程之后在最后进行处理。但是当程序尝试从List Stream读取时,我收到了一个并发修改异常。
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
public class TestStreams {
private static List<Integer> savedForLater = new ArrayList<>();
public static void main(String[] args) {
Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
i -> saveThirtyForLater(i));
Stream<Integer> savedForLaterStream = savedForLater.stream().map(
i -> addOne(i));
// Exception
Stream.concat(origStream, savedForLaterStream).forEach(
i -> System.out.println(i));
// No Exception
// origStream.forEach(i -> System.out.println(i));
// savedForLaterStream.forEach(i -> System.out.println(i));
}
private static Integer addOne(Integer i) {
return new Integer(i + 1);
}
private static boolean saveThirtyForLater(Integer i) {
if (i == 30) {
savedForLater.add(i);
return false;
}
return true;
}
}
此代码产生以下结果:
10
20
40
50
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312)
at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at test.TestStreams.main(TestStreams.java:17)
我尝试过使用线程安全列表,但它也没有产生所需的结果。
创建一个延迟连接的流,其元素是第一个流的所有元素,后跟第二个流的所有元素。
流上的conat不应调用List的流,直到它从中拉取一个对象,此时列表不会更改。
如果所有其他方法都失败了,我可以读取文件两次,但我真的很想知道为什么这不起作用,以及是否有人对操纵流以避免第二次通过有其他想法。
流是惰性的。除非您使用for每
或收集
等终端操作,否则不会执行中间操作(如filter
或map
)。
Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
i -> saveThirtyForLater(i));
执行上述代码行后,您的savedFor后来
列表保持不变。只有在您对该流使用终端操作后,它才会被修改。
在您的最终表达式Stream. concat(的原始流,savedForLterStream).for每(i-
修改filter
方法中的字段是一种非常糟糕的方法,它实际上违反了filter
方法的契约。从它的javadoc:
谓词-一个非干扰的、无状态的谓词,应用于每个元素以确定是否应该包含它
您的谓词save三十日晚
不是无状态的,因为它修改了savedFor稍后
列表。
解决方案:
您可以一个接一个地单独处理这些流,而不是使用concat
:
origStream.forEach(i -> System.out.println(i));
savedForLaterStream.forEach(i -> System.out.println(i));
这些产生期望的结果:
10
20
40
50
31
你不能用concat
来完成这个技巧,因为它破坏了后期绑定。它在调用时立即请求两个流的大小,所以你应该提前知道有多少元素将被保存以供以后使用。但是,由于后期绑定,可以使用plitMap
来做到这一点:
public static void main(String[] args) {
Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
i -> saveThirtyForLater(i));
Stream<Integer> savedForLaterStream = savedForLater.stream().map(
i -> addOne(i));
Stream.of(origStream, savedForLaterStream)
.flatMap(Function.identity())
.forEach(
i -> System.out.println(i));
}
此代码运行良好并打印10/20/40/50/31
。尽管如果您并行化它,它将无法预测。
请注意,我的解决方案严重依赖于OpenJDK/OracleJDK中流API的当前实现。流API规范明确表示filter
中使用的谓词必须是无状态且无干扰的。由于这里违反了这些属性,因此根据规范,结果是不可预测的。
我很感激别人的帮助,但我确实想发布我的最终解决方案。
我使用了LinkedBlockingQueue和自定义Spliterator而不是ArrayList。调用Stream. concat会立即生成参数流的Spliterator(可以说不必要)。ArrayListSpliterator不允许修改列表,因为它已经被其他人指出。
默认情况下,LinkedBlockingQueue有一个弱一致性的拆分器,它可能会返回在拆分器初始化后添加到底层队列的项目。在我的测试中,它始终如此,但是,为了避免任何不同生产行为的机会,我提供了一个自定义拆分器,它将返回初始化后添加到底层队列的项目。QSpliterator代码是从以下地方复制的:https://codereview.stackexchange.com/a/105308
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class TestStreams {
private static LinkedBlockingQueue<Integer> savedForLater = new LinkedBlockingQueue<>();
public static void main(String[] args) {
Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
i -> saveThirtyForLater(i));
Spliterator<Integer> qSpliterator = new QSpliterator<>(savedForLater);
Stream<Integer> savedForLaterStream = StreamSupport.stream(
qSpliterator, false).map(i -> addOne(i));
Stream.concat(origStream, savedForLaterStream).forEach(
i -> System.out.println(i));
}
private static Integer addOne(Integer i) {
return new Integer(i + 1);
}
private static boolean saveThirtyForLater(Integer i) {
if (i == 30) {
savedForLater.add(i);
return false;
}
return true;
}
private static final class QSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> queue;
public QSpliterator(BlockingQueue<T> queue) {
this.queue = queue;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
action.accept(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Take interrupted.", e);
}
return true;
}
@Override
public Spliterator<T> trySplit() {
try {
final int size = queue.size();
List<T> vals = new ArrayList<>(size + 1);
vals.add(queue.take());
queue.drainTo(vals);
return vals.spliterator();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Thread interrupted during trySplit.", e);
}
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.CONCURRENT;
}
}
}