提问者:小点点

FixedThreadPool vs newCachedThreadPool用于执行N个任务并运行DB查询


请原谅标题。我有点困惑,因为我的要求有这么多变量。我会试着解释。

所以基本上我有一个触发服务的调度程序作业。服务进程N(假设这个N是以千为单位,或者在最坏的情况下可能是以十万为单位)的任务数量。现在这些任务也连接到数据库,获取并插入一些数据到数据库中。

现在,在通过各种来源以及堆栈溢出来研究和挖掘有关Execator框架的所有内容之后,有两个选项听起来很合理newFixedThreadPoolnewCachedThreadPool。所以看看这些的实现

List<Tasks> taskLists = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
    futures.add(executorService.submit(() -> tasks.execute()))
}

理想情况下,对于newFixedThreadPool,我们分配我们可以创建的线程池的最大数量。我不能做这个巨大的数字,因为如果这些子任务并行执行,它将全部尝试通过数据库(postgreSQL)进行通信,这也有连接限制。并且一些线程可能会因为这些连接拒绝而失败。所以我必须给出一些合理的数字,但是在这种情况下,所有任务的执行都需要很多时间。

另一方面,如果我使用newCachedThreadPool

List<Tasks> taskLists = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
    futures.add(executorService.submit(() -> tasks.execute()))
}

这种情况下的问题是它将并行执行并重用那些完成的线程。这里的连接拒绝问题也是一样的,有些线程甚至可能在没有完成任务的情况下被杀死。

我也不能最大化postgreSQL连接限制,因为也有一些限制,如果我没有错,它的最大100或215如果可配置。

关于我应该如何处理这种情况,有什么建议或想法吗?


共1个答案

匿名用户

在这种情况下,您可以考虑结合使用newFixedThreadPoolnewCachedThreadPool来实现最佳性能,同时避免连接限制。

首先,您可以将newFixedThreadPool与合理数量的线程一起使用(假设为10,如您的示例所示),但将可以同时访问数据库的线程数量限制为较小的数量(假设为5)。这可以使用Semaphore来实现,它只允许有限数量的线程同时访问数据库。例如:

List<Tasks> taskLists = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
Semaphore dbSemaphore = new Semaphore(5);
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
    futures.add(executorService.submit(() -> {
        dbSemaphore.acquire();
        try {
            tasks.execute();
        } finally {
            dbSemaphore.release();
        }
    }))
}

这将确保只有5个线程可以同时访问数据库,这将有助于避免连接拒绝。

但是,某些任务可能比其他任务执行时间更长,因此newFixedThreadPool可能没有得到充分利用。为了解决这个问题,您可以在与newFixedThreadPool的编译中使用newCachedThreadPool,并将任务提交到两个线程池。这样,执行时间更长的任务可以被newChachedThreadPool拾取,而执行速度更快的任务可以继续使用newFixedThreadPool。例如:

List<Tasks> taskLists = new ArrayList<>();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
Semaphore dbSemaphore = new Semaphore(5);
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
    if (tasks.isLongRunning()) { // use cachedThreadPool for long running tasks
        futures.add(cachedThreadPool.submit(() -> {
            dbSemaphore.acquire();
            try {
                tasks.execute();
            } finally {
                dbSemaphore.release();
            }
        }));
    } else { // use fixedThreadPool for short running tasks
        futures.add(fixedThreadPool.submit(() -> {
            dbSemaphore.acquire();
            try {
                tasks.execute();
            } finally {
                dbSemaphore.release();
            }
        }));
    }
}

这样newFixedThreadPool可以用于短时间运行的任务,而newCachedThreadPool可以用于长时间运行的任务。信号量将确保只有有限数量的线程可以同时访问数据库,这将有助于避免连接拒绝。