ForkJoinPool
ForkJoinPool is an ExecutorService implementation designed specifically for divide-and-conquer parallelism: splitting a large task into smaller subtasks recursively, executing them in parallel, and merging results bottom-up. It was introduced in Java 7 and became significantly more prominent in Java 8 as the execution engine behind parallel streams and CompletableFuture's default executor. ForkJoinPool differs from ThreadPoolExecutor in two fundamental ways: it uses work-stealing scheduling (idle threads steal tasks from the back of busy threads' queues, maximizing CPU utilization with minimal blocking), and it provides first-class support for recursive task decomposition through ForkJoinTask, RecursiveTask<V>, and RecursiveAction. The JVM maintains a single shared instance — ForkJoinPool.commonPool() — that is used by parallel streams and CompletableFuture by default. This entry covers the work-stealing algorithm in depth, ForkJoinTask and its fork/join semantics, RecursiveTask and RecursiveAction with complete divide-and-conquer examples, the commonPool and its configuration, when to use ForkJoinPool versus ThreadPoolExecutor, and the ManagedBlocker interface for integrating blocking operations without starving the pool.
Work-Stealing Architecture and the ForkJoinPool Model
// ── ForkJoinPool construction and configuration ──────────────────────
// Default: parallelism = availableProcessors - 1 (minimum 1)
ForkJoinPool defaultPool = new ForkJoinPool();
System.out.println("Default parallelism: " + defaultPool.getParallelism()); // e.g., 7 on 8-core
// Custom parallelism level:
ForkJoinPool customPool = new ForkJoinPool(
4, // parallelism = 4 worker threads
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, // uncaught exception handler
false // asyncMode: LIFO (false) or FIFO (true)
);
// ── commonPool — the JVM-wide shared instance ─────────────────────────
ForkJoinPool common = ForkJoinPool.commonPool();
System.out.println("Common pool parallelism: " + common.getParallelism());
// = availableProcessors - 1 (configurable via system property)
// Configure commonPool via system property (before any use):
// java -Djava.util.concurrent.ForkJoinPool.common.parallelism=16 MyApp
// ── Work-stealing in action — observable through pool stats ───────────
ForkJoinPool observable = new ForkJoinPool(4);
observable.submit(() -> {
// From inside a ForkJoinPool task, we can see stealing:
System.out.println("Running on: " + Thread.currentThread().getName());
System.out.println("Steal count: " + observable.getStealCount());
});
// Monitor work-stealing activity:
Thread.sleep(100);
System.out.printf("Pool stats: parallelism=%d poolSize=%d activeThreads=%d steals=%d%n",
observable.getParallelism(),
observable.getPoolSize(),
observable.getActiveThreadCount(),
observable.getStealCount()
);
observable.shutdown();
// ── asyncMode — FIFO vs LIFO task ordering ────────────────────────────
// asyncMode=false (default, LIFO): tasks are popped newest-first from own deque
// → better cache locality (recently computed data still in cache)
// → standard choice for recursive divide-and-conquer
// asyncMode=true (FIFO): tasks are popped oldest-first from own deque
// → tasks execute in submission order within a thread
// → better for event-processing pipelines (not recursive tasks)
ForkJoinPool fifoPool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true // asyncMode=true — FIFO ordering for event-style tasks
);
// ── ForkJoinPool vs ThreadPoolExecutor — decision ─────────────────────
// Use ForkJoinPool when:
// - Recursive divide-and-conquer (merge sort, quick sort, tree traversal)
// - CPU-bound computation that decomposes into many fine-grained tasks
// - Parallel streams (uses commonPool automatically)
// - Work has irregular structure — some subtrees deeper than others
// Use ThreadPoolExecutor when:
// - I/O-bound work (HTTP calls, database queries, file operations)
// - Fixed number of independent tasks with no hierarchical structure
// - Need precise control over queue type, capacity, rejection policy
// - Tasks may block on external resources
fifoPool.shutdown();RecursiveTask and RecursiveAction — Divide and Conquer
// ── RecursiveTask — parallel sum of array ────────────────────────────
class ParallelSum extends RecursiveTask<Long> {
private static final int THRESHOLD = 10_000; // tune empirically
private final long[] array;
private final int from, to;
ParallelSum(long[] array, int from, int to) {
this.array = array;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
int length = to - from;
// Base case: problem small enough to solve directly
if (length <= THRESHOLD) {
long sum = 0;
for (int i = from; i < to; i++) sum += array[i];
return sum;
}
// Recursive case: split in half
int mid = from + length / 2;
ParallelSum left = new ParallelSum(array, from, mid);
ParallelSum right = new ParallelSum(array, mid, to);
// Fork LEFT into pool (another thread may steal it):
left.fork();
// Compute RIGHT directly in this thread (avoids unnecessary task creation):
long rightResult = right.compute();
// Join LEFT (wait for it; meanwhile this thread may help with other stolen tasks):
long leftResult = left.join();
return leftResult + rightResult;
}
}
long[] data = LongStream.rangeClosed(1L, 10_000_000L).toArray();
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new ParallelSum(data, 0, data.length));
System.out.println("Parallel sum: " + result); // 50000005000000
// Verify against sequential:
long sequential = LongStream.of(data).sum();
System.out.println("Sequential sum: " + sequential); // same
// ── RecursiveAction — parallel array fill (no return value) ──────────
class ParallelFill extends RecursiveAction {
private static final int THRESHOLD = 5_000;
private final int[] array;
private final int from, to;
private final int fillValue;
ParallelFill(int[] array, int from, int to, int fillValue) {
this.array = array; this.from = from; this.to = to; this.fillValue = fillValue;
}
@Override
protected void compute() {
if (to - from <= THRESHOLD) {
Arrays.fill(array, from, to, fillValue); // base case: direct fill
return;
}
int mid = from + (to - from) / 2;
ParallelFill left = new ParallelFill(array, from, mid, fillValue);
ParallelFill right = new ParallelFill(array, mid, to, fillValue);
left.fork();
right.compute(); // compute right in current thread
left.join(); // wait for left
}
}
int[] arr = new int[1_000_000];
pool.invoke(new ParallelFill(arr, 0, arr.length, 7));
System.out.println("Filled: " + arr[500_000]); // 7
// ── RecursiveTask — merge sort ────────────────────────────────────────
class ParallelMergeSort extends RecursiveAction {
private static final int THRESHOLD = 2048;
private final int[] array;
private final int from, to;
ParallelMergeSort(int[] array, int from, int to) {
this.array = array; this.from = from; this.to = to;
}
@Override
protected void compute() {
if (to - from <= THRESHOLD) {
Arrays.sort(array, from, to); // base case: use Arrays.sort for small ranges
return;
}
int mid = from + (to - from) / 2;
ParallelMergeSort left = new ParallelMergeSort(array, from, mid);
ParallelMergeSort right = new ParallelMergeSort(array, mid, to);
left.fork();
right.compute();
left.join();
mergeInPlace(array, from, mid, to);
}
private void mergeInPlace(int[] arr, int lo, int mid, int hi) {
int[] temp = Arrays.copyOfRange(arr, lo, mid);
int i = 0, j = mid, k = lo;
while (i < temp.length && j < hi) {
if (temp[i] <= arr[j]) arr[k++] = temp[i++];
else arr[k++] = arr[j++];
}
while (i < temp.length) arr[k++] = temp[i++];
}
}
int[] toSort = new Random().ints(1_000_000, 0, Integer.MAX_VALUE).toArray();
pool.invoke(new ParallelMergeSort(toSort, 0, toSort.length));
System.out.println("Sorted: " + IntStream.of(toSort).limit(5).boxed().collect(Collectors.toList()));
pool.shutdown();commonPool, ManagedBlocker, and Integration with Parallel Streams
// ── commonPool usage by parallel streams and CompletableFuture ────────
// Parallel stream uses commonPool automatically:
long parallelSum = LongStream.rangeClosed(1, 1_000_000)
.parallel() // submits work to commonPool
.sum();
System.out.println("Parallel sum: " + parallelSum);
// CompletableFuture without executor uses commonPool:
CompletableFuture<Integer> cfDefault = CompletableFuture.supplyAsync(() -> 42);
// → runs on commonPool thread
// PROBLEM: blocking in commonPool starves everyone:
CompletableFuture<String> blockingCf = CompletableFuture.supplyAsync(() -> {
Thread.sleep(5000); // blocks a commonPool thread for 5 seconds!
return "result";
});
// Meanwhile, parallel streams have one fewer thread → reduced parallelism
// SOLUTION: always use dedicated pool for blocking/I/O work:
ExecutorService ioPool = Executors.newFixedThreadPool(20);
CompletableFuture<String> safeBlockingCf = CompletableFuture.supplyAsync(
() -> { Thread.sleep(5000); return "result"; },
ioPool // doesn't touch commonPool
);
// ── Controlling parallel stream pool via ForkJoinPool.submit() ────────
ForkJoinPool customParallelism = new ForkJoinPool(2); // only 2 threads for this stream
long result = customParallelism.submit(() ->
LongStream.rangeClosed(1, 1_000_000)
.parallel() // uses customParallelism (2 threads), NOT commonPool
.peek(n -> System.out.println("Thread: " + Thread.currentThread().getName()))
.sum()
).get();
System.out.println("Result (2-thread parallel): " + result);
customParallelism.shutdown();
// ── ManagedBlocker — safe blocking in ForkJoinPool tasks ──────────────
// Blocking in a ForkJoinTask without ManagedBlocker:
class UnsafeBlockingTask extends RecursiveTask<String> {
@Override protected String compute() {
try {
Thread.sleep(1000); // blocks pool thread — reduces parallelism!
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "done";
}
}
// Correct: use ManagedBlocker to inform pool about blocking:
class SafeSleepBlocker implements ForkJoinPool.ManagedBlocker {
private final long millis;
private boolean done = false;
SafeSleepBlocker(long millis) { this.millis = millis; }
@Override
public boolean isReleasable() { return done; }
@Override
public boolean block() throws InterruptedException {
if (!done) {
Thread.sleep(millis); // actual blocking — pool may compensate with extra thread
done = true;
}
return true;
}
}
class SafeBlockingTask extends RecursiveTask<String> {
@Override protected String compute() {
try {
ForkJoinPool.managedBlock(new SafeSleepBlocker(1000));
// Pool knows this thread is blocking and may create a compensation thread
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "done safely";
}
}
ForkJoinPool blockingPool = new ForkJoinPool(4);
String safe = blockingPool.invoke(new SafeBlockingTask());
System.out.println(safe);
blockingPool.shutdown();
// ── commonPool configuration via system property ──────────────────────
// Set before any use of commonPool (in main or static initializer):
// System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
// OR via JVM flag:
// java -Djava.util.concurrent.ForkJoinPool.common.parallelism=16 MyApp
// Check current setting:
System.out.println("CommonPool parallelism: " + ForkJoinPool.commonPool().getParallelism());
// ── Threshold tuning — empirical measurement ──────────────────────────
for (int threshold : new int[]{100, 1000, 10_000, 100_000}) {
long[] data = LongStream.rangeClosed(1, 10_000_000).toArray();
ForkJoinPool p = new ForkJoinPool(4);
final int t = threshold;
long start = System.nanoTime();
long sum = p.invoke(new RecursiveTask<Long>() {
@Override protected Long compute() {
return computeWithThreshold(data, 0, data.length, t);
}
long computeWithThreshold(long[] arr, int from, int to, int thresh) {
if (to - from <= thresh) {
long s = 0; for (int i = from; i < to; i++) s += arr[i]; return s;
}
int mid = from + (to - from) / 2;
RecursiveTask<Long> left = new RecursiveTask<Long>() {
@Override protected Long compute() { return computeWithThreshold(arr, from, mid, thresh); }
};
left.fork();
long right = computeWithThreshold(arr, mid, to, thresh);
return left.join() + right;
}
});
long time = System.nanoTime() - start;
System.out.printf("threshold=%,7d time=%,6d µs sum=%d%n", threshold, time/1000, sum);
p.shutdown();
}
// Typical result: 1000–10000 is the sweet spot; 100 has too many tasks; 100000 leaves threads idle