请原谅标题。我有点困惑,因为我的要求有这么多变量。我会试着解释。
所以基本上我有一个触发服务的调度程序作业。服务进程N(假设这个N是以千为单位,或者在最坏的情况下可能是以十万为单位)的任务数量。现在这些任务也连接到数据库,获取并插入一些数据到数据库中。
现在,在通过各种来源以及堆栈溢出来研究和挖掘有关Execator框架的所有内容之后,有两个选项听起来很合理newFixedThreadPool
和newCachedThreadPool
。所以看看这些的实现
List<Tasks> taskLists = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
futures.add(executorService.submit(() -> tasks.execute()))
}
理想情况下,对于newFixedThreadPool,我们分配我们可以创建的线程池的最大数量。我不能做这个巨大的数字,因为如果这些子任务并行执行,它将全部尝试通过数据库(postgreSQL)进行通信,这也有连接限制。并且一些线程可能会因为这些连接拒绝而失败。所以我必须给出一些合理的数字,但是在这种情况下,所有任务的执行都需要很多时间。
另一方面,如果我使用newCachedThreadPool
List<Tasks> taskLists = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
futures.add(executorService.submit(() -> tasks.execute()))
}
这种情况下的问题是它将并行执行并重用那些完成的线程。这里的连接拒绝问题也是一样的,有些线程甚至可能在没有完成任务的情况下被杀死。
我也不能最大化postgreSQL连接限制,因为也有一些限制,如果我没有错,它的最大100或215如果可配置。
关于我应该如何处理这种情况,有什么建议或想法吗?
在这种情况下,您可以考虑结合使用newFixedThreadPool
和newCachedThreadPool
来实现最佳性能,同时避免连接限制。
首先,您可以将newFixedThreadPool与合理数量的线程一起使用(假设为10,如您的示例所示),但将可以同时访问数据库的线程数量限制为较小的数量(假设为5)。这可以使用Semaphore来实现,它只允许有限数量的线程同时访问数据库。例如:
List<Tasks> taskLists = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
Semaphore dbSemaphore = new Semaphore(5);
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
futures.add(executorService.submit(() -> {
dbSemaphore.acquire();
try {
tasks.execute();
} finally {
dbSemaphore.release();
}
}))
}
这将确保只有5个线程可以同时访问数据库,这将有助于避免连接拒绝。
但是,某些任务可能比其他任务执行时间更长,因此newFixedThreadPool
可能没有得到充分利用。为了解决这个问题,您可以在与newFixedThreadPool
的编译中使用newCachedThreadPool
,并将任务提交到两个线程池。这样,执行时间更长的任务可以被newChachedThreadPool
拾取,而执行速度更快的任务可以继续使用newFixedThreadPool
。例如:
List<Tasks> taskLists = new ArrayList<>();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
Semaphore dbSemaphore = new Semaphore(5);
List<Future<?>> futures = new ArrayList<>();
for(Tasks tasks : tasksLists) {
if (tasks.isLongRunning()) { // use cachedThreadPool for long running tasks
futures.add(cachedThreadPool.submit(() -> {
dbSemaphore.acquire();
try {
tasks.execute();
} finally {
dbSemaphore.release();
}
}));
} else { // use fixedThreadPool for short running tasks
futures.add(fixedThreadPool.submit(() -> {
dbSemaphore.acquire();
try {
tasks.execute();
} finally {
dbSemaphore.release();
}
}));
}
}
这样newFixedThreadPool
可以用于短时间运行的任务,而newCachedThreadPool
可以用于长时间运行的任务。信号量将确保只有有限数量的线程可以同时访问数据库,这将有助于避免连接拒绝。