RFD 400
Dealing with cancel safety in async Rust
RFD
400
Authors
Updated

Introduction

While developing the control plane, we’ve found that one of the bigger practical issues has been async cancel correctness: the relatively unpredictable way an async Rust system can behave if futures are dropped [rfd397]. Cancellation is an example of "spooky action at a distance", and having to think about it imposes a significant cognitive burden on authors and reviewers of asynchronous code. Both of these are quite uncharacteristic of Rust.

This document aims to be a practical guide to dealing with cancel safety in Rust, and covers four main topics:

  1. What is a general overview of async cancellation?

  2. What are the main sources of cancellation in async Rust?

  3. As an author of asynchronous library code, how do you write cancel-safe code? How do you indicate to consumers that some APIs are cancel-unsafe?

  4. As an application that consumes asynchronous code, how do you deal with cancel-unsafe futures?

The intended audience for this document is anyone who writes or reviews asynchronous Rust code, particularly in Omicron but also more generally in any context.

Manually implemented futures vs async blocks

A Rust future is, at its core, a state machine for deferred computation. Rust permits futures to be written in two different ways.

  1. Manually implemented futures are values for which std::future::Future is implemented by hand. For example, this is a manually implemented future that immediately returns a value.

    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};

    struct MyFuture {
    value: Option<String>,
    }

    impl Future for MyFuture {
    type Output = String;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    return Poll::Ready(self.value.take().expect("future polled after completion"));
    }
    }
  2. Async blocks are sections of code wrapped in async { }, or in async fn functions. For example, this is an async block that immediately returns a value:

    async {
    let output: String = "future-output".to_owned();
    output
    }

While both kinds of futures are susceptible to cancel safety issues, async blocks tend to hide some of the mechanics of how futures work. This means that in practice, a lot of the lurking surprises of async cancellation lie in async blocks, not in manually implemented futures.

Overview of cancellation

A future, on being dropped, is cancelled. This means that the state machine corresponding to the future no longer continues execution past the point at which it is dropped. (A future can even be dropped before it is polled for the first time, in which case it never gets to execute.)

Tip

This is easiest to see with manually implemented futures. It’s like calling the poll method a few times, having it return Poll::Pending, and then dropping the future before it gets a chance to return Poll::Ready.

On being cancelled, any values owned by the future are dropped. This can include other futures owned by the dropped future. In other words, cancellation propagates from parent futures to child futures.

If any values owned by the future have a Drop implementation, it is called. For example, tokio::sync::SemaphorePermit (returned by Semaphore::acquire) has a Drop implementation. In the below code:

let semaphore = Semaphore::new(2);

async {
let permit = semaphore.acquire().await.unwrap();
println!("permit acquired");
tokio::time::sleep(Duration::from_secs(5)).await;
}
  • If the async block is dropped before the permit is acquired, the future returned by semaphore.acquire() is dropped.

  • If the async block is dropped after the permit is acquired, during the sleep, permit is dropped and its Drop implementation is called.

In either case, since the async block doesn’t take ownership of semaphore (it isn’t async move), semaphore isn’t dropped.

Cancellation can only happen at await points, due to the cooperative multitasking model that futures use. For example, consider this Rust code:

use std::time::Duration;

async fn my_func() {
print!("1");
print!(", 2");
tokio::time::sleep(Duration::from_secs(5)).await;
print!(", 3");
print!(", 4");
}

This can print any one of:

  • 1, 2, 3, 4, if the future returned by my_func() runs to completion.

  • 1, 2, if my_func() runs up to the await point but is cancelled after that. (For example, tokio::time::timeout(Duration::from_secs(1), my_func()));

  • Nothing, if my_func() is cancelled before ever being awaited.

my_func() cannot print 1 or 1, 2, 3, or any other kind of intermediate value.

Tip

This is easiest to see with manually implemented futures as well. You cannot drop a future while its poll method is being called, but you can drop it in between poll calls.

Cancellation can only call synchronous code. This is due to the Drop function not being async. There are plans to add AsyncDrop, but as of 2023-06 it’s still a long way away from being done.

Cancellation is often a feature, not a bug. Many cancellations are wanted and desirable. For example, if you’re waiting for a tokio::sync::SemaphorePermit, and you drop that future, you lose your place in line. This is good because the system doesn’t waste resources waiting on an acquirer that no longer exists.

As an alternative, more explicit cancellation models have their own ways of dealing with this[1].

Cancel safety

We say that a future is cancel-safe if it can be dropped before being completed, without any implications for the rest of the system.

As an example, Tokio’s MPSC channels have an asynchronous method to receive a message, called Receiver::recv. This method is cancel-safe: it either completes and resolves to a message, or is dropped and no messages are received. Receiver::recv does not have an intermediate state where a message has been retrieved from the queue but has not been delivered to the caller.

Notably, the cancel safety of Receiver::recv is a matter of careful API design and implementation. It is certainly possible to write a version of Receiver::recv that holds on to a message across an await point, causing potential data loss. But that’s not how the current version of recv works. That it will continue to work this way is guaranteed in the API documentation (and presumably Tokio’s tests).

In our definition, cancel safety is a local property of a future.

Cancel correctness

We define cancel correctness as a global property: the ability of a system to behave correctly, considering that futures within it might be cancelled.

Cancel correctness is violated if all three of the following conditions are met:

  1. The system has a cancel-unsafe future somewhere in it.

  2. This future is cancelled.

  3. Cancelling the future violates some property of the system.

Breaking these down one by one:

The system has a cancel-unsafe future somewhere in it. A system that consists entirely of cancel-safe futures cannot have cancel correctness bugs. [making_cancel_safe] covers ways to turn cancel-unsafe futures into cancel-safe ones.

