let run_state = Arc::new(AtomicBool::new(true));
let run2 = Arc::clone(&run_state);
smol::spawn(async move {
while run2.load(Ordering::Relaxed) {
// Do things here...
}
});
// Somewhere else in your program
run_state.fetch_and(false, Ordering::Relaxed);
while run2.fetch(Ordering::Relaxed) {
// Do things here...
}
All tasks are connected to each other via channels. A task shuts down when its channel sender stops existing.
F will still be running while B and C are already shut down.
Clone
so we wrap AtomicBool
into an Arc
.Waker
type is Clone
by default.use std::sync::{Arc, atomic::AtomicBool};
use std::task::Waker;
#[derive(Clone, Default)]
struct ShutdownFuture(Arc<AtomicBool>, Option<Waker>);
Future
traitPoll::Pending
or Poll::Ready
Waker
for future updatesuse std::{future::Future, pin::Pin, task::Context};
use std::sync::atomic::Ordering;
impl Future for ShutdownFuture {
type Output = (); // Output type does not matter
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.0.load(Ordering::Relaxed) {
Poll::Ready(()) // System should shut down
} else {
self.1 = Some(cx.waker().clone());
Poll::Pending // System should keep running
}
}
}
Poll::Ready(())
Waker
! That is importantimpl ShutdownFuture {
pub fn stop(&self) {
self.0.fetch_or(true, Ordering::Relaxed);
if let Some(w) = self.1.as_ref() {
w.wake_by_ref();
}
}
}
What does this code do?
tokio
this is tokio::future::select
async_std
you need to use futures
crateasync fn next_event(&self, shutdown: &ShutdownFuture) -> bool {
let mut next_event = self.event_rx.recv();
let mut shutdown = shutdown.clone();
tokio::select! {
_ = shutdown => {
println!("Received shutdown signal!");
false
},
event = next_event => {
handle_event(event).await;
true
}
}
}
use crate::{event::MyEvent, task::TaskHandle}; // Some other fake code
use tokio::sync::mpsc::bounded;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = bounded(32);
let shutdown = ShutdownFuture::default();
let task_handle = TaskHandle { shutdown: shutdown.clone(), tx: tx.clone(), rx };
// Run this loop until next_event returns 'false'
tokio::spawn(async move { while task_handle.next_event().await {} });
tx.send(MyEvent::Nop).await?;
shutdown.stop();
}