免责声明:这是我第一次使用Java的Fork-Join框架,所以我不能100%确定我是否正确使用它。Java也不是我的主要编程语言,所以这也可能是相关的。
给定以下SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
因此,我创建了一个ForkJoinPool
,我向其提交了一些执行一些处理的任务。出于本示例的目的,我将它们替换为Thread.睡眠()
,以保持简单。
在我的实际程序中,这是一个很长的任务列表,因此我想定期在标准输出上打印当前状态。我尝试使用计划的TimerTask
在单独的线程上执行此操作。
但是,我注意到一些我没有预料到的事情:在我的示例中,输出类似于:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
这意味着“状态任务”永远不会执行。
但是,如果我修改我的代码以移动pool. invoke(计算器);
在最后,然后它按预期工作:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
我能得出的唯一结论是ForkJoinPool::invoke()
阻塞主线程(它只在池中的所有任务完成后返回)。
我希望主线程中的代码继续执行,而fork-join-pool中的任务是异步处理的。
我的问题是:发生这种情况是因为我错误地使用了框架吗?我的代码中有什么需要纠正的吗?
我注意到一个ForkJoinPool
的构造函数有一个boolean asyncMode
参数,但是,从我可以从实现中看出,这只是在FIFO_QUEUE
和LIFO_QUEUE
执行模式之间做出决定(不完全确定这些是什么):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
基本上invoke()
会等待整个任务完成再返回,所以是的主线程阻塞了。之后,Timer
没有时间执行,因为它运行在守护线程上。
您可以简单地使用执行()
而不是异步运行任务的调用()
。然后您可以在ForkJoinTask
上的join()
等待结果,在此期间Timer
将运行:
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(pool.toString());
}
}, 100, 2000);
calculator.join(); // wait for computation