This future is cancelled. Cancel correctness issues can only occur if futures are actually cancelled. This might seem like a trivial point at first! But it’s worth keeping in mind as there are usually ways to arrange for futures to not be cancelled.

Cancelling the future violates some property of the system. Properties that need to be upheld for correctness include, but are not limited to:

  • No data loss. If a future that reads from or writes to a stream is cancelled, data currently in flight must not be lost. Case study: the Oxide serial console. If necessary, there should be a way to create new futures that resume reading or writing data at the point at which it was cancelled.

  • Invariants upheld. If a future is cancelled, any shared state it was operating on must not be left in an invalid state.

    In Rust code, it is generally okay to temporarily have invalid state while holding a &mut reference to it, so long as the state is made valid by the time the &mut reference is released. This is a big part of Rust’s claims about "fearless concurrency", at least when it comes to non-async code. The analogue of this in async code is that in cancel-safe futures, shared state must not be invalid across await points. Case study: wicketd’s installinator progress tracker.

    Tip

    One way to look at this issue is via a future’s "cancellation blast radius": the other parts of a system that are affected when a future is cancelled.

    For example, tokio::time::sleep is entirely self-contained. Cancelling it has no impact on the rest of the system.

    A cancellation in code that has some internal mutable state but touches shared state in a read-only fashion is also okay. That’s because even if that internal state becomes invalid, it is immediately discarded.

    Cancellations that make shared mutable state invalid cause problems. This occurs most commonly with Tokio’s mutexes, which is why this guide recommends not using them.

    Cancellations that affect state outside of the process are especially fraught. For an example, see this case study.

  • Cleanup. If a system sets up futures that perform some cleanup at the end of execution, the system must be robust to some or all of those futures being cancelled. In the case of a database connection pool, this can mean an additional check that the connection is in a pristine state at the beginning of transactions, along with cleanup at the end of transactions. Case study: async_bb8_diesel.

  • Fairness. Some futures (e.g. tokio::sync::Mutex::lock), when dropped, make you "lose your place in line". Systems that cancel such futures need to be robust against the lack of fairness.

    Fairness tends to be less of a concern than other issues, especially in uncontended queues. Futures that are cancel-safe except for fairness can be described as mostly cancel-safe.

Which properties of a system must be upheld can also change over time. For example, consider a curl equivalent that fetches an HTTP URL and writes it to disk. If the operation to write data to disk is modeled as a future, canceling it would likely lead to correctness bugs. However, if the user presses Ctrl-C to terminate the program, canceling this future becomes okay.

Sources of cancellation

While a future is cancelled whenever it’s dropped, some patterns of code cause futures to be cancelled frequently. This section contains a list of common patterns that involve future cancellation. (This isn’t meant to be an exhaustive list.)

The select! macro

Perhaps the most common source of async cancellation bugs is tokio::select!, which polls a set of futures concurrently until the first one completes.

Consider this example:

use std::time::Duration;

let (sender, receiver) = tokio::sync::mpsc::channel(8);
spawn_send_task(sender); // give sender off to a task to send values from

let sleep = tokio::time::sleep(Duration::from_secs(5));

tokio::select! {
value = receiver.recv() => {
// ... process value
}
_ = sleep => {
println!("sleep completed");
}
}
  • If the sleep completes before receiver.recv() returns a value, then the receiver.recv() future is cancelled. That is okay because Receiver::recv is documented to be cancel safe.

  • If receiver.recv() completes before the sleep does, then the sleep future is cancelled. The sleep future is notionally stateless[2], and cancelling it has no effect on the rest of the system.

select! loops

While select! is sometimes used as a one-off, in practice most uses of select! involve some sort of looping. Following on from the above example, consider an example spawn_send_task function:

fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];

tokio::task::spawn(async move {
for value in strings {
sender.send(value).await;
}
});
}

This looks like idiomatic Rust code. There’s nothing wrong with it. But let’s say that in practice we’ve been noticing slowdowns sending values to the channel, and we’d like to add some logging to see why it’s slow. Our first attempt is to put a simple select! around the sender.send(value) future, logging at periodic intervals:

fn spawn_send_task(sender: tokio::sync::mpsc::Sender<usize>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];

