Important Notice: this service will be discontinued by the end of 2024 because for multiple years now, Plume is no longer under active/continuous development. Sadly each time there was hope, active development came to a stop again. Please consider using our Writefreely instance instead.

async programming

(sync (obtain-shork))

this blag post is, unfortunately, not about blåhaj, although they may make an appearance later on, so keep on reading 1

i wanted to talk a bit about async programming because it seems to be a bit of a complex topic and idk it might be helpful to understand what the point of this pile of extra programming features actually is

synchronous programming

haskal you lied to me

shush. let's write a program!

#include <netinet/in.h>

haskal you're writing C

we'll use a better language later i promise

#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    struct sockaddr_in addr = {0};
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htons(INADDR_ANY);
    addr.sin_port = htons(1337);

    bind(sockfd, (struct sockaddr*)&addr, sizeof(addr));
    listen(sockfd, 10);
    while (true) {
        int fd = accept(sockfd, (struct sockaddr*)NULL, NULL);
        char buf[128];
        ssize_t sz = read(fd, buf, sizeof(buf));
        if (sz > 0) {
            write(fd, buf, sz);
        }
        close(fd);
    }
}

this is a pretty trivial TCP echo server. it accepts connections and echoes back up to 128 bytes of input

$ ncat localhost 1337
hello world
hello world

but there's a problem:

$ ncat localhost 1337 &
[1] ...
$ ncat localhost 1337
hello world

# no response!

the server only handles one connection at a time! which is kind of a problem!

because of the purely sequential way we wrote this program, we can run into situations where it takes longer than really necessary for clients to get responses from the server. for example

client 1 connects. client 1 is on a slow connection
client 2 connects

server accepts client 1
server waits for client 1 to send data

client 2 sends data

client 1 is taking a while

...
client 2 waits
client 1 is still taking its time
...

server finally gets client 1 data
server responds to client 1
server gets client 2 data
server responds to client 2
client 2 goes like, idk, why did this take so long

clearly the real world doesn't always fit our sequential programming models

the key observation is when you make programs that interact with the Real World™, the Real World™ might not do stuff in the order that you expect. so instead of sequentially programming in a certain order, where each step blocks until a specific Real World™ thing has occurred, it's smarter to handle Real World Events™ in whatever order they actually happen in.

event loop

so in order to make a Smarter Program we can design it around an abstraction

while (true) {
    wait_for_next_event();
    event* evt = get_next_event();
    handle_event(evt);
}

the event loop is a way to structure programs that can respond to events in the order they actually happen in. which is useful!

the typical mode of operation is the program registers interest in a certain event occurring, in the form of some code that will get invoked when the event does occur. then, the event loop runs the code once it gets that event. events could be Real World things, like getting a new TCP connection, or data being available to read, or an alarm set to go off at a certain time, or even internal events that are manually triggered by other parts of the program

so let's upgrade this code. luckily there are cool libraries2 that do the event loop infrastructure for us, so we just have to add callbacks and get notified

#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

// cool library!
#include <ev.h>

// helper datatype that contains an event loop object along with a file
// descriptor
struct haskal_watcher {
    ev_io io;
    int fd;
};

// this function gets called when a socket is ready to read
void read_ready_cb(struct ev_loop* loop, ev_io* w, int revents) {
    int fd = ((struct haskal_watcher*)w)->fd;

    // get client data
    char buf[128];
    ssize_t sz = read(fd, buf, sizeof(buf));
    if (sz < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
        // sometimes it's not actually ready (footnote 3)
        // (this is why we set the sockets to nonblocking mode)
        // keep waiting
        return;
    }

    // respond
    if (sz > 0) {
        write(fd, buf, sz);
    }
    close(fd);

    // unregister interest in this socket
    ev_io_stop(loop, w);
    free(w);
}

void accept_ready_cb(struct ev_loop* loop, ev_io* w, int revents) {
    // try to accept()
    int fd =
        accept(((struct haskal_watcher*)w)->fd, (struct sockaddr*)NULL, NULL);
    if (fd < 0) {
        return;
    }
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);

    // register interest in the "read event" for this socket
    // this causes the event loop to notify us when we can call read()
    struct haskal_watcher* watcher = malloc(sizeof(struct haskal_watcher));
    watcher->fd = fd;
    ev_init(&watcher->io, read_ready_cb);
    ev_io_set(&watcher->io, fd, EV_READ);
    ev_io_start(loop, &watcher->io);
}

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK);

    struct sockaddr_in addr = {0};
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htons(INADDR_ANY);
    addr.sin_port = htons(1337);

    bind(sockfd, (struct sockaddr*)&addr, sizeof(addr));
    listen(sockfd, 10);

    // an event loop!
    struct ev_loop* loop = ev_default_loop(0);

    // tell the loop we're interested in when we can call accept() on the socket
    // (it's a "read event" for obtuse reasons)
    struct haskal_watcher watcher;
    watcher.fd = sockfd;
    ev_init(&watcher.io, accept_ready_cb);
    ev_io_set(&watcher.io, sockfd, EV_READ);
    ev_io_start(loop, &watcher.io);

    // run the loop (this function contains the actual loop)
    ev_run(loop, 0);
}

