Inter-thread Communication
Inter-thread communication is the mechanism by which Java threads coordinate their activities — not just prevent interference, but actively signal each other about state changes. Mutual exclusion via synchronized prevents concurrent corruption, but many concurrent algorithms require that one thread pause until another has completed some work or produced a value. Java's built-in inter-thread communication is built on the monitor's wait/notify mechanism: a thread waiting for a condition calls wait() to release its lock and suspend; another thread that produces the awaited condition calls notify() or notifyAll() to wake the waiter. This entry covers the producer-consumer pattern as the canonical use case, the why and how of releasing the lock during the wait, the notify-then-proceed contract, spurious wakeups and why the while-loop guard is mandatory, the limitations of the built-in mechanism, and when to use higher-level alternatives (BlockingQueue, Condition, CountDownLatch, Semaphore).
The Producer-Consumer Problem and Why Polling Fails
// ── Spin-wait: WRONG — burns CPU, may starve producer ─────────────────
public class SpinningConsumer {
private final Queue<String> buffer;
public SpinningConsumer(Queue<String> buffer) { this.buffer = buffer; }
public String consume() {
while (buffer.isEmpty()) {
// Tight loop: 100% CPU usage, no progress — BAD
}
return buffer.poll();
}
}
// ── Yielding: slightly better but still wrong ─────────────────────────
public String consumeWithYield() {
while (buffer.isEmpty()) {
Thread.yield(); // hints to OS to schedule other threads — not guaranteed
}
return buffer.poll();
}
// ── Sleeping: better but still wrong — arbitrary latency ─────────────
public String consumeWithSleep() throws InterruptedException {
while (buffer.isEmpty()) {
Thread.sleep(1); // releases CPU but introduces up to 1ms latency
}
return buffer.poll();
}
// ── The correct solution: wait/notify ─────────────────────────────────
public class WaitNotifyBuffer<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
public WaitNotifyBuffer(int capacity) { this.capacity = capacity; }
public synchronized void produce(T item) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // buffer full: release lock, suspend thread
}
queue.add(item);
notifyAll(); // wake any waiting consumers
}
public synchronized T consume() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // buffer empty: release lock, suspend thread
}
T item = queue.poll();
notifyAll(); // wake any waiting producers
return item;
}
}
WaitNotifyBuffer<Integer> buf = new WaitNotifyBuffer<>(5);
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
buf.produce(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
int item = buf.consume();
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
producer.start(); consumer.start();
producer.join(); consumer.join();Higher-Level Alternatives and the java.util.concurrent Package
// ── java.util.concurrent.locks.Condition — multiple conditions per lock ─
import java.util.concurrent.locks.*;
public class BoundedBuffer<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // producers wait here
private final Condition notEmpty = lock.newCondition(); // consumers wait here
public BoundedBuffer(int capacity) { this.capacity = capacity; }
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) notFull.await(); // only producers wake here
queue.add(item);
notEmpty.signal(); // wake ONE consumer — precise, no thundering herd
} finally { lock.unlock(); }
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) notEmpty.await(); // only consumers wake here
T item = queue.poll();
notFull.signal(); // wake ONE producer
return item;
} finally { lock.unlock(); }
}
}
// ── BlockingQueue — the recommended production solution ───────────────
import java.util.concurrent.*;
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
blockingQueue.put(i); // blocks automatically if full — no manual sync
System.out.println("Put: " + i);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
int item = blockingQueue.take(); // blocks automatically if empty
System.out.println("Took: " + item);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
producer.start(); consumer.start();
// ── CountDownLatch — wait until N tasks complete ──────────────────────
int TASK_COUNT = 5;
CountDownLatch latch = new CountDownLatch(TASK_COUNT);
ExecutorService executor = Executors.newFixedThreadPool(TASK_COUNT);
for (int i = 0; i < TASK_COUNT; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(100 + (long)(Math.random() * 400));
System.out.println("Task " + taskId + " complete");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // decrement count — thread-safe
}
});
}
latch.await(); // main thread blocks until count reaches zero
System.out.println("All tasks complete");
executor.shutdown();
// ── CyclicBarrier — synchronize threads at a meeting point ────────────
int THREAD_COUNT = 3;
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () ->
System.out.println("All threads reached barrier — proceeding to phase 2"));
for (int i = 0; i < THREAD_COUNT; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("Thread " + id + " doing phase 1 work");
Thread.sleep((long)(Math.random() * 500));
barrier.await(); // wait until all THREAD_COUNT threads arrive
System.out.println("Thread " + id + " doing phase 2 work");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
// ── Semaphore — limit concurrent access ───────────────────────────────
Semaphore semaphore = new Semaphore(3); // at most 3 threads at a time
Runnable limitedTask = () -> {
try {
semaphore.acquire(); // blocks if 3 threads already inside
try {
System.out.println(Thread.currentThread().getName() + " accessing resource");
Thread.sleep(200);
} finally {
semaphore.release(); // always release — finally block
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
};
for (int i = 0; i < 10; i++) new Thread(limitedTask).start();