RFD 397
Challenges with async/await in the control plane
RFD
397
Updated

Very early in the development of the control plane, we considered whether to build our big new distributed system using Rust’s async/await or more traditional synchronous threaded code. [rfd79] discussed this issue and came to the weak conclusion to prefer async/await, in no small part because much of the ecosystem we wanted to use (e.g., low-level http crates) was already async and seemed to be moving further in that direction, and we’d already written some async code.

Three years on, we’ve successfully built a big, complicated system using async Rust. But recently-discovered behavior around async task cancellation has shaken our confidence. We’ve encountered a few instances of hard-to-debug state corruption that resulted from a class of programming error that’s easy to introduce and that Rust itself is unable to help identify. In this RFD, we’ll discuss the problem space in more detail.

Note
[rfd400] follows up on this RFD with patterns for dealing with cancel safety in Rust.

Goals of this RFD

The goals of this RFD are to clearly explain the problem we’ve run into, its scope, risk factors, etc., and summarize a few areas of the solution space enough to show that we probably need a separate RFD (or RFDs) for solutions.

The goal of this RFD is not to rearchitect the whole stack or pick any other solution. That may be a thing we choose to do later (or even next). It may also not be a black-and-white decision.

Technical problem summary

Briefest summary: task cancellation in Rust is an undocumented[1] behavior whereby if the owner of a Future drops it (which can only happen either before the Future has ever been polled or when it is paused at an await point), then the Future stops running. As an author of a Future (including any async function), one needs to assume that the function might return at any point where you use .await, if you want your Future to be cancellation-safe. If your Future is not cancellation-safe, and it gets cancelled, arbitrarily broken behavior can result. We’ve hit such issues several times now: omicron#3098, omicron#3351, omicron#1714, omicron#3356.

Example with Mutexes

Let’s take a look at a simplified example based on real issues we’ve hit. (Much thanks to John Gallagher for the basis of these examples plus much of the work on which this RFD is based.)

A key use case for mutexes is to implement a critical section to enable enforcement of non-atomic invariants. It’s a common pattern to acquire a mutex, modify multiple fields (which is not atomic), then release the mutex, knowing that no other code could see the intermediate state where some of those fields were updated but not the others. Here’s a working example using sync Rust:

use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

/// Some arbitrary state tracked by the program, part of a state machine
#[derive(Debug)]
enum MyState {
One,
Two,
Three,
}

impl MyState {
/// Advances from one state to the next
// The assumption is that this does some non-trivial work represented by
// the "sleep" call. The state itself is consumed by value and a new state
// is returned.
fn advance(self) -> Self {
std::thread::sleep(Duration::from_millis(100));
match self {
Self::One => Self::Two,
Self::Two => Self::Three,
Self::Three => Self::One,
}
}
}

/// The simple state above is composed into a bigger system with `SystemState`.
/// There may be multiple threads with access to this.
// The underlying `MyState` is protected by a Mutex. This implementation stores
// the state as an `Option` because `MyState::advance()` consumes the current
// state, leaving us having no state at all (briefly, and only with the lock
// held) .
//
// INVARIANT: when the lock is acquired, the `Option` is always `Some`.
struct SystemState {
state: Mutex<Option<MyState>>,
}

impl SystemState {
fn new() -> Self {
Self { state: Mutex::new(Some(MyState::One)) }
}

/// Advances the underlying `MyState` (taking the lock and blocking until
/// the advance operation completes)
fn advance(&self) {
let mut system_state = self.state.lock().unwrap();
// Take the inner state enum out, but always replace it!
let current_state =
system_state.take().expect("invariant violated: `state` was None!");
*system_state = Some(current_state.advance());
}

/// Checks whether the invariant has been violated by acquiring the lock and
/// checking whether the state is None
fn check(&self) {
let state = self.state.lock().unwrap();
assert!(state.is_some());
}
}

