🎉 JSON parser from scratch chapter is now out!
Event loop
The producer consumer problem

Producer consumer problem

As stated in the last chapter, there is a producer who produces an event at any arbitrary time. And a consumer who is waiting and listening. On event receive, will act on the event.

Let's start by implementing this using threading like before

import queue
import threading
import time
 
 
def producer(q: queue.Queue, count):
    for n in range(count):
        print("producing", n)
        q.put(n)
        time.sleep(1)
    print("producer done")
    q.put(None)
 
 
def consumer(q: queue.Queue):
    while True:
        item = q.get()
        if item is None:
            break
        print("consuming", item)
    print("consumer done")

Start by adding a queue into the picture, producer will add a item into the queue every second in this implementation.

The consumer will keep looping all the time, whenever it sees an item, print it out. If it sees None, it'll break and announce that its done.

Finally, let's kick off two threads and start them both.

queue = queue.Queue()
threading.Thread(target=producer, args=(queue, 10)).start()
threading.Thread(target=consumer, args=(queue,)).start()
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producing 4
consuming 4
producing 5
consuming 5
producing 6
consuming 6
producing 7
consuming 7
producing 8
consuming 8
producing 9
consuming 9
producer done
consumer done

This works as planned, now the challenge is to run this concurrently instead of in parallel.

We cannot use the existing queue setup, since the default queue does not communicate in any way to the scheduler. We'll need a custom queue implementation. Let's reimport the old scheduler we had in the previous chapter and start thinking about the implementation of AsyncQueue

import heapq
import time
from collections import deque
 
 
class Scheduler:
    def __init__(self):
        self.ready = deque()
        self.sleeping = []  # priority queue
        self.sequence = 0
 
    def call_soon(self, func):
        self.ready.append(func)
 
    def call_later(self, delay, func):
        self.sequence += 1
        deadline = time.time() + delay
        heapq.heappush(self.sleeping, (deadline, self.sequence, func))
 
    def run(self):
        run_counter = 0
        while self.ready or self.sleeping:
            run_counter += 1
 
            if not self.ready:
                deadline, _, func = heapq.heappop(self.sleeping)
                delta = deadline - time.time()
                if delta > 0:
                    # Question: is this not blocking?
                    # print(f"blocking time.sleep for {delta} for {func}")
                    time.sleep(delta)
                self.ready.append(func)
 
            while self.ready:
                func = self.ready.popleft()
                func()
 
 
scheduler = Scheduler()

Now let's think about an async queue implementation. The put in the queue wil still be pretty much the same, for example a simple implementation of put could be

def put(self, item):
    self.items.append(item)

assuming items is a normal python queue.

The tricky part is the get function. Who will trigger the get if the get is not constantly listening?

Let's start with a simple mock implementation of get

def get(self):
    if self.items:
        return items[0] 

This is not useful for us, because for this to work someone needs to keep calling get continuously. If we think about it, we'll need to somehow communicate / signal a listener that we have an event.

Why not use callbacks?

def get(self, callback):
    if self.items:
        return callback(self.items.popleft()) 

Now we can pass a callback and the callback will be triggered when there are items?

Nope, even with the callback some other orchestrator should keep calling the get function. So that when there are items, it'll call the callback function. How would you solve the problem of not constantly calling get?

Think for a minute.

Pay close attention, what if you had another queue called a waiting queue in the AsyncQueue. And on each put, check the waiting queue. If there is an element call_soon that function.

And in the get call. If there are no items for some reason, queue another get call in the waiting queue.

Here's the implementation

class AsyncQueue:
    def __init__(self):
        self.items = deque()
        self.waiting = deque()  # all getters waiting for later
 
    def put(self, item):
        self.items.append(item)
        if self.waiting:
            func = self.waiting.popleft()
            scheduler.call_soon(func)
 
    def get(self, callback):
        if self.items:
            callback(self.items.popleft())
        else:
            self.waiting.append(lambda: self.get(callback))

Observe carefully. Whenever a put happens, waiting queue is checked and if something is waiting, its called.

Whenever a get happens, if there are items they come back. Else, another call is queued in the waiting queue.

This is a neat trick. Every put is also running get & every bad get is actually queuing something in the waiting queue.

The details might be more clear when this is used.

For the producer, we'll need to maintain a counter of current state and the target state. Which means we'll have to use closures again.

We'll still keep this rule saying if the get emits none, then the queue is complete.

def producer(q: AsyncQueue, count):
    def _run(n):
        if n <= count:
            print("producing ", n)
            q.put(n)
            scheduler.call_later(1, lambda: _run(n + 1))
        else:
            print("producer done")
 
    _run(0)

For the consumer, we'll need to pass a callback to the async queue so that it can keep queuing calls. if it receives None, then we print no item found and move on

def consumer(q: AsyncQueue):
    def _consume(item):
        if item is None:
            print("no item found")
            pass
        else:
            print("consuming", item)
            scheduler.call_soon(lambda: consumer(q))
 
    q.get(callback=_consume)
async_queue = AsyncQueue()
scheduler.call_soon(lambda: producer(async_queue, 3))
scheduler.call_soon(
    lambda: consumer(
        q=async_queue,
    )
)
scheduler.run()
producing  0
consuming 0
producing  1
consuming 1
producing  2
consuming 2
producing  3
consuming 3
producer done

Works!

This might be slightly mind bending, but let's recap on what is exactly happening.

I'll not get into the scheduler since that's carry forward from the previous chapter.

  • call_soon the producer, call_soon the consumer. At this point in the scheduler queue, there are two functions ready to execute
  • kick off the scheduler. The producer will run, produce one element. It also checks the waiting queue, its empty so nothing to do. And then do a call_later with one second the producer again. The next element in the queue is the consumer fn call.
  • the consumer will do a get call, see an element and call the callback fn with that element. In the next loop, call_later earlier will trigger and another element is produced.

And the loop goes on...

This looks pretty nice, but what happens if we flip the call_soon and call the consumer first and then call the producer?

Pause and think


This is getting hairy, at this point with all the callbacks and closures parsing the code is becoming a nightmare.

Let's use one construct which python provides out of the box to clean it up a bit called yield.

Yield pauses execution of a function until the next time its called.

If you are not familiar with yield, please read more about it before jumping into the next chapter.