I got very confused and started mucking about with "spawn" when in fact all I needed was the "inline" extension type in orcx that allows the interpreter to expose custom constants.
186 lines
5.8 KiB
Rust
186 lines
5.8 KiB
Rust
//! Note that these utilities are safe and simple in order to facilitate
|
|
//! debugging without adding more points of failure, but they're not efficient;
|
|
//! they may perform heap allocations, I/O and other expensive operations, or
|
|
//! even block the thread altogether waiting for input whenever they receive
|
|
//! control
|
|
|
|
use std::cell::RefCell;
|
|
use std::fmt::Display;
|
|
use std::pin::pin;
|
|
use std::rc::Rc;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::task::{Context, Poll, Wake, Waker};
|
|
use std::thread::panicking;
|
|
|
|
use futures::Stream;
|
|
use itertools::Itertools;
|
|
use task_local::task_local;
|
|
|
|
struct OnPollWaker<F: Fn() + 'static>(Waker, F);
|
|
impl<F: Fn() + 'static> Wake for OnPollWaker<F> {
|
|
fn wake(self: Arc<Self>) {
|
|
(self.1)();
|
|
self.0.wake_by_ref()
|
|
}
|
|
}
|
|
|
|
/// Attach a callback to the [Future] protocol for testing and debugging.
|
|
pub async fn on_wake<F: Future>(
|
|
f: F,
|
|
wake: impl Fn() + Clone + Send + Sync + 'static,
|
|
) -> F::Output {
|
|
let mut f = pin!(f);
|
|
futures::future::poll_fn(|cx| {
|
|
let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into();
|
|
f.as_mut().poll(&mut Context::from_waker(&waker))
|
|
})
|
|
.await
|
|
}
|
|
|
|
/// Respond to [Future::poll] with a callback. For maximum flexibility and state
|
|
/// control, your callback receives the actual poll job as a callback function.
|
|
/// Failure to call this function will result in an immediate panic.
|
|
pub async fn wrap_poll<Fut: Future>(
|
|
f: Fut,
|
|
mut cb: impl FnMut(Box<dyn FnOnce() -> bool + '_>),
|
|
) -> Fut::Output {
|
|
let mut f = pin!(f);
|
|
futures::future::poll_fn(|cx| {
|
|
let poll = RefCell::new(None);
|
|
cb(Box::new(|| {
|
|
let poll1 = f.as_mut().poll(cx);
|
|
let ret = poll1.is_ready();
|
|
*poll.borrow_mut() = Some(poll1);
|
|
ret
|
|
}));
|
|
poll.into_inner().expect("Callback to on_poll failed to call its argument")
|
|
})
|
|
.await
|
|
}
|
|
|
|
/// Respond to [Stream::poll_next] with a callback. The semantics of the
|
|
/// callback are identical to that in [wrap_poll]
|
|
pub fn wrap_poll_next<'a, S: Stream + 'a>(
|
|
s: S,
|
|
mut cb: impl FnMut(Box<dyn FnOnce() -> bool + '_>) + 'a,
|
|
) -> impl Stream<Item = S::Item> + 'a {
|
|
let mut s = Box::pin(s);
|
|
futures::stream::poll_fn(move |cx| {
|
|
let poll = RefCell::new(None);
|
|
cb(Box::new(|| {
|
|
let poll1 = s.as_mut().poll_next(cx);
|
|
let ret = poll1.is_ready();
|
|
*poll.borrow_mut() = Some(poll1);
|
|
ret
|
|
}));
|
|
poll.into_inner().expect("Callback to on_poll failed to call its argument")
|
|
})
|
|
}
|
|
|
|
/// Attach a callback to the [Stream] protocol for testing and debugging.
|
|
pub fn on_stream_wake<'a, S: Stream + 'a>(
|
|
s: S,
|
|
wake: impl Fn() + Clone + Send + Sync + 'static,
|
|
) -> impl Stream<Item = S::Item> {
|
|
let mut s = Box::pin(s);
|
|
futures::stream::poll_fn(move |cx| {
|
|
let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into();
|
|
s.as_mut().poll_next(&mut Context::from_waker(&waker))
|
|
})
|
|
}
|
|
|
|
task_local! {
|
|
static LABEL_STATE: Vec<Rc<String>>
|
|
}
|
|
|
|
/// Add a label to the "label stack" for the duration of a future that helps you
|
|
/// efficiently visualize important aspects of the call stack during logging
|
|
pub async fn with_label<Fut: Future>(label: &str, f: Fut) -> Fut::Output {
|
|
let mut new_lbl = LABEL_STATE.try_with(|lbl| lbl.clone()).unwrap_or_default();
|
|
new_lbl.push(Rc::new(label.to_string()));
|
|
LABEL_STATE.scope(new_lbl, f).await
|
|
}
|
|
|
|
/// Allows to print the label stack
|
|
pub fn label() -> impl Display + Clone + Send + Sync + 'static {
|
|
LABEL_STATE.try_with(|lbl| lbl.iter().join("/")).unwrap_or("".to_string())
|
|
}
|
|
|
|
/// Displays the label stack when printed
|
|
pub struct Label;
|
|
impl Display for Label {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", label()) }
|
|
}
|
|
|
|
/// Attaches generic eprintln handlers to a future
|
|
pub async fn eprint_events<Fut: Future>(note: &str, f: Fut) -> Fut::Output {
|
|
let label = label();
|
|
let note1 = note.to_string();
|
|
on_wake(
|
|
wrap_poll(f, |cb| {
|
|
eprintln!("{Label} polling {note}");
|
|
eprintln!("{Label} polled {note} (ready? {})", cb())
|
|
}),
|
|
move || eprintln!("{label} woke {note1}"),
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Attaches generic eprintln handlers to a stream
|
|
pub fn eprint_stream_events<'a, S: Stream + 'a>(
|
|
note: &'a str,
|
|
s: S,
|
|
) -> impl Stream<Item = S::Item> + 'a {
|
|
let label = label();
|
|
let note1 = note.to_string();
|
|
on_stream_wake(
|
|
wrap_poll_next(s, move |cb| {
|
|
eprintln!("{Label} polling {note}");
|
|
eprintln!("{Label} polled {note} (ready? {})", cb())
|
|
}),
|
|
move || eprintln!("{label} woke {note1}"),
|
|
)
|
|
}
|
|
|
|
struct SpinWaker(AtomicBool);
|
|
impl Wake for SpinWaker {
|
|
fn wake(self: Arc<Self>) { self.0.store(true, Ordering::Relaxed); }
|
|
}
|
|
|
|
/// A dumb executor that keeps synchronously re-running the future as long as it
|
|
/// keeps synchronously waking itself. This is useful for deterministic tests
|
|
/// that don't contain side effects or threading.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// If the future doesn't wake itself and doesn't settle.
|
|
pub fn spin_on<Fut: Future>(f: Fut) -> Fut::Output {
|
|
let repeat = Arc::new(SpinWaker(AtomicBool::new(false)));
|
|
let mut f = pin!(f);
|
|
let waker = repeat.clone().into();
|
|
let mut cx = Context::from_waker(&waker);
|
|
loop {
|
|
match f.as_mut().poll(&mut cx) {
|
|
Poll::Ready(t) => break t,
|
|
Poll::Pending if repeat.0.swap(false, Ordering::Relaxed) => (),
|
|
Poll::Pending => panic!("The future did not exit and did not call its waker."),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Create an object that will panic if dropped. [PanicOnDrop::defuse] must be
|
|
/// called once the particular constraint preventing a drop has passed
|
|
pub fn assert_no_drop(msg: &'static str) -> PanicOnDrop { PanicOnDrop(true, msg) }
|
|
|
|
/// This object will panic if dropped. Call [Self::defuse] when dropping is safe
|
|
/// again
|
|
pub struct PanicOnDrop(bool, &'static str);
|
|
impl PanicOnDrop {
|
|
/// Allow dropping the object without causing a panic
|
|
pub fn defuse(mut self) { self.0 = false; }
|
|
}
|
|
impl Drop for PanicOnDrop {
|
|
fn drop(&mut self) { assert!(panicking() || !self.0, "{}", self.1) }
|
|
}
|