tokio::task::spawn(async move {
// interval.tick() completes execution at 0s, 1s, 2s...
let mut interval = tokio::time::interval(Duration::from_secs(1));

for value in strings {
tokio::select! {
res = sender.send(value) => {
if res.is_err() {
println!("receiver closed");
break;
}
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
});
}

It turns out that the above code is incorrect, in a somewhat non-obvious fashion. That is because sender.send(value) is not cancel-safe.

Why is sender.send not cancel-safe? Consider what happens if interval.tick() completes before sender.send(value) does. In that case, value will not be sent. Per Rust’s single-ownership model, the value will be dropped—lost in the ether.

Note
Sender::send was only documented to be cancel-unsafe after this RFD was written and the author submitted a PR to the Tokio project, in Tokio 1.33. Before that, you had to infer the lack of cancel safety from the type signature: since send owns the value, if the future is dropped before it completes, the value is lost.

There is a convenient way to fix this issue, which is covered in [spawn_send_task_2].

Timeouts

Another common source of cancellation issues is timeouts. Consider the following code that uses tokio::time::timeout:

use std::time::Duration;

let (sender, receiver) = tokio::sync::mpsc::channel();
tokio::time::timeout(Duration::from_secs(5), sender.send(20)).await;

This is equivalent to running select! over a sender.send() and a tokio::time::sleep, and just like in that case the value is lost if the timeout occurs. The solution covered in [complex_ops] also addresses this issue.

In general, any operations that are in some sense equivalent to select! are going to run into the same issues as select!.

Try-joins

The Rust async ecosystem has several implementations of "try-joins":

These implementations await multiple fallible futures, completing when either all futures return Ok or any future returns Err. The common property shared by these adapters is that they all perform early cancellations: as soon as a future fails, all remaining futures are cancelled.

This is often the right thing to do. This code runs two HTTP GET queries, and if one of them fails, it cancels the other one (Rust playground).

use hyper::{client::HttpConnector, Uri}; // 0.14.27

struct Users { /* ... */ }
struct Posts { /* ... */ }

async fn fetch_users_and_posts(,
client: &hyper::Client<HttpConnector>,
) -> hyper::Result<(Users, Posts)> {
let users_fut = client.get(Uri::from_static("https://my.domain/api/users"));
let posts_fut = client.get(Uri::from_static("https://my.domain/api/posts"));

let (users_response, posts_response) = tokio::try_join!(
users_fut,
posts_fut
)?;

// -- process users_response and posts_response here

let users = Users { /* ... */ };
let posts = Posts { /* ... */ };
Ok((users, posts))
}

But for operations that have side effects, try_join can lead to bugs. For example, consider this code which flushes two tokio::io::AsyncWrite implementations:

use tokio::io::{AsyncWrite, AsyncWriteExt};

let write1 = /* AsyncWrite impl, eg tokio::fs::File */;
let write2 = /* another AsyncWrite impl */;

let result = tokio::try_join!(write1.flush(), write2.flush());
result?;

In this case, if either flush fails, the other flush is cancelled immediately. This is incorrect most of the time: we’d like both flush operations to make as much progress as possible.

A solution is covered in [then_try_adapters].

Task aborts

Unlike a future which has no existence outside of its owner, a task is owned by the executor (typically Tokio). Tasks are spawned using the tokio::task::spawn method. This method returns a JoinHandle, which can be used to abort the spawned task. The task is cancelled at the next await point. Any running futures within it are cancelled as well.

For example, consider this task:

let join_handle = tokio::task::spawn(async move {
do_long_running_operation().await;
});

// some time later...
join_handle.abort();

With the abort(), the future returned by do_long_running_operation() can be cancelled at any await point. It is unlikely that arbitrary async Rust code is resilient to cancellations anywhere during control flow[3].

This makes the cancellation problem global rather than something scoped to select! and other similar operations. For this reason, the author believes that task aborts should be avoided entirely. In particular, in most cases task aborts should not be treated as part of normal control flow, just like panics aren’t part of normal control flow.

For a practical solution to cancelling long-running tasks, see [explicit_cancellation].

Note

Unlike with individual futures, dropping a Tokio task’s JoinHandle does not cancel the corresponding task. While this is an implementation choice—some other executors choose to cancel the task on drop—it is one that is only afforded to tasks because they’re owned by the executor. Individual futures must be cancelled on being dropped.

Runtime shutdowns

If a Tokio runtime is shut down, all tasks running within it are cancelled at the next await point. While that is expected behavior, it can lead to some unexpected consequences.

With common patterns like #[tokio::main], runtime shutdowns happen at process exit. However, they can also happen at other times (e.g. within tests, or if a runtime is manually created). In particular, a panic can cause an unwind and a runtime shutdown outside of typical conditions.

This has some consequences:

  1. Let’s say you create a task, and then immediately drop its JoinHandle. You might expect that the task will run forever. That is not true: if the runtime is shut down due to a panic, then the task is cancelled. Code that interacts with this task should handle the case where the task has been cancelled.

  2. If there are two tasks A and B running on the same runtime, and the runtime is shut down, then the tasks are cancelled in arbitrary order. This can lead to nondeterministic panics. See Dropshot issue #709 for an example.

Cancel safety while writing async APIs

If you’re writing async APIs or returning futures, you need to think about how your users are going to approach cancel safety. The general strategy is using a combination of two approaches:

  1. Making code cancel-safe as much as reasonably possible.

  2. Clearly marking the rest as not cancel-safe.

Making code cancel-safe

There is sadly no silver bullet for making code cancel-safe. This section lists out a number of designs that authors can use and reviewers can look for, and example case studies. The hope is that the vast majority of cancel-safety issues can be addressed via these patterns.

Split up complex operations

In [select_loops] we observed that Tokio’s mpsc::Sender::send is not cancel-safe, and can lead to data loss.

There is a solution to this, which we can figure out from looking at the source code for Sender::send. Sender also exposes a reserve method, which returns a permit that can be used to send a value. reserve is mostly cancel-safe: dropping it makes you lose your place in line, but is otherwise okay.

Example: A corrected spawn_send_task

The [select_loops] section had an example spawn_send_task function which was identified as buggy. By using Sender::reserve, this code can be made correct.

This version of the code is a bit more complex. It reads as:

Code sample
fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];

tokio::task::spawn(async move {
// interval.tick() completes execution at 0s, 1s, 2s...
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut strings = strings.into_iter();

// std::vec::IntoIter implements ExactSizeIterator, which has a len()
// method which tells us if the iterator is non-empty. If your
// iterator doesn't implement ExactSizeIterator, you can instead
// use iterator.peekable(), and peek() to see if any values remain.
while strings.len() > 0 {
tokio::select! {
permit = sender.reserve() => {
match permit {
Ok(permit) => {
let value = strings.next().unwrap();
// send() is synchronous because the async part
// (waiting for a slot to be available) has
// already been completed by the time the permit is
// acquired.
permit.send(value);
}
Err(_) => {
println!("receiver closed");
break;
}
}
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
});
}

There is also a way to solve this issue that doesn’t make you lose your place in line. This is covered in [spawn_send_task_3].

Reserve and related patterns

mpsc::Sender::reserve is an example of a common pattern, where the general sequence of async operations is:

  1. An initial async operation that is at least mostly cancel-safe.

  2. A synchronous operation and/or a followup async operation, that isn’t cancel-safe[4].

Some convenience methods like Sender::send combine both steps. These APIs can be easier to use sometimes. But providing these kinds of convenience APIs can lead to users accidentally introducing bugs, as seen in [select_loops].

If you’re writing such APIs, consider not providing convenience methods like Sender::send, and instead mentioning cancel safety in the documentation. While it can be a bit surprising at first (a sender that can’t just send values?), not providing such convenience methods makes it easier to train users into writing correct code. Whether this is a good idea or not depends on the situation.

Now let’s look at a case study which shows how to split up complex operations in practice.

Case study: Oxide serial console

The Oxide control plane provides a serial console, with an interface to web and command-line clients. Nexus has a function called proxy_instance_serial_ws, which proxies data back and forth between Propolis and client. In omicron#3356, it was discovered that proxy_instance_serial_ws used tokio::select! in a cancel-unsafe manner.

Specifically, this function selected against four different futures:

  1. A StreamExt::next future.

  2. A future returned from a method called InstanceSerialConsoleHelper::recv.

  3. An instance of SinkExt::send.

  4. InstanceSerialConsoleHelper::send, which was a wrapper around SinkExt::send.

Out of these four futures, only future 1 was cancel-safe.

  • Future 2 was not cancel-safe because it read a value off of a stream, then processed it using further awaits in the middle. If this future was cancelled in the middle, the value could have been read off the stream but not processed, and lost.

  • Future 3 was not cancel-safe because SinkExt::send is not cancel-safe, for the same reason that mpsc::sender::Send is not cancel-safe.

  • Future 4 was not cancel-safe because it was a wrapper over SinkExt::send.

Note

StreamExt::next is not documented to be cancel-safe, and its cancel safety depends on how the underlying Stream behaves. However, a Stream that couldn’t handle interruptions while generating its values would clearly be buggy. After all, the entire point of a Stream is to iterate over values in situations where the next item isn’t immediately available[5].

With SinkExt::send, no matter how well-written the underlying Sink is, the operation is not cancel-safe. In other words, the cancel unsafety is inherent to the send operation.

These issues were tackled separately, by splitting up complex operations in two different ways.

Making InstanceSerialConsoleHelper::recv cancel-safe
Description, splitting up a complex operation

InstanceSerialConsoleHelper::recv is a method that did two things:

  1. The method first retrieved the next message from an incoming Stream, using StreamExt::next.

  2. After that, recv processed the incoming message.

    • Most of the time, the message would simply be returned to the user.

    • However, if the message was an in-band control instruction to migrate to another server, recv would actually perform that reconnection—an operation with two more await points. After that, the message would be returned to the user.

Step 1 is cancel-safe, since it is just StreamExt::next. However, the migration in step 2 is not cancel-safe, since cancelling the recv future in the middle would mean that an in-progress migration might not be completed and the message might be lost. Here’s the way we chose to solve the cancel-safety issue:

  • Split recv into steps 1 and 2. Perform step 1 in recv(), making it return a future rather than the message.

  • Make the future returned from recv, called InstanceSerialConsoleMessage, perform step 2 and return the message[6].

  • Document that the future returned by step 1 is not cancel-safe and must be awaited for system correctness. (As an improvement, we can check for this explicitly by setting a flag in InstanceSerialConsoleHelper, but the natural flow of using messages leads to correctness so it hasn’t been necessary.)

Operating on the helper then becomes:

let mut helper: InstanceSerialConsoleHelper = /* ... */;

tokio::select! {
res = helper.recv() => {
let message = res?.await?;
// operate on message
}
// ... other select branches
}
What if the parent function is cancelled?

As covered in [overview], cancellation propagates from parent futures to child futures. A natural question to ask is, what if the parent function is cancelled while at the await point in let message = res?.await?;?

The answer to that is that the parent function also owns and is responsible for the client and server streams. If the parent function is cancelled, the stream is going to be closed anyway, so any state that becomes invalid is on its way to being destroyed.

Alternative solutions

These solutions aim to make all of recv cancel-safe, rather than splitting recv into cancel-safe and cancel-unsafe parts. This can be done through careful state management.

One alternative is to spawn a background task to perform the migration. propolis#438 implements this approach.

Another alternative is to resume partial progress by storing in-progress migration messages on self:

  • On receiving a migration message, store a copy of it in a field on self, say, self.migration_message (an Option<T>), before operating on it.

  • After completing the migration, set self.migration_message to None.

  • At the start of recv(), if self.migration_message is Some, process self.migration_message rather than reading a message from the incoming Stream.

Either of these would have been a fine way to solve this problem as well, but they weren’t chosen for domain-specific reasons.

Making SinkExt::send cancel-safe
Description, using the reserve pattern

The SinkExt::send method performs three operations in sequence:

  1. Call the Sink::poll_ready method until it completes.

  2. Call the (synchronous) Sink::start_send method.

  3. After that succeeds, call the Sink::poll_flush method until it completes.

This isn’t cancel-safe for the same reason that mpsc::Sender::send isn’t cancel-safe: the value might not be sent and instead be lost. But also, a reserve-based solution works just as well here as it does with mpsc::Sender::send!

The upstream futures crate doesn’t provide a reserve-pattern API, so we wrote our own (documentation). This API returns a Permit which indicates that step 1 above has been completed. This Permit is then used to send a message, performing steps 2 and 3.

use cancel_safe_futures::prelude::*;

let mut sink = /* ... */;

tokio::select! {
res = sink.reserve() => {
let permit = res?;
res.send(value).await?;
// operate on message
}
// ... other select branches
}

The Permit holds on to a mutable reference to the sink, so the sink can’t be used for other purposes while the permit is active[7].

Making InstanceSerialConsoleHelper::send cancel-safe

InstanceSerialConsoleHelper::send was a wrapper around SinkExt::send that didn’t do anything else. To address cancel-safety issues with this method, we removed it and instead made InstanceSerialConsoleHelper implement Sink. Then, the reserve method implemented above can be used for this select branch as well.

Resume from partial progress

If an async function performs several asynchronous steps in succession, it can often be made robust against cancellation issues in select! loops by storing and resuming partial progress.

The basic idea here is to store the fact that some progress has happened, either

  • internally in a field, or

  • externally via a &mut parameter.

Progress can then be resumed from the point at which the function is called again.

For an example showing how to store partial progress internally, see [serial_console_recv] (an alternative solution).

For an example showing how to externally track partial state, see [case_study_write_all_buf] below.

Case study: AsyncWriteExt::write_all_buf

For synchronous code, the standard library’s std::io::Write has a write_all method that attempts to write an entire buffer into a writer. This method works for simple cases, but since there’s no reporting of partial progress, it does not provide the ability to recover from errors—or even know how much has been written out. This means that many users have to hand-roll their own version of write_all, which is easy to get wrong[8].

When this method is ported to async code, users not only have to worry about errors, they also need to think about situations where a write operation is interrupted by a different branch of a select! firing. This issue was solved in Tokio’s AsyncWriteExt through some clever API design.

First, a trait called bytes::Buf was designed to enable recording partial progress. This trait is an abstract way to represent a read-only buffer of bytes, and has three important methods:

  1. fn remaining(&self) → usize: Reports the number of remaining bytes. (There’s also a has_remaining(&self) → bool method.)

  2. fn chunk(&self) → &[u8]: Returns the next chunk of bytes.

  3. fn advance(&mut self, cnt: usize): Advances the buffer by a particular count of bytes.

A Cursor over any sort of contiguous memory buffer implements the bytes::Buf trait[9].

Then, AsyncWriteExt::write_all_buf accepts any &mut B where B: bytes::Buf. It reports partial progress to the Buf implementer: advance is called on it with however many bytes were successfully written.

This means that code that calls write_all_buf inside a select! loop is correct:

async fn write(writer: &mut W, data: &[u8]) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
{
// Cursor<&[u8]> implements the bytes::Buf trait.
let mut cursor = Cursor::new(data);
while cursor.has_remaining() {
tokio::select! {
res = writer.write_all_buf(&mut cursor) => {
res?;
}
// ... some other branch
}
}

Ok(())
}

Use cooperative (explicit) cancellation

In [task_aborts], we saw that aborting Tokio tasks is problematic because a task could potentially be cancelled in the middle of a cancel-unsafe operation. However, it often is a domain requirement to cancel tasks. One way to solve this is by using cooperative, or explicit, cancellation channels.

The cancel-safe-futures library maintained by Oxide has a coop_cancel module that implements cooperative cancellation with a "fan-in" model: many potential cancelers and one receiver.

See the coop_cancel documentation for more, including examples.

Don’t use tokio::sync::Mutex

Tokio comes with a Mutex that is specifically designed for use in asynchronous contexts. The official recommendation is to prefer std::sync::Mutex if locks are not held across await points, and tokio::sync::Mutex only if locks are held across awaits.

However, given that:

  • Almost all uses of mutexes are to temporarily violate code invariants within a critical section, restoring them by the end of the operation.

  • Mutexes are almost always shared between futures (otherwise why would a mutex be used at all), which means that the "cancellation blast radius" of a future holding a mutex extends to other futures.

  • std::sync::Mutex has a notion of lock poisoning. Tokio’s mutexes don’t[10].

What this suggests is that if a future that is currently holding on to a mutex is cancelled, the state guarded by the mutex is likely invalid. This is such a big problem that this guide recommends avoiding tokio::sync::Mutex to the greatest extent possible.

Alternatives to Tokio mutexes

The recommended alternative to a Tokio mutex is a message-passing design, as described in this Tokio tutorial. Rather than having futures access some common shared state, this scheme has a single manager task that has full (non-shared) ownership of this state. This task receives requests in serial order, via an MPSC channel, and sends responses over (typically) oneshot channels.

TODO: add note about RobustMutex here.

A final option is to switch to std::sync::Mutex, and not hold locks across await points. Be aware, however, that the worker thread is fully occupied while waiting for the lock. This is not a problem if the lock is uncontended and only held for short periods of time.

If you really must use a Tokio mutex

It isn’t impossible to write correct code that manages Tokio mutexes, just very difficult. Some approaches you could take:

  1. Declare that your code isn’t cancel-safe.

  2. Ensure that invariants are always restored between await points. Having invalid state between await points is okay[11].

  3. Allow state to become invalid, but clean it up at the start of every critical section (i.e. every time the lock is acquired).

All of these options require careful analysis, which isn’t true for any of the recommended suggestions.

Spawn background tasks to perform cancel-unsafe operations

Unlike a future which has no existence outside of its owner, a task is owned by the executor (typically Tokio). This means that in some situations, a background task can be used to create a cancel-safe interface around a cancel-unsafe future.

For example, consider a next method that does something cancel-unsafe:

struct MyHandler {
inner: /* some stream of data */,
}

impl MyHandler {
// This is cancel-unsafe because message will be dropped if
// the future is cancelled in the middle of the operation.
async fn next(&mut self) -> MyMessage {
let message = self.inner.next().await;
process_message(&message).await;
message
}
}

As described in [marking_unsafe], a method called next must be cancel-safe. One way to achieve that is to spin up a background task to process the message:

Code sample
struct MyHandler {
inner: /* some stream of data */
join_handle: Option<MyJoinHandle>,
}

impl MyHandler {
async fn next(&mut self) -> MyMessage {
// If an existing background task exists, wait for
// that to complete.
if let Some(join_handle) = &mut self.join_handle {
let message = join_handle.await_completion().await;
self.join_handle = None;
return message;
}

let message = self.inner.next().await;
let join_handle = MyJoinHandle::new(message);

// Set self.join_handle to Some before the next await point.
// This enables resumption if this future gets cancelled.
self.join_handle = Some(join_handle);

let join_handle = self.join_handle.as_mut().unwrap();
message = join_handle.await_completion().await;
self.join_handle = None;
message
}
}

struct MyJoinHandle {
handle: JoinHandle<MyMessage>,
}

impl MyJoinHandle {
fn new(message: MyMessage) -> Self {
let handle = tokio::task::spawn(async move {
process_message(&message).await;
message
});
Self { handle }
}

async fn await_completion(&mut self) -> MyMessage {
// A simple `self.handle.await` results in "cannot move out of
// `self.handle` which is behind a mutable reference". Need to
// use `&mut self.handle` explicitly to guide the type checker.
let handle = &mut self.handle;
// Can also return an error rather than expecting.
handle.await.expect("task panicked")

// After this method completes, the handle should never be
// polled again. If it is polled again, then it will panic.
// That is managed in `MyHandle::next` by setting `join_handle`
// to None immediately after returning from this method.
}
}

This approach works well, but has some downsides:

  1. There’s extra ceremony involved with carefully setting up the join handle. This is easy to get wrong. (Is there an abstraction we can write to make this easier?)

  2. If cancellation is actually desired in some situations, then the task has to be manually cancelled (either via aborts or by using an explicit cancellation channel).

  3. tokio::task::spawn requires a 'static bound, so it can’t borrow data from self or elsewhere. Whether this is okay, an inconvenience, or a deal-breaker, depends on the situation[12].

Use alternate MPSC channel modes

Tokio provides bounded MPSC channels. While bounded channels are a good default since they add backpressure to a system, they additionally come with the issue that their send method is asynchronous (it blocks if the channel is full).

In several real-world cases, it has been observed that the only source of asynchronicity in part of a system is the use of bounded MPSC channels. Removing this source of asynchronicity can make the design of part of a system much simpler.

Consider using these alternative, synchronous channel modes:

  1. try_send on bounded channels: A bounded channel’s Sender provides a synchronous try_send method, which returns an error if the channel is full. This method can be used to externalize backpressure to clients, e.g. by returning an HTTP 429 Too Many Requests error if the channel is full.

  2. Watch channels: If you only care about the last value sent to a channel, consider using a Tokio watch channel. A watch channel is a single-producer, multi-consumer channel which only stores one value and discards the current value on receiving a new one, eliminating backpressure concerns.

  3. Unbounded channels: A final option that is not recommended, but still available, is to use unbounded MPSC channels. Unbounded channels do not have a capacity limit, and their send method is synchronous. Using unbounded channels has a significant downside, namely the lack of backpressure. Among other things, this can result in memory consumption blowing up if the unbounded channel isn’t emptied. How this concern trades off against cancel-safety issues is a case-by-case decision.

For a practical case study, see [case_study_wicketd].

Case study: wicketd’s installinator progress tracker

In Omicron, the service running on the technician port is called wicketd. This service manages recovery and offline updates, and part of that management is recording progress reports sent by another component: the installinator [rfd345].

Previously, the report tracker’s report_progress method used bounded channels to send reports. This had several consequences:

  • The code that sent reports had to be asynchronous.

  • Since this code used a mutex that was held across an await point, that mutex had to be a Tokio mutex rather than std::sync::Mutex.

  • Other code that used the mutex also had to be asynchronous.

The report_progress method wasn’t cancel-safe, since an aborted report could lead to the update be stuck in an Invalid state. (This is exactly the sort of issue with Tokio mutexes that resulted in the recommendation to not use them).

The solution

In Omicron PR #3950 we switched to using a watch channel. We used the fact that progress reports were cumulative; only the last progress was interesting to the tracker.

This had ripple effects, all positive:

Previously…​

In Omicron #3579, we first tried a couple of attempts to make this code cancel-safe. However, both of the attempts had subtle flaws related to cancellation. As a result, we settled on switching to an unbounded channel.

While the benefit of cancel safety outweighed potential backpressure issues at the time, we later realized that a watch channel would have the same benefits without any backpressure-related issues.

An alternative would have been to rewrite this code into a message-passing style, but this was a more expedient way to achieve cancel safety.

Perform cleanup separately

Consider a method on a database connection pool that executes a future within the context of a database transaction:

use std::future::Future;

struct TransactionContext {
// ...
}

struct ConnectionPool {
// ...
}

impl ConnectionPool {
fn execute_transaction<F, Fut>(future_fn: F) -> Result<T, E>
where
F: FnOnce(TransactionContext) -> Fut,
Fut: Future<Output = Result<R, E>>,
{
let cx: TransactionContext = /* ... */;
// ... begin transaction

match future_fn(cx).await {
Ok(value) => {
// ... commit transaction
Ok(value)
}
Err(error) => {
// ... rollback transaction
Err(error)
}
}
}
}

If execute_transaction is cancelled in the middle of being run, the database connection represented by the transaction context might be left in an inconsistent state. For example, it might be in a state where a transaction has begun but not ended.

For cancel correctness, it is important that ConnectionPool perform cleanup on the connection at the time it is returned to the pool, or at the time a new connection is allocated. For example, if the connection is in the middle of a transaction, it should be rolled back.

Marking APIs as cancel-unsafe

Many async functions are likely to be cancel-unsafe, with no easy way to change them. How can APIs that aren’t cancel-safe be designated, so consumers use them with care? This can be done through a combination of naming and documentation.

Names and function signatures

Cancel-unsafe APIs should have names and function signatures where it "feels like" using them in select! statements is wrong. Getting these semiotics right is a matter of experience and judgment, but here are some general guidelines:

  • Don’t name cancel-unsafe methods next() or recv(). It is natural to use these sorts of methods in select! loops.

  • Don’t name methods reserve() or acquire() unless they’re mostly cancel-safe. This pattern-matches against the Tokio methods that are similar.

  • Use names that indicate an irrevocable action that shouldn’t be repeated. For example, a method named self.http_post_data(), by referring to an HTTP POST, clearly indicates an action that shouldn’t be recreated in a select! loop.

  • Use the type system if possible. Consider a method like fn execute(self), where self isn’t cloneable. It is not going to be possible to use it in a select! loop. Instead, users must create the future returned by the method outside of the select! loop.

    • Note that a method like execute(self) can’t be used in a select! loop, but it can can still be called once and cancelled (e.g. with a timeout). Often this is okay since the entire operation is aborted in that case.

Documentation

Documenting the cancel safety of your APIs to a reasonable degree helps users understand what can go wrong if a future is cancelled. It’s recommended that each method have a "cancel safety" section associated with it.

For an example, see the documentation for AsyncWriteExt::write_all_buf.

Documenting cancel safety for every single async API can be too much of a lift, but it’s worth doing this at least for the most commonly used methods.

Cancel safety while consuming async code

Pay attention to names, function signatures and documentation

On the flip side of marking APIs as cancel-unsafe, it’s important to pay attention to what library authors are trying to indicate. For example, if a method performs an HTTP POST or is called execute(), it likely can’t be used in a select! loop.

"Read the documentation" is an unsatisfying answer for many reasons, but in many cases it’s the best we have.

Resume futures in select! loops rather than recreating them

If you’re using a cancel-unsafe future in a select! loop, you cannot recreate the future in each loop iteration. Instead, create the future once, outside of the select loop. Then, select! over a &mut reference to the future. In other words, resume futures rather than recreating them from scratch.

Here’s an example that performs an HTTP request with periodic ticks.

Code sample
use hyper::{client::HttpConnector, Body, Request, Response};
use std::time::Duration;

struct MyHttpClient {
client: hyper::Client<HttpConnector>,

}

impl MyHttpClient {
async fn request_with_ticks(
&self,
req: Request<Body>,
) -> Result<Response<Body>, hyper::Error> {
let mut interval = tokio::time::interval(Duration::from_millis(100));
let mut response_fut = std::pin::pin!(self.client.request(req));

loop {
tokio::select! {
response = &mut response_fut => {
// The initial response has arrived. Exit the loop
// and return the response.
return response;
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
}
}

For why std::pin::pin! is necessary, see the Tokio documentation[13].

Example: spawn_send_task redux

In [spawn_send_task_2] we saw an example of how to use tokio::mpsc::Sender::reserve to solve data loss issues with Sender::send. However, we noted that there were still potential fairness issues, since dropping the future returned by Sender::reserve would lead to losing one’s place in line.

In this example, we further modify spawn_send_task so that we resume futures rather than recreating them. This means that we no longer lose our place in line if the future is cancelled.

We cannot use std::pin::pin! in this example because we’re replacing the value of reserve_fut in the middle of the loop. Instead, we call Box::pin, which moves the future returned by sender.reserve() to the heap and then pins it.

Here’s the example (Rust playground):

Code sample
fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];

tokio::task::spawn(async move {
// interval.tick() completes execution at 0s, 1s, 2s...
let mut interval = tokio::time::interval(Duration::from_secs(1));

let mut strings = strings.into_iter();
let mut reserve_fut = if strings.len() > 0 {
// Create a future to obtain a permit to send the first value.
Box::pin(sender.reserve())
} else {
// No values to send -- exit the loop.
return;
};

loop {
tokio::select! {
res = &mut reserve_fut => {
if res.is_err() {
println!("receiver closed");
break;
}
match res {
Ok(permit) => {
permit.send(strings.next().unwrap());
reserve_fut = if strings.len() > 0 {
// Create a future to send the next value.
Box::pin(sender.reserve())
} else {
// All values sent -- exit the loop.
break;
}
}
Err(_) => {
println!("receiver closed");
break;
}
}
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
});
}

This code is a bit convoluted to write, and for complex enough operations it is sometimes not possible to express code in this manner at all.

Exercise: Try rewriting spawn_send_task to create Box::pin(sender.send(value)) futures rather than Box::pin(sender.reserve()).

  1. What are the upsides and downsides of using send()? Is data loss still possible with send() given the problem statement that spawn_send_task solves?

  2. The reserve() function is still more general than send(). Can you modify the problem statement to create a situation which would lead to data loss with send(), even if resumed rather than recreated, but not with reserve()? (Hint: Construct a scenario that’s analogous to write_all_buf.)

Use then_try adapters

As discussed in [try_joins], use of try_ adapters can be problematic because they lead to early cancellations of futures that should not be cancelled.

The example in [try_joins] can be made correct by being rewritten as:

let (result1, result2) = tokio::join!(write1.flush(), write2.flush());
result1?;
result2?;

tokio::join! is unaware of the Result type, and so it doesn’t do early cancellations. However, this is a bit unwieldy to write, especially for more complicated try-joins and other adapters.

To solve this issue, Oxide’s cancel-safe-futures library introduces then_try adapters that are aware of the Result type, but don’t perform early cancellations.

For example, the join_then_try macro first runs all the futures passed in to completion, then errors out if any of them failed[14]

For example:

let result = cancel_safe_futures::join_then_try!(
write1.flush(),
write2.flush(),
);
result?;

This does exactly the same thing as the tokio::join! example above.

Note
The cancel-safe-futures library is incomplete. then_try adapters are added on an as-needed basis. If you need an adapter that isn’t present, please feel free to add more and send a PR!

Case study: sled-agent zone deletion

The Oxide sled-agent has code to shut down and uninstall locked zones. Previously, this code used the try_for_each_concurrent adapter to delete zones in parallel.

This code was subtly incorrect, because if one zone failed to be cancelled, it would cause all other zone deletion operations to be cancelled, potentially leaving zones (which are external to the process) in an inconsistent state.

In Omicron PR #3758, the code was changed to use the for_each_concurrent_then_try adapter provided by cancel-safe-futures. With this adapter in use, the code will now attempt to progress all zone deletions as far as possible.

Use background tasks

As described in [spawn_tasks], a task is owned by the executor rather than the code that created it. The strategy mentioned there for library code can also be used in application code to move cancel-unsafe futures to background tasks.

For an example of how using background tasks addressed practical issues with cancel safety, see [case_study_dropshot].

Avoid task aborts

As outlined in [task_aborts], it is very difficult to write code that is cancel-safe at any arbitrary await point. Avoid task aborts, and instead prefer explicit cancellation channels wherever necessary.

Case study: Dropshot and client disconnects

Dropshot is the REST HTTP server used throughout Omicron. Dropshot is written in an asynchronous style, with users registering async request handlers and Dropshot responsible for executing them.

Consider what happens if a client initiates an HTTP request, then disconnects in the middle of the request. Previously, on disconnection, the future would be cancelled. Since a lot of the Omicron code wasn’t written with cancel safety in mind (for example, by using Tokio mutexes), this resulted in many potential cancel-safety issues. Even worse, these issues could be triggered by client-initiated events.

To handle cancel-unsafe Omicron code in a systemic fashion, we changed Dropshot in PRs #701 and #702 to add a mode where:

  1. Each request is spawned on its own task.

  2. If a client disconnects, the corresponding task is no longer aborted. Instead, it is kept running in the background.

  3. When the server shuts down, any pending tasks are run to completion.

After verifying that the new mode didn’t break things, we made it the default for Dropshot. While cancellation from (say) select! loops is still a concern, client-initiated cancellations are no longer possible.

Future directions

Working with upstreams to clarify cancel safety

This document has hopefully succeeded at its task of showing the perils of async cancellation. As an action item, we should work with upstreams to clarify the cancel safety of their APIs, fixing issues as necessary.

Testing cancel safety

While it is possible to test the cancel safety of async code in an ad-hoc fashion, there are no current mechanisms the author knows about for performing systematic testing. It would be good to design and build something along those lines.

Footnotes
  • 1

    For example, Go requires that you pass around an explicit Context that carries a cancellation token. Cancellations can only happen when the cancellation token is checked, and not at await points generally. For a Rust equivalent, see the section on explicit cancellations in this document.

    View
  • 2

    "Notionally" because in reality the sleep future does register itself with the Tokio executor on creation, and has to clean itself up when the future is dropped.

    View
  • 3

    This isn’t entirely true—some Rust code is OK being cancelled at any point. For example, an operation that is entirely self-contained and read-only can be cancelled at any point. An operation that downloads a file from a server can resume from where it left off, or restart from the beginning. An operation that’s idempotent can be retried.

    View
  • 4

    The specific case of reserve/send is one that shows up throughout software systems. This is a more general version of reserve, where a cancel-safe operation is followed by a cancel-unsafe one.

    View
  • 5

    As an action item, we should work with futures upstream to clarify the cancel safety of all futures adapters, including these ones.

    View
  • 6

    The future returned from recv holds a mutable reference to the InstanceSerialConsoleHelper. This makes it so that InstanceSerialConsoleHelper::recv can’t be called while an InstanceSerialConsoleMessage is alive. This is correct behavior.

    View
  • 7

    Holding on to a mutable reference is necessary in this case to call start_send and poll_flush on the sink. There are other cases in which a mutable reference to the parent isn’t really necessary, but might still be desirable to ensure that other methods on the parent can’t be called in the meantime. In those cases, use a PhantomData<&'a mut ()>.

    View
  • 8

    Specifically, std::io::ErrorKind::Interrupted errors must be retried. Many hand-rolled implementations of write_all do not do that.

    View
  • 9

    Why define Buf rather than just accepting Cursor? It’s to allow alternative implementations. For example, bytes::Bytes, which has a different design from Cursor, also implements the Buf trait.

    View
  • 10

    The author’s understanding is that implementing lock poisoning with Tokio mutexes is very difficult to do, due to particulars of Rust.

    View
  • 11

    This is true with a big asterisk: panic safety. If the critical section panics, the mutex is silently unlocked. This isn’t true for std::sync::Mutex which poisons the lock on panicking.

    View
  • 12

    An alternative is to use the async-scoped crate. While some callers can use the safe scope_and_block method, many other callers would need to use unsafe code. The unsafety manifests in a way where providing a safe abstraction over it can be impossible.

    View
  • 13

    The Tokio documentation uses tokio::pin!, which is an older and less ergonomic equivalent to std::pin::pin!. The latter was stabilized in Rust 1.68 and is recommended.

    View
  • 14

    What if more than one future fails? In that case, join_then_try returns the error from the first future that failed.

    View