partway towards commands
I got very confused and started mucking about with "spawn" when in fact all I needed was the "inline" extension type in orcx that allows the interpreter to expose custom constants.
This commit is contained in:
12
orchid-async-utils/Cargo.toml
Normal file
12
orchid-async-utils/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "orchid-async-utils"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
futures = { version = "0.3.31", default-features = false, features = [
|
||||
"std",
|
||||
"async-await",
|
||||
] }
|
||||
itertools = "0.14.0"
|
||||
task-local = "0.1.0"
|
||||
185
orchid-async-utils/src/debug.rs
Normal file
185
orchid-async-utils/src/debug.rs
Normal file
@@ -0,0 +1,185 @@
|
||||
//! Note that these utilities are safe and simple in order to facilitate
|
||||
//! debugging without adding more points of failure, but they're not efficient;
|
||||
//! they may perform heap allocations, I/O and other expensive operations, or
|
||||
//! even block the thread altogether waiting for input whenever they receive
|
||||
//! control
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::fmt::Display;
|
||||
use std::pin::pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::task::{Context, Poll, Wake, Waker};
|
||||
use std::thread::panicking;
|
||||
|
||||
use futures::Stream;
|
||||
use itertools::Itertools;
|
||||
use task_local::task_local;
|
||||
|
||||
struct OnPollWaker<F: Fn() + 'static>(Waker, F);
|
||||
impl<F: Fn() + 'static> Wake for OnPollWaker<F> {
|
||||
fn wake(self: Arc<Self>) {
|
||||
(self.1)();
|
||||
self.0.wake_by_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Attach a callback to the [Future] protocol for testing and debugging.
|
||||
pub async fn on_wake<F: Future>(
|
||||
f: F,
|
||||
wake: impl Fn() + Clone + Send + Sync + 'static,
|
||||
) -> F::Output {
|
||||
let mut f = pin!(f);
|
||||
futures::future::poll_fn(|cx| {
|
||||
let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into();
|
||||
f.as_mut().poll(&mut Context::from_waker(&waker))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Respond to [Future::poll] with a callback. For maximum flexibility and state
|
||||
/// control, your callback receives the actual poll job as a callback function.
|
||||
/// Failure to call this function will result in an immediate panic.
|
||||
pub async fn wrap_poll<Fut: Future>(
|
||||
f: Fut,
|
||||
mut cb: impl FnMut(Box<dyn FnOnce() -> bool + '_>),
|
||||
) -> Fut::Output {
|
||||
let mut f = pin!(f);
|
||||
futures::future::poll_fn(|cx| {
|
||||
let poll = RefCell::new(None);
|
||||
cb(Box::new(|| {
|
||||
let poll1 = f.as_mut().poll(cx);
|
||||
let ret = poll1.is_ready();
|
||||
*poll.borrow_mut() = Some(poll1);
|
||||
ret
|
||||
}));
|
||||
poll.into_inner().expect("Callback to on_poll failed to call its argument")
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Respond to [Stream::poll_next] with a callback. The semantics of the
|
||||
/// callback are identical to that in [wrap_poll]
|
||||
pub fn wrap_poll_next<'a, S: Stream + 'a>(
|
||||
s: S,
|
||||
mut cb: impl FnMut(Box<dyn FnOnce() -> bool + '_>) + 'a,
|
||||
) -> impl Stream<Item = S::Item> + 'a {
|
||||
let mut s = Box::pin(s);
|
||||
futures::stream::poll_fn(move |cx| {
|
||||
let poll = RefCell::new(None);
|
||||
cb(Box::new(|| {
|
||||
let poll1 = s.as_mut().poll_next(cx);
|
||||
let ret = poll1.is_ready();
|
||||
*poll.borrow_mut() = Some(poll1);
|
||||
ret
|
||||
}));
|
||||
poll.into_inner().expect("Callback to on_poll failed to call its argument")
|
||||
})
|
||||
}
|
||||
|
||||
/// Attach a callback to the [Stream] protocol for testing and debugging.
|
||||
pub fn on_stream_wake<'a, S: Stream + 'a>(
|
||||
s: S,
|
||||
wake: impl Fn() + Clone + Send + Sync + 'static,
|
||||
) -> impl Stream<Item = S::Item> {
|
||||
let mut s = Box::pin(s);
|
||||
futures::stream::poll_fn(move |cx| {
|
||||
let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into();
|
||||
s.as_mut().poll_next(&mut Context::from_waker(&waker))
|
||||
})
|
||||
}
|
||||
|
||||
task_local! {
|
||||
static LABEL_STATE: Vec<Rc<String>>
|
||||
}
|
||||
|
||||
/// Add a label to the "label stack" for the duration of a future that helps you
|
||||
/// efficiently visualize important aspects of the call stack during logging
|
||||
pub async fn with_label<Fut: Future>(label: &str, f: Fut) -> Fut::Output {
|
||||
let mut new_lbl = LABEL_STATE.try_with(|lbl| lbl.clone()).unwrap_or_default();
|
||||
new_lbl.push(Rc::new(label.to_string()));
|
||||
LABEL_STATE.scope(new_lbl, f).await
|
||||
}
|
||||
|
||||
/// Allows to print the label stack
|
||||
pub fn label() -> impl Display + Clone + Send + Sync + 'static {
|
||||
LABEL_STATE.try_with(|lbl| lbl.iter().join("/")).unwrap_or("".to_string())
|
||||
}
|
||||
|
||||
/// Displays the label stack when printed
|
||||
pub struct Label;
|
||||
impl Display for Label {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", label()) }
|
||||
}
|
||||
|
||||
/// Attaches generic eprintln handlers to a future
|
||||
pub async fn eprint_events<Fut: Future>(note: &str, f: Fut) -> Fut::Output {
|
||||
let label = label();
|
||||
let note1 = note.to_string();
|
||||
on_wake(
|
||||
wrap_poll(f, |cb| {
|
||||
eprintln!("{Label} polling {note}");
|
||||
eprintln!("{Label} polled {note} (ready? {})", cb())
|
||||
}),
|
||||
move || eprintln!("{label} woke {note1}"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Attaches generic eprintln handlers to a stream
|
||||
pub fn eprint_stream_events<'a, S: Stream + 'a>(
|
||||
note: &'a str,
|
||||
s: S,
|
||||
) -> impl Stream<Item = S::Item> + 'a {
|
||||
let label = label();
|
||||
let note1 = note.to_string();
|
||||
on_stream_wake(
|
||||
wrap_poll_next(s, move |cb| {
|
||||
eprintln!("{Label} polling {note}");
|
||||
eprintln!("{Label} polled {note} (ready? {})", cb())
|
||||
}),
|
||||
move || eprintln!("{label} woke {note1}"),
|
||||
)
|
||||
}
|
||||
|
||||
struct SpinWaker(AtomicBool);
|
||||
impl Wake for SpinWaker {
|
||||
fn wake(self: Arc<Self>) { self.0.store(true, Ordering::Relaxed); }
|
||||
}
|
||||
|
||||
/// A dumb executor that keeps synchronously re-running the future as long as it
|
||||
/// keeps synchronously waking itself. This is useful for deterministic tests
|
||||
/// that don't contain side effects or threading.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the future doesn't wake itself and doesn't settle.
|
||||
pub fn spin_on<Fut: Future>(f: Fut) -> Fut::Output {
|
||||
let repeat = Arc::new(SpinWaker(AtomicBool::new(false)));
|
||||
let mut f = pin!(f);
|
||||
let waker = repeat.clone().into();
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
loop {
|
||||
match f.as_mut().poll(&mut cx) {
|
||||
Poll::Ready(t) => break t,
|
||||
Poll::Pending if repeat.0.swap(false, Ordering::Relaxed) => (),
|
||||
Poll::Pending => panic!("The future did not exit and did not call its waker."),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create an object that will panic if dropped. [PanicOnDrop::defuse] must be
|
||||
/// called once the particular constraint preventing a drop has passed
|
||||
pub fn assert_no_drop(msg: &'static str) -> PanicOnDrop { PanicOnDrop(true, msg) }
|
||||
|
||||
/// This object will panic if dropped. Call [Self::defuse] when dropping is safe
|
||||
/// again
|
||||
pub struct PanicOnDrop(bool, &'static str);
|
||||
impl PanicOnDrop {
|
||||
/// Allow dropping the object without causing a panic
|
||||
pub fn defuse(mut self) { self.0 = false; }
|
||||
}
|
||||
impl Drop for PanicOnDrop {
|
||||
fn drop(&mut self) { assert!(panicking() || !self.0, "{}", self.1) }
|
||||
}
|
||||
5
orchid-async-utils/src/lib.rs
Normal file
5
orchid-async-utils/src/lib.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod debug;
|
||||
mod localset;
|
||||
pub use localset::*;
|
||||
mod task_future;
|
||||
pub use task_future::*;
|
||||
48
orchid-async-utils/src/localset.rs
Normal file
48
orchid-async-utils/src/localset.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
|
||||
use futures::StreamExt;
|
||||
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
|
||||
use futures::future::LocalBoxFuture;
|
||||
|
||||
pub struct LocalSet<'a, E> {
|
||||
receiver: UnboundedReceiver<LocalBoxFuture<'a, Result<(), E>>>,
|
||||
pending: VecDeque<LocalBoxFuture<'a, Result<(), E>>>,
|
||||
}
|
||||
impl<'a, E> LocalSet<'a, E> {
|
||||
pub fn new() -> (UnboundedSender<LocalBoxFuture<'a, Result<(), E>>>, Self) {
|
||||
let (sender, receiver) = unbounded();
|
||||
(sender, Self { receiver, pending: VecDeque::new() })
|
||||
}
|
||||
}
|
||||
impl<E> Future for LocalSet<'_, E> {
|
||||
type Output = Result<(), E>;
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
let mut any_pending = false;
|
||||
loop {
|
||||
match this.receiver.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(fut)) => this.pending.push_back(fut),
|
||||
Poll::Ready(None) => break,
|
||||
Poll::Pending => {
|
||||
any_pending = true;
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
let count = this.pending.len();
|
||||
for _ in 0..count {
|
||||
let mut req = this.pending.pop_front().unwrap();
|
||||
match req.as_mut().poll(cx) {
|
||||
Poll::Ready(Ok(())) => (),
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
||||
Poll::Pending => {
|
||||
any_pending = true;
|
||||
this.pending.push_back(req)
|
||||
},
|
||||
}
|
||||
}
|
||||
if any_pending { Poll::Pending } else { Poll::Ready(Ok(())) }
|
||||
}
|
||||
}
|
||||
92
orchid-async-utils/src/task_future.rs
Normal file
92
orchid-async-utils/src/task_future.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use std::any::Any;
|
||||
use std::cell::RefCell;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::task::{Context, Poll, Waker};
|
||||
|
||||
use futures::future::{FusedFuture, LocalBoxFuture};
|
||||
|
||||
struct State {
|
||||
work: Option<LocalBoxFuture<'static, Box<dyn Any>>>,
|
||||
result: Option<Box<dyn Any>>,
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
/// A fused future that can be passed to a non-polymorphic executor that doesn't
|
||||
/// process results and doesn't return handles
|
||||
pub struct Pollable(Rc<RefCell<State>>);
|
||||
impl FusedFuture for Pollable {
|
||||
fn is_terminated(&self) -> bool {
|
||||
let g = self.0.borrow();
|
||||
g.work.is_none() || g.result.is_some()
|
||||
}
|
||||
}
|
||||
impl Future for Pollable {
|
||||
type Output = ();
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut g = self.0.borrow_mut();
|
||||
match &mut *g {
|
||||
State { result: Some(_), .. } | State { work: None, .. } => Poll::Ready(()),
|
||||
State { work: Some(work), waker, result } => match work.as_mut().poll(cx) {
|
||||
Poll::Pending => {
|
||||
waker.clone_from(cx.waker());
|
||||
Poll::Pending
|
||||
},
|
||||
Poll::Ready(val) => {
|
||||
*result = Some(val);
|
||||
g.work = None;
|
||||
Poll::Ready(())
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An object that can be used to inspect the state of the task
|
||||
pub struct Handle<T: 'static>(Rc<RefCell<State>>, PhantomData<T>);
|
||||
impl<T: 'static> Handle<T> {
|
||||
/// Immediately stop working on this task, and return the result if it has
|
||||
/// already finished
|
||||
pub fn abort(&self) -> Option<T> {
|
||||
let mut g = self.0.borrow_mut();
|
||||
g.work.take();
|
||||
match g.result.take() {
|
||||
Some(val) => Some(*val.downcast().expect("Mismatch between type of future and handle")),
|
||||
None => {
|
||||
g.waker.wake_by_ref();
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
/// Determine if there's any more work to do on this task
|
||||
pub fn is_finished(&self) -> bool {
|
||||
let g = self.0.borrow();
|
||||
g.result.is_some() || g.work.is_none()
|
||||
}
|
||||
/// "finish" the freestanding task, and return the future instead
|
||||
pub async fn join(self) -> T {
|
||||
let work = {
|
||||
let mut g = self.0.borrow_mut();
|
||||
if let Some(val) = g.result.take() {
|
||||
return *val.downcast().expect("Mistmatch between type of future and handle");
|
||||
}
|
||||
g.waker.wake_by_ref();
|
||||
g.work.take().expect("Attempted to join task that was already aborted")
|
||||
};
|
||||
*work.await.downcast().expect("Mismatch between type of future and handle")
|
||||
}
|
||||
}
|
||||
|
||||
/// Split a future into an object that can be polled and one that returns
|
||||
/// information on its progress and its result. The first one can be passed to
|
||||
/// an executor or localset, the second can be used to manage it
|
||||
pub fn to_task<F: Future<Output: 'static> + 'static>(f: F) -> (Pollable, Handle<F::Output>) {
|
||||
let dyn_future = Box::pin(async { Box::new(f.await) as Box<dyn Any> });
|
||||
let state = Rc::new(RefCell::new(State {
|
||||
result: None,
|
||||
work: Some(dyn_future),
|
||||
waker: Waker::noop().clone(),
|
||||
}));
|
||||
(Pollable(state.clone()), Handle(state, PhantomData))
|
||||
}
|
||||
Reference in New Issue
Block a user