diff --git a/orchid-async-utils/src/cancel_cleanup.rs b/orchid-async-utils/src/cancel_cleanup.rs index 22aebe2..4ae56e1 100644 --- a/orchid-async-utils/src/cancel_cleanup.rs +++ b/orchid-async-utils/src/cancel_cleanup.rs @@ -62,7 +62,7 @@ mod test { #[test] fn not_called_if_finished() { - spin_on(false, async { + spin_on(async { let (mut req_in, mut req_out) = mpsc::channel(0); let (mut rep_in, mut rep_out) = mpsc::channel(0); join( diff --git a/orchid-async-utils/src/debug.rs b/orchid-async-utils/src/debug.rs index 6b01aa2..6a0a455 100644 --- a/orchid-async-utils/src/debug.rs +++ b/orchid-async-utils/src/debug.rs @@ -143,15 +143,26 @@ pub fn eprint_stream_events<'a, S: Stream + 'a>( ) } +thread_local! { + static WAKE_LODUD: RefCell = const { RefCell::new(false) }; +} + +/// Equivalent to [spin_on], but also logs on wake +pub fn spin_on_loud(fut: Fut) -> Fut::Output { + let prev = WAKE_LODUD.replace(true); + let ret = spin_on(fut); + WAKE_LODUD.set(prev); + ret +} + struct SpinWaker { repeat: AtomicBool, - loud: bool, } impl Wake for SpinWaker { fn wake(self: Arc) { self.repeat.store(true, Ordering::SeqCst); - if self.loud { - eprintln!("Triggered repeat for spin_on") + if WAKE_LODUD.with_borrow(|k| *k) { + eprintln!("{Label} Triggered repeat for spin_on") } } } @@ -160,11 +171,13 @@ impl Wake for SpinWaker { /// keeps synchronously waking itself. This is useful for deterministic tests /// that don't contain side effects or threading. /// +/// Use [spin_on_loud] to get messages on wake for debugging +/// /// # Panics /// /// If the future doesn't wake itself and doesn't settle. -pub fn spin_on(loud: bool, f: Fut) -> Fut::Output { - let spin_waker = Arc::new(SpinWaker { repeat: AtomicBool::new(false), loud }); +pub fn spin_on(f: Fut) -> Fut::Output { + let spin_waker = Arc::new(SpinWaker { repeat: AtomicBool::new(false) }); let mut f = pin!(f); let waker = spin_waker.clone().into(); let mut cx = Context::from_waker(&waker); diff --git a/orchid-base/src/comm.rs b/orchid-base/src/comm.rs index f840047..abcdb70 100644 --- a/orchid-base/src/comm.rs +++ b/orchid-base/src/comm.rs @@ -354,7 +354,7 @@ struct ReplySub { cb: oneshot::Sender, } -struct IoClient { +pub struct IoClient { output: IoLock, id: Rc>, subscribe: Rc>, @@ -491,30 +491,33 @@ impl MsgWriter for IoNotifWriter { } } -pub struct CommCtx { +pub struct CommCx { exit: Sender<()>, } -impl CommCtx { +impl CommCx { pub async fn exit(self) -> io::Result<()> { self.exit.clone().send(()).await.expect("quit channel dropped"); Ok(()) } } +pub struct IoComm { + pub client: IoClient, + pub cx: CommCx, + pub srv: IoCommServer, +} + /// Establish bidirectional request-notification communication over a duplex /// channel. The returned [IoClient] can be used for notifications immediately, /// but requests can only be received while the future is running. The future /// will only resolve when [CommCtx::exit] is called. -pub fn io_comm( - o: Pin>, - i: Pin>, -) -> (impl Client + 'static, CommCtx, IoCommServer) { +pub fn io_comm(o: Pin>, i: Pin>) -> IoComm { let i = Rc::new(Mutex::new(i)); let o = Rc::new(Mutex::new(o)); let (onsub, client) = IoClient::new(o.clone()); let (exit, onexit) = channel(1); - (client, CommCtx { exit }, IoCommServer { o, i, onsub, onexit }) + IoComm { client, cx: CommCx { exit }, srv: IoCommServer { o, i, onsub, onexit } } } pub struct IoCommServer { o: Rc>>>, @@ -657,53 +660,61 @@ impl IoCommServer { } #[cfg(test)] -mod test { +pub mod test { use std::cell::RefCell; use futures::channel::mpsc; - use futures::{FutureExt, SinkExt, StreamExt, join, select}; + use futures::future::{join3, select}; + use futures::{SinkExt, StreamExt}; use orchid_api_derive::{Coding, Hierarchy}; use orchid_api_traits::Request; use orchid_async_utils::debug::{spin_on, with_label}; use unsync_pipe::pipe; - use crate::comm::{ClientExt, MsgReaderExt, ReqReaderExt, io_comm}; + use super::*; use crate::with_stash; + pub fn test_pair() -> (IoComm, IoComm) { + let (right_in, left_out) = pipe(1024); + let (left_in, right_out) = pipe(1024); + ( + io_comm(Box::pin(left_in), Box::pin(left_out)), + io_comm(Box::pin(right_in), Box::pin(right_out)), + ) + } + + pub async fn listen_no_ingress(srv: IoCommServer) { + srv + .listen( + async |_| panic!("Not expecting ingress notif"), + async |_| panic!("Not expecting ingress req"), + ) + .await + .unwrap() + } + #[derive(Clone, Debug, PartialEq, Coding, Hierarchy)] #[extendable] struct TestNotif(u64); #[test] fn notification() { - spin_on(false, async { - let (in1, out2) = pipe(1024); - let (in2, out1) = pipe(1024); - let (received, mut on_receive) = mpsc::channel(2); - let (_, recv_ctx, recv_srv) = io_comm(Box::pin(in2), Box::pin(out2)); - let (sender, ..) = io_comm(Box::pin(in1), Box::pin(out1)); - join!( - async { - recv_srv - .listen( - async |notif| { - received.clone().send(notif.read::().await?).await.unwrap(); - Ok(()) - }, - async |_| panic!("Should receive notif, not request"), - ) - .await - .unwrap() - }, - async { - sender.notify(TestNotif(3)).await.unwrap(); - assert_eq!(on_receive.next().await, Some(TestNotif(3))); - sender.notify(TestNotif(4)).await.unwrap(); - assert_eq!(on_receive.next().await, Some(TestNotif(4))); - recv_ctx.exit().await.unwrap(); - } - ); - }) + let (left, right) = test_pair(); + let (received, mut on_receive) = mpsc::channel(2); + let right_srv_fut = right.srv.listen( + async |notif| { + received.clone().send(notif.read::().await?).await.unwrap(); + Ok(()) + }, + async |_| panic!("Should receive notif, not request"), + ); + spin_on(join(async { right_srv_fut.await.unwrap() }, async { + left.client.notify(TestNotif(3)).await.unwrap(); + assert_eq!(on_receive.next().await, Some(TestNotif(3))); + left.client.notify(TestNotif(4)).await.unwrap(); + assert_eq!(on_receive.next().await, Some(TestNotif(4))); + right.cx.exit().await.unwrap(); + })); } #[derive(Clone, Debug, Coding, Hierarchy)] @@ -715,140 +726,144 @@ mod test { #[test] fn request() { - spin_on(false, async { - let (in1, out2) = pipe(1024); - let (in2, out1) = pipe(1024); - let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2)); - let (client, client_ctx, client_srv) = io_comm(Box::pin(in1), Box::pin(out1)); - join!( - async { - srv - .listen( - async |_| panic!("No notifs expected"), - async |mut req| { - let val = req.read_req::().await?; - req.reply(&val, val.0 + 1).await - }, - ) - .await - .unwrap() - }, - async { - client_srv - .listen( - async |_| panic!("Not expecting ingress notif"), - async |_| panic!("Not expecting ingress req"), - ) - .await - .unwrap() - }, - async { - let response = client.request(DummyRequest(5)).await.unwrap(); - assert_eq!(response, 6); - srv_ctx.exit().await.unwrap(); - client_ctx.exit().await.unwrap(); - } - ); - }) + let (left, right) = test_pair(); + let right_srv_fut = right.srv.listen( + async |_| panic!("No notifs expected"), + async |mut req| { + let val = req.read_req::().await?; + req.reply(&val, val.0 + 1).await + }, + ); + let left_srv_fut = left.srv.listen( + async |_| panic!("Not expecting ingress notif"), + async |_| panic!("Not expecting ingress req"), + ); + spin_on(join3( + async { right_srv_fut.await.unwrap() }, + async { left_srv_fut.await.unwrap() }, + async { + let response = left.client.request(DummyRequest(5)).await.unwrap(); + assert_eq!(response, 6); + right.cx.exit().await.unwrap(); + left.cx.exit().await.unwrap(); + }, + )); } #[test] fn exit() { - spin_on(false, async { - let (input1, output1) = pipe(1024); - let (input2, output2) = pipe(1024); - let (reply_client, reply_context, reply_server) = - io_comm(Box::pin(input1), Box::pin(output2)); - let (req_client, req_context, req_server) = io_comm(Box::pin(input2), Box::pin(output1)); - let reply_context = RefCell::new(Some(reply_context)); - let (exit, onexit) = futures::channel::oneshot::channel::<()>(); - join!( - with_label("reply", async move { - reply_server - .listen( - async |hand| { - let _notif = hand.read::().await.unwrap(); - let context = reply_context.borrow_mut().take().unwrap(); - context.exit().await?; - Ok(()) - }, - async |mut hand| { - let req = hand.read_req::().await?; - hand.reply(&req, req.0 + 1).await - }, - ) - .await - .unwrap(); - exit.send(()).unwrap(); - let _client = reply_client; - }), - with_label("client", async move { - req_server - .listen( - async |_| panic!("Only the other server expected notifs"), - async |_| panic!("Only the other server expected requests"), - ) - .await - .unwrap(); - let _ctx = req_context; - }), - async move { - req_client.request(DummyRequest(0)).await.unwrap(); - req_client.notify(TestNotif(0)).await.unwrap(); - onexit.await.unwrap(); - } - ) - }); + let (left, right) = test_pair(); + let reply_context = RefCell::new(Some(right.cx)); + let (exit, onexit) = oneshot::channel::<()>(); + let right_srv_fut = right.srv.listen( + async |hand| { + let _notif = hand.read::().await.unwrap(); + let context = reply_context.borrow_mut().take().unwrap(); + context.exit().await?; + Ok(()) + }, + async |mut hand| { + let req = hand.read_req::().await?; + hand.reply(&req, req.0 + 1).await + }, + ); + let left_srv_fut = left.srv.listen( + async |_| panic!("Only the other server expected notifs"), + async |_| panic!("Only the other server expected requests"), + ); + spin_on(join3( + with_label("reply", async move { + right_srv_fut.await.unwrap(); + exit.send(()).unwrap(); + let _client = right.client; + }), + with_label("client", async move { + left_srv_fut.await.unwrap(); + let _ctx = left.cx; + }), + async move { + left.client.request(DummyRequest(0)).await.unwrap(); + left.client.notify(TestNotif(0)).await.unwrap(); + onexit.await.unwrap(); + }, + )); } #[test] fn timely_cancel() { - spin_on(false, async { - let (in1, out2) = pipe(1024); - let (in2, out1) = pipe(1024); - let (wait_in, mut wait_out) = mpsc::channel(0); - let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2)); - let (client, client_ctx, client_srv) = io_comm(Box::pin(in1), Box::pin(out1)); - join!( - with_label("server", async { - srv - .listen( - async |_| panic!("No notifs expected"), - async |mut req| { - let _ = req.read_req::().await?; - let _ = req.finish().await; - wait_in.clone().send(()).await.unwrap(); - // This will never return, so if the cancellation does not work, it would block - // the loop - futures::future::pending().await - }, - ) - .await - .unwrap(); - }), - with_label("client", async { - client_srv - .listen( - async |_| panic!("Not expecting ingress notif"), - async |_| panic!("Not expecting ingress req"), - ) - .await - .unwrap(); - }), + let (left, right) = test_pair(); + let (wait_in, mut wait_out) = mpsc::channel(0); + let right_srv_fut = right.srv.listen( + async |_| panic!("No notifs expected"), + async |mut req| { + let _ = req.read_req::().await?; + let _ = req.finish().await; + wait_in.clone().send(()).await.unwrap(); + // This will never return, so if the cancellation does not work, it would block + // the loop + futures::future::pending().await + }, + ); + spin_on(join3( + with_label("server", async { right_srv_fut.await.unwrap() }), + with_label("client", listen_no_ingress(left.srv)), + with_label( + "outer_stash", with_stash(async { with_stash(async { - select! { - _ = client.request(DummyRequest(5)).fuse() => { - panic!("This one should not run") - }, - rep = wait_out.next() => rep.expect("something?"), - } + select( + Box::pin(async { + left.client.request(DummyRequest(5)).await.unwrap(); + panic!("This one should not run"); + }), + Box::pin(async { + wait_out.next().await.expect("something?"); + }), + ) + .await; }) .await; - srv_ctx.exit().await.unwrap(); - client_ctx.exit().await.unwrap(); - }) - ); - }) + right.cx.exit().await.unwrap(); + left.cx.exit().await.unwrap(); + }), + ), + )); + } + + #[test] + fn late_cancel() { + let (left, right) = test_pair(); + let (send, mut recv) = mpsc::channel(0); + let right_srv_fut = right.srv.listen( + async |_| panic!("Expected a request"), + async |mut req| { + req.read_req::().await?; + let mut reply_writer = req.start_reply().await?; + let (stop_wait, wait) = oneshot::channel(); + send.clone().send(stop_wait).await.unwrap(); + wait.await.unwrap(); + (1 as ::Response).encode(reply_writer.writer()).await?; + reply_writer.finish().await + }, + ); + spin_on(join3( + async { right_srv_fut.await.unwrap() }, + with_label("client", listen_no_ingress(left.srv)), + async { + with_stash(Box::pin(async { + select( + Box::pin(async { + left.client.request(DummyRequest(5)).await.unwrap(); + panic!("This one should not run"); + }), + Box::pin(async { recv.next().await.unwrap().send(()) }), + ) + .await; + })) + .await; + right.cx.exit().await.unwrap(); + left.cx.exit().await.unwrap(); + }, + )); } } diff --git a/orchid-base/src/stash.rs b/orchid-base/src/stash.rs index d761899..fff9c2a 100644 --- a/orchid-base/src/stash.rs +++ b/orchid-base/src/stash.rs @@ -148,41 +148,34 @@ mod test { #[test] fn run_stashed_future() { let (mut send, recv) = mpsc::channel(0); - spin_on( - false, - join( - with_stash(async { - let mut send1 = send.clone(); + spin_on(join( + with_stash(async { + let mut send1 = send.clone(); + stash(async move { + send1.send(1).await.unwrap(); + }); + let mut send1 = send.clone(); + stash(async move { + let mut send2 = send1.clone(); stash(async move { - send1.send(1).await.unwrap(); + send2.send(2).await.unwrap(); }); - let mut send1 = send.clone(); + send1.send(3).await.unwrap(); stash(async move { - let mut send2 = send1.clone(); - stash(async move { - send2.send(2).await.unwrap(); - }); - send1.send(3).await.unwrap(); - stash(async move { - send1.send(4).await.unwrap(); - }) - }); - let mut send1 = send.clone(); - stash(async move { - send1.send(5).await.unwrap(); - }); - send.send(6).await.unwrap(); - }), - async { - let mut results = recv.take(6).collect::>().await; - results.sort(); - assert_eq!( - &results, - &[1, 2, 3, 4, 5, 6], - "all variations completed in unspecified order" - ); - }, - ), - ); + send1.send(4).await.unwrap(); + }) + }); + let mut send1 = send.clone(); + stash(async move { + send1.send(5).await.unwrap(); + }); + send.send(6).await.unwrap(); + }), + async { + let mut results = recv.take(6).collect::>().await; + results.sort(); + assert_eq!(&results, &[1, 2, 3, 4, 5, 6], "all variations completed in unspecified order"); + }, + )); } } diff --git a/orchid-extension/src/entrypoint.rs b/orchid-extension/src/entrypoint.rs index b047120..cfb5c33 100644 --- a/orchid-extension/src/entrypoint.rs +++ b/orchid-extension/src/entrypoint.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use orchid_api_traits::{Decode, Encode, Request, UnderRoot, enc_vec}; use orchid_async_utils::{Handle, JoinError, to_task}; use orchid_base::{ - Client, ClientExt, CommCtx, Comment, MsgReader, MsgReaderExt, ReqHandleExt, ReqReaderExt, + Client, ClientExt, CommCx, Comment, IoComm, MsgReader, MsgReaderExt, ReqHandleExt, ReqReaderExt, Snippet, Sym, TokenVariant, Witness, char_filter_match, char_filter_union, es, io_comm, is, log, mk_char_filter, try_with_reporter, ttv_from_api, with_interner, with_logger, with_stash, }; @@ -37,7 +37,7 @@ use crate::{ task_local::task_local! { static CLIENT: Rc; - static CTX: Rc>>; + static CTX: Rc>>; } fn get_client() -> Rc { CLIENT.get() } @@ -51,7 +51,7 @@ pub async fn exit() { /// Set the client used for global [request] and [notify] functions within the /// runtime of this future -pub async fn with_comm(c: Rc, ctx: CommCtx, fut: F) -> F::Output { +pub async fn with_comm(c: Rc, ctx: CommCx, fut: F) -> F::Output { CLIENT.scope(c, CTX.scope(Rc::new(RefCell::new(Some(ctx))), fut)).await } @@ -194,9 +194,9 @@ impl ExtensionBuilder { ctx.output.as_mut().flush().await.unwrap(); let logger1 = LoggerImpl::from_api(&host_header.logger); let logger2 = logger1.clone(); - let (client, comm_ctx, extension_srv) = io_comm(ctx.output, ctx.input); + let IoComm { client, cx: comm_ctx, srv } = io_comm(ctx.output, ctx.input); // this future will be ready once the extension cleanly exits - let extension_fut = extension_srv.listen( + let extension_fut = srv.listen( async |n: Box>| { let notif = n.read().await.unwrap(); match notif { diff --git a/orchid-host/src/extension.rs b/orchid-host/src/extension.rs index 2f1da00..9920fb5 100644 --- a/orchid-host/src/extension.rs +++ b/orchid-host/src/extension.rs @@ -17,7 +17,7 @@ use hashbrown::{HashMap, HashSet}; use itertools::Itertools; use orchid_api_traits::{Decode, Encode, Request}; use orchid_base::{ - AtomRepr, Client, ClientExt, CommCtx, FmtCtxImpl, Format, IStr, IStrv, MsgReaderExt, Pos, + AtomRepr, Client, ClientExt, CommCx, FmtCtxImpl, Format, IStr, IStrv, IoComm, MsgReaderExt, Pos, ReqHandleExt, ReqReaderExt, Sym, Witness, es, ev, io_comm, is, iv, log, stash, with_stash, }; @@ -45,7 +45,7 @@ pub struct ReqPair(R, Sender); pub struct ExtensionData { name: String, ctx: Ctx, - comm_cx: Option, + comm_cx: Option, join_ext: Option>, client: Rc, systems: Vec, @@ -82,12 +82,12 @@ impl Extension { let header2 = header.clone(); Ok(Self(Rc::new_cyclic(|weak: &Weak| { // context not needed because exit is extension-initiated - let (client, comm_cx, comm) = io_comm(init.input, init.output); + let IoComm { client, cx: comm_cx, srv } = io_comm(init.input, init.output); let weak2 = weak; let weak = weak.clone(); let ctx2 = ctx.clone(); let join_ext = ctx.clone().spawn(Duration::ZERO, async move { - comm + srv .listen( async |reader| { with_stash(async { diff --git a/orchid.code-workspace b/orchid.code-workspace index e3120e1..5cdc9f4 100644 --- a/orchid.code-workspace +++ b/orchid.code-workspace @@ -47,6 +47,9 @@ "--logs=msg>stderr", "exec", "1 + 1" + ], + "initCommands": [ + "settings set target.disable-aslr false" ] }, {