提问者:小点点

java中具有并发映射的生产者消费者模式


我有以下问题,我不确定如何设计部分解决方案:

我有一个逐行阅读的大文本文件。我需要处理每一行并更新HashMap。

AFAIK我需要一个生产者线程来读取文件中的行,并将这些行分派给一个消费者线程池。消费者线程应该更新并发哈希映射,然后获取新行。

我的问题是:消费者线程如何访问ConCurrentHashMap?如果我使用固定的线程池,生产者是否需要先将该行添加到队列中,或者它可以简单地提交或执行一个新的消费者?

编辑:Zim-Zam是正确的;我希望消费者在完成时将他们的结果转储到并发哈希映射中。

我在主线程中创建了ConCurrentHashMap,并将对它的引用传递给构造函数中的消费者。消费者应该在他们的运行方法中添加或增加一个原子整数。我如何在主线程中判断何时读取了所有行并且消费者完成了?

再次感谢。


共3个答案

匿名用户

您可以让所有消费者共享生产者添加到的相同队列,或者您可以为每个消费者提供生产者通过循环链表或类似数据结构访问的自己的队列,以便每个消费者的队列接收或多或少相同的数据量(例如,如果您有3个消费者,那么生产者将向queue1、queue2、queue3、queue1等添加数据)。

您可以为每个消费者提供对相同的ContranstHashMap的引用(例如在消费者的构造函数中),或者您可以通过静态getter方法访问ConCurrentHashMap

匿名用户

我认为您真的不需要按照您建议的方式使用生产者消费者队列。

简单地让主队列读取文件,并为您读取的每一行创建一个相应的Runnable对象(将其视为命令)并将其放入线程池执行器。Runnable对象的内容只是处理该行并将结果放入并发HashMap的逻辑

ThreadPoolExecator可以使用有界或无界阻塞队列创建,具体取决于您想要的行为。

在伪代码中,它是这样的:

class LineHandler implements Runnable {
    String line;
    ConcurrentHashMap resultMap;
    public LineHandler(String line, ConcurrentHashMap resultMap) {
        this.line = line;
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        // work on line
        // update resultMap
    }
}

// logic in your file reader thread, supposed to be in a loop:

while (moreLinesInFile()) {
    String line = readFromFile();
    threadPoolExecutor.submit(new LineHandler(line, concurrentHashMap));
}

threadPoolExecutor.shutdown();

匿名用户

使用CountDownLatch。

// in main thread
// assume consumers are in some kind of container
List<MyConsumer> consumers...
CountDownLatch latch = new CountDownLatch( consumers.size() );

for( MyConsumer c : consumers ) {
    c.setLatch( latch );
    c.start(); // starts asychronous, or submit to executor, whatever you're doing
}

// block main thread, optionally timing out
latch.await();


// Then in consumer when it's done it's work:
latch.countDown();