Parallel Streams
Parallel streams execute stream operations concurrently using the common ForkJoinPool, transparently splitting the data source into chunks and processing each chunk on a separate thread. A sequential stream becomes parallel by calling parallelStream() on a collection or parallel() on an existing stream; calling sequential() converts it back. The Fork/Join framework divides the work recursively — a Spliterator splits the data into halves, each half is processed independently, and results are combined — enabling efficient parallelism for data-parallel computations on multi-core hardware. Parallel streams are most effective when the computation is CPU-bound, the data set is large, the operation is independently executable on each element (no shared mutable state, no ordering dependencies), and the overhead of splitting and combining is small relative to the per-element computation. This entry covers the splitting and combining model in depth, the common ForkJoinPool and how to use a custom pool, the effect of encounter order and ordered vs unordered operations, thread safety requirements for reduction and collection, performance pitfalls (autoboxing, small data sets, stateful operations, sequential sources), and measurement-driven guidelines for when parallel streams help vs hurt.
How Parallel Streams Work — Splitting, Processing, and Combining
// ── Creating parallel streams ─────────────────────────────────────────
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// From a collection directly:
numbers.parallelStream()
.map(n -> n * n)
.forEach(System.out::println); // order NOT guaranteed — parallel
// Convert existing sequential stream to parallel:
numbers.stream() // sequential
.parallel() // switch to parallel
.map(n -> n * n)
.forEach(System.out::println);
// Convert parallel back to sequential:
numbers.parallelStream()
.sequential() // back to sequential
.map(n -> n * n)
.forEach(System.out::println); // order guaranteed
// Check if a stream is parallel:
Stream<Integer> s = numbers.parallelStream();
System.out.println(s.isParallel()); // true
// ── Splitting: ArrayList vs LinkedList performance ────────────────────
// ArrayList: SIZED + SUBSIZED + ORDERED → perfect parallel splits
List<Integer> arrayList = new ArrayList<>(Collections.nCopies(1_000_000, 1));
long arraySum = arrayList.parallelStream().mapToLong(Integer::longValue).sum();
// Splits perfectly: [0,500k), [500k,1M) → [0,250k), [250k,500k) etc.
// LinkedList: NOT SIZED, NOT SUBSIZED → poor parallel performance
List<Integer> linkedList = new LinkedList<>(Collections.nCopies(1_000_000, 1));
long linkedSum = linkedList.parallelStream().mapToLong(Integer::longValue).sum();
// Cannot split efficiently — degrades to sequential behavior
// ── Ordered vs unordered: ORDERED constraint limits parallelism ────────
List<String> words = List.of("banana","apple","cherry","date","elderberry");
// With ordering (ORDERED source): limit() must preserve first 3 in encounter order
List<String> orderedLimit = words.parallelStream()
.limit(3) // must preserve encounter order — SLOW in parallel
.collect(Collectors.toList());
System.out.println(orderedLimit); // always [banana, apple, cherry]
// Without ordering: any 3 elements, much faster in parallel
List<String> unorderedLimit = words.parallelStream()
.unordered() // remove ORDERED constraint
.limit(3) // can return any 3 — FAST in parallel
.collect(Collectors.toList());
System.out.println(unorderedLimit); // 3 elements, order unpredictableThread Safety, Custom ForkJoinPool, and Reduction Requirements
// ── Thread safety: shared mutable state causes race conditions ────────
// WRONG: parallel stream mutating shared list:
List<Integer> shared = new ArrayList<>();
IntStream.range(0, 1000).parallel().forEach(shared::add); // RACE CONDITION
System.out.println(shared.size()); // likely NOT 1000 — lost updates, corruption
// CORRECT: use collect() instead:
List<Integer> correct = IntStream.range(0, 1000)
.parallel()
.boxed()
.collect(Collectors.toList()); // thread-safe: internal partial lists combined
System.out.println(correct.size()); // always 1000
// CORRECT: concurrent collector for parallel grouping:
Map<Integer, List<Integer>> grouped = IntStream.range(0, 1000)
.parallel()
.boxed()
.collect(Collectors.groupingByConcurrent(n -> n % 10)); // ConcurrentHashMap
System.out.println(grouped.size()); // 10 groups
// ── AtomicLong for parallel side-effect counting (antipattern but safe) ─
AtomicLong count = new AtomicLong();
IntStream.range(0, 1_000_000).parallel().forEach(n -> {
if (n % 2 == 0) count.incrementAndGet(); // thread-safe but suboptimal
});
// Better: use stream operations directly:
long countBetter = IntStream.range(0, 1_000_000).parallel().filter(n -> n % 2 == 0).count();
// ── Custom ForkJoinPool: separate from common pool ────────────────────
ForkJoinPool customPool = new ForkJoinPool(4); // 4 worker threads
try {
long result = customPool.submit(() ->
LongStream.range(0, 1_000_000)
.parallel() // uses customPool, not common pool
.sum()
).get();
System.out.println("Sum: " + result);
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
} finally {
customPool.shutdown(); // must shut down to release threads
}
// ── Reduction identity and associativity requirements ─────────────────
// CORRECT: identity=0, accumulator/combiner are associative:
int sum = IntStream.range(1, 11).parallel().reduce(0, Integer::sum);
System.out.println(sum); // always 55
// WRONG: identity is not a true identity (violates: combine(identity, v) = v):
int wrongResult = Stream.of(1, 2, 3, 4, 5).parallel().reduce(10, Integer::sum);
// Each parallel sub-stream uses identity=10, so 5 sub-streams add 10 each → 10+10+... added
System.out.println(wrongResult); // NOT 25, NOT deterministic — depends on split count
// WRONG: non-associative accumulation:
// String concatenation with ordering dependency fails in parallel:
String s = Stream.of("a","b","c","d").parallel()
.reduce("", (a, b) -> a + "[" + b + "]"); // order-dependent — wrong in parallel
System.out.println(s); // unpredictable
// CORRECT: use Collectors.joining() which handles parallel combining:
String joined = Stream.of("a","b","c","d").parallel()
.collect(Collectors.joining(", ")); // always "a, b, c, d"Performance Pitfalls and When to Use Parallel Streams
// ── Small data: parallel is SLOWER ───────────────────────────────────
List<Integer> small = List.of(1, 2, 3, 4, 5);
// Sequential: fast (no thread coordination overhead)
long seqResult = small.stream().mapToLong(n -> n * n).sum();
// Parallel: slow (thread overhead > computation cost for 5 elements)
long parResult = small.parallelStream().mapToLong(n -> n * n).sum();
// Typical timing: sequential 0.001ms, parallel 0.1ms — 100x SLOWER in parallel
// ── Autoboxing pitfall ────────────────────────────────────────────────
List<Integer> boxedNumbers = new ArrayList<>();
for (int i = 0; i < 1_000_000; i++) boxedNumbers.add(i);
// SLOW: boxing/unboxing + parallel overhead cancels benefit:
long boxedSum = boxedNumbers.parallelStream()
.mapToLong(Integer::longValue) // unboxing 1M Integer objects
.sum();
// FAST: no boxing at all:
long primitiveSum = LongStream.range(0, 1_000_000).parallel().sum();
// ── Poor Spliterator: LinkedList doesn't split well ────────────────────
List<Integer> linked = new LinkedList<>();
for (int i = 0; i < 100_000; i++) linked.add(i);
// parallelStream on LinkedList: splits poorly, often sequential in practice
// Fix: collect to ArrayList first if parallelism is needed:
List<Integer> arrayList2 = new ArrayList<>(linked);
long fastSum = arrayList2.parallelStream().mapToLong(Integer::longValue).sum();
// ── sorted() in parallel: requires full materialization ───────────────
// All parallel speedup is lost because sorted() must see all elements:
List<Integer> toSort = new ArrayList<>(Collections.nCopies(100_000, 0));
Collections.shuffle(toSort);
// This is NOT faster in parallel — sorted() forces a sequential step:
List<Integer> sortedParallel = toSort.parallelStream()
.sorted() // forces full materialization — no parallel benefit
.collect(Collectors.toList());
// ── NQ model: when parallel helps ────────────────────────────────────
// N=10, Q=1ms per element: N*Q = 10ms → probably worth parallelizing
// N=10_000, Q=0.001ms: N*Q = 10ms → borderline — measure
// N=10, Q=0.001ms: N*Q = 0.01ms → sequential wins easily
// CPU-intensive computation: good parallel candidate
long[] primes = LongStream.range(2, 1_000_000).parallel()
.filter(n -> {
for (long i = 2; i * i <= n; i++) {
if (n % i == 0) return false;
}
return true; // expensive check — Q is high → parallel beneficial
}).toArray();
System.out.println("Primes found: " + primes.length);
// I/O operation inside parallel: BAD — blocks common ForkJoinPool
// DON'T DO THIS:
urls.parallelStream().forEach(url -> {
String content = downloadContent(url); // blocks ForkJoinWorkerThread!
process(content); // pool starved while waiting for I/O
});
// DO THIS INSTEAD: CompletableFuture with dedicated thread pool
ExecutorService ioPool = Executors.newFixedThreadPool(20);
List<CompletableFuture<Void>> futures = urls.stream()
.map(url -> CompletableFuture.runAsync(() -> {
String content = downloadContent(url); // I/O in ioPool, not ForkJoinPool
process(content);
}, ioPool))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
ioPool.shutdown();