fn main() {
let system_state = Arc::new(SystemState::new());

// Spawn a thread to repeatedly advance the state machine.
let s2 = Arc::clone(&system_state);
std::thread::spawn(move || {
let mut count: u64 = 0;
loop {
s2.advance();
count = count + 1;
eprintln!("advanced {} times", count);
}
});

// Now, repeatedly check the invariant.
let mut count: u64 = 0;
loop {
system_state.check();
count = count + 1;
if count % 50 == 0 {
eprintln!("checked {} times", count);
}
}
}

This is not a great way to write this code. The hope is that it’s realistic enough to illustrate the problem. This is a basic use of Mutexes and works fine. You can run this program (which is in this repo) indefinitely and it will report that it’s both advancing and checking the invariant regularly:

...
checked 400 times
checked 450 times
advanced 1 times
checked 500 times
advanced 2 times
checked 550 times
advanced 3 times
checked 600 times
...

Now let’s look at an obvious (but wrong) way to translate this to async code.

use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

/// Some arbitrary state tracked by the program, part of a state machine
#[derive(Debug)]
enum MyState {
One,
Two,
Three,
}

impl MyState {
/// Advances from one state to the next
// The assumption is that this does some non-trivial work represented by
// the "sleep" call. The state itself is consumed by value and a new state
// is returned.
async fn advance(self) -> Self {
tokio::time::sleep(Duration::from_millis(100)).await;
match self {
Self::One => Self::Two,
Self::Two => Self::Three,
Self::Three => Self::One,
}
}
}

/// The simple state above is composed into a bigger system with `SystemState`.
/// There may be multiple threads with access to this.
// The underlying `MyState` is protected by a Mutex. This implementation stores
// the state as an `Option` because `MyState::advance()` consumes the current
// state, leaving us having no state at all (briefly, and only with the lock
// held) .
//
// INVARIANT: when the lock is acquired, the `Option` is always `Some`.
struct SystemState {
state: Mutex<Option<MyState>>,
}

impl SystemState {
fn new() -> Self {
Self { state: Mutex::new(Some(MyState::One)) }
}

/// Advances the underlying `MyState` (taking the lock and blocking until
/// the advance operation completes)
async fn advance(&self) {
let mut system_state = self.state.lock().await;
// Take the inner state enum out, but always replace it!
let current_state =
system_state.take().expect("invariant violated: `state` was None!");
*system_state = Some(current_state.advance().await);
}

/// Checks whether the invariant has been violated by acquiring the lock and
/// checking whether the state is None
async fn check(&self) {
let state = self.state.lock().await;
assert!(state.is_some(), "invariant violated: `state` was None!");
}
}

#[tokio::main]
async fn main() {
let system_state = Arc::new(SystemState::new());

// Spawn a task to repeatedly advance the state machine.
let s2 = Arc::clone(&system_state);
let advancer_task = tokio::spawn(async move {
let mut count: u64 = 0;
loop {
s2.advance().await;
count = count + 1;
eprintln!("advanced {} times", count);
}
});

// Throw a wrench into things: at some point, let's cancel the advancer
// task. (There is no direct analog with threads.)
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(1500)).await;
advancer_task.abort();
});

// Now, repeatedly check the invariant.
let mut count: u64 = 0;
loop {
system_state.check().await;
count = count + 1;
if count % 50 == 0 {
eprintln!("checked {} times", count);
}
}
}

For reference, here’s the difference between these two files:

 $ diff example-{,a}sync/src/main.rs
