Producer consumer problem
As stated in the last chapter, there is a producer who produces an event at any arbitrary time. And a consumer who is waiting and listening. On event receive, will act on the event.
Let's start by implementing this using threading like before
import queue
import threading
import time
def producer(q: queue.Queue, count):
for n in range(count):
print("producing", n)
q.put(n)
time.sleep(1)
print("producer done")
q.put(None)
def consumer(q: queue.Queue):
while True:
item = q.get()
if item is None:
break
print("consuming", item)
print("consumer done")
Start by adding a queue into the picture, producer will add a item into the queue every second in this implementation.
The consumer will keep looping all the time, whenever it sees an item, print it out. If it sees None
, it'll break and announce that its done.
Finally, let's kick off two threads and start them both.
queue = queue.Queue()
threading.Thread(target=producer, args=(queue, 10)).start()
threading.Thread(target=consumer, args=(queue,)).start()
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producing 4
consuming 4
producing 5
consuming 5
producing 6
consuming 6
producing 7
consuming 7
producing 8
consuming 8
producing 9
consuming 9
producer done
consumer done
This works as planned, now the challenge is to run this concurrently instead of in parallel.
We cannot use the existing queue setup, since the default queue does not communicate in any way to the scheduler. We'll need a custom queue implementation. Let's reimport the old scheduler we had in the previous chapter and start thinking about the implementation of AsyncQueue
import heapq
import time
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = [] # priority queue
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
run_counter = 0
while self.ready or self.sleeping:
run_counter += 1
if not self.ready:
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
# Question: is this not blocking?
# print(f"blocking time.sleep for {delta} for {func}")
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
scheduler = Scheduler()
Now let's think about an async queue implementation. The put in the queue wil still be pretty much the same, for example a simple implementation of put could be
def put(self, item):
self.items.append(item)
assuming items is a normal python queue.
The tricky part is the get
function. Who will trigger the get
if the get is not constantly listening?
Let's start with a simple mock implementation of get
def get(self):
if self.items:
return items[0]
This is not useful for us, because for this to work someone needs to keep calling get continuously. If we think about it, we'll need to somehow communicate / signal a listener that we have an event.
Why not use callbacks?
def get(self, callback):
if self.items:
return callback(self.items.popleft())
Now we can pass a callback and the callback will be triggered when there are items?
Nope, even with the callback some other orchestrator should keep calling the get function. So that when there are items, it'll call the callback function. How would you solve the problem of not constantly calling get?
Think for a minute.
Pay close attention, what if you had another queue called a waiting queue in the AsyncQueue
. And on each put
, check the waiting queue. If there is an element call_soon
that function.
And in the get
call. If there are no items for some reason, queue another get
call in the waiting queue.
Here's the implementation
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque() # all getters waiting for later
def put(self, item):
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
scheduler.call_soon(func)
def get(self, callback):
if self.items:
callback(self.items.popleft())
else:
self.waiting.append(lambda: self.get(callback))
Observe carefully. Whenever a put
happens, waiting queue is checked and if something is waiting, its called.
Whenever a get happens, if there are items they come back. Else, another call is queued in the waiting queue.
This is a neat trick. Every put is also running get
& every bad get is actually queuing something in the waiting queue.
The details might be more clear when this is used.
For the producer, we'll need to maintain a counter of current state and the target state. Which means we'll have to use closures again.
We'll still keep this rule saying if the get emits none, then the queue is complete.
def producer(q: AsyncQueue, count):
def _run(n):
if n <= count:
print("producing ", n)
q.put(n)
scheduler.call_later(1, lambda: _run(n + 1))
else:
print("producer done")
_run(0)
For the consumer, we'll need to pass a callback to the async queue so that it can keep queuing calls. if it receives None, then we print no item found and move on
def consumer(q: AsyncQueue):
def _consume(item):
if item is None:
print("no item found")
pass
else:
print("consuming", item)
scheduler.call_soon(lambda: consumer(q))
q.get(callback=_consume)
async_queue = AsyncQueue()
scheduler.call_soon(lambda: producer(async_queue, 3))
scheduler.call_soon(
lambda: consumer(
q=async_queue,
)
)
scheduler.run()
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producer done
Works!
This might be slightly mind bending, but let's recap on what is exactly happening.
I'll not get into the scheduler since that's carry forward from the previous chapter.
- call_soon the producer, call_soon the consumer. At this point in the scheduler queue, there are two functions ready to execute
- kick off the scheduler. The producer will run, produce one element. It also checks the waiting queue, its empty so nothing to do. And then do a
call_later
with one second the producer again. The next element in the queue is the consumer fn call. - the consumer will do a get call, see an element and call the callback fn with that element. In the next loop,
call_later
earlier will trigger and another element is produced.
And the loop goes on...
This looks pretty nice, but what happens if we flip the call_soon
and call the consumer first and then call the producer?
Pause and think
This is getting hairy, at this point with all the callbacks and closures parsing the code is becoming a nightmare.
Let's use one construct which python provides out of the box to clean it up a bit called yield
.
Yield pauses execution of a function until the next time its called.
If you are not familiar with
yield
, please read more about it before jumping into the next chapter.