Software
Code Lab - Job Queues With Postgres
A working example with async Python and Quart
By: Andrei Taranchenko (LinkedIn)
Created: 30 Nov 2023

Introduction

Friendly Fire needs to periodically execute scheduled jobs - to remind Slack users to review GitHub pull requests. Instead of bolting on a new system just for this, I decided to leverage Postgres instead.

The must-have requirement was the ability to schedule a job to run in the future, with workers polling for “ripe” jobs, executing them and retrying on failure, with exponential backoff. With SKIP LOCKED, Postgres has the needed functionality, allowing a single worker to atomically pull a job from the job queue without another worker pulling the same one.

This project is a demo of this system, slightly simplified.

This example, available on GitHub is a playground for the following:

  • How to set up a base Quart web app with Postgres using Poetry
  • How to process a queue of immediate and delayed jobs using only the database
  • How to retry failed jobs with exponential backoff
  • How to use custom decorators to ensure atomic HTTP requests (success - commit, failure - rollback)
  • How to use Pydantic for stricter Python models
  • How to use asyncpg and asynchronously query Postgres with connection pooling
  • How to test asyncio code using pytest and unittest.IsolatedAsyncioTestCase
  • How to manipulate the clock in tests using freezegun
  • How to use mypy, flake8, isort, and black to format and lint the code
  • How to use Make to simplify local commands

ALTER MODE SKIP COMPLEXITY

Postgres introduced SKIP LOCKED years ago, but recently there was a noticeable uptick in the interest around this feature. In particular regarding its obvious use for simpler queuing systems, allowing us to bypass libraries or maintenance-hungry third-party messaging systems.

Why now? It’s hard to say, but my guess is that the tech sector is adjusting to the leaner times, looking for more efficient and cheaper ways of achieving the same goals at common-scale but with fewer resources. Or shall we say - reasonable resources.

What’s Quart?

Quart is the asynchronous version of Flask. If you know about the g - the global request context - you will be right at home. Multiple quality frameworks have entered Python-scape in recent years - FastAPI, Sanic, Falcon, Litestar. There is also Bottle and Carafe. Apparently naming Python frameworks after liquid containers is now a running joke.

Seeing that both Flask and Quart are now part of the Pallets project, Quart has been curiously devoid of hype. These two are in the process of being merged and at some point will become one framework - classic synchronous Flask and asynchronous Quart in one.

How it works

Writing about SKIP LOCKED is going to be redundant as this has been covered plenty elsewhere. For example, in this article. Even more in-depth are these slides from 2016 PGCON.

The central query looks like this:

DELETE FROM job
  WHERE id = (
         SELECT id FROM job
          WHERE ripe_at 
             IS NULL 
             OR [current_time] >= ripe_at
     FOR UPDATE
    SKIP LOCKED LIMIT 1
  )
RETURNING *, id::text

Each worker is added as a background task, periodically querying the database for “ripe” jobs (the ones ready to execute), and then runs the code for that specific job type.

A job that does not have the “ripe” time set will be executed whenever a worker is available.

A job that fails will be retried with exponential backoff, up to Job.max_retries times:

Creating a job is simple:

job: Job = Job(
    job_type=JobType.MY_JOB_TYPE,
    arguments={"user_id": user_id},
).runs_in(hours=1)

await jobq.service.job_db.save(job)

SKIP LOCKED and DELETE ... SELECT FOR UPDATE tango together to make sure that no worker gets the same job at the same time. To keep things interesting, at the Postgres level we have an MD5-based auto-generated column to make sure that no job of the same type and with the same arguments gets queued up more than once.

This project also demonstrates the usage of custom DB transaction decorators in order to have a cleaner transaction notation:

@write_transaction
@api.put("/user")
async def add_user():
    # DB write logic
@read_transaction
@api.get("/user")
async def get_user():
    # DB read logic

A request (or a function) annotated with one of these decorators will be in an atomic transaction until it exits, and rolled back if it fails.

At shutdown, the “stop” flag in each worker is set, and the server waits until all the workers complete their sleep cycles, peacing out gracefully.

async def stop(self):
    for worker in self.workers:
        worker.request_stop()

    while not all([w.stopped for w in self.workers]):
        await asyncio.sleep(1)

    logger.info("All workers have stopped")

Testing

The test suite leverages unittest.IsolatedAsyncioTestCase (Python 3.8 and up) to grant us access to asyncSetUp() - this way we can call await in our test setup functions:

async def asyncSetUp(self) -> None:
    self.conn = await asyncpg.connect(...)
    conn_manager.set_connection(self.conn)
    self.transaction = self.conn.transaction()

    await self.transaction.start()

async def asyncTearDown(self) -> None:
    await self.transaction.rollback()
    await self.conn.close()
    await self.ctx.pop()

Note that we set up the database only once for our test class. At the end of each test, the connection is rolled back, returning the database to its pristine state for the next test. This is a speed trick to make sure we don’t have to run database setup code each single time. In this case it doesn’t really matter, but in a test suite large enough, this is going to add up.

For delayed jobs, we simulate the future by freezing the clock at a specific time (relative to now):

# jump to the FUTURE
with freeze_time(now + datetime.timedelta(hours=2)):
    ripe_job = await job_db.get_one_ripe_job()
    assert ripe_job

Improvements

Batching - pulling more than one job at once would add major dragonforce to this system. This is not part of the example as to not overcomplicate it. You just need to be careful and return the failed jobs back in the queue while deleting the completed ones. With enough workers, a system like this could really be capable of handling serious common-scale workloads.

Server exit - there are less than trivial ways of interrupting worker sleep cycles. This could improve the experience of running the service locally. In its current form, you have to wait a few seconds until all worker loops get out of sleep() and read the STOP flag.

Try our service for streamlined code review assignments and notifications - Friendly Fire:
  • Notify of PRs and comments in Slack directly
  • Save Slack PR discussions to GitHub
  • Skip reviewers who are not available
  • File pattern matching
  • Individual code review reminders
  • No access to your codebase needed