2d1
< use std::sync::Mutex;
3a3
> use tokio::sync::Mutex;
18,19c18,19
< fn advance(self) -> Self {
< std::thread::sleep(Duration::from_millis(100));
---
> async fn advance(self) -> Self {
> tokio::time::sleep(Duration::from_millis(100)).await;
47,48c47,48
< fn advance(&self) {
< let mut system_state = self.state.lock().unwrap();
---
> async fn advance(&self) {
> let mut system_state = self.state.lock().await;
52c52
< *system_state = Some(current_state.advance());
---
> *system_state = Some(current_state.advance().await);
57,58c57,58
< fn check(&self) {
< let state = self.state.lock().unwrap();
---
> async fn check(&self) {
> let state = self.state.lock().await;
63c63,64
< fn main() {
---
> #[tokio::main]
> async fn main() {
66c67
< // Spawn a thread to repeatedly advance the state machine.
---
> // Spawn a task to repeatedly advance the state machine.
68c69
< std::thread::spawn(move || {
---
> let advancer_task = tokio::spawn(async move {
71c72
< s2.advance();
---
> s2.advance().await;
76a78,84
> // Throw a wrench into things: at some point, let's cancel the advancer
> // task. (There is no direct analog with threads.)
> tokio::spawn(async move {
> tokio::time::sleep(Duration::from_millis(1500)).await;
> advancer_task.abort();
> });
>
80c88
< system_state.check();
---
> system_state.check().await;

There are some straightforward translations from sync to async code:

  • change blocking functions to "async"

  • change call sites of these functions to use ".await"

  • change from a std Mutex to a tokio Mutex

  • change main() to spawn tasks instead of threads

This is consistent with the official documentation for async/await that suggests several times that async code is supposed to basically look like sync code.

There’s one other change: main() in the async version spawns a task that sleeps for 1500ms and then cancels the advancer task. While it’s explicit and maybe obviously suspicious, this is a stand-in for what can happen implicitly to any async Future: something else may trigger its cancellation at any point, and this is outside the Future’s control. There is no analog to this behavior in the threaded version because there is no way to cancel a thread. (On the other hand, that means if you do want to be able to cancel a long-running operation in synchronous threaded code, you need to figure out how to implement that.)

So what happens with the async program? It nearly always crashes after about 1.5 seconds with:

thread 'main' panicked at 'invariant violated: `state` was None!', src/main.rs:59:9

What’s going on?

  • The advancer task enters a loop acquiring the mutex, grabbing the current state out of the Option, sleeping for 100ms, storing the new state back into the Option, and dropping the Mutex. Critically, "sleeping for 100ms" is an "await" point at which task cancellation might happen.

  • After 1.5s, advancer.abort() is invoked. The purpose of this function is to cause tokio to drop the task (the advancer task, in this case). The task is not interrupted, but the next time it reaches an await point, tokio sees that the (tokio) task has been aborted and drops the (async/await) task, triggering task cancellation.

  • Cancellation isn’t so much an explicit behavior so much as just what happens if you stop polling on a Future and drop it. It, plus any state stored in it, gets dropped. In our case, at the "await" point, one of the pieces of state is "the tokio::sync::MutexGuard ". When that gets dropped, like the std::sync::MutexGuard, the lock is released. So the lock gets released with the invariant that it protects not having been restored.

To be clear, the code above is buggy. Given how async/await and task cancellation work in Rust, this code is not cancellation-safe, meaning that it violates its own invariants when it gets cancelled.

At the same time, it must be emphasized that cancellation is not mentioned in the "Asynchronous Programming in Rust" book or the other official docs that we searched, so it’s not clear how anybody would be expected to know about this problem.

Other examples

tokio is a widely-used crate providing an executor and a suite of async functions for common kinds of blocking work: file I/O, network I/O, timers, etc. It includes a select! macro (similar in spirit to the classic C interface of the same name) whose docs provide the best summary of cancellation we’ve seen in almost-official docs. It talks specifically about cancellation safety and enumerates:

  • a bunch of functions that are cancellation-safe

  • a bunch of functions that are not cancellation-safe in ways that can lead to data loss (!) (e.g., AsyncReadExt::read_to_string)

  • a bunch of functions that are not cancellation-safe apparently only in that fairness properties are lost when cancelled (e.g., if you use Mutex::lock() in a select! loop, which cancels the not-taken branches, you’ll lose your place in line each time your select! picks a different branch).

Compounding the confusion about this issue:

  • These lists are not exhaustive. They do not comprise all the functions in tokio.

  • Functions like tokio::time::sleep() are not listed here, but are separately documented as cancellation-safe.

  • The definition of "cancellation safety" here talks about recreating the Future, but a Future seems pretty unsafe with respect to cancellation if it leaves the program in an invalid state even if the Future itself is never recreated.

In general

As an author of a Future that you want to be cancellation-safe, if you call any other async function, you often want to know if that function is cancellation-safe. There is no way for the compiler to know whether a Future is cancellation-safe. There is no standard marker trait used to denote that something is cancellation-safe (or not). Even if there were, there would be know way to know it was applied correctly. (This is pretty different from Send, where it’s pretty rare that it needs to be applied by hand.) You either have to hope that fact is documented (and accurate) or verify it yourself. You also need to ensure that this continues to be true if it changes. As we saw, even in tokio — one of the most widely used pieces of the async ecosystem — it isn’t always clear whether a function is cancellation-safe or even where that would be documented.

Aside: this sucks

There’s a pretty big emotional component to this problem that’s important to acknowledge. I call this out explicitly because I’ve seen this problem revealed to one team member after another and each of us (myself included) quickly entered some combination of anger/denial/bargaining, which has led to some heated discussions and rash suggestions ("you can’t use this correctly" and "can’t we just …​ ?"). It’s helpful to call out that yeah, this sucks[2]. And also: it’s real, it’s working by design, some people like this approach, and it is possible to use it correctly. That doesn’t mean we have to like it or keep using it or that it’s a good fit for Omicron.

Risks in Omicron

It’s hard to assess the risk. See [_faq_cant_we_just_make_everything_cancellation_safe]. Still, we can say a few things.

Some risk factors that we know of:

  • Using tokio::sync::Mutex is risky. If held across an await point, the lock can be dropped without any more code running in that Future.

  • Any code running from a Dropshot server request handler can be cancelled if the client disconnects while the request is running. Work is ongoing to mitigate this risk by running request handlers in their own tasks.

  • Using anything that explicitly cancels Futures (see below for examples) carries that risk that the Futures are not cancel-safe.

Some areas where we believe there’s low risk:

  • Saga actions and undo actions should be safe. Each of these executes in their own tokio task. The only way they could be cancelled is if they were explicitly aborted. Steno keeps track of the JoinHandle that’s needed to do this, never aborts them itself, and never exposes the JoinHandle.

  • A potential risk might be that dropping the Executor itself might cancel all of its tasks. Most of our programs spin up an Executor in main that wouldn’t be dropped until the program is exiting anyway.

In Omicron, tasks can be cancelled:

  • as mentioned above, if they’re part of a Dropshot request handler function and the client disconnects

  • if they’re used in a tokio::select! call

  • if they’re spawned as a new tokio task and the caller explicitly aborts the task (with JoinHandle::abort())

  • if they’re used as part of try_join() or try_join_all() (used to run several Futures in parallel and stop them all when any of them fails)

  • if they’re used in any variant of tokio::timeout(), used to run a Future for longer than a given time interval (i.e., cancel if it’s taking too long)

  • if they’re used by any Future that can be cancelled for any of these reasons

Short-term plan

Most of the bugs we’ve root-caused related to task cancellation involved a Dropshot handler function being cancelled. Most of those involved a tokio::sync::Mutex. With dropshot#701, Dropshot will be able to run request handlers in their own tokio tasks. This should eliminate the cause of most the cases that we’ve found so far.

Why isn’t this a complete answer? Read on.

Note
[rfd400] follows up on this RFD with patterns that we can apply going forward to help mitigate these problems.

FAQ: "Can’t we just …​ ?"

FAQ: "Can’t we just fix this in Dropshot?"

This doesn’t fully solve the problem because:

  • It just eliminates the one source of cancellation we’ve seen be a problem. The Futures involved are still not cancellation-safe. They remain a land mine in that they’ll break if they’re ever used in a context that might cancel them (e.g., tokio::select!).

  • An unknown amount of code (presumably most of it) has not been checked for cancellation safety.

  • We don’t have an approach, methods, tools, etc. to ensure that new code is written with cancellation safety in mind.

omicron#3356 is an example bug that did not involve a Dropshot endpoint handler.

FAQ: "Can’t we just make everything cancellation-safe?"

It is certainly possible to write cancellation-safe Futures. Patterns include:

  • Use channels to communicate with a single background task that’s responsible for the work. This is a nice pattern but does involve a fair bit of boilerplate each time.

  • Use a Drop impl to restore any invariants temporarily broken by code across an await point. scope_guard is one way to do this.

It’s impossible to automatically know if any Future is cancellation-safe. In general, auditing a Future for cancellation safety is laborious: you have to identify any invariants that it or its callers rely on and see if those remain true if the Future stops running at any of the await points. Even if you verify that it’s correct today, that could change if the Future or any of the Futures that it waits on changes.

We know we’ve written a bunch of cancellation-unsafe code. So if we were just going to rely on writing correct code, presumably we’d at least need to work on getting the existing team up to speed and making sure new team members learn about these details, too. We’d almost certainly want to develop some guidance around this, too. Where does responsibility for cancellation safety lie? Must every Future be cancellation safe? Or might it be okay sometimes to rely on the caller to avoid cancellation? If so, how do we communicate that? How do we help ourselves get any of this right? Again, the lack of official docs here make this hard because it’s not clear what Rust programmers at large expect about cancellation safety of some arbitrary Future.

FAQ: "Can’t we just write an executor that doesn’t cancel tasks?"

Unclear if you could, but it doesn’t seem correct. We use task cancellation every time we use tokio::select! and most of the time we probably rely on it for correctness. Take this example[3] from gateway-cli. In a tokio::select! loop, it waits in parallel for (1) a line of input from stdin or (2) the next message from a stream of WebSocket messages. If we didn’t cancel these Futures, then say the first one to complete is the read from stdin, then the Future that waits for a WebSocket message will still run and consume that message, but you’ll never get that message. The message will just be lost (i.e., arbitrarily broken behavior, data corruption, etc.).

FAQ: "Can’t we just use the std Mutex?"

Even if we eliminated all use of the tokio Mutex, that doesn’t really solve the deeper problem. We’ve seen at least one task cancellation bug that did not involve the tokio Mutex. This might be an okay part of a short-term approach but doesn’t solve the long term problem.

Could we do it as a short term option? The tokio docs on its Mutex explain that (surprisingly) it’s potentially okay to use a std::sync::Mutex in asynchronous code. One problem with doing this is that if you block on the Mutex, you wind up blocking one of the async worker threads, which can starve every other async task that wants to run.

Also, the std::sync::MutexGuard is not Send, which means in practice we can’t hold it across an await point.[4] So it wouldn’t be a drop-in substitute for tokio::sync::Mutex — in fact, it seems like one likely couldn’t use it in the situations where the tokio Mutex would have done the wrong thing.

As an aside, the docs go on to say:

The primary use case for the async mutex is to provide shared mutable access to IO resources such as a database connection. If the value behind the mutex is just data, it’s usually appropriate to use a blocking mutex such as the one in the standard library or parking_lot.

but later:

Additionally, when you do want shared access to an IO resource, it is often better to spawn a task to manage the IO resource, and to use message passing to communicate with that task.

It sounds like if you take these docs at face value, you usually don’t want tokio::sync::Mutex anyway?

FAQ: "Can’t we just stop using async?"

This is probably something we could do. On the plus side, we’d probably gain a lot in debuggability. With synchronous threaded code, information about what’s blocked and why is evident on the call stack, which is available both in situ and post mortem. With async, this information is potentially available but not as easily accessed in either mode. This is discussed in more detail in [rfd79].

There’d be a bunch of work to figure out how to get to a synchronous threaded world, and presumably that’d need to happen incrementally.

Might we lose something we like about async/await? How would we implement things like tokio::timeout, which is used widely to "do this thing but stop if it takes longer than this amount of time"). How would we actually cancel things? If a client disconnects while we’re doing some expensive database operation, we do want to abort that. If we’re blocked reading the response from the database, how do we cancel it?

Another advantage of async code is that it’s easy to do something like make an HTTP request to several dependencies in parallel (a common pattern in distributed systems to reduce latency and improve availability). How would we do this in sync code? (You obviously can do this, but if it involves a lot of ceremony, and we find ourselves doing this often, that’s a notable downside.)

Debatable? FUD? Synchronous threaded systems sometimes have a bigger blast radius when things go out to lunch. Imagine if all CockroachDB queries hang: we would likely find that all of Nexus’s threads were blocked on queries or blocked waiting for a connection to the database. We wouldn’t be able to ask Nexus anything else, like, say, "why are you stuck?". Nexus might be dead in the water without CockroachDB anyway, so a better example might be if some other internal service that’s used in, say, 5% of requests went similarly out to lunch. Again, with even modest load, we’d eventually expect all Nexus threads to get stuck there. These problems can certainly be addressed (e.g., separate thread pools for separate chunks of work, aggressive timeouts, large caps on the thread pool, etc.), and in principle the same thing could happen with async/await if there were a cap on the total number of async tasks. The fear here is that it’s very easy to accidentally wind up here by not putting the right controls in all the places they need to be, whereas with async the system could often limp along in these situations, just using more memory for the stuck tasks.

FAQ: "Can’t we just …​ "

Maybe! This is a complex problem with many solutions that have unintended consequences. When we’re talking about the longer-term plan, we should probably spend a little time thinking through these consequences.

Next steps

It’s worth considering if there are other actions we want to take to mitigate the problem in the short term. By "short term", we mean things that can be done in like a week, like writing up specific guidelines for avoiding these problems.

We probably need a bigger discussion about direction, like do we try to move to sync? Maybe this should be scheduled for the next meet-up?

Note

Reading list

Footnotes
  • 1

    It might be controversial to say that this is undocumented? But no mention of task cancellation appears in the "Learn Rust" book, the "Asynchronous Programming in Rust" book, the Rust Reference book, or the Rustonomicon book. The futures crate mentions the word but never explains it, only treating it as synonymous with some Future being dropped. The tokio crate documentation mentions it in a few places, primarily in the documentation for tokio::select!. This is probably the best explanation we’ve found in any official documentation. It would be easy for someone to read all available documentation on async/await and not even know that cancellation was a thing, much less a critical concept for writing async code, unless they chose tokio as their executor and happened to reach for tokio::select!.

    View
  • 2

    Why does this situation suck? It’s clear that many of us haven’t been aware of cancellation safety and it seems likely there are many cancellation issues all over Omicron. It’s awfully stressful to find out while we’re working so hard to ship a product ASAP that we have some unknown number of arbitrarily bad bugs that we cannot easily even find. It’s also frustrating that this feels just like the memory safety issues in C that we adopted Rust to get away from: there’s some dynamic property that the programmer is responsible for guaranteeing, the compiler is unable to provide any help with it, the failure mode for getting it wrong is often undebuggable (by construction, the program has not done something it should have, so it’s not like there’s a log message or residual state you could see in a debugger or console), and the failure mode for getting it wrong can be arbitrarily damaging (crashes, hangs, data corruption, you name it). Add on that this behavior is apparently mostly undocumented outside of one macro in one (popular) crate in the async/await ecosystem and yeah, this is frustrating. This feels antithetical to what many of us understood to be a core principle of Rust, that we avoid such insidious runtime behavior by forcing the programmer to demonstrate at compile-time that the code is well-formed

    View
  • 3

    Not to pick on any code: this was the first result found with git grep tokio::select! that seemed probably correct.

    View
  • 4

    Really, it means holding it across the await point makes the Future not Send, which means neither it nor any Future that waits for it can be used with tokio::spawn(). There are special cases where this could be useful, but it’s not most of ours (including any Futures waited on from Dropshot handlers, saga actions, or "background tasks".)

    View