async fn main() {
let (tx, rx) = async_channel::bounded(64);
smol::spawn(async move {
// Producer loop
while alive() {
let data = get_data_via_magic().await;
tx.send(data).await;
}
});
smol::block_on(async move {
// Consumer loop
while let Some(data) = rx.recv().await {
process_data_somehow(data).await;
}
});
}
trait Bonkers {}
struct Envelope(pub Box<dyn Bonkers + 'static>);
pub struct A<T> { recv: Receiver<Envelope>, }
pub struct B<T> { recv: Receiver<Envelope>, }
pub struct Lookup {
map: BTreeMap<&'static str, Sender<Envelope>>,
}
impl Lookup {
fn get_sender_for(&self, id: &str) -> Sender<Envelope> {
self.map.get(id).cloned().unwrap()
}
}
impl A<Foo> {
fn send_to_b(&self, l: &Lookup, b: Bar) {
let sender = l.get_sender_for("b-my-friend");
sender.send(Envelope(Box::new(b))).unwrap();
}
}
impl B<Bar> {
fn handle_new_bar(&self) {
let env = self.recv.recv().unwrap();
let env_any = env.0.as_any();
let b: &Bar = env_any.downcast_ref::<Bar>().unwrap();
// Tada >.>
}
}
impl B<Bar> {
fn get_bar(&self) -> Box<Bar> {
let env = self.recv.recv().unwrap();
core::mem::transmute_copy<Box<dyn Bonkers>, Box<Bar>>(&env.0)
}
}
Not so controversial opinion time: don't use unbounded channels! Why?
use async_std::{channel, task};
use std::time::Duration;
#[async_std::main]
async fn main() {
let (tx, rx) = channel::bounded(4);
task::spawn(async move {
for _ in 0..100 {
tx.send((0..100_000).map(|i| i as u8).collect::<Vec<u8>>())
.await
.unwrap();
}
});
while let Ok(i) = rx.recv().await {
println!("Received message: {:?}", i.first());
task::sleep(Duration::from_millis(100)).await;
}
}
$ time -v target/debug/02_backpressure_bounded
Command being timed: "target/debug/02_backpressure_bounded"
Percent of CPU this job got: 5%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.02
Maximum resident set size (kbytes): 4136
Major (requiring I/O) page faults: 291
Minor (reclaiming a frame) page faults: 457
use async_std::{channel, task};
use std::time::Duration;
#[async_std::main]
async fn main() {
let (tx, rx) = channel::unbounded();
task::spawn(async move {
for _ in 0..100 {
tx.send((0..100_000).map(|i| i as u8).collect::<Vec<u8>>())
.await
.unwrap();
}
});
while let Ok(i) = rx.recv().await {
println!("Received message: {:?}", i.first());
task::sleep(Duration::from_millis(100)).await;
}
}
$ time -v target/debug/02_backpressure_unbounded
Command being timed: "target/debug/02_backpressure_unbounded"
Percent of CPU this job got: 3%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.02
Maximum resident set size (kbytes): 11984
Major (requiring I/O) page faults: 378
Minor (reclaiming a frame) page faults: 2775