CompletableFuture
CompletableFuture<T>, introduced in Java 8, is a Future implementation that supports manual completion, non-blocking callbacks, and a rich set of composition and transformation operators that enable building complex asynchronous pipelines without blocking threads. It implements both Future<T> and CompletionStage<T>. The CompletionStage interface defines over 40 methods for chaining, combining, transforming, and handling exceptions in asynchronous computations, all of which return new CompletionStages that complete when their input stage completes and their operation finishes. CompletableFuture can be created from an async computation (supplyAsync, runAsync), completed manually by any thread (complete, completeExceptionally), or combined from multiple CompletableFutures (allOf, anyOf). This entry covers the creation methods, the full transformation and chaining API (thenApply, thenAccept, thenRun, thenCompose, thenCombine and their Async variants), exception handling (exceptionally, handle, whenComplete), fan-out and fan-in patterns (allOf, anyOf), the executor model and the default ForkJoinPool.commonPool(), manual completion for testing and bridging, and performance considerations including thread pool selection for different stage types.
Creation, Manual Completion, and the Executor Model
// ── supplyAsync — start an async computation ─────────────────────────
ExecutorService ioPool = Executors.newFixedThreadPool(20);
// Default executor (ForkJoinPool.commonPool) — only for CPU-bound work:
CompletableFuture<Integer> cpuFuture = CompletableFuture.supplyAsync(() -> {
return IntStream.rangeClosed(1, 1_000_000).sum(); // CPU-bound: uses commonPool OK
});
// Custom executor — always for I/O-bound work:
CompletableFuture<String> ioFuture = CompletableFuture.supplyAsync(
() -> fetchFromDatabase(42), // I/O-bound: use dedicated pool
ioPool
);
// runAsync — no result:
CompletableFuture<Void> fireAndForget = CompletableFuture.runAsync(
() -> sendEmail("user@example.com"),
ioPool
);
// ── Manual completion — bridging to callback APIs ─────────────────────
// Example: wrap a callback-based HTTP client into CompletableFuture:
CompletableFuture<String> bridged = new CompletableFuture<>();
httpClient.getAsync("https://api.example.com/data", new Callback() {
@Override void onSuccess(String body) {
bridged.complete(body); // completes the future with the result
}
@Override void onFailure(Throwable ex) {
bridged.completeExceptionally(ex); // completes the future with an exception
}
});
// bridged is now a CompletableFuture that completes when the callback fires
// ── Manual completion for testing ────────────────────────────────────
class UserService {
CompletableFuture<User> fetchUser(long id) {
return CompletableFuture.supplyAsync(() -> db.findUser(id), ioPool);
}
}
// In tests: complete manually to control behavior without I/O:
CompletableFuture<User> mockFuture = new CompletableFuture<>();
UserService service = mock(UserService.class);
when(service.fetchUser(42L)).thenReturn(mockFuture);
// Test immediate completion:
mockFuture.complete(new User(42L, "Alice"));
// Test failure:
CompletableFuture<User> failFuture = new CompletableFuture<>();
failFuture.completeExceptionally(new DatabaseException("timeout"));
// ── completedFuture — synchronous fast path ───────────────────────────
Map<Long, User> cache = new ConcurrentHashMap<>();
CompletableFuture<User> fetchWithCache(long id) {
User cached = cache.get(id);
if (cached != null) {
return CompletableFuture.completedFuture(cached); // no async needed
}
return CompletableFuture.supplyAsync(() -> {
User user = db.findUser(id);
cache.put(id, user);
return user;
}, ioPool);
}
// ── The default executor problem ──────────────────────────────────────
// WRONG: I/O in commonPool starves CPU tasks (parallel streams, etc.):
CompletableFuture<String> badIo = CompletableFuture.supplyAsync(() -> {
return slowDatabaseCall(); // blocks a commonPool thread — BAD
});
// CORRECT: dedicated I/O pool for I/O operations:
ExecutorService dbPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 10, // I/O-bound: 10x cores
new NamedThreadFactory("db-pool", false)
);
CompletableFuture<String> goodIo = CompletableFuture.supplyAsync(
() -> slowDatabaseCall(),
dbPool // I/O-bound work on I/O pool — commonPool remains available for CPU work
);Transformation, Chaining, and Combining — thenApply, thenCompose, thenCombine
// ── thenApply — transform result ─────────────────────────────────────
CompletableFuture<String> rawData = CompletableFuture.supplyAsync(
() -> fetchJson(), ioPool
);
CompletableFuture<User> parsedUser = rawData
.thenApply(json -> parseJson(json, User.class)); // transform String → User
CompletableFuture<String> userEmail = parsedUser
.thenApply(User::getEmail); // transform User → String
// Chain directly:
CompletableFuture<String> pipeline = CompletableFuture
.supplyAsync(() -> fetchJson(), ioPool)
.thenApply(json -> parseJson(json, User.class)) // in pool thread (fast — OK)
.thenApplyAsync(user -> enrichFromLdap(user), ioPool) // needs I/O — use pool
.thenApply(User::getEmail); // fast transform — no pool needed
// ── thenCompose — chain dependent async operations ────────────────────
// WRONG: thenApply returns CompletableFuture<CompletableFuture<User>> — nested!
CompletableFuture<CompletableFuture<User>> nested = CompletableFuture
.supplyAsync(() -> 42L, ioPool)
.thenApply(id -> fetchUserAsync(id)); // returns a future — nested!
// CORRECT: thenCompose flattens to CompletableFuture<User>:
CompletableFuture<User> user = CompletableFuture
.supplyAsync(() -> 42L, ioPool)
.thenCompose(id -> fetchUserAsync(id)); // id → CompletableFuture<User> → flat
// Multi-step sequential async pipeline:
CompletableFuture<OrderConfirmation> orderPipeline = CompletableFuture
.supplyAsync(() -> validateOrder(order), ioPool)
.thenCompose(valid -> reserveInventory(valid), ioPool)
.thenCompose(reserved -> chargePayment(reserved), ioPool)
.thenCompose(charged -> sendConfirmation(charged), ioPool);
// Each step starts only after the previous async step fully completes
// ── thenCombine — combine two independent futures ─────────────────────
CompletableFuture<User> userFuture = fetchUserAsync(userId, ioPool);
CompletableFuture<Account> acctFuture = fetchAccountAsync(accountId, ioPool);
// Both fetches run concurrently — neither depends on the other
CompletableFuture<Dashboard> dashboard = userFuture.thenCombine(
acctFuture,
(user, account) -> new Dashboard(user, account) // called when BOTH complete
);
// ── allOf — wait for ALL, then collect results ────────────────────────
List<Long> userIds = List.of(1L, 2L, 3L, 4L, 5L);
// Start all fetches concurrently:
List<CompletableFuture<User>> userFutures = userIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> fetchUser(id), ioPool))
.collect(Collectors.toList());
// Wait for all, then collect results (standard allOf pattern):
CompletableFuture<List<User>> allUsers = CompletableFuture
.allOf(userFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> userFutures.stream()
.map(CompletableFuture::join) // join() never blocks here — all are done
.collect(Collectors.toList())
);
List<User> users = allUsers.get(10, TimeUnit.SECONDS);
System.out.println("Fetched " + users.size() + " users");
// ── anyOf — first to complete wins ───────────────────────────────────
List<String> endpoints = List.of("primary.api.com", "secondary.api.com", "tertiary.api.com");
CompletableFuture<Object> firstResponse = CompletableFuture.anyOf(
endpoints.stream()
.map(endpoint -> CompletableFuture.supplyAsync(() -> callEndpoint(endpoint), ioPool))
.toArray(CompletableFuture[]::new)
);
String response = (String) firstResponse.get(); // first successful response
System.out.println("Got: " + response);
// ── thenAccept and thenRun — side effects ────────────────────────────
CompletableFuture.supplyAsync(() -> processOrder(), ioPool)
.thenAccept(order -> auditLog.record(order)) // consume result, return Void
.thenRun(() -> metrics.increment("orders.processed")); // no result accessException Handling, whenComplete, handle, and Practical Patterns
// ── exceptionally — recover from failure ─────────────────────────────
CompletableFuture<String> withRecovery = CompletableFuture
.supplyAsync(() -> {
if (Math.random() < 0.5) throw new RuntimeException("Network error");
return "live data";
}, ioPool)
.exceptionally(ex -> {
System.err.println("Recovering from: " + ex.getMessage());
return "cached fallback"; // recovery value — stage completes normally
});
System.out.println("Result: " + withRecovery.get()); // "live data" or "cached fallback"
// ── handle — always runs, transforms both success and failure ─────────
CompletableFuture<Response> handled = CompletableFuture
.supplyAsync(() -> riskyDatabaseCall(), ioPool)
.handle((data, ex) -> {
if (ex != null) {
log.error("DB call failed", ex);
return Response.error(ex.getMessage()); // transform exception → Response
}
return Response.success(data); // transform success → Response
});
// ── whenComplete — observe without altering ───────────────────────────
CompletableFuture<Order> withLogging = CompletableFuture
.supplyAsync(() -> processOrder(), ioPool)
.whenComplete((order, ex) -> {
if (ex != null) {
log.error("Order processing failed", ex); // observe failure
metrics.increment("orders.failed");
} else {
log.info("Order processed: " + order.getId()); // observe success
metrics.increment("orders.succeeded");
}
// Does NOT change the result — exception or value passes through unchanged
});
// ── Exception propagation through chains ──────────────────────────────
CompletableFuture<String> chain = CompletableFuture
.supplyAsync(() -> "raw data", ioPool)
.thenApply(s -> { throw new RuntimeException("Step 2 failed"); }) // throws
.thenApply(s -> s.toUpperCase()) // SKIPPED — exception propagates
.thenApply(s -> s.trim()) // SKIPPED — exception propagates
.exceptionally(ex -> "recovered"); // catches the exception from step 2
System.out.println(chain.get()); // "recovered"
// ── Complete practical pipeline ────────────────────────────────────────
record UserProfile(User user, List<Order> orders, AccountBalance balance) {}
CompletableFuture<UserProfile> buildProfile(long userId) {
// Step 1: Fetch user (sequential first step)
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> userService.findById(userId), ioPool)
.exceptionally(ex -> User.anonymous()); // fallback on user fetch failure
// Step 2 & 3: Fetch orders and balance in PARALLEL after user:
return userFuture.thenCompose(user -> {
CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> orderService.findByUser(user.getId()), ioPool)
.exceptionally(ex -> Collections.emptyList());
CompletableFuture<AccountBalance> balanceFuture = CompletableFuture
.supplyAsync(() -> accountService.getBalance(user.getId()), ioPool)
.exceptionally(ex -> AccountBalance.zero());
// Combine both parallel results:
return ordersFuture.thenCombine(balanceFuture,
(orders, balance) -> new UserProfile(user, orders, balance)
);
})
.whenComplete((profile, ex) -> {
if (ex != null) log.error("Profile build failed for user " + userId, ex);
else metrics.recordProfileBuildTime(userId);
});
}
// Usage: non-blocking
buildProfile(42L)
.thenAccept(profile -> renderPage(profile))
.exceptionally(ex -> { renderErrorPage(ex); return null; });
// ── Timeout (Java 9+) — complete exceptionally after deadline ─────────
CompletableFuture<String> withTimeout = CompletableFuture
.supplyAsync(() -> slowExternalCall(), ioPool)
.orTimeout(2, TimeUnit.SECONDS) // completes exceptionally with TimeoutException after 2s
.exceptionally(ex -> ex instanceof TimeoutException ? "timeout-default" : "other-error");
// completeOnTimeout (Java 9+) — complete with value instead of exception:
CompletableFuture<String> withDefault = CompletableFuture
.supplyAsync(() -> slowExternalCall(), ioPool)
.completeOnTimeout("default-value", 2, TimeUnit.SECONDS); // no exception, just a value