🎉 JSON parser from scratch chapter is now out!
Event loop
Simple scheduler

A simple scheduler

If the function is just not doing anything, why do I have to keep the CPU idle?

Let's spin up a simple scheduler and let the scheduler know that CPU is idle. And the scheduler can then start by looking if there is any other work to do, if yes then do it. Simple.

But for the first take, let's keep sleep around and see how to interleave the calls

Dissecting on what is a scheduler

A scheduler is a setup which can take in requests and schedule them at a fixed sequence. Think of it as an orchestrator. It takes in requests and queues them up for execution.

Observe the word queue, the scheduler is essentially a queue which maintains a list of executables and executes them. Python natively has deque for queuing, let's build a scheduler using deque.

# Scheduler
import time
from collections import deque
 
 
class Scheduler:
    def __init__(self):
        self.ready = deque()
 
    def call_soon(self, func):
        self.ready.append(func)
 
    def run(self):
        while self.ready:
            func = self.ready.popleft()
            func()
 
 
scheduler = Scheduler()

Now that the scheduler is in place, let's swap the implementation of countdown and countup to use the scheduler

count down is straight forward. after sleeping, ask the scheduler to queue up the next execution

def countdown(n):
    if n > 0:
        print("down", n)
        time.sleep(1)
        scheduler.call_soon(lambda: countdown(n - 1))

countup is interesting, think about it. countdown takes in a number and calls itself, so there is no state to maintain

countup needs to maintain two variables, one is where to stop (which comes from the argument) and the other is till how far it has counted. Where do we maintain this?

One way of solving it is by using closures. We can have an inner fn take in a counter argument and then use both vars as state to run the countup.

And finally kick it off with executing with zero.

def countup(stop):
    def _run(counter: int):
        if counter < stop:
            print("up", counter)
            time.sleep(1)
            scheduler.call_soon(lambda: _run(counter + 1))
 
    _run(0)

Piecing it all together

scheduler.call_soon(lambda: countdown(5))
scheduler.call_soon(lambda: countup(5))
scheduler.run()
down 5
up 0
down 4
up 1
down 3
up 2
down 2
up 3
down 1
up 4

Awesome, it works as expected!

But still, during sleep the thread is just idle! That is not acceptable, we'll need to change that now!

Time sequencing

time.sleep is blocking. During that time, the CPU is not really doing anything. If we can let the scheduler know, then the scheduler can instead run something in that time box.

Let's add a new function to the scheduler called call_later which will schedule a call to the function after some time. Not sleep, but schedule. And during the free time, it can keep looping around and running functions in its queue.

But how can we pull that off?

Let's start by creating another queue for holding all functions which are asleep. This needs to be a queue which is sorted by time so that we always have the first function to execute is the first one sorted by deadline. This is to make sure we don't have to sort for every insert. Python has a construct for this, its called a heapq. It'll sort by an arbitrary variable.

There is one very small edge case where two functions are scheduled at the same deadline, in that case we'll need a tie-breaker. let's add one more seq number for tie breaking.

In run, instead of just running items off the ready queue. We can also check the sleeping queue. If there is nothing in the ready queue, go look at the sleeping queue. If there is something to run, run that as well.

import time
from collections import deque
import heapq
 
 
class Scheduler:
    def __init__(self):
        self.ready = deque()
        self.sleeping = []
        self.sequence = 0
 
    def call_soon(self, func):
        self.ready.append(func)
 
    def call_later(self, delay, func):
        self.sequence += 1
        deadline = time.time() + delay
        heapq.heappush(self.sleeping, (deadline, self.sequence, func))
 
    def run(self):
        while self.ready or self.sleeping:
 
            if not self.ready:
                deadline, _, func = heapq.heappop(self.sleeping)
                delta = deadline - time.time()
                if delta > 0:
                    # Question: is this not blocking?
                    time.sleep(delta)
                self.ready.append(func)
 
            while self.ready:
                func = self.ready.popleft()
                func()

Let's implement the countdown and up again using this instead of time.sleep. I've setup the sleeps at different rates so that you can see how the event loop runs them.

def countdown(n):
    if n > 0:
        print("down", n)
        scheduler.call_later(4, lambda: countdown(n - 1))
 
 
def countup(stop):
    def _run(counter: int):
        if counter < stop:
            print("up", counter)
            scheduler.call_later(1, lambda: _run(counter + 1))
 
    _run(0)
 
 
scheduler = Scheduler()
scheduler.call_soon(lambda: countdown(5))
scheduler.call_soon(lambda: countup(5))
scheduler.run()
down 5
up 0
up 1
up 2
up 3
down 4
up 4
down 3
down 2
down 1

There you have it, true concurrency!

There are some classic problems in concurrency which we'll need to visit to go forward from here. Let's pick the producer consumer problem.

Here's the deal. There is a producer which produces events at a random time as per its desire. There is a consumer which listens and responds to the producer.

But how do you listen?

Is listening not just waiting, and by result blocking?