notice how in this version, instead of just calling functions that depend on Real World Interactions whenever we want, we call the functions only when the event loop library has notified us that the associated event has actually occurred (actually not really 3)

haskal this code is awful and verbose and really clunky

yeah! event looping is not a zero cost abstraction (sad ferris noises), it makes you have to split up your code into all these chunks that go into callback functions which handle responding to events and then schedule more callbacks to run later. it's verbose. but despite this, some programs are written in this clunky callback-based style. notably, JavaScript, before the standards people4 got ahold of themselves in 2017.

unfortunately we can't really fix this problem in a bad programming language. this is where we introduce an important new language feature that makes writing evented code way easier

✨ coroutines, continuations, green threads, etc ✨

a coroutine is an extension of a function. or like, precisely, the set of coroutines is a superset of the set of functions. coroutines add 2 cool new features to the function we all know and love

  • suspend. a coroutine has a list of breakpoints where execution can be suspended. when suspended, execution returns to the caller of the coroutine
  • resume. a suspended coroutine can be resumed at any time after being suspended

here's an example

async def my_coroutine(a: int) -> int:
    b = a + 1
    await other_function()
    return b

this is python, and in python we use the async keyword to mark that a function is a coroutine, not just a regular function. additionally, we use await to define a sort of "chained suspend point", which will pass any suspends of other_function through to the caller of my_coroutine, and if something else has been awaiting my_coroutine then it will be passed to the caller of that, and so on. to define a single non-chained suspend point, you make an object and define the await magic function, then await that 5 but your runtime probably won't like you if you do that

runtime?

coroutines that can suspend are great, but they need a caller to suspend to. the runtime is the boundary between coroutine land and the boring normal world. the runtime invokes coroutines, which will suspend and return back to the runtime at some point, and then the runtime can go call or resume other coroutines

that sounds familiar

yeah! the coroutine runtime is an event loop, except instead of being constrained to invoking callback functions it can also resume coroutines in response to events that it gets. so at a high level, await registers interest in some event and suspends the coroutine. then, the runtime resumes execution from the await keyword once the event is invoked. this allows writing code that looks sequential but can actually be run concurrently with other code

haskal this sounds a lot like the operating system's thread scheduler. why bother?

yeah, but the main advantage is scheduling in userspace is a lot faster. switching between userspace and kernel space can be really expensive, and OS thread objects are heavy, while coroutines can be suspended and resumed instantly, and you can easily create like, thousands, or even millions of them, which you can't do with OS threads

so here's the same server but in python

import asyncio

# handle one connection
async def handle_connection(reader, writer):
    # await suspends this coroutine until the socket is available for reading
    # this is at a low level kind of equivalent to `yield MagicRuntimeReadRequest(reader, 128)`
    # it sends the runtime a notice that we're interested in reading and suspends
    # then, the coroutine is resumed with the read data by the runtime
    data = await reader.read(128)
    writer.write(data)
    writer.close()

# this coroutine sets up the server and runs it forever
async def main():
    server = await asyncio.start_server(handle_connection, '0.0.0.0', 1337)
    await server.serve_forever()

# this sets up the coroutine runtime and runs the event loop
# calling the given coroutine as an entry function
asyncio.run(main())

here, asyncio.start_server expects a callback but the content of the callback can be written in a sequential-looking way. but don't be fooled! this server can handle multiple clients at the same time just as well as our callback-based C code. and for larger scale programs, trust me, this style of programming is way cleaner than the billions of callbacks you'd have in a callback-based event loop

aside: coroutines actually aren't magic either

suppose C had some magic coroutine mechanism so we could write this

async void my_coroutine(int a) {
    int b = a + 1;
    await other_function();
    return b;
}

we can actually transform this into a plain function with a few steps. first, move the stack to the heap

struct my_coroutine_frame {
   int a;
   int b;
};

async void my_coroutine(struct my_coroutine_frame* frame) {
    frame->b = frame->a + 1;
    await other_function();
    return frame->b;
}

create a state variable, and lift the code into a state machine split around each await keyword

struct my_coroutine_frame {
   enum { STATE_1, STATE_2 } state;
   int a;
   int b;
};

void my_coroutine(struct my_coroutine_frame* frame) {
    switch (frame->state) {
        case STATE_1: {
            frame->b = a + 1;
            return other_function();
        }
        case STATE_2: {
            return frame->b;
        }
    }
}

and convert calls to set up a frame on the heap. to resume the coroutine, increment the state variable in the frame and call the coroutine function again. and that's basically it

this is also most of how LLVM implements the C++ coroutines TS at a low level6

when your language has actual continuations

i wanna demo racket because it does things a bit differently

