VOOZH about

URL: https://dev.to/odmikes/building-a-simple-async-scheduler-with-generators-in-python-543e

⇱ Building a simple async scheduler with generators in Python - DEV Community


Asyncio in Python is built on coroutines and an event loop — a mechanism that manages their execution.

In this article, we’ll build a simplified model of this approach and recreate similar behavior using generators (yield). This will help us clearly see how tasks pause execution and hand control back to the scheduler — without any “magic” behind async/await.
In this article, we treat generators as coroutines.
We will:

  • build a primitive task scheduler
  • understand how tasks “pause”
  • add time-based waiting
  • and end up with a simplified version of asyncio

No magic — just yield.

The Simplest Scheduler

Let’s start with a minimal example:

def task1():
 print("Start Apollo-01")
 yield
 print("Start Apollo-02")
 yield
 print("Start Apollo-03")


def task2():
 print("Start Artemis-01")
 yield
 print("Start Artemis-02")
 yield
 print("Start Artemis-03")


tasks = [task1(), task2()]

while tasks:
 new_tasks = []

 for t in tasks:
 try:
 next(t)
 new_tasks.append(t)
 except StopIteration:
 pass

 tasks = new_tasks

What’s happening here

  • task1() and task2() are generator-based coroutines, not regular functions
  • yield is the point where a task pauses
  • next(t) resumes execution until the next yield
  • the while loop is our scheduler (event loop)

Output:

Start Apollo-01
Start Artemis-01
Start Apollo-02
Start Artemis-02
Start Apollo-03
Start Artemis-03

Tasks run in turns, yielding control to each other.

The Problem

This scheduler:

  • cannot wait (sleep)
  • does not handle time
  • simply switches tasks in a loop

There’s no way to say:
“wake me up in 2 seconds”

Adding a Wait Signal

Let’s allow tasks to communicate with the scheduler.

from enum import Enum, auto

class Op(Enum):
 WAIT = auto()

Now define sleep:

def sleep(delay):
 yield Op.WAIT, delay

Now a task can yield not just control, but also an instruction.

Tasks Control the Scheduler

def task():
 print("start")
 yield from sleep(2)
 print("end")

Now the task explicitly tells the scheduler:

  • when to pause
  • and for how long

Scaling Problem

Imagine we have 10,000 tasks:

  • one wakes up in 1 second
  • another in 3 seconds
  • another in 0.5 seconds

We need an efficient way to always pick the next task to run.

Heap (Priority Queue)

A heap allows us to efficiently retrieve the smallest element.

In Python — heapq.

We store:

(step_at, index, coroutine)

Where:

  • step_at — when the task should resume
  • index — prevents comparison issues
  • coroutine — the task itself

Final Example — Rocket Launches

import heapq
import random
import time
from enum import Enum, auto
from typing import Generator

Rocket = tuple[str, float, int]


class Op(Enum):
 WAIT = auto()


def random_delay() -> float:
 return random.random() * 5


def sleep(delay):
 yield Op.WAIT, delay


def now():
 return time.time()


def random_countdown() -> int:
 return random.randrange(5)


def launch_rocket(rocket_name: str, delay: float, countdown: int):
 yield from sleep(delay)

 for i in reversed(range(countdown)):
 print(f"{rocket_name}: {i + 1}...")
 yield from sleep(1)

 print(f"Rocket {rocket_name} is launched")


def create_rockets(n: int = 10_000) -> Generator[Rocket, None, None]:
 for i in range(n):
 yield (f"Artemis-{i}", random_delay(), random_countdown())


def run():
 rockets = create_rockets()

 work = [
 (
 now(),
 index,
 launch_rocket(name, delay, countdown),
 )
 for index, (name, delay, countdown) in enumerate(rockets)
 ]

 heapq.heapify(work)

 while work:
 step_at, index, coro = heapq.heappop(work)

 wait = step_at - now()
 if wait > 0:
 time.sleep(wait)

 try:
 op, arg = coro.send(None)
 except StopIteration:
 continue

 if op == Op.WAIT:
 step_at = now() + arg
 heapq.heappush(work, (step_at, index, coro))


if __name__ == "__main__":
 run()

How It Works

  • All tasks are placed into a heap
  • We pick the one with the smallest step_at
  • If needed — we wait (time.sleep)
  • Resume execution (send(None))
  • The task returns:
  • - WAIT → reschedule
  • - StopIteration → task is finished Calling send(None) is equivalent to next() — it resumes the coroutine until the next yield.

Conclusion

We’ve built a simplified model of asyncio:

  • yield — pause point
  • generator — coroutine
  • loop — event loop
  • heapq — time-based scheduler

And most importantly — no magic.

async/await is just a more convenient syntax built on top of these same ideas.