I learnt a surprising lesson about mixing actor- and Arc<Mutex<
-style concurrency. Time to share it with the world.
I'm writing a driver crate for Late Mate. Late Mate works over USB, which is just a byte pipe with no concept of request/response1. It's alright to be a byte pipe! It's not the Universal Serial Bus, it's me. I need request/response to stay sane.
The way to do this is to have a unique RequestId
that is sent over USB with every request and returned back
with every response. RequestId
is then used to match requests and responses.
Here's an outline of the original, buggy architecture. Request
and Response
are u32
for simplicity.
Scroll down if it's too much code at once.
type RequestId = u32;
type Request = u32;
type Response = u32;
#[derive(Default)]
struct Dispatcher {
next_request_id: RequestId,
pending: BTreeMap<RequestId, oneshot::Sender<Response>>,
}
impl Dispatcher {
pub fn register(&mut self) -> (RequestId, oneshot::Receiver<Response>) {
todo!()
}
}
pub async fn dispatcher_loop(
dispatcher: Arc<Mutex<Dispatcher>>,
mut usb_rx: mpsc::Receiver<(RequestId, Response)>,
) {
loop {
match usb_rx.recv().await {
Some((request_id, response)) => {
if let Some(respond_to) =
dispatcher.lock().await.pending.remove(&request_id)
{
// it's OK to ignore a closed receiver
let _ = respond_to.send(response);
}
}
// USB channel was closed
None => break,
}
}
}
struct Driver {
dispatcher: Arc<Mutex<Dispatcher>>,
usb_tx: mpsc::Sender<(RequestId, Request)>,
}
impl Driver {
pub fn init() -> Self {
let (usb_rx, usb_tx) = todo!();
let dispatcher = Arc::new(Mutex::new(Dispatcher::default()));
tokio::spawn(dispatcher_loop(dispatcher.clone(), usb_rx));
Driver { dispatcher, usb_tx }
}
pub async fn send_request(&self, request: Request) -> Response {
let (request_id, receiver) = self.dispatcher.lock().await.register();
self.usb_tx.send((request_id, request)).await.unwrap();
receiver.await.expect("Dispatcher must be alive")
}
}
Let's break it down a bit.
First, there is this struct:
#[derive(Default)]
struct Dispatcher {
next_request_id: RequestId,
pending: BTreeMap<RequestId, oneshot::Sender<Response>>,
}
impl Dispatcher {
pub fn register(&mut self) -> (RequestId, oneshot::Receiver<Response>) {
todo!()
}
}
Dispatcher
keeps track of pending requests and the next RequestId
to use. Functions wanting to talk over USB
register their upcoming requests and get a new RequestId
and a oneshot
channel back. The channel will eventually
contain their reply.
Next is the dispatch loop/actor:
pub async fn dispatcher_loop(
dispatcher: Arc<Mutex<Dispatcher>>,
mut usb_rx: mpsc::Receiver<(RequestId, Response)>,
) {
loop {
match usb_rx.recv().await {
Some((request_id, response)) => {
if let Some(respond_to) =
dispatcher.lock().await.pending.remove(&request_id)
{
// it's OK to ignore a closed receiver
let _ = respond_to.send(response);
}
}
// USB channel was closed
None => break,
}
}
}
The loop listens to messages coming out of the USB pipe2 and dispatches them to appropriate response channels.
Two things to pay attention to:
Arc<Mutex<Dispatcher>>
. The Dispatcher
is created once and shared between
futures/threads through a refcounted pointer and a lock3..recv()
returns None
and the loop exits, because it has nothing more to do.Finally, the last part shows how it all works together:
struct Driver {
dispatcher: Arc<Mutex<Dispatcher>>,
usb_tx: mpsc::Sender<(RequestId, Request)>,
}
impl Driver {
pub fn init() -> Self {
let (usb_rx, usb_tx) = todo!();
let dispatcher = Arc::new(Mutex::new(Dispatcher::default()));
tokio::spawn(dispatcher_loop(dispatcher.clone(), usb_rx));
Driver { dispatcher, usb_tx }
}
pub async fn send_request(&self, request: Request) -> Response {
let (request_id, receiver) = self.dispatcher.lock().await.register();
self.usb_tx.send((request_id, request)).await.unwrap();
receiver.await.expect("Dispatcher must be alive")
}
}
The driver creates a singular, Arc<Mutex<
'ed Dispatcher
and spawns the agent on initialisation.
To send a request, the request is first registered with the dispatcher (locking it for an increment and a map insert),
and then written directly to the USB pipe.
Still with me?
Great.
This solution works!
Except…
What happens when the USB side goes down?
loop {
match usb_rx.recv().await {
Some((request_id, response)) => {
// ...
}
None => break,
}
}
…the dispatcher loop exits, everything is dropped, including reply Sender
s…
pub async fn send_request(&self, request: Request) -> Response {
let (request_id, receiver) = self.dispatcher.lock().await.register();
// ...
receiver.await.expect("Dispatcher must be alive")
}
…and then Receiver
s return None
and the requesting function panics. Right?
Nope, it silently hangs. Forever. Why?
Here:
pub async fn dispatcher_loop(
dispatcher: Arc<Mutex<Dispatcher>>,
// ...
) {
// ...
}
When the dispatcher loop exits, it doesn't bring Dispatcher
down, because there are still non-dropped Arc
s
on the Driver
side. The loop doesn't own the Dispatcher
, it barely owns a refcounting pointer to the Dispatcher
.
Dispatcher
doesn't get dropped, so pending
doesn't get dropped either, so response channels stay open, but
now there is no one to write into the channels.
There are many ways to solve this, but I think the cleanest is to avoid mixing Arc<Mutex<
and actors and
just go full actors:
pub async fn dispatcher_loop(
mut dispatcher: Dispatcher,
mut usb_rx: mpsc::Receiver<(RequestId, Response)>,
mut registration_requests: mpsc::Receiver<
oneshot::Sender<(RequestId, oneshot::Receiver<Response>)>,
>,
) {
loop {
tokio::select! {
usb_response = usb_rx.recv() => match usb_response {
Some((request_id, response)) =>
dispatcher.handle_usb_rx(request_id, response).await,
None => break
},
request = registration_requests.recv() => match request {
Some(reply_to) => dispatcher.register(reply_to).await,
None => break
}
}
}
}
Points of interest:
Dispatcher
itself. When it exits, Dispatcher
is dropped, closing all reply channels
as expected.Map
. I followed Alice Ryhl's suggestion and hid
message passing behind a DispatcherHandle.register()
, and it's all quite neat now.select!
, it works in a
very special way.One thing Alice mentions in her talk on actors in Rust is how Rust ownership model interacts with actor-based concurrency.
The specific example in the talk was message passing forcing transfer of ownership: because of the way
message passing works, it's hard to pass a reference through a channel (unless it's an Arc
).
Passing a "naked" non-static
reference to an actor would require the code to prove that
the reference will still be valid until the receiving actor can no longer hold it,
which is ~impossible to model in modern Rust. I believe it might be impossible to model in a practical language,
period.
The bug above is another example of the interaction between ownership and actors: it's much easier to propagate
shutdown and cleanup through Drop
, but that requires actors to own their resources. Actors Want Ownership,
which is a fair demand, even if Hollywood execs disagree.
In short, it's a footgun! So if you decide to Arc<Mutex<
something to your actors, handle it with care.
This is not entirely true. The underlying USB protocol is request/response, but as I'm using a bulk transfer endpoint with packets that can be larger than a single buffer, it's a byte pipe for me. If you're interested in a detailed explanation of how pre-3.0 USB works, USB Made Simple and USB in a NutShell are great.
Serialisation/deserialisation are handled elsewhere, let's pretend it's just a channel of structs.
It could be an std
lock, but either do fine in this context.