coroutine syntax in python, for example, is very clearly slapped onto the language after the fact. but in racket, userspace scheduling is an integral part of the language, and rather than having explicit syntax like async and await racket code will be suspended, scheduled, and resumed fully automatically. racket has first-class continuations7 so it's pretty natural

#lang racket

(define (handle-client in out)
  ;; i'm being lazy. read actually reads and parses an s-expression
  (define something (read in))
  (write something out)
  (close-input-port in)
  (close-output-port out))

(define listener (tcp-listen 1337))
(let loop ()
  (define-values [in out] (tcp-accept listener))
  (thread (lambda () (handle-client in out))
  (loop))

no visible await or anything! but this code will still handle multiple clients at the same time, like you'd expect. thread doesn't create an OS thread but a green thread that is scheduled in userspace. and functions like tcp-accept and read will automatically suspend and be scheduled for resumption when the corresponding event is ready -- and of course in the meantime other code can run

high level async codeing

with async-style programming you can really scale up to doing a legitimately ridiculous amount of stuff concurrently. to leverage async concurrency you basically need 3 tools

  • create new green thread and add it to the the event loop/scheduler. in python this is asyncio.create_task, in racket it's thread 8
  • wait for a group of events, and get the first event that completes. in python it's asyncio.wait with return_when=asyncio.FIRST_COMPLETED (also takes an optional timeout parameter), in racket it's sync (or sync/timeout) 9
  • it's also helpful to have some sort of "channel" to send stuff between coroutines in a FIFO fashion. in python you can use asyncio.Queue, and in racket there are thread mailboxes and async channels10

for example, a common task in Highly Concurrent Async Code might be to get through a list of tasks using a certain number of "active jobs", maybe also while printing progress to the console. as an example application, consider a web crawler that has to go through thousands of pages, and ideally wants to go as fast as possible.

(define (run-all-tasks tasks)
  (define NUM-JOBS 128)
  (define total-tasks (length tasks))

  (define (update-progress progress active total)
    (write-string (format "\r\x1b[J[~a%] ~a/~a (~a active)"
                          (floor (/ (* 100 progress) total))
                          progress total active))
    (flush-output))

  (let loop ([tasks tasks] [active-workers '()] [done-cnt 0])
    (if (and (not (empty? tasks)) (< (length active-workers) NUM-JOBS))
      (let* ([task-input (first tasks)]
             [task-thd (thread (lambda () (do-task-work task-input)))])
        (loop (rest tasks) (cons task-thd active-workers) done-cnt))
      (let* ([done-thd (apply sync active-workers)]
             [rest-thds (filter (compose not (curry equal? done-thd)) active-workers)])
        (unless (empty? rest-thds)
          (update-progress done-cnt (length active-workers) total-tasks)
          (loop tasks rest-thds (add1 done-cnt))))))

  (write-string "\r\x1b[Jdone\n")
  (flush-output))

this maintains a list of NUM-JOBS active green threads executing tasks, and adds to the list as necessary, while printing a fancy status to the console, and it's pretty straightforward. imagine writing something like this with callback-based code. yikes.

the point

this maybe didn't go as in depth as might be useful. but the point is, async-style code and event loops aren't meant to be scary or magic, it's not a mega complicated programming monster, it's just a way to write efficient concurrent programs. coroutines and green threads are a way to allow parts of code to be suspended and scheduled in userspace, which lets you write programs almost the same way as you would in fully blocking sequential code, but with the advantage that other parts of the code can run while it waits for the necessary Real World™ events, in a manner that's more efficient and scalable than OS threads. so hopefully you get why this is cool and useful now

(sync (obtain-shork))

#<shork>

a tactical smolhaj with its head sticking out of a backpack. ready for instant cuddles anywhere


1

https://social.pixie.town/users/f0x/statuses/104344539354792377

2

i also use the docs for this as an example of how to write good fucking software documentation. i wish all library docs were like this. gods.

3

for certain reasons these things are allowed per spec to lie to you about events -- eg telling you that a socket is available for read() when the read call would actually block. this might not actually happen in practice but we code defensively anyway

for example from the man page of select(2), one of the POSIX mechanisms for getting event notifications from the operating system for your event loop, which libev might use as a backend:

On Linux, select() may report a socket file descriptor as "ready for reading", while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has the wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.

4

as i understand, the standards people are Google. and also Google. and maybe some more Google

5

you might recognize this sort of behavior as generators and await as yield from and __await__ as __iter__ and that's because yes in python coroutines are actually generators, and the syntax actually used to be yield from once upon a time. now, coroutines exist as a second generator implementation parallel to real generators because that allows you to make coroutine generators

6

the coroutines TS is really cool. the problem is it requires ridiculous amounts of supporting line noise to actually work :(

i suppose python asyncio is basically line noise too though

7

a continuation is an abstract unit of program execution. continuations can be used to implement all the program control flow structures you're used to (including coroutines!). exceptions are a limited form of continuation in some languages. C setjmp/longjmp is also kind of a limited form of continuation

8

in javascript this is: ???????

9

in javascript this is: ???????

10

in javascript this is: ???????