Introduction
While developing the control plane, we’ve found that one of the bigger practical issues has been async cancel correctness: the relatively unpredictable way an async Rust system can behave if futures are dropped [rfd397]. Cancellation is an example of "spooky action at a distance", and having to think about it imposes a significant cognitive burden on authors and reviewers of asynchronous code. Both of these are quite uncharacteristic of Rust.
This document aims to be a practical guide to dealing with cancel safety in Rust, and covers four main topics:
What is a general overview of async cancellation?
What are the main sources of cancellation in async Rust?
As an author of asynchronous library code, how do you write cancel-safe code? How do you indicate to consumers that some APIs are cancel-unsafe?
As an application that consumes asynchronous code, how do you deal with cancel-unsafe futures?
The intended audience for this document is anyone who writes or reviews asynchronous Rust code, particularly in Omicron but also more generally in any context.
Manually implemented futures vs async blocks
A Rust future is, at its core, a state machine for deferred computation. Rust permits futures to be written in two different ways.
Manually implemented futures are values for which
std::future::Future
is implemented by hand. For example, this is a manually implemented future that immediately returns a value.use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct MyFuture {
value: Option<String>,
}
impl Future for MyFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
return Poll::Ready(self.value.take().expect("future polled after completion"));
}
}Async blocks are sections of code wrapped in
async { }
, or inasync fn
functions. For example, this is an async block that immediately returns a value:async {
let output: String = "future-output".to_owned();
output
}
While both kinds of futures are susceptible to cancel safety issues, async blocks tend to hide some of the mechanics of how futures work. This means that in practice, a lot of the lurking surprises of async cancellation lie in async blocks, not in manually implemented futures.
Overview of cancellation
A future, on being dropped, is cancelled. This means that the state machine corresponding to the future no longer continues execution past the point at which it is dropped. (A future can even be dropped before it is polled for the first time, in which case it never gets to execute.)
This is easiest to see with manually implemented futures. It’s like calling the
poll
method a few times, having it return Poll::Pending
, and then dropping
the future before it gets a chance to return Poll::Ready
.
On being cancelled, any values owned by the future are dropped. This can include other futures owned by the dropped future. In other words, cancellation propagates from parent futures to child futures.
If any values owned by the future have a
Drop
implementation, it is
called. For example, tokio::sync::SemaphorePermit
(returned by Semaphore::acquire
) has a Drop
implementation. In the below code:
let semaphore = Semaphore::new(2);
async {
let permit = semaphore.acquire().await.unwrap();
println!("permit acquired");
tokio::time::sleep(Duration::from_secs(5)).await;
}
If the async block is dropped before the permit is acquired, the future returned by
semaphore.acquire()
is dropped.If the async block is dropped after the permit is acquired, during the sleep,
permit
is dropped and itsDrop
implementation is called.
In either case, since the async block doesn’t take ownership of semaphore
(it isn’t async move
), semaphore
isn’t dropped.
Cancellation can only happen at await points, due to the cooperative multitasking model that futures use. For example, consider this Rust code:
use std::time::Duration;
async fn my_func() {
print!("1");
print!(", 2");
tokio::time::sleep(Duration::from_secs(5)).await;
print!(", 3");
print!(", 4");
}
This can print any one of:
1, 2, 3, 4
, if the future returned bymy_func()
runs to completion.1, 2
, ifmy_func()
runs up to the await point but is cancelled after that. (For example,tokio::time::timeout(Duration::from_secs(1), my_func())
);Nothing, if
my_func()
is cancelled before ever being awaited.
my_func()
cannot print 1
or 1, 2, 3
, or any other kind of intermediate value.
This is easiest to see with manually implemented futures as well. You cannot drop a future while its poll
method is being called, but you can drop it in between poll
calls.
Cancellation can only call synchronous code. This is due to the Drop
function not being async. There are plans to add AsyncDrop
, but as of 2023-06 it’s still a long way away from being done.
Cancellation is often a feature, not a bug. Many cancellations are wanted and desirable. For example, if you’re waiting for a tokio::sync::SemaphorePermit
, and you drop that future, you lose your place in line. This is good because the system doesn’t waste resources waiting on an acquirer that no longer exists.
As an alternative, more explicit cancellation models have their own ways of dealing with this[1].
Cancel safety
We say that a future is cancel-safe if it can be dropped before being completed, without any implications for the rest of the system.
As an example, Tokio’s MPSC channels have an asynchronous method to receive a
message, called
Receiver::recv
.
This method is cancel-safe: it either completes and resolves to a message, or is
dropped and no messages are received. Receiver::recv
does not have an intermediate state
where a message has been retrieved from the queue but has not been delivered to
the caller.
Notably, the cancel safety of Receiver::recv
is a matter of careful API design
and implementation. It is certainly possible to write a version of
Receiver::recv
that holds on to a message across an await point, causing
potential data loss. But that’s not how the current version of recv
works.
That it will continue to work this way is guaranteed in the API documentation
(and presumably Tokio’s tests).
In our definition, cancel safety is a local property of a future.
Cancel correctness
We define cancel correctness as a global property: the ability of a system to behave correctly, considering that futures within it might be cancelled.
Cancel correctness is violated if all three of the following conditions are met:
The system has a cancel-unsafe future somewhere in it.
This future is cancelled.
Cancelling the future violates some property of the system.
Breaking these down one by one:
The system has a cancel-unsafe future somewhere in it. A system that consists entirely of cancel-safe futures cannot have cancel correctness bugs. [making_cancel_safe] covers ways to turn cancel-unsafe futures into cancel-safe ones.
This future is cancelled. Cancel correctness issues can only occur if futures are actually cancelled. This might seem like a trivial point at first! But it’s worth keeping in mind as there are usually ways to arrange for futures to not be cancelled.
Cancelling the future violates some property of the system. Properties that need to be upheld for correctness include, but are not limited to:
No data loss. If a future that reads from or writes to a stream is cancelled, data currently in flight must not be lost. Case study: the Oxide serial console. If necessary, there should be a way to create new futures that resume reading or writing data at the point at which it was cancelled.
Invariants upheld. If a future is cancelled, any shared state it was operating on must not be left in an invalid state.
In Rust code, it is generally okay to temporarily have invalid state while holding a
&mut
reference to it, so long as the state is made valid by the time the&mut
reference is released. This is a big part of Rust’s claims about "fearless concurrency", at least when it comes to non-async code. The analogue of this in async code is that in cancel-safe futures, shared state must not be invalid across await points. Case study: wicketd’s installinator progress tracker.TipOne way to look at this issue is via a future’s "cancellation blast radius": the other parts of a system that are affected when a future is cancelled.
For example,
tokio::time::sleep
is entirely self-contained. Cancelling it has no impact on the rest of the system.A cancellation in code that has some internal mutable state but touches shared state in a read-only fashion is also okay. That’s because even if that internal state becomes invalid, it is immediately discarded.
Cancellations that make shared mutable state invalid cause problems. This occurs most commonly with Tokio’s mutexes, which is why this guide recommends not using them.
Cancellations that affect state outside of the process are especially fraught. For an example, see this case study.
Cleanup. If a system sets up futures that perform some cleanup at the end of execution, the system must be robust to some or all of those futures being cancelled. In the case of a database connection pool, this can mean an additional check that the connection is in a pristine state at the beginning of transactions, along with cleanup at the end of transactions. Case study:
async_bb8_diesel
.Fairness. Some futures (e.g.
tokio::sync::Mutex::lock
), when dropped, make you "lose your place in line". Systems that cancel such futures need to be robust against the lack of fairness.Fairness tends to be less of a concern than other issues, especially in uncontended queues. Futures that are cancel-safe except for fairness can be described as mostly cancel-safe.
Which properties of a system must be upheld can also change over time. For example, consider a curl
equivalent that fetches an HTTP URL and writes it to disk. If the operation to write data to disk is modeled as a future, canceling it would likely lead to correctness bugs. However, if the user presses Ctrl-C to terminate the program, canceling this future becomes okay.
Sources of cancellation
While a future is cancelled whenever it’s dropped, some patterns of code cause futures to be cancelled frequently. This section contains a list of common patterns that involve future cancellation. (This isn’t meant to be an exhaustive list.)
The select!
macro
Perhaps the most common source of async cancellation bugs is tokio::select!
, which polls a set of futures concurrently until the first one completes.
Consider this example:
use std::time::Duration;
let (sender, receiver) = tokio::sync::mpsc::channel(8);
spawn_send_task(sender); // give sender off to a task to send values from
let sleep = tokio::time::sleep(Duration::from_secs(5));
tokio::select! {
value = receiver.recv() => {
// ... process value
}
_ = sleep => {
println!("sleep completed");
}
}
If the
sleep
completes beforereceiver.recv()
returns a value, then thereceiver.recv()
future is cancelled. That is okay becauseReceiver::recv
is documented to be cancel safe.If
receiver.recv()
completes before thesleep
does, then thesleep
future is cancelled. Thesleep
future is notionally stateless[2], and cancelling it has no effect on the rest of the system.
select!
loops
While select!
is sometimes used as a one-off, in practice most uses of
select!
involve some sort of looping. Following on from the above example,
consider an example spawn_send_task
function:
fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];
tokio::task::spawn(async move {
for value in strings {
sender.send(value).await;
}
});
}
This looks like idiomatic Rust code. There’s nothing wrong with it. But
let’s say that in practice we’ve been noticing slowdowns sending values to the
channel, and we’d like to add some logging to see why it’s slow. Our first
attempt is to put a simple select!
around the sender.send(value)
future,
logging at periodic intervals:
fn spawn_send_task(sender: tokio::sync::mpsc::Sender<usize>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];
tokio::task::spawn(async move {
// interval.tick() completes execution at 0s, 1s, 2s...
let mut interval = tokio::time::interval(Duration::from_secs(1));
for value in strings {
tokio::select! {
res = sender.send(value) => {
if res.is_err() {
println!("receiver closed");
break;
}
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
});
}
It turns out that the above code is incorrect, in a somewhat non-obvious fashion. That is because
sender.send(value)
is not cancel-safe.
Why is sender.send
not cancel-safe? Consider what happens if interval.tick()
completes before sender.send(value)
does. In that case, value
will not be sent. Per Rust’s single-ownership model, the value will be dropped—lost in the ether.
Sender::send
was only documented to be cancel-unsafe after this RFD was written and the author submitted a PR to the Tokio project, in Tokio 1.33. Before that, you had to infer the lack of cancel safety from the type signature: since send
owns the value, if the future is dropped before it completes, the value is lost.There is a convenient way to fix this issue, which is covered in [spawn_send_task_2].
Timeouts
Another common source of cancellation issues is timeouts. Consider the following
code that uses
tokio::time::timeout
:
use std::time::Duration;
let (sender, receiver) = tokio::sync::mpsc::channel();
tokio::time::timeout(Duration::from_secs(5), sender.send(20)).await;
This is equivalent to running select!
over a sender.send()
and a
tokio::time::sleep
, and just like in that case the value is lost if the
timeout occurs. The solution covered in [complex_ops] also addresses this
issue.
In general, any operations that are in some sense equivalent to select!
are
going to run into the same issues as select!
.
Try-joins
The Rust async ecosystem has several implementations of "try-joins":
Tokio has a
try_join!
macro.The futures crate also has a number of
try_
methods defined onTryStreamExt
, includingtry_for_each
andtry_collect
.
These implementations await multiple fallible futures, completing when either
all futures return Ok
or any future returns Err
. The common property shared
by these adapters is that they all perform early cancellations: as soon as a
future fails, all remaining futures are cancelled.
This is often the right thing to do. This code runs two HTTP GET queries, and if one of them fails, it cancels the other one (Rust playground).
use hyper::{client::HttpConnector, Uri}; // 0.14.27
struct Users { /* ... */ }
struct Posts { /* ... */ }
async fn fetch_users_and_posts(,
client: &hyper::Client<HttpConnector>,
) -> hyper::Result<(Users, Posts)> {
let users_fut = client.get(Uri::from_static("https://my.domain/api/users"));
let posts_fut = client.get(Uri::from_static("https://my.domain/api/posts"));
let (users_response, posts_response) = tokio::try_join!(
users_fut,
posts_fut
)?;
// -- process users_response and posts_response here
let users = Users { /* ... */ };
let posts = Posts { /* ... */ };
Ok((users, posts))
}
But for operations that have side effects, try_join
can lead to bugs. For example, consider this code which flushes two
tokio::io::AsyncWrite
implementations:
use tokio::io::{AsyncWrite, AsyncWriteExt};
let write1 = /* AsyncWrite impl, eg tokio::fs::File */;
let write2 = /* another AsyncWrite impl */;
let result = tokio::try_join!(write1.flush(), write2.flush());
result?;
In this case, if either flush fails, the other flush is cancelled immediately. This is incorrect most of the time: we’d like both flush operations to make as much progress as possible.
A solution is covered in [then_try_adapters].
Task aborts
Unlike a future which has no existence outside of its owner, a task is owned by
the executor (typically Tokio). Tasks are spawned using the
tokio::task::spawn
method. This method returns a
JoinHandle
,
which can be used to
abort
the spawned task. The task is cancelled at the next await point. Any running
futures within it are cancelled as well.
For example, consider this task:
let join_handle = tokio::task::spawn(async move {
do_long_running_operation().await;
});
// some time later...
join_handle.abort();
With the abort()
, the future returned by do_long_running_operation()
can be cancelled at any await point. It is unlikely that arbitrary async Rust code is resilient to cancellations anywhere during control flow[3].
This makes the cancellation problem global rather than something scoped to
select!
and other similar operations. For this reason, the author believes
that task aborts should be avoided entirely. In particular, in most cases task
aborts should not be treated as part of normal control flow, just like panics
aren’t part of normal control flow.
For a practical solution to cancelling long-running tasks, see [explicit_cancellation].
Unlike with individual futures, dropping a Tokio task’s JoinHandle
does not cancel the corresponding task. While this is an implementation choice—some other executors choose to cancel the task on drop—it is one that is only afforded to tasks because they’re owned by the executor. Individual futures must be cancelled on being dropped.
Runtime shutdowns
If a Tokio runtime is shut down, all tasks running within it are cancelled at the next await point. While that is expected behavior, it can lead to some unexpected consequences.
With common patterns like #[tokio::main]
, runtime shutdowns happen at process
exit. However, they can also happen at other times (e.g. within tests, or if a
runtime is manually created). In particular, a panic can cause an unwind and a
runtime shutdown outside of typical conditions.
This has some consequences:
Let’s say you create a task, and then immediately drop its
JoinHandle
. You might expect that the task will run forever. That is not true: if the runtime is shut down due to a panic, then the task is cancelled. Code that interacts with this task should handle the case where the task has been cancelled.If there are two tasks A and B running on the same runtime, and the runtime is shut down, then the tasks are cancelled in arbitrary order. This can lead to nondeterministic panics. See Dropshot issue #709 for an example.
Cancel safety while writing async APIs
If you’re writing async APIs or returning futures, you need to think about how your users are going to approach cancel safety. The general strategy is using a combination of two approaches:
Making code cancel-safe as much as reasonably possible.
Clearly marking the rest as not cancel-safe.
Making code cancel-safe
There is sadly no silver bullet for making code cancel-safe. This section lists out a number of designs that authors can use and reviewers can look for, and example case studies. The hope is that the vast majority of cancel-safety issues can be addressed via these patterns.
Split up complex operations
In [select_loops] we observed that Tokio’s mpsc::Sender::send
is not cancel-safe, and can lead to data loss.
There is a solution to this, which we can figure out from looking at the source code for Sender::send
. Sender
also exposes a reserve
method, which returns a permit that can be used to send a value. reserve
is mostly cancel-safe: dropping it makes you lose your place in line, but is otherwise okay.
Example: A corrected spawn_send_task
The [select_loops] section had an example spawn_send_task
function which was
identified as buggy. By using Sender::reserve
, this code can be made correct.
This version of the code is a bit more complex. It reads as:
Code sample
fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];
tokio::task::spawn(async move {
// interval.tick() completes execution at 0s, 1s, 2s...
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut strings = strings.into_iter();
// std::vec::IntoIter implements ExactSizeIterator, which has a len()
// method which tells us if the iterator is non-empty. If your
// iterator doesn't implement ExactSizeIterator, you can instead
// use iterator.peekable(), and peek() to see if any values remain.
while strings.len() > 0 {
tokio::select! {
permit = sender.reserve() => {
match permit {
Ok(permit) => {
let value = strings.next().unwrap();
// send() is synchronous because the async part
// (waiting for a slot to be available) has
// already been completed by the time the permit is
// acquired.
permit.send(value);
}
Err(_) => {
println!("receiver closed");
break;
}
}
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
});
}
There is also a way to solve this issue that doesn’t make you lose your place in line. This is covered in [spawn_send_task_3].
Reserve and related patterns
mpsc::Sender::reserve
is an example of a common pattern, where the general
sequence of async operations is:
An initial async operation that is at least mostly cancel-safe.
A synchronous operation and/or a followup async operation, that isn’t cancel-safe[4].
Some convenience methods like Sender::send
combine both steps. These APIs can be
easier to use sometimes. But providing these kinds of convenience APIs can
lead to users accidentally introducing bugs, as seen in [select_loops].
If you’re writing such APIs, consider not providing convenience
methods like Sender::send
, and instead mentioning cancel safety in the
documentation. While it can be a bit surprising at first (a sender that can’t
just send values?), not providing such convenience methods makes it easier to
train users into writing correct code. Whether this is a good idea or not
depends on the situation.
Now let’s look at a case study which shows how to split up complex operations in practice.
Case study: Oxide serial console
The Oxide control plane provides a serial console, with an interface to web and command-line clients. Nexus has a function called proxy_instance_serial_ws
, which proxies data back and forth between Propolis and client. In omicron#3356, it was
discovered that proxy_instance_serial_ws
used tokio::select!
in a cancel-unsafe manner.
Specifically, this function selected against four different futures:
A
StreamExt::next
future.A future returned from a method called
InstanceSerialConsoleHelper::recv
.An instance of
SinkExt::send
.InstanceSerialConsoleHelper::send
, which was a wrapper aroundSinkExt::send
.
Out of these four futures, only future 1 was cancel-safe.
Future 2 was not cancel-safe because it read a value off of a stream, then processed it using further awaits in the middle. If this future was cancelled in the middle, the value could have been read off the stream but not processed, and lost.
Future 3 was not cancel-safe because
SinkExt::send
is not cancel-safe, for the same reason thatmpsc::sender::Send
is not cancel-safe.Future 4 was not cancel-safe because it was a wrapper over
SinkExt::send
.
StreamExt::next
is not documented to be cancel-safe, and its cancel safety depends on how the underlying Stream
behaves. However, a Stream
that couldn’t handle interruptions while generating its values would clearly be buggy. After all, the entire point of a Stream
is to iterate over values in situations where the next item isn’t immediately available[5].
With
SinkExt::send
,
no matter how well-written the underlying Sink
is, the operation is not
cancel-safe. In other words, the cancel unsafety is inherent to the send
operation.
These issues were tackled separately, by splitting up complex operations in two different ways.
Making InstanceSerialConsoleHelper::recv
cancel-safe
Description, splitting up a complex operation
InstanceSerialConsoleHelper::recv
is a method that did two things:
The method first retrieved the next message from an incoming
Stream
, usingStreamExt::next
.After that,
recv
processed the incoming message.Most of the time, the message would simply be returned to the user.
However, if the message was an in-band control instruction to migrate to another server,
recv
would actually perform that reconnection—an operation with two more await points. After that, the message would be returned to the user.
Step 1 is cancel-safe, since it is just StreamExt::next
. However, the migration in step 2 is
not cancel-safe, since cancelling the recv
future in the middle would mean that an in-progress migration might not be completed and the message might be lost. Here’s the way we chose to solve the cancel-safety issue:
Split
recv
into steps 1 and 2. Perform step 1 inrecv()
, making it return a future rather than the message.Make the future returned from
recv
, calledInstanceSerialConsoleMessage
, perform step 2 and return the message[6].Document that the future returned by step 1 is not cancel-safe and must be awaited for system correctness. (As an improvement, we can check for this explicitly by setting a flag in
InstanceSerialConsoleHelper
, but the natural flow of using messages leads to correctness so it hasn’t been necessary.)
Operating on the helper then becomes:
let mut helper: InstanceSerialConsoleHelper = /* ... */;
tokio::select! {
res = helper.recv() => {
let message = res?.await?;
// operate on message
}
// ... other select branches
}
What if the parent function is cancelled?
As covered in [overview], cancellation propagates from parent futures to child futures. A natural question to ask is, what if the parent function is cancelled while at the await point in let message = res?.await?;
?
The answer to that is that the parent function also owns and is responsible for the client and server streams. If the parent function is cancelled, the stream is going to be closed anyway, so any state that becomes invalid is on its way to being destroyed.
Alternative solutions
These solutions aim to make all of recv
cancel-safe, rather than splitting recv
into cancel-safe and cancel-unsafe parts. This can be done through careful state management.
One alternative is to spawn a background task to perform the migration. propolis#438 implements this approach.
Another alternative is to resume partial progress by storing in-progress migration messages on self
:
On receiving a migration message, store a copy of it in a field on
self
, say,self.migration_message
(anOption<T>
), before operating on it.After completing the migration, set
self.migration_message
toNone
.At the start of
recv()
, ifself.migration_message
isSome
, processself.migration_message
rather than reading a message from the incomingStream
.
Either of these would have been a fine way to solve this problem as well, but they weren’t chosen for domain-specific reasons.
Making SinkExt::send
cancel-safe
Description, using the reserve pattern
The SinkExt::send
method performs three operations in sequence:
Call the
Sink::poll_ready
method until it completes.Call the (synchronous)
Sink::start_send
method.After that succeeds, call the
Sink::poll_flush
method until it completes.
This isn’t cancel-safe for the same reason that mpsc::Sender::send
isn’t
cancel-safe: the value might not be sent and instead be lost. But also, a
reserve-based solution works just as well here as it does with
mpsc::Sender::send
!
The upstream futures crate doesn’t provide a reserve-pattern API, so we
wrote our own
(documentation).
This API returns a Permit
which indicates that step 1 above has been completed.
This Permit
is then used to send a message, performing steps 2 and 3.
use cancel_safe_futures::prelude::*;
let mut sink = /* ... */;
tokio::select! {
res = sink.reserve() => {
let permit = res?;
res.send(value).await?;
// operate on message
}
// ... other select branches
}
The Permit
holds on to a mutable reference to the sink, so the sink can’t be used for other purposes while the permit is active[7].
Making InstanceSerialConsoleHelper::send
cancel-safe
InstanceSerialConsoleHelper::send
was a wrapper around SinkExt::send
that didn’t do anything else. To address cancel-safety issues with this method, we removed it and instead made InstanceSerialConsoleHelper
implement Sink
. Then, the reserve
method implemented above can be used for this select branch as well.
Resume from partial progress
If an async function performs several asynchronous steps in succession, it can
often be made robust against cancellation issues in select!
loops by storing
and resuming partial progress.
The basic idea here is to store the fact that some progress has happened, either
internally in a field, or
externally via a
&mut
parameter.
Progress can then be resumed from the point at which the function is called again.
For an example showing how to store partial progress internally, see [serial_console_recv] (an alternative solution).
For an example showing how to externally track partial state, see [case_study_write_all_buf] below.
Case study: AsyncWriteExt::write_all_buf
For synchronous code, the standard library’s std::io::Write
has a write_all
method that attempts to write an entire buffer into a writer. This method works for simple cases, but since there’s no reporting of partial progress, it does not provide the ability to recover from errors—or even know how much has been written out. This means that many users have to hand-roll their own version of write_all
, which is easy to get wrong[8].
When this method is ported to async code, users not only have to worry about
errors, they also need to think about situations where a write operation is
interrupted by a different branch of a select!
firing. This issue was solved
in Tokio’s
AsyncWriteExt
through some clever API design.
First, a trait called bytes::Buf
was designed to enable recording partial progress. This trait is an abstract way to represent a read-only buffer of bytes, and has three important methods:
fn remaining(&self) → usize
: Reports the number of remaining bytes. (There’s also ahas_remaining(&self) → bool
method.)fn chunk(&self) → &[u8]
: Returns the next chunk of bytes.fn advance(&mut self, cnt: usize)
: Advances the buffer by a particular count of bytes.
Then, AsyncWriteExt::write_all_buf
accepts any &mut B where B: bytes::Buf
. It reports partial progress to the Buf
implementer: advance
is called on it with however many bytes were successfully written.
This means that code that calls write_all_buf
inside a select!
loop is correct:
async fn write(writer: &mut W, data: &[u8]) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
{
// Cursor<&[u8]> implements the bytes::Buf trait.
let mut cursor = Cursor::new(data);
while cursor.has_remaining() {
tokio::select! {
res = writer.write_all_buf(&mut cursor) => {
res?;
}
// ... some other branch
}
}
Ok(())
}
Use cooperative (explicit) cancellation
In [task_aborts], we saw that aborting Tokio tasks is problematic because a task could potentially be cancelled in the middle of a cancel-unsafe operation. However, it often is a domain requirement to cancel tasks. One way to solve this is by using cooperative, or explicit, cancellation channels.
The
cancel-safe-futures
library maintained by Oxide has a coop_cancel
module that implements cooperative cancellation with a "fan-in" model: many potential cancelers and one receiver.
See the coop_cancel
documentation for more, including examples.
Don’t use tokio::sync::Mutex
Tokio comes with a Mutex
that is specifically designed for use in asynchronous contexts. The official recommendation is to prefer std::sync::Mutex
if locks are not held across await points, and tokio::sync::Mutex
only if locks are held across awaits.
However, given that:
Almost all uses of mutexes are to temporarily violate code invariants within a critical section, restoring them by the end of the operation.
Mutexes are almost always shared between futures (otherwise why would a mutex be used at all), which means that the "cancellation blast radius" of a future holding a mutex extends to other futures.
std::sync::Mutex
has a notion of lock poisoning. Tokio’s mutexes don’t[10].
What this suggests is that if a future that is currently holding on to a mutex is cancelled, the state guarded by the mutex is likely invalid. This is such a big problem that this guide recommends avoiding tokio::sync::Mutex
to the greatest extent possible.
Alternatives to Tokio mutexes
The recommended alternative to a Tokio mutex is a message-passing design, as described in this Tokio tutorial. Rather than having futures access some common shared state, this scheme has a single manager task that has full (non-shared) ownership of this state. This task receives requests in serial order, via an MPSC channel, and sends responses over (typically) oneshot channels.
TODO: add note about RobustMutex
here.
A final option is to switch to std::sync::Mutex
, and not hold locks across
await points. Be aware, however, that the worker thread is fully occupied while
waiting for the lock. This is not a problem if the lock is uncontended and only
held for short periods of time.
If you really must use a Tokio mutex
It isn’t impossible to write correct code that manages Tokio mutexes, just very difficult. Some approaches you could take:
Declare that your code isn’t cancel-safe.
Ensure that invariants are always restored between await points. Having invalid state between await points is okay[11].
Allow state to become invalid, but clean it up at the start of every critical section (i.e. every time the lock is acquired).
All of these options require careful analysis, which isn’t true for any of the recommended suggestions.
Spawn background tasks to perform cancel-unsafe operations
Unlike a future which has no existence outside of its owner, a task is owned by the executor (typically Tokio). This means that in some situations, a background task can be used to create a cancel-safe interface around a cancel-unsafe future.
For example, consider a next
method that does something cancel-unsafe:
struct MyHandler {
inner: /* some stream of data */,
}
impl MyHandler {
// This is cancel-unsafe because message will be dropped if
// the future is cancelled in the middle of the operation.
async fn next(&mut self) -> MyMessage {
let message = self.inner.next().await;
process_message(&message).await;
message
}
}
As described in [marking_unsafe], a method called next
must be cancel-safe. One way to achieve that is to spin up a background task to process the message:
Code sample
struct MyHandler {
inner: /* some stream of data */
join_handle: Option<MyJoinHandle>,
}
impl MyHandler {
async fn next(&mut self) -> MyMessage {
// If an existing background task exists, wait for
// that to complete.
if let Some(join_handle) = &mut self.join_handle {
let message = join_handle.await_completion().await;
self.join_handle = None;
return message;
}
let message = self.inner.next().await;
let join_handle = MyJoinHandle::new(message);
// Set self.join_handle to Some before the next await point.
// This enables resumption if this future gets cancelled.
self.join_handle = Some(join_handle);
let join_handle = self.join_handle.as_mut().unwrap();
message = join_handle.await_completion().await;
self.join_handle = None;
message
}
}
struct MyJoinHandle {
handle: JoinHandle<MyMessage>,
}
impl MyJoinHandle {
fn new(message: MyMessage) -> Self {
let handle = tokio::task::spawn(async move {
process_message(&message).await;
message
});
Self { handle }
}
async fn await_completion(&mut self) -> MyMessage {
// A simple `self.handle.await` results in "cannot move out of
// `self.handle` which is behind a mutable reference". Need to
// use `&mut self.handle` explicitly to guide the type checker.
let handle = &mut self.handle;
// Can also return an error rather than expecting.
handle.await.expect("task panicked")
// After this method completes, the handle should never be
// polled again. If it is polled again, then it will panic.
// That is managed in `MyHandle::next` by setting `join_handle`
// to None immediately after returning from this method.
}
}
This approach works well, but has some downsides:
There’s extra ceremony involved with carefully setting up the join handle. This is easy to get wrong. (Is there an abstraction we can write to make this easier?)
If cancellation is actually desired in some situations, then the task has to be manually cancelled (either via aborts or by using an explicit cancellation channel).
tokio::task::spawn
requires a'static
bound, so it can’t borrow data fromself
or elsewhere. Whether this is okay, an inconvenience, or a deal-breaker, depends on the situation[12].
Use alternate MPSC channel modes
Tokio provides
bounded MPSC
channels. While bounded channels are a good default since they add
backpressure
to a system, they additionally come with the issue that their send
method is
asynchronous (it blocks if the channel is full).
In several real-world cases, it has been observed that the only source of asynchronicity in part of a system is the use of bounded MPSC channels. Removing this source of asynchronicity can make the design of part of a system much simpler.
Consider using these alternative, synchronous channel modes:
try_send
on bounded channels: A bounded channel’sSender
provides a synchronoustry_send
method, which returns an error if the channel is full. This method can be used to externalize backpressure to clients, e.g. by returning an HTTP 429 Too Many Requests error if the channel is full.Watch channels: If you only care about the last value sent to a channel, consider using a Tokio watch channel. A watch channel is a single-producer, multi-consumer channel which only stores one value and discards the current value on receiving a new one, eliminating backpressure concerns.
Unbounded channels: A final option that is not recommended, but still available, is to use unbounded MPSC channels. Unbounded channels do not have a capacity limit, and their
send
method is synchronous. Using unbounded channels has a significant downside, namely the lack of backpressure. Among other things, this can result in memory consumption blowing up if the unbounded channel isn’t emptied. How this concern trades off against cancel-safety issues is a case-by-case decision.
For a practical case study, see [case_study_wicketd].
Case study: wicketd’s installinator progress tracker
In Omicron, the service running on the technician port is called wicketd. This service manages recovery and offline updates, and part of that management is recording progress reports sent by another component: the installinator [rfd345].
Previously, the report tracker’s report_progress
method used bounded channels to send reports. This had several consequences:
The code that sent reports had to be asynchronous.
Since this code used a mutex that was held across an await point, that mutex had to be a Tokio mutex rather than
std::sync::Mutex
.Other code that used the mutex also had to be asynchronous.
The report_progress
method wasn’t cancel-safe, since an aborted report could lead to the update be stuck in an Invalid
state. (This is exactly the sort of issue with Tokio mutexes that resulted in the recommendation to not use them).
The solution
In Omicron PR #3950 we switched to using a watch channel. We used the fact that progress reports were cumulative; only the last progress was interesting to the tracker.
This had ripple effects, all positive:
The
report_progress
method was no longer async.The internal mutex used by this code was changed from a Tokio mutex to a
std::sync::Mutex
, which doesn’t have cancel-safety issues.Other code that used the mutex could be made synchronous as well.
Previously…​
In Omicron #3579, we first tried a couple of attempts to make this code cancel-safe. However, both of the attempts had subtle flaws related to cancellation. As a result, we settled on switching to an unbounded channel.
While the benefit of cancel safety outweighed potential backpressure issues at the time, we later realized that a watch channel would have the same benefits without any backpressure-related issues.
An alternative would have been to rewrite this code into a message-passing style, but this was a more expedient way to achieve cancel safety.
Perform cleanup separately
Consider a method on a database connection pool that executes a future within the context of a database transaction:
use std::future::Future;
struct TransactionContext {
// ...
}
struct ConnectionPool {
// ...
}
impl ConnectionPool {
fn execute_transaction<F, Fut>(future_fn: F) -> Result<T, E>
where
F: FnOnce(TransactionContext) -> Fut,
Fut: Future<Output = Result<R, E>>,
{
let cx: TransactionContext = /* ... */;
// ... begin transaction
match future_fn(cx).await {
Ok(value) => {
// ... commit transaction
Ok(value)
}
Err(error) => {
// ... rollback transaction
Err(error)
}
}
}
}
If execute_transaction
is cancelled in the middle of being run, the database
connection represented by the transaction context might be left in an
inconsistent state. For example, it might be in a state where a transaction has
begun but not ended.
For cancel correctness, it is important that ConnectionPool
perform cleanup on
the connection at the time it is returned to the pool, or at the time a new
connection is allocated. For example, if the connection is in the middle of a
transaction, it should be rolled back.
Case study: async_bb8_diesel
TODO, pointing to async-bb8-diesel#47.
Marking APIs as cancel-unsafe
Many async functions are likely to be cancel-unsafe, with no easy way to change them. How can APIs that aren’t cancel-safe be designated, so consumers use them with care? This can be done through a combination of naming and documentation.
Names and function signatures
Cancel-unsafe APIs should have names and function signatures where it "feels like" using them in select!
statements is wrong. Getting these semiotics right is a matter of experience and judgment, but here are some general guidelines:
Don’t name cancel-unsafe methods
next()
orrecv().
It is natural to use these sorts of methods inselect!
loops.Don’t name methods
reserve()
oracquire()
unless they’re mostly cancel-safe. This pattern-matches against the Tokio methods that are similar.Use names that indicate an irrevocable action that shouldn’t be repeated. For example, a method named
self.http_post_data()
, by referring to an HTTPPOST
, clearly indicates an action that shouldn’t be recreated in aselect!
loop.Use the type system if possible. Consider a method like
fn execute(self)
, whereself
isn’t cloneable. It is not going to be possible to use it in aselect!
loop. Instead, users must create the future returned by the method outside of theselect!
loop.Note that a method like
execute(self)
can’t be used in aselect!
loop, but it can can still be called once and cancelled (e.g. with a timeout). Often this is okay since the entire operation is aborted in that case.
Documentation
Documenting the cancel safety of your APIs to a reasonable degree helps users understand what can go wrong if a future is cancelled. It’s recommended that each method have a "cancel safety" section associated with it.
For an example, see the documentation for AsyncWriteExt::write_all_buf
.
Documenting cancel safety for every single async API can be too much of a lift, but it’s worth doing this at least for the most commonly used methods.
Cancel safety while consuming async code
Pay attention to names, function signatures and documentation
On the flip side of marking APIs as cancel-unsafe, it’s
important to pay attention to what library authors are trying to indicate. For
example, if a method performs an HTTP POST or is called execute()
, it likely
can’t be used in a select!
loop.
"Read the documentation" is an unsatisfying answer for many reasons, but in many cases it’s the best we have.
Resume futures in select!
loops rather than recreating them
If you’re using a cancel-unsafe future in a select!
loop, you cannot recreate
the future in each loop iteration. Instead, create the future once,
outside of the select loop. Then, select!
over a &mut
reference to the future.
In other words, resume futures rather than recreating them from scratch.
Here’s an example that performs an HTTP request with periodic ticks.
Code sample
use hyper::{client::HttpConnector, Body, Request, Response};
use std::time::Duration;
struct MyHttpClient {
client: hyper::Client<HttpConnector>,
}
impl MyHttpClient {
async fn request_with_ticks(
&self,
req: Request<Body>,
) -> Result<Response<Body>, hyper::Error> {
let mut interval = tokio::time::interval(Duration::from_millis(100));
let mut response_fut = std::pin::pin!(self.client.request(req));
loop {
tokio::select! {
response = &mut response_fut => {
// The initial response has arrived. Exit the loop
// and return the response.
return response;
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
}
}
For why std::pin::pin!
is necessary, see the Tokio documentation[13].
Example: spawn_send_task
redux
In [spawn_send_task_2] we saw an example of how to use tokio::mpsc::Sender::reserve
to solve data loss issues with Sender::send
. However, we noted that there were still potential fairness issues, since dropping the future returned by Sender::reserve
would lead to losing one’s place in line.
In this example, we further modify spawn_send_task
so that we resume futures rather than
recreating them. This means that we no longer lose our place in line if the future is cancelled.
We cannot use std::pin::pin!
in this example because we’re replacing the value of reserve_fut
in the middle of the loop. Instead, we call Box::pin
, which moves the future returned by sender.reserve()
to the heap and then pins it.
Here’s the example (Rust playground):
Code sample
fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) {
let strings: Vec<String> = vec![
"foo".to_owned(),
"bar".to_owned(),
"baz".to_owned(),
];
tokio::task::spawn(async move {
// interval.tick() completes execution at 0s, 1s, 2s...
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut strings = strings.into_iter();
let mut reserve_fut = if strings.len() > 0 {
// Create a future to obtain a permit to send the first value.
Box::pin(sender.reserve())
} else {
// No values to send -- exit the loop.
return;
};
loop {
tokio::select! {
res = &mut reserve_fut => {
if res.is_err() {
println!("receiver closed");
break;
}
match res {
Ok(permit) => {
permit.send(strings.next().unwrap());
reserve_fut = if strings.len() > 0 {
// Create a future to send the next value.
Box::pin(sender.reserve())
} else {
// All values sent -- exit the loop.
break;
}
}
Err(_) => {
println!("receiver closed");
break;
}
}
}
_ = interval.tick() => {
println!("interval tick");
}
}
}
});
}
This code is a bit convoluted to write, and for complex enough operations it is sometimes not possible to express code in this manner at all.
Exercise: Try rewriting spawn_send_task
to create Box::pin(sender.send(value))
futures rather than Box::pin(sender.reserve())
.
What are the upsides and downsides of using
send()
? Is data loss still possible withsend()
given the problem statement thatspawn_send_task
solves?The
reserve()
function is still more general thansend()
. Can you modify the problem statement to create a situation which would lead to data loss withsend()
, even if resumed rather than recreated, but not withreserve()
? (Hint: Construct a scenario that’s analogous towrite_all_buf
.)
Use then_try
adapters
As discussed in [try_joins], use of try_
adapters can be problematic because
they lead to early cancellations of futures that should not be cancelled.
The example in [try_joins] can be made correct by being rewritten as:
let (result1, result2) = tokio::join!(write1.flush(), write2.flush());
result1?;
result2?;
tokio::join!
is unaware of the Result
type, and so it doesn’t do early
cancellations. However, this is a bit unwieldy to write, especially for more
complicated try-joins and other adapters.
To solve this issue, Oxide’s
cancel-safe-futures
library introduces then_try
adapters that are aware of the Result
type, but
don’t perform early cancellations.
For example, the join_then_try
macro first runs all the futures passed in to completion, then errors out if any of them failed[14]
For example:
let result = cancel_safe_futures::join_then_try!(
write1.flush(),
write2.flush(),
);
result?;
This does exactly the same thing as the tokio::join!
example above.
cancel-safe-futures
library is incomplete. then_try
adapters are added on an as-needed basis. If you need an adapter that isn’t present, please feel free to add more and send a PR!Case study: sled-agent zone deletion
The Oxide sled-agent has code to shut down and uninstall locked zones. Previously, this code used the try_for_each_concurrent
adapter to delete zones in parallel.
This code was subtly incorrect, because if one zone failed to be cancelled, it would cause all other zone deletion operations to be cancelled, potentially leaving zones (which are external to the process) in an inconsistent state.
In Omicron PR #3758, the code was
changed to use the for_each_concurrent_then_try
adapter provided by cancel-safe-futures
. With this adapter in use, the code will now attempt to
progress all zone deletions as far as possible.
Use background tasks
As described in [spawn_tasks], a task is owned by the executor rather than the code that created it. The strategy mentioned there for library code can also be used in application code to move cancel-unsafe futures to background tasks.
For an example of how using background tasks addressed practical issues with cancel safety, see [case_study_dropshot].
Avoid task aborts
As outlined in [task_aborts], it is very difficult to write code that is cancel-safe at any arbitrary await point. Avoid task aborts, and instead prefer explicit cancellation channels wherever necessary.
Case study: Dropshot and client disconnects
Dropshot is the REST HTTP server used throughout Omicron. Dropshot is written in an asynchronous style, with users registering async request handlers and Dropshot responsible for executing them.
Consider what happens if a client initiates an HTTP request, then disconnects in the middle of the request. Previously, on disconnection, the future would be cancelled. Since a lot of the Omicron code wasn’t written with cancel safety in mind (for example, by using Tokio mutexes), this resulted in many potential cancel-safety issues. Even worse, these issues could be triggered by client-initiated events.
To handle cancel-unsafe Omicron code in a systemic fashion, we changed Dropshot in PRs #701 and #702 to add a mode where:
Each request is spawned on its own task.
If a client disconnects, the corresponding task is no longer aborted. Instead, it is kept running in the background.
When the server shuts down, any pending tasks are run to completion.
After verifying that the new mode didn’t break things, we made it the default
for Dropshot. While cancellation from (say) select!
loops is still a concern,
client-initiated cancellations are no longer possible.
Future directions
Working with upstreams to clarify cancel safety
This document has hopefully succeeded at its task of showing the perils of async cancellation. As an action item, we should work with upstreams to clarify the cancel safety of their APIs, fixing issues as necessary.
Testing cancel safety
While it is possible to test the cancel safety of async code in an ad-hoc fashion, there are no current mechanisms the author knows about for performing systematic testing. It would be good to design and build something along those lines.
External References
[rfd397] Oxide Computer Co. RFD 397: Challenges with async/await in the control plane. 2023.
[rfd345] Oxide Computer Co. RFD 345: SP-Driven Gimlet Recovery (also MUPdate). 2023.