跳到主要内容

forkjoin

ForkJoinPool 与工作窃取

ToC

  • 设计理念与适用场景
  • RecursiveTask/RecursiveAction
  • 工作窃取与并行度
  • 与 ThreadPoolExecutor 的取舍
  • 实战与坑点

设计理念与适用场景

Fork/Join 面向“可拆分、可合并”的并行任务(分治),通过工作窃取提升 CPU 利用率。适合 CPU 密集型、递归/区间类计算;不适合大量阻塞 I/O。

基本用法:RecursiveTask

import java.util.concurrent.*;

class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10_000;
final long[] arr; final int lo, hi;
SumTask(long[] arr, int lo, int hi) { this.arr=arr; this.lo=lo; this.hi=hi; }
@Override protected Long compute() {
int len = hi - lo;
if (len <= THRESHOLD) {
long s = 0; for (int i=lo;i<hi;i++) s += arr[i]; return s;
}
int mid = lo + len/2;
SumTask left = new SumTask(arr, lo, mid);
SumTask right = new SumTask(arr, mid, hi);
left.fork(); // 异步
long r = right.compute(); // 当前线程继续右半
long l = left.join(); // 等左半
return l + r;
}
}

ForkJoinPool pool = new ForkJoinPool();
long sum = pool.invoke(new SumTask(arr, 0, arr.length));

RecursiveAction 类似,但无返回值。

工作窃取与并行度

  • 每个工作线程维护双端队列(deque),本地 LIFO;空闲线程从他人队列尾部“窃取”;
  • 并行度默认=CPU 核数;new ForkJoinPool(parallelism) 可自定义;
  • ForkJoinPool.commonPool() 作为全局公共池,CompletableFuture 默认使用它。

与 ThreadPoolExecutor 的取舍

  • ForkJoin 偏向细粒度、递归任务;ThreadPool 更通用、适合 I/O 与显式队列;
  • 避免在 ForkJoin 任务内执行长时间阻塞 I/O;如必须,使用 ManagedBlocker
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker(){
public boolean block() throws InterruptedException { /* 阻塞 I/O */ return true; }
public boolean isReleasable() { return false; }
});

实战要点

  • 合理设置阈值(切分粒度),避免过度拆分或粒度过大;
  • 递归树不平衡时可能产生偏斜,必要时随机拆分/分块;
  • parallelStream():其底层通常用 commonPool;重度使用建议自建池避免干扰其他任务。