Literally just channels


Back to index

How to communicate between tasks?

Channels!


async fn main() {
    let (tx, rx) = async_channel::bounded(64);
    smol::spawn(async move {
        // Producer loop
        while alive() {
            let data = get_data_via_magic().await;
            tx.send(data).await;
        }
    });

    smol::block_on(async move {
        // Consumer loop
        while let Some(data) = rx.recv().await {
            process_data_somehow(data).await;
        }
    });
}

Channels wake up consumers!

Type separation (with trait objects)


  • In a system with many generic types, channels can provide barriers
  • Get type information for a different future
  • This was very relevant when writing the Ockam Node runtime (https://github.com/build-trust/ockam) but might be a bit esoteric for your application

type-separation.png

trait Bonkers {} 
struct Envelope(pub Box<dyn Bonkers + 'static>);

pub struct A<T> { recv: Receiver<Envelope>, }
pub struct B<T> { recv: Receiver<Envelope>, }

pub struct Lookup {
    map: BTreeMap<&'static str, Sender<Envelope>>,
}

impl Lookup {
    fn get_sender_for(&self, id: &str) -> Sender<Envelope> {
        self.map.get(id).cloned().unwrap()
    }
}

impl A<Foo> {
    fn send_to_b(&self, l: &Lookup, b: Bar) {
        let sender = l.get_sender_for("b-my-friend");
        sender.send(Envelope(Box::new(b))).unwrap();
    }
}


impl B<Bar> {
    fn handle_new_bar(&self) {
        let env = self.recv.recv().unwrap();
        let env_any = env.0.as_any();
        let b: &Bar = env_any.downcast_ref::<Bar>().unwrap();

        // Tada >.>
    }
}

impl B<Bar> {
    fn get_bar(&self) -> Box<Bar> {
        let env = self.recv.recv().unwrap();
        core::mem::transmute_copy<Box<dyn Bonkers>, Box<Bar>>(&env.0)
    }
}

Bounded vs unbounded

Not so controversial opinion time: don't use unbounded channels! Why?


Bounded channels
Fixed size channels that are predictable and provide backpressure
Unbounded channels
Dynamically grow and shrink with demand. More dynamic but can grow infinitely

What is backpressure?


  • A mechanism to bind the production rate to the consumption rate of two components
  • Or: slow down producers for slow consumers
  • This can overall slow down a system but prevent it from crashing

Backpressure example


bp0.png

Backpressure example


bp1.png

Backpressure example


bp2.png

Backpressure example


bp3.png

Backpressure example


bp6.png

Backpressure example


bp8.png

Backpressure example


bp10.png

Demo bounded

use async_std::{channel, task};
use std::time::Duration;

#[async_std::main]
async fn main() {
    let (tx, rx) = channel::bounded(4);

    task::spawn(async move {
        for _ in 0..100 {
            tx.send((0..100_000).map(|i| i as u8).collect::<Vec<u8>>())
                .await
                .unwrap();
        }
    });

    while let Ok(i) = rx.recv().await {
        println!("Received message: {:?}", i.first());
        task::sleep(Duration::from_millis(100)).await;
    }
}
$ time -v target/debug/02_backpressure_bounded
      Command being timed: "target/debug/02_backpressure_bounded"
      Percent of CPU this job got: 5%
      Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.02
      Maximum resident set size (kbytes): 4136
      Major (requiring I/O) page faults: 291
      Minor (reclaiming a frame) page faults: 457

Demo unbounded

use async_std::{channel, task};
use std::time::Duration;

#[async_std::main]
async fn main() {
    let (tx, rx) = channel::unbounded();

    task::spawn(async move {
        for _ in 0..100 {
            tx.send((0..100_000).map(|i| i as u8).collect::<Vec<u8>>())
                .await
                .unwrap();
        }
    });

    while let Ok(i) = rx.recv().await {
        println!("Received message: {:?}", i.first());
        task::sleep(Duration::from_millis(100)).await;
    }
}
$ time -v target/debug/02_backpressure_unbounded
      Command being timed: "target/debug/02_backpressure_unbounded"
      Percent of CPU this job got: 3%
      Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.02
      Maximum resident set size (kbytes): 11984
      Major (requiring I/O) page faults: 378
      Minor (reclaiming a frame) page faults: 2775

Types of channels


  • Bounded
  • Unbounded
  • Single Producer, Single Consumer (SPSC)
  • Multi Producer, Single Consumer (MPSC)
  • Multi Producer, Multi Consumer (MPMC)
  • Oneshot

Back to index