提问者:小点点

最后处理某些流元素


我有一个接口,我必须实现它,它需要一个Stream响应。我的源中的一些元素缺少数据,我必须使用源中的其他元素来找到它。它太大了,无法将所有元素保存在内存中。我可以编写一个例程来查找丢失的数据,但前提是我最后处理丢失数据的元素。

这是我试图解决这个问题的一个简化示例。在这种情况下,我试图保存30个元素,以便在addOne的附加例程之后在最后进行处理。但是当程序尝试从List Stream读取时,我收到了一个并发修改异常。

package test;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class TestStreams {
    private static List<Integer> savedForLater = new ArrayList<>();

    public static void main(String[] args) {
        Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
                i -> saveThirtyForLater(i));
        Stream<Integer> savedForLaterStream = savedForLater.stream().map(
                i -> addOne(i));

        // Exception
        Stream.concat(origStream, savedForLaterStream).forEach(
            i -> System.out.println(i));

        // No Exception
        // origStream.forEach(i -> System.out.println(i));
        // savedForLaterStream.forEach(i -> System.out.println(i));
    }

    private static Integer addOne(Integer i) {
        return new Integer(i + 1);
    }

    private static boolean saveThirtyForLater(Integer i) {
        if (i == 30) {
            savedForLater.add(i);
            return false;
        }
        return true;
    }
}

此代码产生以下结果:

10
20
40
50
Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
    at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312)
    at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at test.TestStreams.main(TestStreams.java:17)

我尝试过使用线程安全列表,但它也没有产生所需的结果。

创建一个延迟连接的流,其元素是第一个流的所有元素,后跟第二个流的所有元素。

流上的conat不应调用List的流,直到它从中拉取一个对象,此时列表不会更改。

如果所有其他方法都失败了,我可以读取文件两次,但我真的很想知道为什么这不起作用,以及是否有人对操纵流以避免第二次通过有其他想法。


共3个答案

匿名用户

流是惰性的。除非您使用for每收集等终端操作,否则不会执行中间操作(如filtermap)。

Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
        i -> saveThirtyForLater(i));

执行上述代码行后,您的savedFor后来列表保持不变。只有在您对该流使用终端操作后,它才会被修改。

在您的最终表达式Stream. concat(的原始流,savedForLterStream).for每(i-

修改filter方法中的字段是一种非常糟糕的方法,它实际上违反了filter方法的契约。从它的javadoc:

谓词-一个非干扰的、无状态的谓词,应用于每个元素以确定是否应该包含它

您的谓词save三十日晚不是无状态的,因为它修改了savedFor稍后列表。

解决方案:

您可以一个接一个地单独处理这些流,而不是使用concat

origStream.forEach(i -> System.out.println(i));
savedForLaterStream.forEach(i -> System.out.println(i));

这些产生期望的结果:

10
20
40
50
31

匿名用户

你不能用concat来完成这个技巧,因为它破坏了后期绑定。它在调用时立即请求两个流的大小,所以你应该提前知道有多少元素将被保存以供以后使用。但是,由于后期绑定,可以使用plitMap来做到这一点:

public static void main(String[] args) {
    Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
            i -> saveThirtyForLater(i));
    Stream<Integer> savedForLaterStream = savedForLater.stream().map(
            i -> addOne(i));

    Stream.of(origStream, savedForLaterStream)
        .flatMap(Function.identity())
        .forEach(
        i -> System.out.println(i));
}

此代码运行良好并打印10/20/40/50/31。尽管如果您并行化它,它将无法预测。

请注意,我的解决方案严重依赖于OpenJDK/OracleJDK中流API的当前实现。流API规范明确表示filter中使用的谓词必须是无状态且无干扰的。由于这里违反了这些属性,因此根据规范,结果是不可预测的。

匿名用户

我很感激别人的帮助,但我确实想发布我的最终解决方案。

我使用了LinkedBlockingQueue和自定义Spliterator而不是ArrayList。调用Stream. concat会立即生成参数流的Spliterator(可以说不必要)。ArrayListSpliterator不允许修改列表,因为它已经被其他人指出。

默认情况下,LinkedBlockingQueue有一个弱一致性的拆分器,它可能会返回在拆分器初始化后添加到底层队列的项目。在我的测试中,它始终如此,但是,为了避免任何不同生产行为的机会,我提供了一个自定义拆分器,它将返回初始化后添加到底层队列的项目。QSpliterator代码是从以下地方复制的:https://codereview.stackexchange.com/a/105308

package test;

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class TestStreams {
    private static LinkedBlockingQueue<Integer> savedForLater = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
                i -> saveThirtyForLater(i));
        Spliterator<Integer> qSpliterator = new QSpliterator<>(savedForLater);
        Stream<Integer> savedForLaterStream = StreamSupport.stream(
                qSpliterator, false).map(i -> addOne(i));

        Stream.concat(origStream, savedForLaterStream).forEach(
                i -> System.out.println(i));
    }

    private static Integer addOne(Integer i) {
        return new Integer(i + 1);
    }

    private static boolean saveThirtyForLater(Integer i) {
        if (i == 30) {
            savedForLater.add(i);
            return false;
        }
        return true;
    }

    private static final class QSpliterator<T> implements Spliterator<T> {

        private final BlockingQueue<T> queue;

        public QSpliterator(BlockingQueue<T> queue) {
            this.queue = queue;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            try {
                action.accept(queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Take interrupted.", e);
            }
            return true;
        }

        @Override
        public Spliterator<T> trySplit() {
            try {
                final int size = queue.size();
                List<T> vals = new ArrayList<>(size + 1);
                vals.add(queue.take());
                queue.drainTo(vals);
                return vals.spliterator();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(
                        "Thread interrupted during trySplit.", e);
            }
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return Spliterator.CONCURRENT;
        }

    }
}