Shutting down async systems


Back to index

Q: How do you stop disconnected, infinite futures?

A: Run state!


let run_state = Arc::new(AtomicBool::new(true));

let run2 = Arc::clone(&run_state);
smol::spawn(async move {
    while run2.load(Ordering::Relaxed) {
        // Do things here...
    }
});

// Somewhere else in your program
run_state.fetch_and(false, Ordering::Relaxed);

What problems might there be?

while run2.fetch(Ordering::Relaxed) {
  // Do things here...
}
  • Only check run-state once per loop
  • Once the loop has started it can't be stopped again!
  • Is this a good or a bad thing? YOU DECIDE!

Another scenario


All tasks are connected to each other via channels. A task shuts down when its channel sender stops existing.

  • A produces data for B and C
  • B produces data for D and E
  • C produces data for F and G

Shutdown will be staggered!

F will still be running while B and C are already shut down.

Basic structure

  • First we start by defining the internal structure of our Future.
  • We need it to be Clone so we wrap AtomicBool into an Arc.
  • The Waker type is Clone by default.
use std::sync::{Arc, atomic::AtomicBool};
use std::task::Waker;

#[derive(Clone, Default)]
struct ShutdownFuture(Arc<AtomicBool>, Option<Waker>);

  • Next up we implement the Future trait
  • When we are polled, we check the boolean for its state
    • Either return Poll::Pending or Poll::Ready
    • This determines the shutdown state of our state machine!
  • We also store a Waker for future updates
use std::{future::Future, pin::Pin, task::Context};
use std::sync::atomic::Ordering;

impl Future for ShutdownFuture {
    type Output = (); // Output type does not matter

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        if self.0.load(Ordering::Relaxed) {
            Poll::Ready(()) // System should shut down
        }  else {
            self.1 = Some(cx.waker().clone());
            Poll::Pending // System should keep running
        }
    }
}

What does this do so far?

  • Polling this future will never return Poll::Ready(())
  • This future will only be polled once
  • But we stored a Waker! That is important
impl ShutdownFuture {
    pub fn stop(&self) {
        self.0.fetch_or(true, Ordering::Relaxed);
        if let Some(w) = self.1.as_ref() {
            w.wake_by_ref();
        }
    }
}

What does this code do?

Selecting the right future

  • Putting all this together we need another mechanism: select!
  • In tokio this is tokio::future::select
  • For async_std you need to use futures crate
async fn next_event(&self, shutdown: &ShutdownFuture) -> bool {
    let mut next_event = self.event_rx.recv();
    let mut shutdown = shutdown.clone();

    tokio::select! {
        _ = shutdown => {
            println!("Received shutdown signal!");
            false
        },
        event = next_event => {
            handle_event(event).await;
            true
        }
    }
}

Putting it all together

  • The last thing missing is creating a task :)
  • Let's use a channel to send some data to a task, then shut it down via our shutdown signal
use crate::{event::MyEvent, task::TaskHandle}; // Some other fake code
use tokio::sync::mpsc::bounded;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = bounded(32);
    let shutdown = ShutdownFuture::default();
    let task_handle = TaskHandle { shutdown: shutdown.clone(), tx: tx.clone(), rx };

    // Run this loop until next_event returns 'false'
    tokio::spawn(async move { while task_handle.next_event().await {} });

    tx.send(MyEvent::Nop).await?;
    shutdown.stop();
}

Back to index