Threads & Synchronization
Threads are the fundamental building blocks of concurrent programs. A thread is a lightweight unit of execution within a process — all threads in a process share the same memory space, which makes communication fast but also introduces synchronization challenges.
Thread Fundamentals
What Is a Thread?
A process can contain one or more threads. Each thread has its own:
- Program counter — tracks which instruction is executing
- Stack — stores local variables and function call frames
- Register set — the CPU registers assigned to the thread
But threads within the same process share:
- Heap memory — dynamically allocated objects
- Global variables — static/global data
- File descriptors — open files, sockets, and other I/O resources
- Code segment — the executable instructions
┌──────────────────── Process ─────────────────────┐│ ││ Shared: Heap, Global Vars, Code, File Descs ││ ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │ Thread 1│ │ Thread 2│ │ Thread 3│ ││ │ Stack │ │ Stack │ │ Stack │ ││ │ PC │ │ PC │ │ PC │ ││ │ Regs │ │ Regs │ │ Regs │ ││ └─────────┘ └─────────┘ └─────────┘ │└───────────────────────────────────────────────────┘Thread Lifecycle
A thread transitions through several states during its lifetime:
┌────────┐ start() ┌─────────┐ │ New │────────────►│ Runnable│◄─────────────────┐ └────────┘ └────┬────┘ │ │ │ scheduled signal/ │ notify ▼ │ ┌─────────┐ ┌──────┴────┐ │ Running │──wait()──►│ Waiting │ └────┬────┘ └───────────┘ │ completed │ ▼ ┌─────────┐ │ Dead │ └─────────┘- New: Thread object is created but not yet started
- Runnable: Thread is ready to run and waiting for CPU time
- Running: Thread is actively executing on a CPU core
- Waiting/Blocked: Thread is waiting for a lock, I/O, or a signal
- Dead/Terminated: Thread has finished execution
Creating and Managing Threads
import threadingimport time
# Method 1: Pass a target functiondef worker(name, delay): print(f"Thread {name} starting") time.sleep(delay) print(f"Thread {name} finished")
t1 = threading.Thread(target=worker, args=("A", 2))t2 = threading.Thread(target=worker, args=("B", 1))
t1.start()t2.start()
# Wait for both threads to completet1.join()t2.join()print("All threads done")
# Method 2: Subclass Threadclass MyThread(threading.Thread): def __init__(self, name): super().__init__() self.name = name
def run(self): print(f"Thread {self.name} running") time.sleep(1) print(f"Thread {self.name} done")
threads = [MyThread(f"Worker-{i}") for i in range(3)]for t in threads: t.start()for t in threads: t.join()
# Daemon threads -- automatically killed when main thread exitsdaemon = threading.Thread(target=worker, args=("Daemon", 10), daemon=True)daemon.start()# Program exits without waiting for daemon to finish// Node.js Worker Threadsconst { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) { // Main thread -- create workers const worker1 = new Worker(__filename, { workerData: { name: 'A', delay: 2000 } }); const worker2 = new Worker(__filename, { workerData: { name: 'B', delay: 1000 } });
worker1.on('message', (msg) => console.log(`Worker A says: ${msg}`)); worker2.on('message', (msg) => console.log(`Worker B says: ${msg}`));
worker1.on('exit', () => console.log('Worker A finished')); worker2.on('exit', () => console.log('Worker B finished'));
// Shared memory between workers const sharedBuffer = new SharedArrayBuffer(4); const sharedArray = new Int32Array(sharedBuffer); sharedArray[0] = 42;
const worker3 = new Worker(__filename, { workerData: { name: 'C', sharedBuffer } });} else { // Worker thread const { name, delay, sharedBuffer } = workerData;
if (sharedBuffer) { const sharedArray = new Int32Array(sharedBuffer); console.log(`Worker ${name} reads shared value: ${sharedArray[0]}`); // Use Atomics for thread-safe operations Atomics.add(sharedArray, 0, 1); parentPort.postMessage(`Incremented shared value to ${sharedArray[0]}`); } else { parentPort.postMessage(`${name} starting`); setTimeout(() => { parentPort.postMessage(`${name} done`); process.exit(0); }, delay); }}import java.util.concurrent.*;
public class ThreadDemo { public static void main(String[] args) throws InterruptedException { // Method 1: Implement Runnable Runnable task = () -> { String name = Thread.currentThread().getName(); System.out.println(name + " starting"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(name + " finished"); };
Thread t1 = new Thread(task, "Worker-A"); Thread t2 = new Thread(task, "Worker-B");
t1.start(); t2.start(); t1.join(); t2.join();
// Method 2: Extend Thread class MyThread extends Thread { public MyThread(String name) { super(name); }
@Override public void run() { System.out.println(getName() + " running"); } }
new MyThread("Custom-Thread").start();
// Method 3: Callable with return values ExecutorService executor = Executors.newFixedThreadPool(2); Future<Integer> future = executor.submit(() -> { Thread.sleep(1000); return 42; });
System.out.println("Result: " + future.get()); // Blocks until done executor.shutdown();
// Daemon threads Thread daemon = new Thread(() -> { while (true) { System.out.println("Daemon running..."); } }); daemon.setDaemon(true); daemon.start(); // JVM exits without waiting for daemon }}#include <iostream>#include <thread>#include <vector>#include <chrono>#include <future>
void worker(const std::string& name, int delay_ms) { std::cout << "Thread " << name << " starting\n"; std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); std::cout << "Thread " << name << " finished\n";}
int main() { // Method 1: Pass a function std::thread t1(worker, "A", 2000); std::thread t2(worker, "B", 1000);
t1.join(); t2.join(); std::cout << "All threads done\n";
// Method 2: Lambda std::thread t3([]() { std::cout << "Lambda thread running\n"; }); t3.join();
// Method 3: Multiple threads in a vector std::vector<std::thread> threads; for (int i = 0; i < 4; ++i) { threads.emplace_back([i]() { std::cout << "Worker " << i << " running\n"; }); } for (auto& t : threads) { t.join(); }
// Method 4: std::async with return value auto future = std::async(std::launch::async, []() -> int { std::this_thread::sleep_for(std::chrono::seconds(1)); return 42; }); std::cout << "Result: " << future.get() << "\n"; // Blocks until done
// Detached threads (similar to daemon) std::thread detached(worker, "Detached", 5000); detached.detach(); // Thread runs independently // Main can exit; detached thread may be killed
return 0;}Synchronization Primitives
When multiple threads access shared data, you need synchronization to prevent data corruption. Below are the essential primitives.
Mutex (Mutual Exclusion Lock)
A mutex ensures that only one thread can enter a critical section at a time. If a thread tries to acquire a locked mutex, it blocks until the mutex is released.
import threading
counter = 0lock = threading.Lock()
def increment(n): global counter for _ in range(n): lock.acquire() try: counter += 1 # Critical section finally: lock.release()
# Preferred: use the context managerdef increment_safe(n): global counter for _ in range(n): with lock: # Automatically acquires and releases counter += 1
threads = [threading.Thread(target=increment_safe, args=(100000,)) for _ in range(4)]for t in threads: t.start()for t in threads: t.join()
print(f"Counter: {counter}") # Always 400000// JavaScript is single-threaded in the main event loop,// but Worker threads can share memory via SharedArrayBuffer.// Use Atomics for synchronization.
const { Worker, isMainThread, workerData } = require('worker_threads');
if (isMainThread) { const sharedBuffer = new SharedArrayBuffer(4); const sharedArray = new Int32Array(sharedBuffer); sharedArray[0] = 0;
const workers = []; for (let i = 0; i < 4; i++) { workers.push(new Worker(__filename, { workerData: { sharedBuffer } })); }
Promise.all(workers.map(w => new Promise(res => w.on('exit', res)))) .then(() => { console.log(`Counter: ${sharedArray[0]}`); // Always 400000 });} else { const { sharedBuffer } = workerData; const sharedArray = new Int32Array(sharedBuffer);
for (let i = 0; i < 100000; i++) { // Atomics.add is an atomic read-modify-write operation Atomics.add(sharedArray, 0, 1); }}import java.util.concurrent.locks.ReentrantLock;
public class MutexDemo { private int counter = 0;
// Method 1: synchronized keyword public synchronized void incrementSync() { counter++; }
// Method 2: synchronized block (finer granularity) private final Object lock = new Object();
public void incrementBlock() { synchronized (lock) { counter++; } }
// Method 3: ReentrantLock (more flexible) private final ReentrantLock reentrantLock = new ReentrantLock();
public void incrementLock() { reentrantLock.lock(); try { counter++; } finally { reentrantLock.unlock(); // Always unlock in finally } }
// Method 4: tryLock (non-blocking attempt) public boolean tryIncrement() { if (reentrantLock.tryLock()) { try { counter++; return true; } finally { reentrantLock.unlock(); } } return false; // Could not acquire lock }}#include <mutex>#include <thread>#include <vector>#include <iostream>
int counter = 0;std::mutex mtx;
void increment(int n) { for (int i = 0; i < n; ++i) { mtx.lock(); counter++; // Critical section mtx.unlock(); }}
// Preferred: use lock_guard (RAII)void increment_safe(int n) { for (int i = 0; i < n; ++i) { std::lock_guard<std::mutex> guard(mtx); counter++; // Automatically unlocks when guard goes out of scope }}
// C++17: std::scoped_lock for multiple mutexesstd::mutex mtx_a, mtx_b;
void transfer() { std::scoped_lock lock(mtx_a, mtx_b); // Locks both, avoids deadlock // ... transfer between two protected resources}
int main() { std::vector<std::thread> threads; for (int i = 0; i < 4; ++i) { threads.emplace_back(increment_safe, 100000); } for (auto& t : threads) t.join();
std::cout << "Counter: " << counter << "\n"; // Always 400000 return 0;}Semaphores
A semaphore maintains a counter and allows up to N threads to access a resource concurrently. A mutex is essentially a semaphore with N = 1 (a binary semaphore).
import threadingimport time
# Allow at most 3 concurrent connectionsconnection_pool = threading.Semaphore(3)
def access_database(thread_id): print(f"Thread {thread_id} waiting for connection...") with connection_pool: print(f"Thread {thread_id} connected (slot acquired)") time.sleep(2) # Simulate database work print(f"Thread {thread_id} disconnected (slot released)")
threads = [threading.Thread(target=access_database, args=(i,)) for i in range(8)]for t in threads: t.start()for t in threads: t.join()// JavaScript does not have a built-in Semaphore,// but you can implement one with Atomics.
class Semaphore { constructor(buffer, index, initialCount) { this.buffer = buffer; this.index = index; if (initialCount !== undefined) { Atomics.store(this.buffer, this.index, initialCount); } }
acquire() { while (true) { const current = Atomics.load(this.buffer, this.index); if (current > 0) { // Try to decrement if (Atomics.compareExchange(this.buffer, this.index, current, current - 1) === current) { return; // Acquired } } else { // Wait until someone releases Atomics.wait(this.buffer, this.index, 0); } } }
release() { Atomics.add(this.buffer, this.index, 1); Atomics.notify(this.buffer, this.index, 1); }}
// Usage with SharedArrayBuffer in Worker threadsconst shared = new SharedArrayBuffer(4);const buffer = new Int32Array(shared);const sem = new Semaphore(buffer, 0, 3); // Max 3 concurrentimport java.util.concurrent.Semaphore;
public class SemaphoreDemo { // Allow at most 3 concurrent connections private static final Semaphore connectionPool = new Semaphore(3);
public static void accessDatabase(int threadId) { try { System.out.println("Thread " + threadId + " waiting..."); connectionPool.acquire(); System.out.println("Thread " + threadId + " connected"); Thread.sleep(2000); // Simulate work System.out.println("Thread " + threadId + " disconnected"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { connectionPool.release(); } }
public static void main(String[] args) { for (int i = 0; i < 8; i++) { final int id = i; new Thread(() -> accessDatabase(id)).start(); } }}#include <iostream>#include <thread>#include <vector>#include <semaphore> // C++20#include <chrono>
// C++20 counting semaphore: allow 3 concurrentstd::counting_semaphore<3> connection_pool(3);
void access_database(int thread_id) { std::cout << "Thread " << thread_id << " waiting...\n"; connection_pool.acquire(); std::cout << "Thread " << thread_id << " connected\n"; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "Thread " << thread_id << " disconnected\n"; connection_pool.release();}
int main() { std::vector<std::thread> threads; for (int i = 0; i < 8; ++i) { threads.emplace_back(access_database, i); } for (auto& t : threads) t.join(); return 0;}Condition Variables
A condition variable allows threads to wait for a specific condition to become true, rather than busy-waiting or polling. Threads waiting on a condition variable are woken up by a signal or broadcast from another thread.
import threadingimport timeimport random
queue = []MAX_SIZE = 5condition = threading.Condition()
def producer(): for i in range(10): with condition: while len(queue) >= MAX_SIZE: print("Producer waiting -- queue full") condition.wait() item = random.randint(1, 100) queue.append(item) print(f"Produced: {item} (queue size: {len(queue)})") condition.notify_all() # Wake up consumers time.sleep(random.uniform(0.1, 0.5))
def consumer(name): for _ in range(5): with condition: while len(queue) == 0: print(f"Consumer {name} waiting -- queue empty") condition.wait() item = queue.pop(0) print(f"Consumer {name} consumed: {item} (queue size: {len(queue)})") condition.notify_all() # Wake up producer time.sleep(random.uniform(0.2, 0.6))
p = threading.Thread(target=producer)c1 = threading.Thread(target=consumer, args=("A",))c2 = threading.Thread(target=consumer, args=("B",))
p.start(); c1.start(); c2.start()p.join(); c1.join(); c2.join()// Condition variable semantics using Atomics.wait / Atomics.notify// in a Worker thread environment
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) { // Shared memory layout: // [0] = item count, [1] = signal flag, [2..6] = queue slots const shared = new SharedArrayBuffer(7 * 4); const arr = new Int32Array(shared);
const producer = new Worker(__filename, { workerData: { role: 'producer', shared } }); const consumer = new Worker(__filename, { workerData: { role: 'consumer', shared } });
producer.on('exit', () => console.log('Producer done')); consumer.on('exit', () => console.log('Consumer done'));} else { const { role, shared } = workerData; const arr = new Int32Array(shared); const COUNT_IDX = 0; const SIGNAL_IDX = 1; const QUEUE_START = 2;
if (role === 'producer') { for (let i = 0; i < 5; i++) { // Simple spin-wait + Atomics approach while (Atomics.load(arr, COUNT_IDX) >= 5) { Atomics.wait(arr, SIGNAL_IDX, 0, 100); // Wait up to 100ms } const count = Atomics.load(arr, COUNT_IDX); arr[QUEUE_START + count] = i + 1; Atomics.add(arr, COUNT_IDX, 1); Atomics.store(arr, SIGNAL_IDX, 1); Atomics.notify(arr, SIGNAL_IDX, 1); console.log(`Produced: ${i + 1}`); } } else { for (let i = 0; i < 5; i++) { while (Atomics.load(arr, COUNT_IDX) <= 0) { Atomics.wait(arr, SIGNAL_IDX, 0, 100); } const count = Atomics.load(arr, COUNT_IDX); const item = arr[QUEUE_START + count - 1]; Atomics.sub(arr, COUNT_IDX, 1); Atomics.store(arr, SIGNAL_IDX, 0); Atomics.notify(arr, SIGNAL_IDX, 1); console.log(`Consumed: ${item}`); } }}import java.util.*;import java.util.concurrent.locks.*;
public class ConditionVariableDemo { private final Queue<Integer> queue = new LinkedList<>(); private final int MAX_SIZE = 5; private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition();
public void produce() throws InterruptedException { Random random = new Random(); for (int i = 0; i < 10; i++) { lock.lock(); try { while (queue.size() >= MAX_SIZE) { System.out.println("Producer waiting -- queue full"); notFull.await(); // Release lock and wait } int item = random.nextInt(100); queue.add(item); System.out.println("Produced: " + item + " (queue size: " + queue.size() + ")"); notEmpty.signalAll(); // Wake up consumers } finally { lock.unlock(); } Thread.sleep(random.nextInt(400) + 100); } }
public void consume(String name) throws InterruptedException { Random random = new Random(); for (int i = 0; i < 5; i++) { lock.lock(); try { while (queue.isEmpty()) { System.out.println("Consumer " + name + " waiting -- queue empty"); notEmpty.await(); } int item = queue.poll(); System.out.println("Consumer " + name + " consumed: " + item); notFull.signalAll(); // Wake up producer } finally { lock.unlock(); } Thread.sleep(random.nextInt(500) + 200); } }}#include <iostream>#include <thread>#include <queue>#include <mutex>#include <condition_variable>#include <random>
std::queue<int> buffer;const int MAX_SIZE = 5;std::mutex mtx;std::condition_variable not_full;std::condition_variable not_empty;
void producer() { std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> dist(1, 100);
for (int i = 0; i < 10; ++i) { std::unique_lock<std::mutex> lock(mtx); not_full.wait(lock, [] { return buffer.size() < MAX_SIZE; });
int item = dist(gen); buffer.push(item); std::cout << "Produced: " << item << " (queue size: " << buffer.size() << ")\n";
lock.unlock(); not_empty.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(100 + gen() % 400)); }}
void consumer(const std::string& name, int count) { for (int i = 0; i < count; ++i) { std::unique_lock<std::mutex> lock(mtx); not_empty.wait(lock, [] { return !buffer.empty(); });
int item = buffer.front(); buffer.pop(); std::cout << "Consumer " << name << " consumed: " << item << "\n";
lock.unlock(); not_full.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(200)); }}
int main() { std::thread p(producer); std::thread c1(consumer, "A", 5); std::thread c2(consumer, "B", 5);
p.join(); c1.join(); c2.join(); return 0;}Read-Write Locks
A read-write lock allows multiple concurrent readers but only one writer. This is ideal when reads are far more frequent than writes.
import threading
# Python does not have a built-in RWLock, but you can use a simple implementationclass ReadWriteLock: def __init__(self): self._read_ready = threading.Condition(threading.Lock()) self._readers = 0
def acquire_read(self): with self._read_ready: self._readers += 1
def release_read(self): with self._read_ready: self._readers -= 1 if self._readers == 0: self._read_ready.notify_all()
def acquire_write(self): self._read_ready.acquire() while self._readers > 0: self._read_ready.wait()
def release_write(self): self._read_ready.release()
# Usagerw_lock = ReadWriteLock()shared_data = {"value": 0}
def reader(reader_id): rw_lock.acquire_read() try: print(f"Reader {reader_id} reads: {shared_data['value']}") finally: rw_lock.release_read()
def writer(value): rw_lock.acquire_write() try: shared_data["value"] = value print(f"Writer set value to {value}") finally: rw_lock.release_write()import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo { private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private int sharedData = 0;
public int read() { rwLock.readLock().lock(); try { // Multiple threads can read simultaneously return sharedData; } finally { rwLock.readLock().unlock(); } }
public void write(int value) { rwLock.writeLock().lock(); try { // Only one thread can write at a time sharedData = value; } finally { rwLock.writeLock().unlock(); } }}#include <shared_mutex> // C++17#include <iostream>#include <thread>#include <vector>
std::shared_mutex rw_mutex;int shared_data = 0;
void reader(int id) { std::shared_lock lock(rw_mutex); // Multiple readers allowed std::cout << "Reader " << id << " reads: " << shared_data << "\n";}
void writer(int value) { std::unique_lock lock(rw_mutex); // Exclusive access shared_data = value; std::cout << "Writer set value to " << value << "\n";}
int main() { std::vector<std::thread> threads;
// Start multiple readers and one writer threads.emplace_back(reader, 1); threads.emplace_back(reader, 2); threads.emplace_back(writer, 42); threads.emplace_back(reader, 3); threads.emplace_back(reader, 4);
for (auto& t : threads) t.join(); return 0;}Barriers
A barrier is a synchronization point where all threads must arrive before any of them can proceed. Useful for phased computations where each phase depends on the results of the previous phase.
import threadingimport time
barrier = threading.Barrier(3) # Wait for 3 threads
def phase_worker(thread_id): # Phase 1 print(f"Thread {thread_id}: Phase 1 complete") barrier.wait() # Wait for all threads to finish Phase 1
# Phase 2 -- only starts after all threads finish Phase 1 print(f"Thread {thread_id}: Phase 2 complete") barrier.wait()
print(f"Thread {thread_id}: All phases done")
threads = [threading.Thread(target=phase_worker, args=(i,)) for i in range(3)]for t in threads: t.start()for t in threads: t.join()import java.util.concurrent.CyclicBarrier;import java.util.concurrent.BrokenBarrierException;
public class BarrierDemo { public static void main(String[] args) { // CyclicBarrier resets automatically -- can be reused across phases CyclicBarrier barrier = new CyclicBarrier(3, () -> { System.out.println("--- All threads reached the barrier ---"); });
for (int i = 0; i < 3; i++) { final int id = i; new Thread(() -> { try { System.out.println("Thread " + id + ": Phase 1 complete"); barrier.await(); // Wait for all threads
System.out.println("Thread " + id + ": Phase 2 complete"); barrier.await(); // Reusable barrier
System.out.println("Thread " + id + ": All phases done"); } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); } }).start(); } }}#include <barrier> // C++20#include <iostream>#include <thread>#include <vector>
int main() { auto on_completion = []() noexcept { std::cout << "--- All threads reached the barrier ---\n"; };
std::barrier sync_point(3, on_completion);
auto worker = [&](int id) { std::cout << "Thread " << id << ": Phase 1 complete\n"; sync_point.arrive_and_wait();
std::cout << "Thread " << id << ": Phase 2 complete\n"; sync_point.arrive_and_wait();
std::cout << "Thread " << id << ": All phases done\n"; };
std::vector<std::thread> threads; for (int i = 0; i < 3; ++i) { threads.emplace_back(worker, i); } for (auto& t : threads) t.join();
return 0;}Thread-Safe Data Structures
Rather than adding locks around every access to a standard data structure, many languages offer data structures that handle synchronization internally.
| Language | Thread-Safe Collections |
|---|---|
| Python | queue.Queue, collections.deque (with locks), multiprocessing.Queue |
| Java | ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue, ConcurrentLinkedQueue |
| C++ | No standard concurrent containers; use std::mutex with standard containers, or third-party libraries like Intel TBB |
| Go | sync.Map, channels as concurrent queues |
| Rust | Arc<Mutex<T>>, crossbeam crate for lock-free structures |
import queueimport threading
# Thread-safe queue (blocks automatically)q = queue.Queue(maxsize=10)
def producer(): for i in range(20): q.put(i) # Blocks if full print(f"Produced: {i}")
def consumer(): while True: item = q.get() # Blocks if empty print(f"Consumed: {item}") q.task_done()
# Start consumer as daemon threadt = threading.Thread(target=consumer, daemon=True)t.start()
producer()q.join() # Wait until all items are processedimport java.util.concurrent.*;
public class ConcurrentCollectionsDemo { public static void main(String[] args) { // ConcurrentHashMap -- thread-safe map ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); map.put("counter", 0); map.compute("counter", (key, val) -> val + 1); // Atomic update
// BlockingQueue -- thread-safe producer-consumer queue BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// Producer new Thread(() -> { try { for (int i = 0; i < 20; i++) { queue.put(i); // Blocks if full System.out.println("Produced: " + i); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start();
// Consumer new Thread(() -> { try { for (int i = 0; i < 20; i++) { int item = queue.take(); // Blocks if empty System.out.println("Consumed: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); }}Thread Pools
Creating a new thread for every task is expensive. Thread pools maintain a set of reusable threads that pick up tasks from a work queue, reducing overhead.
from concurrent.futures import ThreadPoolExecutor, as_completedimport time
def fetch_url(url): """Simulate fetching a URL""" time.sleep(1) # Simulate network delay return f"Content from {url}"
urls = [f"https://example.com/page/{i}" for i in range(10)]
# ThreadPoolExecutor manages a pool of worker threadswith ThreadPoolExecutor(max_workers=4) as executor: # Submit all tasks futures = {executor.submit(fetch_url, url): url for url in urls}
# Process results as they complete for future in as_completed(futures): url = futures[future] try: result = future.result() print(f"{url}: {result}") except Exception as e: print(f"{url} generated an exception: {e}")
# Using executor.map for simpler caseswith ThreadPoolExecutor(max_workers=4) as executor: results = executor.map(fetch_url, urls) for url, result in zip(urls, results): print(f"{url}: {result}")import java.util.concurrent.*;import java.util.List;import java.util.ArrayList;
public class ThreadPoolDemo { public static void main(String[] args) throws Exception { // Fixed thread pool: exactly 4 threads ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// Cached thread pool: creates threads as needed, reuses idle ones ExecutorService cachedPool = Executors.newCachedThreadPool();
// Scheduled thread pool: for delayed or periodic tasks ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// Submit tasks and collect Futures List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { final int page = i; futures.add(fixedPool.submit(() -> { Thread.sleep(1000); return "Content from page " + page; })); }
// Collect results for (Future<String> future : futures) { System.out.println(future.get()); // Blocks until result is ready }
// Schedule a task with a delay scheduledPool.schedule(() -> { System.out.println("Delayed task executed"); }, 2, TimeUnit.SECONDS);
// Schedule a recurring task scheduledPool.scheduleAtFixedRate(() -> { System.out.println("Periodic task"); }, 0, 1, TimeUnit.SECONDS);
// Always shut down executors fixedPool.shutdown(); fixedPool.awaitTermination(30, TimeUnit.SECONDS); }}#include <iostream>#include <thread>#include <vector>#include <queue>#include <functional>#include <mutex>#include <condition_variable>#include <future>
class ThreadPool {public: ThreadPool(size_t num_threads) : stop(false) { for (size_t i = 0; i < num_threads; ++i) { workers.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); } task(); } }); } }
template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F, Args...>::type> { using return_type = typename std::invoke_result<F, Args...>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> result = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return result; }
~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (auto& worker : workers) worker.join(); }
private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop;};
int main() { ThreadPool pool(4);
std::vector<std::future<std::string>> results; for (int i = 0; i < 10; ++i) { results.push_back(pool.enqueue([i] { std::this_thread::sleep_for(std::chrono::seconds(1)); return "Result from task " + std::to_string(i); })); }
for (auto& result : results) { std::cout << result.get() << "\n"; }
return 0;}Synchronization Primitives Summary
| Primitive | Purpose | Use When |
|---|---|---|
| Mutex | Exclusive access to a critical section | Protecting shared mutable state |
| Semaphore | Limit concurrent access to N | Connection pools, rate limiting |
| Condition Variable | Wait for a condition to become true | Producer-consumer queues, event signaling |
| Read-Write Lock | Multiple readers OR one writer | Read-heavy workloads with infrequent writes |
| Barrier | Synchronize threads at a checkpoint | Phased parallel algorithms |
| Atomic Operations | Lock-free single-variable updates | Counters, flags, simple state |
Best Practices
- Minimize the critical section — Hold locks for as short a time as possible to reduce contention
- Use high-level abstractions — Prefer thread pools and concurrent collections over raw threads and locks
- Avoid nested locks — Acquiring multiple locks increases deadlock risk; if unavoidable, use a consistent lock ordering
- Prefer immutability — Immutable data is inherently thread-safe and requires no synchronization
- Use RAII for locks — Always use context managers, try/finally, or lock guards to ensure locks are released
- Test under load — Concurrency bugs often surface only under high contention; use stress tests and thread sanitizers
- Document thread safety — Clearly state which methods and classes are thread-safe and which are not