//! Note that these utilities are safe and simple in order to facilitate //! debugging without adding more points of failure, but they're not efficient; //! they may perform heap allocations, I/O and other expensive operations, or //! even block the thread altogether waiting for input whenever they receive //! control use std::cell::RefCell; use std::fmt::Display; use std::pin::pin; use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{Context, Poll, Wake, Waker}; use std::thread::panicking; use futures::Stream; use itertools::Itertools; use task_local::task_local; struct OnPollWaker(Waker, F); impl Wake for OnPollWaker { fn wake(self: Arc) { (self.1)(); self.0.wake_by_ref() } } /// Attach a callback to the [Future] protocol for testing and debugging. pub async fn on_wake( f: F, wake: impl Fn() + Clone + Send + Sync + 'static, ) -> F::Output { let mut f = pin!(f); futures::future::poll_fn(|cx| { let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into(); f.as_mut().poll(&mut Context::from_waker(&waker)) }) .await } /// Respond to [Future::poll] with a callback. For maximum flexibility and state /// control, your callback receives the actual poll job as a callback function. /// Failure to call this function will result in an immediate panic. pub async fn wrap_poll( f: Fut, mut cb: impl FnMut(Box bool + '_>), ) -> Fut::Output { let mut f = pin!(f); futures::future::poll_fn(|cx| { let poll = RefCell::new(None); cb(Box::new(|| { let poll1 = f.as_mut().poll(cx); let ret = poll1.is_ready(); *poll.borrow_mut() = Some(poll1); ret })); poll.into_inner().expect("Callback to on_poll failed to call its argument") }) .await } /// Respond to [Stream::poll_next] with a callback. The semantics of the /// callback are identical to that in [wrap_poll] pub fn wrap_poll_next<'a, S: Stream + 'a>( s: S, mut cb: impl FnMut(Box bool + '_>) + 'a, ) -> impl Stream + 'a { let mut s = Box::pin(s); futures::stream::poll_fn(move |cx| { let poll = RefCell::new(None); cb(Box::new(|| { let poll1 = s.as_mut().poll_next(cx); let ret = poll1.is_ready(); *poll.borrow_mut() = Some(poll1); ret })); poll.into_inner().expect("Callback to on_poll failed to call its argument") }) } /// Attach a callback to the [Stream] protocol for testing and debugging. pub fn on_stream_wake<'a, S: Stream + 'a>( s: S, wake: impl Fn() + Clone + Send + Sync + 'static, ) -> impl Stream { let mut s = Box::pin(s); futures::stream::poll_fn(move |cx| { let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into(); s.as_mut().poll_next(&mut Context::from_waker(&waker)) }) } task_local! { static LABEL_STATE: Vec> } /// Add a label to the "label stack" for the duration of a future that helps you /// efficiently visualize important aspects of the call stack during logging pub async fn with_label(label: &str, f: Fut) -> Fut::Output { let mut new_lbl = LABEL_STATE.try_with(|lbl| lbl.clone()).unwrap_or_default(); new_lbl.push(Rc::new(label.to_string())); LABEL_STATE.scope(new_lbl, f).await } /// Allows to print the label stack pub fn label() -> impl Display + Clone + Send + Sync + 'static { LABEL_STATE.try_with(|lbl| lbl.iter().join("/")).unwrap_or("".to_string()) } /// Displays the label stack when printed pub struct Label; impl Display for Label { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", label()) } } /// Attaches generic eprintln handlers to a future pub async fn eprint_events(note: &str, f: Fut) -> Fut::Output { let label = label(); let note1 = note.to_string(); on_wake( wrap_poll(f, |cb| { eprintln!("{Label} polling {note}"); eprintln!("{Label} polled {note} (ready? {})", cb()) }), move || eprintln!("{label} woke {note1}"), ) .await } /// Attaches generic eprintln handlers to a stream pub fn eprint_stream_events<'a, S: Stream + 'a>( note: &'a str, s: S, ) -> impl Stream + 'a { let label = label(); let note1 = note.to_string(); on_stream_wake( wrap_poll_next(s, move |cb| { eprintln!("{Label} polling {note}"); eprintln!("{Label} polled {note} (ready? {})", cb()) }), move || eprintln!("{label} woke {note1}"), ) } struct SpinWaker(AtomicBool); impl Wake for SpinWaker { fn wake(self: Arc) { self.0.store(true, Ordering::Relaxed); } } /// A dumb executor that keeps synchronously re-running the future as long as it /// keeps synchronously waking itself. This is useful for deterministic tests /// that don't contain side effects or threading. /// /// # Panics /// /// If the future doesn't wake itself and doesn't settle. pub fn spin_on(f: Fut) -> Fut::Output { let repeat = Arc::new(SpinWaker(AtomicBool::new(false))); let mut f = pin!(f); let waker = repeat.clone().into(); let mut cx = Context::from_waker(&waker); loop { match f.as_mut().poll(&mut cx) { Poll::Ready(t) => break t, Poll::Pending if repeat.0.swap(false, Ordering::Relaxed) => (), Poll::Pending => panic!("The future did not exit and did not call its waker."), } } } /// Create an object that will panic if dropped. [PanicOnDrop::defuse] must be /// called once the particular constraint preventing a drop has passed pub fn assert_no_drop(msg: &'static str) -> PanicOnDrop { PanicOnDrop(true, msg) } /// This object will panic if dropped. Call [Self::defuse] when dropping is safe /// again pub struct PanicOnDrop(bool, &'static str); impl PanicOnDrop { /// Allow dropping the object without causing a panic pub fn defuse(mut self) { self.0 = false; } } impl Drop for PanicOnDrop { fn drop(&mut self) { assert!(panicking() || !self.0, "{}", self.1) } }