forked from Orchid/orchid
Unit tests pass
Fixed a nasty deadlock in reqnot
This commit is contained in:
@@ -3,13 +3,13 @@ use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
|
||||
|
||||
use orchid_api::binary::{FutureBin, FutureContextBin, OwnedWakerBin, UnitPoll};
|
||||
use crate::api;
|
||||
|
||||
type WideBox = Box<dyn Future<Output = ()>>;
|
||||
|
||||
static OWNED_VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
|data| {
|
||||
let data = unsafe { Rc::<OwnedWakerBin>::from_raw(data as *const _) };
|
||||
let data = unsafe { Rc::<api::binary::OwnedWakerBin>::from_raw(data as *const _) };
|
||||
let val = RawWaker::new(Rc::into_raw(data.clone()) as *const (), &OWNED_VTABLE);
|
||||
// Clone must create a duplicate of the Rc, so it has to be un-leaked, cloned,
|
||||
// then leaked again.
|
||||
@@ -19,19 +19,19 @@ static OWNED_VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
|data| {
|
||||
// Wake must awaken the task and then clean up the state, so the waker must be
|
||||
// un-leaked
|
||||
let data = unsafe { Rc::<OwnedWakerBin>::from_raw(data as *const _) };
|
||||
let data = unsafe { Rc::<api::binary::OwnedWakerBin>::from_raw(data as *const _) };
|
||||
(data.wake)(data.data);
|
||||
mem::drop(data);
|
||||
},
|
||||
|data| {
|
||||
// Wake-by-ref must awaken the task while preserving the future, so the Rc is
|
||||
// untouched
|
||||
let data = unsafe { (data as *const OwnedWakerBin).as_ref() }.unwrap();
|
||||
let data = unsafe { (data as *const api::binary::OwnedWakerBin).as_ref() }.unwrap();
|
||||
(data.wake_ref)(data.data);
|
||||
},
|
||||
|data| {
|
||||
// Drop must clean up the state, so the waker must be un-leaked
|
||||
let data = unsafe { Rc::<OwnedWakerBin>::from_raw(data as *const _) };
|
||||
let data = unsafe { Rc::<api::binary::OwnedWakerBin>::from_raw(data as *const _) };
|
||||
(data.drop)(data.data);
|
||||
mem::drop(data);
|
||||
},
|
||||
@@ -39,12 +39,12 @@ static OWNED_VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
|
||||
struct BorrowedWakerData<'a> {
|
||||
go_around: &'a mut bool,
|
||||
cx: FutureContextBin,
|
||||
cx: api::binary::FutureContextBin,
|
||||
}
|
||||
static BORROWED_VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
|data| {
|
||||
let data = unsafe { (data as *mut BorrowedWakerData).as_mut() }.unwrap();
|
||||
let owned_data = Rc::<OwnedWakerBin>::new((data.cx.waker)(data.cx.data));
|
||||
let owned_data = Rc::<api::binary::OwnedWakerBin>::new((data.cx.waker)(data.cx.data));
|
||||
RawWaker::new(Rc::into_raw(owned_data) as *const (), &OWNED_VTABLE)
|
||||
},
|
||||
|data| *unsafe { (data as *mut BorrowedWakerData).as_mut() }.unwrap().go_around = true,
|
||||
@@ -54,13 +54,13 @@ static BORROWED_VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
|
||||
/// Convert a future to a binary-compatible format that can be sent across
|
||||
/// dynamic library boundaries
|
||||
pub fn future_to_vt<Fut: Future<Output = ()> + 'static>(fut: Fut) -> FutureBin {
|
||||
pub fn future_to_vt<Fut: Future<Output = ()> + 'static>(fut: Fut) -> api::binary::FutureBin {
|
||||
let wide_box = Box::new(fut) as WideBox;
|
||||
let data = Box::into_raw(Box::new(wide_box));
|
||||
extern "C" fn drop(raw: *const ()) {
|
||||
mem::drop(unsafe { Box::<WideBox>::from_raw(raw as *mut _) })
|
||||
}
|
||||
extern "C" fn poll(raw: *const (), cx: FutureContextBin) -> UnitPoll {
|
||||
extern "C" fn poll(raw: *const (), cx: api::binary::FutureContextBin) -> api::binary::UnitPoll {
|
||||
let mut this = unsafe { Pin::new_unchecked(&mut **(raw as *mut WideBox).as_mut().unwrap()) };
|
||||
loop {
|
||||
let mut go_around = false;
|
||||
@@ -73,27 +73,27 @@ pub fn future_to_vt<Fut: Future<Output = ()> + 'static>(fut: Fut) -> FutureBin {
|
||||
let mut ctx = Context::from_waker(&borrowed_waker);
|
||||
let result = this.as_mut().poll(&mut ctx);
|
||||
if matches!(result, Poll::Ready(())) {
|
||||
break UnitPoll::Ready;
|
||||
break api::binary::UnitPoll::Ready;
|
||||
}
|
||||
if !go_around {
|
||||
break UnitPoll::Pending;
|
||||
break api::binary::UnitPoll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
FutureBin { data: data as *const _, drop, poll }
|
||||
api::binary::FutureBin { data: data as *const _, drop, poll }
|
||||
}
|
||||
|
||||
struct VirtualFuture {
|
||||
vt: FutureBin,
|
||||
vt: api::binary::FutureBin,
|
||||
}
|
||||
impl Unpin for VirtualFuture {}
|
||||
impl Future for VirtualFuture {
|
||||
type Output = ();
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
extern "C" fn waker(raw: *const ()) -> OwnedWakerBin {
|
||||
extern "C" fn waker(raw: *const ()) -> api::binary::OwnedWakerBin {
|
||||
let waker = unsafe { (raw as *mut Context).as_mut() }.unwrap().waker().clone();
|
||||
let data = Box::into_raw(Box::<Waker>::new(waker)) as *const ();
|
||||
return OwnedWakerBin { data, drop, wake, wake_ref };
|
||||
return api::binary::OwnedWakerBin { data, drop, wake, wake_ref };
|
||||
extern "C" fn drop(raw: *const ()) {
|
||||
mem::drop(unsafe { Box::<Waker>::from_raw(raw as *mut Waker) })
|
||||
}
|
||||
@@ -104,11 +104,11 @@ impl Future for VirtualFuture {
|
||||
unsafe { (raw as *mut Waker).as_mut() }.unwrap().wake_by_ref();
|
||||
}
|
||||
}
|
||||
let cx = FutureContextBin { data: cx as *mut Context as *const (), waker };
|
||||
let cx = api::binary::FutureContextBin { data: cx as *mut Context as *const (), waker };
|
||||
let result = (self.vt.poll)(self.vt.data, cx);
|
||||
match result {
|
||||
UnitPoll::Pending => Poll::Pending,
|
||||
UnitPoll::Ready => Poll::Ready(()),
|
||||
api::binary::UnitPoll::Pending => Poll::Pending,
|
||||
api::binary::UnitPoll::Ready => Poll::Ready(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -118,4 +118,4 @@ impl Drop for VirtualFuture {
|
||||
|
||||
/// Receive a future sent across dynamic library boundaries and convert it into
|
||||
/// an owned object
|
||||
pub fn vt_to_future(vt: FutureBin) -> impl Future<Output = ()> { VirtualFuture { vt } }
|
||||
pub fn vt_to_future(vt: api::binary::FutureBin) -> impl Future<Output = ()> { VirtualFuture { vt } }
|
||||
|
||||
157
orchid-base/src/future_debug.rs
Normal file
157
orchid-base/src/future_debug.rs
Normal file
@@ -0,0 +1,157 @@
|
||||
use std::cell::RefCell;
|
||||
use std::fmt::Display;
|
||||
use std::pin::pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::task::{Context, Poll, Wake, Waker};
|
||||
|
||||
use futures::Stream;
|
||||
use itertools::Itertools;
|
||||
use task_local::task_local;
|
||||
|
||||
struct OnPollWaker<F: Fn() + 'static>(Waker, F);
|
||||
impl<F: Fn() + 'static> Wake for OnPollWaker<F> {
|
||||
fn wake(self: Arc<Self>) {
|
||||
(self.1)();
|
||||
self.0.wake_by_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Attach a callback to the [Future] protocol for testing and debugging. Note
|
||||
/// that this function is safe and simple in order to facilitate debugging
|
||||
/// without adding more points of failure, but it's not fast; it performs a heap
|
||||
/// allocation on each poll of the returned future.
|
||||
pub async fn on_wake<F: Future>(
|
||||
f: F,
|
||||
wake: impl Fn() + Clone + Send + Sync + 'static,
|
||||
) -> F::Output {
|
||||
let mut f = pin!(f);
|
||||
futures::future::poll_fn(|cx| {
|
||||
let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into();
|
||||
f.as_mut().poll(&mut Context::from_waker(&waker))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Respond to [Future::poll] with a callback. For maximum flexibility and state
|
||||
/// control, your callback receives the actual poll job as a callback function.
|
||||
/// Failure to call this function will result in an immediate panic.
|
||||
pub async fn wrap_poll<Fut: Future>(
|
||||
f: Fut,
|
||||
mut cb: impl FnMut(Box<dyn FnOnce() -> bool + '_>),
|
||||
) -> Fut::Output {
|
||||
let mut f = pin!(f);
|
||||
futures::future::poll_fn(|cx| {
|
||||
let poll = RefCell::new(None);
|
||||
cb(Box::new(|| {
|
||||
let poll1 = f.as_mut().poll(cx);
|
||||
let ret = poll1.is_ready();
|
||||
*poll.borrow_mut() = Some(poll1);
|
||||
ret
|
||||
}));
|
||||
poll.into_inner().expect("Callback to on_poll failed to call its argument")
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn wrap_poll_next<'a, S: Stream + 'a>(
|
||||
s: S,
|
||||
mut cb: impl FnMut(Box<dyn FnOnce() -> bool + '_>) + 'a,
|
||||
) -> impl Stream<Item = S::Item> + 'a {
|
||||
let mut s = Box::pin(s);
|
||||
futures::stream::poll_fn(move |cx| {
|
||||
let poll = RefCell::new(None);
|
||||
cb(Box::new(|| {
|
||||
let poll1 = s.as_mut().poll_next(cx);
|
||||
let ret = poll1.is_ready();
|
||||
*poll.borrow_mut() = Some(poll1);
|
||||
ret
|
||||
}));
|
||||
poll.into_inner().expect("Callback to on_poll failed to call its argument")
|
||||
})
|
||||
}
|
||||
|
||||
pub fn on_stream_wake<'a, S: Stream + 'a>(
|
||||
s: S,
|
||||
wake: impl Fn() + Clone + Send + Sync + 'static,
|
||||
) -> impl Stream<Item = S::Item> {
|
||||
let mut s = Box::pin(s);
|
||||
futures::stream::poll_fn(move |cx| {
|
||||
let waker = Arc::new(OnPollWaker(cx.waker().clone(), wake.clone())).into();
|
||||
s.as_mut().poll_next(&mut Context::from_waker(&waker))
|
||||
})
|
||||
}
|
||||
|
||||
task_local! {
|
||||
static LABEL_STATE: Vec<Rc<String>>
|
||||
}
|
||||
|
||||
pub async fn with_label<Fut: Future>(label: &str, f: Fut) -> Fut::Output {
|
||||
let mut new_lbl = LABEL_STATE.try_with(|lbl| lbl.clone()).unwrap_or_default();
|
||||
new_lbl.push(Rc::new(label.to_string()));
|
||||
LABEL_STATE.scope(new_lbl, f).await
|
||||
}
|
||||
|
||||
pub fn label() -> impl Display + Clone + Send + Sync + 'static {
|
||||
LABEL_STATE.try_with(|lbl| lbl.iter().join("/")).unwrap_or("".to_string())
|
||||
}
|
||||
|
||||
pub struct Label;
|
||||
impl Display for Label {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", label()) }
|
||||
}
|
||||
|
||||
pub async fn eprint_events<Fut: Future>(note: &str, f: Fut) -> Fut::Output {
|
||||
let label = label();
|
||||
let note1 = note.to_string();
|
||||
on_wake(
|
||||
wrap_poll(f, |cb| {
|
||||
eprintln!("{Label} polling {note}");
|
||||
eprintln!("{Label} polled {note} (ready? {})", cb())
|
||||
}),
|
||||
move || eprintln!("{label} woke {note1}"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn eprint_stream_events<'a, S: Stream + 'a>(
|
||||
note: &'a str,
|
||||
s: S,
|
||||
) -> impl Stream<Item = S::Item> + 'a {
|
||||
let label = label();
|
||||
let note1 = note.to_string();
|
||||
on_stream_wake(
|
||||
wrap_poll_next(s, move |cb| {
|
||||
eprintln!("{Label} polling {note}");
|
||||
eprintln!("{Label} polled {note} (ready? {})", cb())
|
||||
}),
|
||||
move || eprintln!("{label} woke {note1}"),
|
||||
)
|
||||
}
|
||||
|
||||
struct SpinWaker(AtomicBool);
|
||||
impl Wake for SpinWaker {
|
||||
fn wake(self: Arc<Self>) { self.0.store(true, Ordering::Relaxed); }
|
||||
}
|
||||
|
||||
/// A dumb executor that keeps synchronously re-running the future as long as it
|
||||
/// keeps synchronously waking itself.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the future doesn't wake itself and doesn't settle. This is useful for
|
||||
/// deterministic tests that don't contain side effects or threading.
|
||||
pub fn spin_on<Fut: Future>(f: Fut) -> Fut::Output {
|
||||
let repeat = Arc::new(SpinWaker(AtomicBool::new(false)));
|
||||
let mut f = pin!(f);
|
||||
let waker = repeat.clone().into();
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
loop {
|
||||
match f.as_mut().poll(&mut cx) {
|
||||
Poll::Ready(t) => break t,
|
||||
Poll::Pending if repeat.0.swap(false, Ordering::Relaxed) => (),
|
||||
Poll::Pending => panic!("The future did not exit and did not call its waker."),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ pub mod combine;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
pub mod format;
|
||||
pub mod future_debug;
|
||||
pub mod id_store;
|
||||
pub mod interner;
|
||||
pub mod iter_utils;
|
||||
|
||||
@@ -23,12 +23,12 @@ impl<E> Future for LocalSet<'_, E> {
|
||||
let mut any_pending = false;
|
||||
loop {
|
||||
match this.receiver.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(fut)) => this.pending.push_back(fut),
|
||||
Poll::Ready(None) => break,
|
||||
Poll::Pending => {
|
||||
any_pending = true;
|
||||
break;
|
||||
},
|
||||
Poll::Ready(None) => break,
|
||||
Poll::Ready(Some(fut)) => this.pending.push_back(fut),
|
||||
}
|
||||
}
|
||||
let count = this.pending.len();
|
||||
|
||||
@@ -71,4 +71,7 @@ pub mod test {
|
||||
Self(Rc::new(move |s| clone!(f; Box::pin(async move { f(s).await }))))
|
||||
}
|
||||
}
|
||||
impl Default for TestLogger {
|
||||
fn default() -> Self { TestLogger::new(async |s| eprint!("{s}")) }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,12 +299,10 @@ impl<'a> MsgWriter<'a> for IoNotifWriter {
|
||||
|
||||
pub struct CommCtx {
|
||||
exit: Sender<()>,
|
||||
o: Rc<Mutex<Pin<Box<dyn AsyncWrite>>>>,
|
||||
}
|
||||
|
||||
impl CommCtx {
|
||||
pub async fn exit(self) -> io::Result<()> {
|
||||
self.o.lock().await.as_mut().close().await?;
|
||||
self.exit.clone().send(()).await.expect("quit channel dropped");
|
||||
Ok(())
|
||||
}
|
||||
@@ -325,7 +323,7 @@ pub fn io_comm(
|
||||
let o = Rc::new(Mutex::new(o));
|
||||
let (onsub, client) = IoClient::new(o.clone());
|
||||
let (exit, onexit) = channel(1);
|
||||
(client, CommCtx { exit, o: o.clone() }, IoCommServer { o, i, onsub, onexit })
|
||||
(client, CommCtx { exit }, IoCommServer { o, i, onsub, onexit })
|
||||
}
|
||||
pub struct IoCommServer {
|
||||
o: Rc<Mutex<Pin<Box<dyn AsyncWrite>>>>,
|
||||
@@ -345,7 +343,6 @@ impl IoCommServer {
|
||||
Sub(ReplySub),
|
||||
Exit,
|
||||
}
|
||||
let exiting = RefCell::new(false);
|
||||
let input_stream = try_stream(async |mut h| {
|
||||
loop {
|
||||
let mut g = Bound::async_new(i.clone(), async |i| i.lock().await).await;
|
||||
@@ -361,27 +358,21 @@ impl IoCommServer {
|
||||
}
|
||||
});
|
||||
let (mut add_pending_req, fork_future) = LocalSet::new();
|
||||
let mut fork_stream = pin!(fork_future.fuse().into_stream());
|
||||
let mut fork_stream = pin!(fork_future.into_stream());
|
||||
let mut pending_replies = HashMap::new();
|
||||
'body: {
|
||||
let mut shared = pin!(stream_select!(
|
||||
let mut shared = stream_select! {
|
||||
pin!(input_stream) as Pin<&mut dyn Stream<Item = io::Result<Event>>>,
|
||||
onsub.map(|sub| Ok(Event::Sub(sub))),
|
||||
fork_stream.as_mut().map(|res| {
|
||||
res.map(|()| panic!("this substream cannot exit while the loop is running"))
|
||||
res.map(|()| panic!("this substream cannot exit while the loop is running") as Event)
|
||||
}),
|
||||
onexit.map(|()| Ok(Event::Exit)),
|
||||
));
|
||||
};
|
||||
while let Some(next) = shared.next().await {
|
||||
match next {
|
||||
Err(e) => break 'body Err(e),
|
||||
Ok(Event::Exit) => {
|
||||
*exiting.borrow_mut() = true;
|
||||
let mut out = o.lock().await;
|
||||
out.as_mut().flush().await?;
|
||||
out.as_mut().close().await?;
|
||||
break;
|
||||
},
|
||||
Ok(Event::Exit) => break,
|
||||
Ok(Event::Sub(ReplySub { id, ack, cb })) => {
|
||||
pending_replies.insert(id, cb);
|
||||
ack.send(()).unwrap();
|
||||
@@ -415,6 +406,9 @@ impl IoCommServer {
|
||||
while let Some(next) = fork_stream.next().await {
|
||||
next?
|
||||
}
|
||||
let mut out = o.lock().await;
|
||||
out.as_mut().flush().await?;
|
||||
out.as_mut().close().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -427,11 +421,9 @@ mod test {
|
||||
use futures::{SinkExt, StreamExt, join};
|
||||
use orchid_api_derive::{Coding, Hierarchy};
|
||||
use orchid_api_traits::Request;
|
||||
use test_executors::spin_on;
|
||||
use unsync_pipe::pipe;
|
||||
|
||||
use crate::logging::test::TestLogger;
|
||||
use crate::logging::with_logger;
|
||||
use crate::future_debug::spin_on;
|
||||
use crate::reqnot::{ClientExt, MsgReaderExt, ReqReaderExt, io_comm};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Coding, Hierarchy)]
|
||||
@@ -440,8 +432,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn notification() {
|
||||
let logger = TestLogger::new(async |s| eprint!("{s}"));
|
||||
spin_on(with_logger(logger, async {
|
||||
spin_on(async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (received, mut on_receive) = mpsc::channel(2);
|
||||
@@ -468,7 +459,7 @@ mod test {
|
||||
recv_ctx.exit().await.unwrap();
|
||||
}
|
||||
);
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Coding, Hierarchy)]
|
||||
@@ -480,8 +471,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn request() {
|
||||
let logger = TestLogger::new(async |s| eprint!("{s}"));
|
||||
spin_on(with_logger(logger, async {
|
||||
spin_on(async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2));
|
||||
@@ -515,13 +505,12 @@ mod test {
|
||||
client_ctx.exit().await.unwrap();
|
||||
}
|
||||
);
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exit() {
|
||||
let logger = TestLogger::new(async |s| eprint!("{s}"));
|
||||
spin_on(with_logger(logger, async {
|
||||
spin_on(async {
|
||||
let (input1, output1) = pipe(1024);
|
||||
let (input2, output2) = pipe(1024);
|
||||
let (reply_client, reply_context, reply_server) =
|
||||
@@ -565,6 +554,6 @@ mod test {
|
||||
onexit.await.unwrap();
|
||||
}
|
||||
)
|
||||
}));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user