user@techdebt:~/blog$_
$ cd ..

The Bounded Buffer

Producer 1 idle Producer 2 idle Buffer 0/5 lock: free not_full not_empty empty_slots: 5 full_slots: 0 Consumer 1 idle Consumer 2 idle
mode:
$ buffer inspector
click Step or Play to inspect buffer state
$ simulation.log

the pattern

Imagine a web scraper feeding URLs into a pool of workers. The scraper discovers pages faster than the workers can process them. Without a buffer, the scraper has to wait for a worker to finish before handing off the next URL. Producer and consumer are coupled, forced to run in lockstep.

A bounded buffer decouples them. The producer drops items into the buffer and keeps going. The consumer pulls items out when it’s ready. The buffer absorbs bursts from the producer and smooths out pauses from the consumer. But it has a fixed capacity, so a runaway producer can’t consume infinite memory.

from queue import Queue
import threading

buffer = Queue(maxsize=5)

def producer(name):
    for i in range(10):
        item = f"{name}-item-{i}"
        buffer.put(item)  # blocks if full
        print(f"{name} produced {item}")

def consumer(name):
    while True:
        item = buffer.get()  # blocks if empty
        print(f"{name} consumed {item}")
        buffer.task_done()

# Start threads
for i in range(2):
    threading.Thread(target=producer, args=(f"P{i+1}",), daemon=True).start()
for i in range(2):
    threading.Thread(target=consumer, args=(f"C{i+1}",), daemon=True).start()

buffer.join()  # wait until all items processed

Queue.put() blocks when the buffer is full. Queue.get() blocks when it’s empty. No busy-waiting. No race conditions. The threads sleep until there’s work to do, and the OS wakes them up when the buffer state changes.

building it yourself

queue.Queue is the right choice for production code. But understanding what it does internally matters. Here’s a bounded buffer built with threading.Condition:

import threading

class BoundedBuffer:
    def __init__(self, capacity):
        self.buffer = []
        self.capacity = capacity
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def put(self, item):
        with self.not_full:
            while len(self.buffer) >= self.capacity:
                self.not_full.wait()
            self.buffer.append(item)
            self.not_empty.notify()

    def get(self):
        with self.not_empty:
            while len(self.buffer) == 0:
                self.not_empty.wait()
            item = self.buffer.pop(0)
            self.not_full.notify()
            return item

Two condition variables share a single lock. not_full is for producers waiting for space. not_empty is for consumers waiting for items. When a producer adds an item, it signals not_empty to wake a waiting consumer. When a consumer removes an item, it signals not_full to wake a waiting producer.

Notice the while loop around the wait() call. You might think if would work. It won’t. Two reasons. First, spurious wakeups: the OS can wake a thread even without a corresponding notify(). The POSIX spec explicitly allows this. Second, with multiple producers or consumers, another thread might grab the item between the notify() and the wakeup. Thread A gets notified, but before it reacquires the lock, Thread B swoops in and takes the last item. Thread A wakes up to an empty buffer. The while loop re-checks the condition and goes back to sleep if needed. It is not optional.

semaphores

Condition variables require you to write the predicate check yourself. Semaphores bake the counting into the primitive:

import threading

class BoundedBufferSemaphore:
    def __init__(self, capacity):
        self.buffer = []
        self.mutex = threading.Lock()
        self.empty_slots = threading.Semaphore(capacity)
        self.full_slots = threading.Semaphore(0)

    def put(self, item):
        self.empty_slots.acquire()  # wait for an empty slot
        with self.mutex:
            self.buffer.append(item)
        self.full_slots.release()   # signal one more full slot

    def get(self):
        self.full_slots.acquire()   # wait for a full slot
        with self.mutex:
            item = self.buffer.pop(0)
        self.empty_slots.release()  # signal one more empty slot
        return item

empty_slots starts at capacity. Every put() decrements it. When it hits zero, the next producer blocks. full_slots starts at zero. Every put() increments it. When a consumer calls acquire() on full_slots and it’s zero, the consumer blocks. The counting replaces the while loop condition check entirely. No spurious wakeup issue because semaphore.acquire() handles the counting atomically.

Notice that the semaphore acquires happen outside the mutex. This is deliberate. If you put empty_slots.acquire() inside with self.mutex, a producer holding the mutex could block on a full buffer, preventing any consumer from acquiring the mutex to remove an item. Deadlock.