diff --git a/examples/hello-world/main.orc b/examples/hello-world/main.orc index 84d421b..0806b48 100644 --- a/examples/hello-world/main.orc +++ b/examples/hello-world/main.orc @@ -5,5 +5,11 @@ let main = "foo" + string::slice "hello" 1 3 + "bar" let io_main = ( stdio::get_stdout \stdout - -- TODO: missing output commands in std + std::stream::write_str stdout "Hello, World!" + (std::stream::flush + (std::stream::close + orchid::cmd::exit + \e e) + \e e) + \e e ) \ No newline at end of file diff --git a/orchid-async-utils/src/cancel_cleanup.rs b/orchid-async-utils/src/cancel_cleanup.rs new file mode 100644 index 0000000..22aebe2 --- /dev/null +++ b/orchid-async-utils/src/cancel_cleanup.rs @@ -0,0 +1,87 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Future returned by [cancel_cleanup] +pub struct CancelCleanup { + /// Set to None when Ready + fut: Option, + /// Only set to None in Drop + on_drop: Option, +} +impl Future for CancelCleanup { + type Output = Fut::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { fut, .. } = unsafe { self.get_unchecked_mut() }; + if let Some(future) = fut { + let future = unsafe { Pin::new_unchecked(future) }; + let poll = future.poll(cx); + if poll.is_ready() { + *fut = None; + } + poll + } else { + Poll::Pending + } + } +} +impl Drop for CancelCleanup { + fn drop(&mut self) { + if let Some(fut) = self.fut.take() { + (self.on_drop.take().unwrap())(fut) + } + } +} + +/// Handle a Future's Drop. The callback is only called if the future has not +/// yet returned and would be cancelled, and it receives the future as an +/// argument +pub fn cancel_cleanup( + fut: Fut, + on_drop: Fun, +) -> CancelCleanup { + CancelCleanup { fut: Some(fut), on_drop: Some(on_drop) } +} + +#[cfg(test)] +mod test { + use std::pin::pin; + + use futures::channel::mpsc; + use futures::future::join; + use futures::{SinkExt, StreamExt}; + + use super::*; + use crate::debug::spin_on; + + #[test] + fn called_on_drop() { + let mut called = false; + cancel_cleanup(pin!(async {}), |_| called = true); + assert!(called, "cleanup was called when the future was dropped"); + } + + #[test] + fn not_called_if_finished() { + spin_on(false, async { + let (mut req_in, mut req_out) = mpsc::channel(0); + let (mut rep_in, mut rep_out) = mpsc::channel(0); + join( + async { + req_out.next().await.unwrap(); + rep_in.send(()).await.unwrap(); + }, + async { + cancel_cleanup( + pin!(async { + req_in.send(()).await.unwrap(); + rep_out.next().await.unwrap(); + }), + |_| panic!("Callback called on drop even though the future was finished"), + ) + .await + }, + ) + .await + }); + } +} diff --git a/orchid-async-utils/src/debug.rs b/orchid-async-utils/src/debug.rs index a0ed65d..6b01aa2 100644 --- a/orchid-async-utils/src/debug.rs +++ b/orchid-async-utils/src/debug.rs @@ -143,9 +143,17 @@ pub fn eprint_stream_events<'a, S: Stream + 'a>( ) } -struct SpinWaker(AtomicBool); +struct SpinWaker { + repeat: AtomicBool, + loud: bool, +} impl Wake for SpinWaker { - fn wake(self: Arc) { self.0.store(true, Ordering::Relaxed); } + fn wake(self: Arc) { + self.repeat.store(true, Ordering::SeqCst); + if self.loud { + eprintln!("Triggered repeat for spin_on") + } + } } /// A dumb executor that keeps synchronously re-running the future as long as it @@ -155,15 +163,15 @@ impl Wake for SpinWaker { /// # Panics /// /// If the future doesn't wake itself and doesn't settle. -pub fn spin_on(f: Fut) -> Fut::Output { - let repeat = Arc::new(SpinWaker(AtomicBool::new(false))); +pub fn spin_on(loud: bool, f: Fut) -> Fut::Output { + let spin_waker = Arc::new(SpinWaker { repeat: AtomicBool::new(false), loud }); let mut f = pin!(f); - let waker = repeat.clone().into(); + let waker = spin_waker.clone().into(); let mut cx = Context::from_waker(&waker); loop { match f.as_mut().poll(&mut cx) { Poll::Ready(t) => break t, - Poll::Pending if repeat.0.swap(false, Ordering::Relaxed) => (), + Poll::Pending if spin_waker.repeat.swap(false, Ordering::SeqCst) => (), Poll::Pending => panic!("The future did not exit and did not call its waker."), } } diff --git a/orchid-async-utils/src/lib.rs b/orchid-async-utils/src/lib.rs index 3147a6b..17629e0 100644 --- a/orchid-async-utils/src/lib.rs +++ b/orchid-async-utils/src/lib.rs @@ -1,5 +1,7 @@ pub mod debug; +mod cancel_cleanup; +pub use cancel_cleanup::*; mod localset; pub use localset::*; mod task_future; -pub use task_future::*; +pub use task_future::*; \ No newline at end of file diff --git a/orchid-async-utils/src/localset.rs b/orchid-async-utils/src/localset.rs index e86a2d6..807ffbb 100644 --- a/orchid-async-utils/src/localset.rs +++ b/orchid-async-utils/src/localset.rs @@ -1,21 +1,35 @@ -use std::collections::VecDeque; use std::pin::Pin; use std::task::Poll; -use futures::StreamExt; -use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded}; +use futures::channel::mpsc::{SendError, UnboundedReceiver, UnboundedSender, unbounded}; use futures::future::LocalBoxFuture; +use futures::stream::FuturesUnordered; +use futures::{SinkExt, StreamExt}; -pub struct LocalSet<'a, E> { - receiver: UnboundedReceiver>>, - pending: VecDeque>>, +pub struct LocalSetController<'a, E> { + sender: UnboundedSender>>, } -impl<'a, E> LocalSet<'a, E> { - pub fn new() -> (UnboundedSender>>, Self) { - let (sender, receiver) = unbounded(); - (sender, Self { receiver, pending: VecDeque::new() }) +impl<'a, E> LocalSetController<'a, E> { + pub async fn spawn> + 'a>( + &mut self, + fut: F, + ) -> Result<(), SendError> { + self.sender.send(Box::pin(fut)).await } } + +pub fn local_set<'a, E: 'a>() +-> (LocalSetController<'a, E>, impl Future> + 'a) { + let (sender, receiver) = unbounded(); + let controller = LocalSetController { sender }; + let set = LocalSet { receiver, pending: FuturesUnordered::new() }; + (controller, set) +} + +struct LocalSet<'a, E> { + receiver: UnboundedReceiver>>, + pending: FuturesUnordered>>, +} impl Future for LocalSet<'_, E> { type Output = Result<(), E>; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { @@ -23,7 +37,7 @@ impl Future for LocalSet<'_, E> { let mut any_pending = false; loop { match this.receiver.poll_next_unpin(cx) { - Poll::Ready(Some(fut)) => this.pending.push_back(fut), + Poll::Ready(Some(fut)) => this.pending.push(fut), Poll::Ready(None) => break, Poll::Pending => { any_pending = true; @@ -31,15 +45,14 @@ impl Future for LocalSet<'_, E> { }, } } - let count = this.pending.len(); - for _ in 0..count { - let mut req = this.pending.pop_front().unwrap(); - match req.as_mut().poll(cx) { - Poll::Ready(Ok(())) => (), - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + loop { + match this.pending.poll_next_unpin(cx) { + Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)), + Poll::Ready(Some(Ok(()))) => continue, + Poll::Ready(None) => break, Poll::Pending => { any_pending = true; - this.pending.push_back(req) + break; }, } } diff --git a/orchid-async-utils/src/task_future.rs b/orchid-async-utils/src/task_future.rs index 2332b0e..4a8cdf6 100644 --- a/orchid-async-utils/src/task_future.rs +++ b/orchid-async-utils/src/task_future.rs @@ -1,10 +1,11 @@ use std::any::Any; use std::cell::RefCell; -use std::marker::PhantomData; -use std::pin::Pin; +use std::pin::{Pin, pin}; use std::rc::Rc; use std::task::{Context, Poll, Waker}; +use futures::FutureExt; +use futures::channel::oneshot::{self, Canceled}; use futures::future::{FusedFuture, LocalBoxFuture}; struct State { @@ -43,50 +44,56 @@ impl Future for Pollable { } } +pub struct JoinError; + /// An object that can be used to inspect the state of the task -pub struct Handle(Rc>, PhantomData); +pub struct Handle { + send_abort: RefCell>>, + ready: Rc>, + recv_output: RefCell>, +} impl Handle { /// Immediately stop working on this task, and return the result if it has /// already finished pub fn abort(&self) -> Option { - let mut g = self.0.borrow_mut(); - g.work.take(); - match g.result.take() { - Some(val) => Some(*val.downcast().expect("Mismatch between type of future and handle")), - None => { - g.waker.wake_by_ref(); - None - }, + if let Some(abort) = self.send_abort.take() { + let _ = abort.send(()); } + self.recv_output.borrow_mut().try_recv().ok().flatten() } /// Determine if there's any more work to do on this task - pub fn is_finished(&self) -> bool { - let g = self.0.borrow(); - g.result.is_some() || g.work.is_none() - } + pub fn is_finished(&self) -> bool { *self.ready.borrow() } /// "finish" the freestanding task, and return the future instead - pub async fn join(self) -> T { - let work = { - let mut g = self.0.borrow_mut(); - if let Some(val) = g.result.take() { - return *val.downcast().expect("Mistmatch between type of future and handle"); - } - g.waker.wake_by_ref(); - g.work.take().expect("Attempted to join task that was already aborted") - }; - *work.await.downcast().expect("Mismatch between type of future and handle") + pub async fn join(self) -> Result { + self.recv_output.into_inner().await.map_err(|Canceled| JoinError) } } /// Split a future into an object that can be polled and one that returns /// information on its progress and its result. The first one can be passed to /// an executor or localset, the second can be used to manage it -pub fn to_task + 'static>(f: F) -> (Pollable, Handle) { - let dyn_future = Box::pin(async { Box::new(f.await) as Box }); - let state = Rc::new(RefCell::new(State { - result: None, - work: Some(dyn_future), - waker: Waker::noop().clone(), - })); - (Pollable(state.clone()), Handle(state, PhantomData)) +pub fn to_task<'a, F: Future + 'a>( + f: F, +) -> (impl Future + 'a, Handle) { + let (send_abort, mut on_abort) = oneshot::channel(); + let (send_output, on_output) = oneshot::channel(); + let ready = Rc::new(RefCell::new(false)); + let ready2 = ready.clone(); + let fut = async move { + let mut fut = pin!(f.fuse()); + let output = futures::select_biased! { + res = on_abort => match res { + Ok(()) => return, + Err(_) => fut.await, + }, + output = fut => output, + }; + ready2.replace(true); + let _: Result<_, _> = send_output.send(output); + }; + (fut, Handle { + ready, + recv_output: RefCell::new(on_output), + send_abort: RefCell::new(Some(send_abort)), + }) } diff --git a/orchid-base/src/comm.rs b/orchid-base/src/comm.rs index 2b89a51..81e5139 100644 --- a/orchid-base/src/comm.rs +++ b/orchid-base/src/comm.rs @@ -1,4 +1,4 @@ -use std::cell::RefCell; +use std::cell::{BorrowMutError, RefCell}; use std::marker::PhantomData; use std::pin::{Pin, pin}; use std::rc::Rc; @@ -16,56 +16,95 @@ use futures::{ }; use hashbrown::HashMap; use orchid_api_traits::{Decode, Encode, Request, UnderRoot}; -use orchid_async_utils::LocalSet; use orchid_async_utils::debug::{PanicOnDrop, assert_no_drop}; +use orchid_async_utils::{cancel_cleanup, local_set, to_task}; +use crate::{clone, finish_or_stash, stash, with_stash}; + +// TODO: revise error handling; error recovery is never partial, it always +// requires dropping the server, client, and all requests + +/// A token indicating that a reply to a request has been sent. Returned from +/// [RepWriter::finish] which is the raw reply channel, or [ReqHandleExt::reply] +/// or [ReqReaderExt::reply] which are type checked #[must_use = "Receipts indicate that a required action has been performed within a function. \ Most likely this should be returned somewhere."] -pub struct Receipt<'a>(PhantomData<&'a mut ()>); -impl Receipt<'_> { - /// Only call this function from a custom implementation of [RepWriter] - pub fn _new() -> Self { Self(PhantomData) } +pub struct Receipt; +impl Receipt { + /// Only ever call this function from a custom implementation of + /// [RepWriter::finish] + pub fn _new() -> Self { Self } +} + +/// Return data while waiting for the response to a request. [Self::future] must +/// be awaited in order to ensure that progress is being made +pub struct ReqWait { + /// Future representing waiting for a request. This must be steadily polled. + pub future: LocalBoxFuture<'static, io::Result>>, + /// Since the [Self::future] must be awaited which exclusively borrows it, + /// this separate handle can be used for cancellation. + pub canceller: Box, } /// Write guard to outbound for the purpose of serializing a request. Only one /// can exist at a time. Dropping this object should panic. -pub trait ReqWriter<'a> { +pub trait ReqWriter { /// Access to the underlying channel. This may be buffered. fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>; /// Finalize the request, release the outbound channel, then queue for the /// reply on the inbound channel. - fn send(self: Box) -> LocalBoxFuture<'a, io::Result + 'a>>>; + fn send(self: Box) -> ReqWait; } /// Write guard to inbound for the purpose of deserializing a reply. While held, /// no inbound requests or other replies can be processed. /// -/// Dropping this object should panic even if [RepReader::finish] returns -/// synchronously, because the API isn't cancellation safe in general so it is a -/// programmer error in all cases to drop an object related to it without proper -/// cleanup. -pub trait RepReader<'a> { +/// # Cancellation +/// +/// If the request has been cancelled and the server has accepted the +/// cancellation instead of writing a reply (which is never guaranteed), then +/// this object is inert and should be dropped. +/// +/// Dropping this object if [Self::reader] returns [Some] should panic even if +/// [RepReader::finish] returns synchronously, because the API isn't +/// cancellation safe in general so it is a programmer error to drop an object +/// related to it without proper cleanup. +pub trait RepReader { /// Access to the underlying channel. The length of the message is inferred - /// from the number of bytes read so this must not be buffered. - fn reader(&mut self) -> Pin<&mut dyn AsyncRead>; + /// from the number of bytes read so this must not be buffered and a full + /// reply must always be read from it if available + /// + /// This returns None if the request has successfully been cancelled, in which + /// case this object can be dropped without calling [Self::finish] + fn reader(&mut self) -> Option>; /// Finish reading the request - fn finish(self: Box) -> LocalBoxFuture<'a, ()>; + fn finish(self: Box) -> LocalBoxFuture<'static, ()>; +} + +/// A handle for cancelling in-flight requests without a reference to +/// the wait future (which would be mutably borrowed by an await at this point) +pub trait CancelNotifier { + /// Upon cancellation the future may resolve to a stub version of [RepReader] + /// with no reader access, but since the cancellation is not synchronized + /// with the server, a full reply may still be received, and if it is, the + /// original reply must still be read from it. + fn cancel(self: Box) -> LocalBoxFuture<'static, ()>; } /// Write guard to outbound for the purpose of serializing a notification. /// /// Dropping this object should panic for the same reason [RepReader] panics -pub trait MsgWriter<'a> { +pub trait MsgWriter { /// Access to the underlying channel. This may be buffered. fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>; /// Send the notification - fn finish(self: Box) -> LocalBoxFuture<'a, io::Result<()>>; + fn finish(self: Box) -> LocalBoxFuture<'static, io::Result<()>>; } /// For initiating outbound requests and notifications pub trait Client { - fn start_request(&self) -> LocalBoxFuture<'_, io::Result + '_>>>; - fn start_notif(&self) -> LocalBoxFuture<'_, io::Result + '_>>>; + fn start_request(&self) -> LocalBoxFuture<'static, io::Result>>; + fn start_notif(&self) -> LocalBoxFuture<'static, io::Result>>; } impl ClientExt for T {} @@ -73,62 +112,146 @@ impl ClientExt for T {} /// notif lifecycle and typing #[allow(async_fn_in_trait)] pub trait ClientExt: Client { + #[allow( + clippy::await_holding_refcell_ref, + reason = "Must bypass a future return point by sharing the common path" + )] async fn request>(&self, t: T) -> io::Result { - let mut req = self.start_request().await?; - t.into_root().encode(req.writer().as_mut()).await?; - let mut rep = req.send().await?; - let response = T::Response::decode(rep.reader()).await; - rep.finish().await; - response + let start_req = self.start_request(); + // This section must finish if it has started, and the returned writer's `send` + // must be called as well. + let common = Rc::new(RefCell::new(Some(Box::pin(async move { + let mut writer = start_req.await?; + t.into_root().encode(writer.writer().as_mut()).await?; + io::Result::Ok(writer) + })))); + // Initialized in the cancelable section if common returns here. If set, the + // future inside must be finished on stash after the notification is sent + // to ensure that the server acknowledges the cancellation, or to decode the + // result if the cancellation was in fact too late. + let req_wait_rc = Rc::new(RefCell::new(None)); + // If both this and common are None, that means the cancelable section is + // already past its last interruptible point, and must be finished on stash + cancel_cleanup( + clone!(req_wait_rc, common; Box::pin(async move { + let req_wait; + { + let mut common_g = common.try_borrow_mut().expect("cancel will drop us before locking"); + let common = (common_g.as_mut()) + .expect("Only unset by us below or by cancel after dropping us"); + // cancel handler may take over here + req_wait = common.await?.send(); + common_g.take(); + } + let mut rep; + { + let mut req_wait_g = (req_wait_rc.try_borrow_mut()) + .expect("We are the first ones to access this"); + *req_wait_g = Some(req_wait); + let req_wait = req_wait_g.as_mut().expect("Initialized right above"); + // cancel handler may take over here + rep = req_wait.future.as_mut().await?; + req_wait_g.take(); + }; + // cancel handler will not interrupt if we've gotten this far + let reader = rep.reader().expect("Not been cancelled thus far"); + let result = T::Response::decode(reader).await; + rep.finish().await; + result + })), + |fut| { + stash(async move { + // TODO: strategy for IO errors on stash + let req_wait = if common.try_borrow_mut().is_ok_and(|r| r.is_none()) { + // fut was already past common + match req_wait_rc.try_borrow_mut() { + Ok(mut opt) => { + let Some(req_wait) = opt.take() else { + // fut was already reading, finish that read and exit + fut.await.expect("IO error on stash"); + return; + }; + req_wait + }, + Err(BorrowMutError { .. }) => { + // fut was in waiting, take over and do our own thing + std::mem::drop(fut); + req_wait_rc.take().expect("If it was borrowed then it was still set") + }, + } + } else { + // fut was still in common, take over and finish common + std::mem::drop(fut); + let common = + (common.take()).expect("If it was still borrowed in fut, it was not yet unset"); + common.await.expect("IO error on stash").send() + }; + req_wait.canceller.cancel().await; + let mut rep = req_wait.future.await.expect("IO error on stash"); + let Some(reader) = rep.reader() else { return }; + T::Response::decode(reader).await.expect("IO error on stash"); + rep.finish().await; + }) + }, + ) + .await } - async fn notify>(&self, t: T) -> io::Result<()> { - let mut notif = self.start_notif().await?; - t.into_root().encode(notif.writer().as_mut()).await?; - notif.finish().await?; - Ok(()) + async fn notify + 'static>(&self, t: T) -> io::Result<()> { + let start_notif = self.start_notif(); + finish_or_stash(Box::pin(async { + let mut notif = start_notif.await?; + t.into_root().encode(notif.writer().as_mut()).await?; + notif.finish().await?; + Ok(()) + })) + .await } } -pub trait ReqReader<'a> { +pub trait ReqReader { fn reader(&mut self) -> Pin<&mut dyn AsyncRead>; - fn finish(self: Box) -> LocalBoxFuture<'a, Box + 'a>>; + fn finish(self: Box) -> LocalBoxFuture<'static, Box>; } -impl<'a, T: ReqReader<'a> + ?Sized> ReqReaderExt<'a> for T {} +impl ReqReaderExt for T {} #[allow(async_fn_in_trait)] -pub trait ReqReaderExt<'a>: ReqReader<'a> { +pub trait ReqReaderExt: ReqReader { async fn read_req(&mut self) -> io::Result { R::decode(self.reader()).await } async fn reply( self: Box, req: impl Evidence, - rep: &R::Response, - ) -> io::Result> { + rep: R::Response, + ) -> io::Result { self.finish().await.reply(req, rep).await } - async fn start_reply(self: Box) -> io::Result + 'a>> { + async fn start_reply(self: Box) -> io::Result> { self.finish().await.start_reply().await } } -pub trait ReqHandle<'a> { - fn start_reply(self: Box) -> LocalBoxFuture<'a, io::Result + 'a>>>; +pub trait ReqHandle { + fn start_reply(self: Box) -> LocalBoxFuture<'static, io::Result>>; } -impl<'a, T: ReqHandle<'a> + ?Sized> ReqHandleExt<'a> for T {} +impl ReqHandleExt for T {} #[allow(async_fn_in_trait)] -pub trait ReqHandleExt<'a>: ReqHandle<'a> { +pub trait ReqHandleExt: ReqHandle { async fn reply( self: Box, _: impl Evidence, - rep: &Req::Response, - ) -> io::Result> { - let mut reply = self.start_reply().await?; - rep.encode(reply.writer()).await?; - reply.finish().await + rep: Req::Response, + ) -> io::Result { + let start_reply = self.start_reply(); + finish_or_stash(Box::pin(async move { + let mut reply = start_reply.await?; + rep.encode(reply.writer()).await?; + reply.finish().await + })) + .await } } -pub trait RepWriter<'a> { +pub trait RepWriter { fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>; - fn finish(self: Box) -> LocalBoxFuture<'a, io::Result>>; + fn finish(self: Box) -> LocalBoxFuture<'static, io::Result>; } pub trait MsgReader<'a> { @@ -166,41 +289,43 @@ type IoLock = Rc>>>; type IoGuard = Bound>>, IoLock>; /// An incoming request. This holds a lock on the ingress channel. -pub struct IoReqReader<'a> { - prefix: &'a [u8], +pub struct IoReqReader { + prefix: u64, read: IoGuard, - write: &'a Mutex>, + o: Rc>>, } -impl<'a> ReqReader<'a> for IoReqReader<'a> { +impl ReqReader for IoReqReader { fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() } - fn finish(self: Box) -> LocalBoxFuture<'a, Box + 'a>> { + fn finish(self: Box) -> LocalBoxFuture<'static, Box> { Box::pin(async { - Box::new(IoReqHandle { prefix: self.prefix, write: self.write }) as Box> + Box::new(IoReqHandle { prefix: self.prefix, write: self.o }) as Box }) } } -pub struct IoReqHandle<'a> { - prefix: &'a [u8], - write: &'a Mutex>, + +pub struct IoReqHandle { + prefix: u64, + write: IoLock, } -impl<'a> ReqHandle<'a> for IoReqHandle<'a> { - fn start_reply(self: Box) -> LocalBoxFuture<'a, io::Result + 'a>>> { +impl ReqHandle for IoReqHandle { + fn start_reply(self: Box) -> LocalBoxFuture<'static, io::Result>> { + let write = self.write.clone(); Box::pin(async move { - let mut write = self.write.lock().await; - write.as_mut().write_all(self.prefix).await?; - Ok(Box::new(IoRepWriter { write }) as Box>) + let mut write = Bound::async_new(write, |l| l.lock()).await; + self.prefix.encode(write.as_mut()).await?; + Ok(Box::new(IoRepWriter { write }) as Box) }) } } -pub struct IoRepWriter<'a> { - write: MutexGuard<'a, IoRef>, +pub struct IoRepWriter { + write: IoGuard, } -impl<'a> RepWriter<'a> for IoRepWriter<'a> { +impl RepWriter for IoRepWriter { fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.write.as_mut() } - fn finish(mut self: Box) -> LocalBoxFuture<'a, io::Result>> { + fn finish(mut self: Box) -> LocalBoxFuture<'static, io::Result> { Box::pin(async move { self.writer().flush().await?; - Ok(Receipt(PhantomData)) + Ok(Receipt) }) } } @@ -214,11 +339,16 @@ impl<'a> MsgReader<'a> for IoMsgReader<'a> { fn finish(self: Box) -> LocalBoxFuture<'static, ()> { Box::pin(async {}) } } +pub enum ReplyRecord { + Cancelled, + Ready(IoGuard), +} + #[derive(Debug)] struct ReplySub { id: u64, ack: oneshot::Sender<()>, - cb: oneshot::Sender>, + cb: oneshot::Sender, } struct IoClient { @@ -231,37 +361,42 @@ impl IoClient { let (req, rep) = mpsc::channel(0); (rep, Self { output, id: Rc::new(RefCell::new(0)), subscribe: Rc::new(req) }) } - async fn lock_out(&self) -> IoGuard { - Bound::async_new(self.output.clone(), async |o| o.lock().await).await - } } impl Client for IoClient { - fn start_notif(&self) -> LocalBoxFuture<'_, io::Result + '_>>> { + fn start_notif(&self) -> LocalBoxFuture<'static, io::Result>> { + let output = self.output.clone(); Box::pin(async { let drop_g = assert_no_drop("Notif future dropped"); - let mut o = self.lock_out().await; + let mut o = Bound::async_new(output, |o| o.lock()).await; 0u64.encode(o.as_mut()).await?; drop_g.defuse(); Ok(Box::new(IoNotifWriter { o, drop_g: assert_no_drop("Notif writer dropped") }) as Box) }) } - fn start_request(&self) -> LocalBoxFuture<'_, io::Result + '_>>> { - Box::pin(async { - let id = { - let mut id_g = self.id.borrow_mut(); - *id_g += 1; - *id_g - }; - let (cb, reply) = oneshot::channel(); - let (ack, got_ack) = oneshot::channel(); - let drop_g = assert_no_drop("Request future dropped"); - self.subscribe.as_ref().clone().send(ReplySub { id, ack, cb }).await.unwrap(); + fn start_request(&self) -> LocalBoxFuture<'static, io::Result>> { + let output = self.output.clone(); + let id = { + let mut id_g = self.id.borrow_mut(); + *id_g += 1; + *id_g + }; + let (cb, reply) = oneshot::channel(); + let (ack, got_ack) = oneshot::channel(); + let mut subscribe = self.subscribe.as_ref().clone(); + let start_req_drop_g = assert_no_drop("Request future dropped"); + Box::pin(async move { + subscribe.send(ReplySub { id, ack, cb }).await.unwrap(); got_ack.await.unwrap(); - let mut w = self.lock_out().await; - id.encode(w.as_mut()).await?; - drop_g.defuse(); + let mut xfer_bytes = id.to_be_bytes(); + xfer_bytes[0] = 0x00; + let req_prefix = u64::from_be_bytes(xfer_bytes); + let mut w = Bound::async_new(output.clone(), |o| o.lock()).await; + req_prefix.encode(w.as_mut()).await?; + start_req_drop_g.defuse(); Ok(Box::new(IoReqWriter { + id, + output, reply, w, drop_g: assert_no_drop("Request reader dropped without reply"), @@ -270,34 +405,62 @@ impl Client for IoClient { } } -struct IoReqWriter { - reply: oneshot::Receiver>, - w: IoGuard, - drop_g: PanicOnDrop, +struct IoReqCanceller { + id: u64, + output: IoLock, } -impl<'a> ReqWriter<'a> for IoReqWriter { - fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.w.as_mut() } - fn send(self: Box) -> LocalBoxFuture<'a, io::Result + 'a>>> { - Box::pin(async { - let Self { reply, mut w, drop_g } = *self; - w.flush().await?; - mem::drop(w); - let i = reply.await.expect("Client dropped before reply received"); - drop_g.defuse(); - Ok(Box::new(IoRepReader { - i, - drop_g: assert_no_drop("Reply reader dropped without finishing"), - }) as Box) +impl CancelNotifier for IoReqCanceller { + fn cancel(self: Box) -> LocalBoxFuture<'static, ()> { + let mut xfer_bytes = self.id.to_be_bytes(); + xfer_bytes[0] = 0x02; + let cancel_id = u64::from_be_bytes(xfer_bytes); + let cancel_signal_drop_g = assert_no_drop("Cannot cancel the sending of a cancellation"); + let o = self.output.clone(); + Box::pin(async move { + let mut o = o.lock().await; + let _ = cancel_id.encode(o.as_mut()).await; + cancel_signal_drop_g.defuse(); }) } } -struct IoRepReader { - i: IoGuard, +struct IoReqWriter { + id: u64, + reply: oneshot::Receiver, + output: IoLock, + w: IoGuard, drop_g: PanicOnDrop, } -impl<'a> RepReader<'a> for IoRepReader { - fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.i.as_mut() } +impl ReqWriter for IoReqWriter { + fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.w.as_mut() } + fn send(self: Box) -> ReqWait { + let Self { id, output, reply, mut w, drop_g } = *self; + let canceller = IoReqCanceller { id, output }; + let future = async { + w.flush().await?; + mem::drop(w); + let reply_record = reply.await.expect("Client dropped before reply received"); + drop_g.defuse(); + Ok(Box::new(IoRepReader { + reply_record, + drop_g: assert_no_drop("Reply reader dropped without finishing"), + }) as Box) + }; + ReqWait { future: Box::pin(future), canceller: Box::new(canceller) } + } +} + +struct IoRepReader { + reply_record: ReplyRecord, + drop_g: PanicOnDrop, +} +impl RepReader for IoRepReader { + fn reader(&mut self) -> Option> { + match &mut self.reply_record { + ReplyRecord::Cancelled => None, + ReplyRecord::Ready(guard) => Some(guard.as_mut()), + } + } fn finish(self: Box) -> LocalBoxFuture<'static, ()> { Box::pin(async { self.drop_g.defuse() }) } @@ -308,7 +471,7 @@ struct IoNotifWriter { o: IoGuard, drop_g: PanicOnDrop, } -impl<'a> MsgWriter<'a> for IoNotifWriter { +impl MsgWriter for IoNotifWriter { fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.o.as_mut() } fn finish(mut self: Box) -> LocalBoxFuture<'static, io::Result<()>> { Box::pin(async move { @@ -333,10 +496,7 @@ impl CommCtx { /// Establish bidirectional request-notification communication over a duplex /// channel. The returned [IoClient] can be used for notifications immediately, /// but requests can only be received while the future is running. The future -/// will only resolve when [CommCtx::quit] is called. The generic type -/// parameters are associated with the client and serve to ensure with a runtime -/// check that the correct message families are sent in the correct directions -/// across the channel. +/// will only resolve when [CommCtx::exit] is called. pub fn io_comm( o: Pin>, i: Pin>, @@ -356,8 +516,8 @@ pub struct IoCommServer { impl IoCommServer { pub async fn listen( self, - notif: impl for<'a> AsyncFn(Box + 'a>) -> io::Result<()>, - req: impl for<'a> AsyncFn(Box + 'a>) -> io::Result>, + notif: impl AsyncFn(Box) -> io::Result<()>, + req: impl AsyncFn(Box) -> io::Result, ) -> io::Result<()> { let Self { o, i, onexit, onsub } = self; enum Event { @@ -379,7 +539,9 @@ impl IoCommServer { } } }); - let (mut add_pending_req, fork_future) = LocalSet::new(); + + let running_requests = RefCell::new(HashMap::new()); + let (mut task_pool, fork_future) = local_set(); let mut fork_stream = pin!(fork_future.into_stream()); let mut pending_replies = HashMap::new(); 'body: { @@ -400,32 +562,73 @@ impl IoCommServer { // this is detected and logged on client let _ = ack.send(()); }, + // ID 0 is reserved for single-fire notifications Ok(Event::Input(0, read)) => { let notif = ¬if; - let notif_job = - async move { notif(Box::new(IoMsgReader { _pd: PhantomData, read })).await }; - add_pending_req.send(Box::pin(notif_job)).await.unwrap(); - }, - // MSB == 0 is a request, !id where MSB == 1 is the corresponding response - Ok(Event::Input(id, read)) if (id & (1 << (u64::BITS - 1))) == 0 => { - let (o, req) = (o.clone(), &req); - let req_job = async move { - let mut prefix = Vec::new(); - (!id).encode_vec(&mut prefix); - let _ = req(Box::new(IoReqReader { prefix: &pin!(prefix), read, write: &o })).await; - Ok(()) - }; - add_pending_req.send(Box::pin(req_job)).await.unwrap(); + task_pool.spawn(notif(Box::new(IoMsgReader { _pd: PhantomData, read }))).await.unwrap(); }, + // non-zero IDs are associated with requests Ok(Event::Input(id, read)) => { - let cb = pending_replies.remove(&!id).expect("Reply to unrecognized request"); - cb.send(read).unwrap_or_else(|_| panic!("Failed to send reply")); + // the MSb decides what kind of message this is + let mut id_bytes = id.to_be_bytes(); + let discr = std::mem::replace(&mut id_bytes[0], 0x00); + let id = u64::from_be_bytes(id_bytes); + match discr { + // request + 0x00 => { + let (o, req, reqs) = (o.clone(), &req, &running_requests); + task_pool + .spawn(async move { + id_bytes[0] = 0x01; + let prefix = u64::from_be_bytes(id_bytes); + let reader = Box::new(IoReqReader { prefix, read, o }); + let (fut, handle) = to_task(async { req(reader).await.map(|Receipt| ()) }); + reqs.borrow_mut().insert(id, handle); + with_stash(fut).await; + // during this await the read guard is released and thus we may receive a + // cancel notification from below + Ok(()) + }) + .await + .unwrap(); + }, + // response + 0x01 => { + let cb = pending_replies.remove(&id).expect("Reply to unrecognized request"); + cb.send(ReplyRecord::Ready(read)) + .unwrap_or_else(|_| panic!("Failed to send reply")); + }, + // cancellation + 0x02 => { + match running_requests.borrow().get(&id) { + Some(handle) => handle.abort(), + // assuming that the client is correct, if there is no record + // then the reply was already sent + None => continue, + }; + // if the request starts writing back before our abort arrives, we only + // get this mutex once it's done + let mut write = o.lock().await; + // if the request is still in the store, the write didn't begin + let Some(_) = running_requests.borrow_mut().remove(&id) else { continue }; + id_bytes[0] = 0x03; + let cancel_code = u64::from_be_bytes(id_bytes); + cancel_code.encode(write.as_mut()).await?; + }, + // stub reply for cancelled request + 0x03 => { + let cb = pending_replies.remove(&id).expect("Cancelling unrecognized request"); + cb.send(ReplyRecord::Cancelled) + .unwrap_or_else(|_| panic!("Failed to send reply cancellation")) + }, + n => panic!("Unrecognized message type code {n}"), + } }, } } Ok(()) }?; - mem::drop(add_pending_req); + mem::drop(task_pool); while let Some(next) = fork_stream.next().await { next? } @@ -441,13 +644,15 @@ mod test { use std::cell::RefCell; use futures::channel::mpsc; - use futures::{SinkExt, StreamExt, join}; + use futures::{FutureExt, SinkExt, StreamExt, join, select}; + use never::Never; use orchid_api_derive::{Coding, Hierarchy}; use orchid_api_traits::Request; use orchid_async_utils::debug::spin_on; use unsync_pipe::pipe; use crate::comm::{ClientExt, MsgReaderExt, ReqReaderExt, io_comm}; + use crate::with_stash; #[derive(Clone, Debug, PartialEq, Coding, Hierarchy)] #[extendable] @@ -455,7 +660,7 @@ mod test { #[test] fn notification() { - spin_on(async { + spin_on(false, async { let (in1, out2) = pipe(1024); let (in2, out1) = pipe(1024); let (received, mut on_receive) = mpsc::channel(2); @@ -494,7 +699,7 @@ mod test { #[test] fn request() { - spin_on(async { + spin_on(false, async { let (in1, out2) = pipe(1024); let (in2, out1) = pipe(1024); let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2)); @@ -506,7 +711,7 @@ mod test { async |_| panic!("No notifs expected"), async |mut req| { let val = req.read_req::().await?; - req.reply(&val, &(val.0 + 1)).await + req.reply(&val, val.0 + 1).await }, ) .await @@ -533,7 +738,7 @@ mod test { #[test] fn exit() { - spin_on(async { + spin_on(false, async { let (input1, output1) = pipe(1024); let (input2, output2) = pipe(1024); let (reply_client, reply_context, reply_server) = @@ -553,7 +758,7 @@ mod test { }, async |mut hand| { let req = hand.read_req::().await?; - hand.reply(&req, &(req.0 + 1)).await + hand.reply(&req, req.0 + 1).await }, ) .await @@ -579,4 +784,49 @@ mod test { ) }); } + + #[test] + fn timely_cancel() { + spin_on(false, async { + let (in1, out2) = pipe(1024); + let (in2, out1) = pipe(1024); + let (wait_in, mut wait_out) = mpsc::channel(0); + let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2)); + let (client, client_ctx, client_srv) = io_comm(Box::pin(in1), Box::pin(out1)); + join!( + async { + srv + .listen( + async |_| panic!("No notifs expected"), + async |mut req| { + let _ = req.read_req::().await?; + wait_in.clone().send(()).await.unwrap(); + // TODO: verify cancellation + futures::future::pending::().await; + unreachable!("request should be cancelled before resume is triggered") + }, + ) + .await + .unwrap() + }, + async { + client_srv + .listen( + async |_| panic!("Not expecting ingress notif"), + async |_| panic!("Not expecting ingress req"), + ) + .await + .unwrap() + }, + with_stash(async { + select! { + _ = client.request(DummyRequest(5)).fuse() => panic!("This one should not run"), + rep = wait_out.next() => rep.expect("something?"), + }; + srv_ctx.exit().await.unwrap(); + client_ctx.exit().await.unwrap(); + }) + ); + }) + } } diff --git a/orchid-base/src/stash.rs b/orchid-base/src/stash.rs index b525284..d761899 100644 --- a/orchid-base/src/stash.rs +++ b/orchid-base/src/stash.rs @@ -1,44 +1,188 @@ //! A pattern for running async code from sync destructors and other -//! unfortunately sync callbacks +//! unfortunately sync callbacks, and for ensuring that these futures finish in +//! a timely fashion //! //! We create a task_local vecdeque which is moved into a thread_local whenever //! the task is being polled. A call to [stash] pushes the future onto this -//! deque. Before [with_stash] returns, it pops everything from the deque -//! individually and awaits each of them, pushing any additionally stashed -//! futures onto the back of the same deque. +//! deque. Before [with_stash] returns, it awaits everything stashed up to that +//! point or inside the stashed futures. use std::cell::RefCell; -use std::collections::VecDeque; use std::pin::Pin; +use std::task::{Context, Poll}; -use task_local::task_local; +use futures::StreamExt; +use futures::future::LocalBoxFuture; +use futures::stream::FuturesUnordered; +use orchid_async_utils::cancel_cleanup; -#[derive(Default)] -struct StashedFutures { - queue: RefCell>>>>, -} - -task_local! { - static STASHED_FUTURES: StashedFutures; +thread_local! { + /// # Invariant + /// + /// Any function that changes the value of this thread_local must restore it before returning + static CURRENT_STASH: RefCell>>> = RefCell::default(); } /// Complete the argument future, and any futures spawned from it via [stash]. /// This is useful mostly to guarantee that messaging destructors have run. -pub async fn with_stash(fut: F) -> F::Output { - STASHED_FUTURES - .scope(StashedFutures::default(), async { - let val = fut.await; - while let Some(fut) = STASHED_FUTURES.with(|sf| sf.queue.borrow_mut().pop_front()) { - fut.await; - } - val - }) - .await +/// +/// # Cancellation +/// +/// To ensure that stashed futures run, the returned future re-stashes them a +/// layer above when dropped. Therefore cancelling `with_stash` is only safe +/// within an enclosing `with_stash` outside of a panic. +pub fn with_stash(fut: F) -> impl Future { + WithStash { stash: FuturesUnordered::new(), state: WithStashState::Main(fut) } } /// Schedule a future to be run before the next [with_stash] guard ends. This is /// most useful for sending messages from destructors. +/// +/// # Panics +/// +/// If no enclosing stash is found, this function panics, unless we are already +/// panicking. The assumption is that a panic is a vis-major where proper +/// cleanup is secondary to avoiding an abort. pub fn stash + 'static>(fut: F) { - (STASHED_FUTURES.try_with(|sf| sf.queue.borrow_mut().push_back(Box::pin(fut)))) - .expect("No stash! Timely completion cannot be guaranteed") + CURRENT_STASH.with(|stash| { + let mut g = stash.borrow_mut(); + let Some(stash) = g.as_mut() else { + if !std::thread::panicking() { + panic!("No stash! Timely completion cannot be guaranteed"); + } + return; + }; + stash.push(Box::pin(fut)) + }) +} + +pub fn finish_or_stash( + fut: F, +) -> impl Future + Unpin + 'static { + cancel_cleanup(fut, |fut| { + stash(async { + fut.await; + }) + }) +} + +enum WithStashState { + Main(F), + Stash { + /// Optional to simplify state management, but only ever null on a very + /// short stretch + output: Option, + }, +} + +struct WithStash { + stash: FuturesUnordered>, + state: WithStashState, +} +impl Future for WithStash { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // SAFETY: the only non-Unpin item is Main#main, and it's pinned right back + let Self { state, stash } = unsafe { Pin::get_unchecked_mut(self) }; + if let WithStashState::Main(main) = state { + // SAFETY: this comes from the pin we break on the line above + let main = unsafe { Pin::new_unchecked(main) }; + let prev = CURRENT_STASH.with_borrow_mut(|key| key.replace(Vec::new())); + let poll = main.poll(cx); + let stash_init = CURRENT_STASH + .with_borrow_mut(|key| std::mem::replace(key, prev)) + .expect("We put a Some() in here and CURRENT_STASH demands restoration"); + stash.extend(stash_init); + if let Poll::Ready(o) = poll { + // skip this branch from this point onwards + *state = WithStashState::Stash { output: Some(o) }; + } + } + match state { + WithStashState::Main(_) | WithStashState::Stash { output: None, .. } => Poll::Pending, + WithStashState::Stash { output: output @ Some(_) } => loop { + // if the queue has new elements, poll_next_unpin has to be called in the next + // loop to ensure that wake-ups are triggered for them too, and if + // poll_next_unpin is called, the queue may get yet more elements synchronously, + // hence the loop + let prev = CURRENT_STASH.with_borrow_mut(|key| key.replace(Vec::new())); + let poll = stash.poll_next_unpin(cx); + let stash_new = CURRENT_STASH + .with_borrow_mut(|key| std::mem::replace(key, prev)) + .expect("We put a Some() in here and CURRENT_STASH demands restoration"); + stash.extend(stash_new); + match poll { + Poll::Ready(None) if stash.is_empty() => { + let output = output.take().expect("Checked in branching"); + break Poll::Ready(output); + }, + Poll::Pending => { + break Poll::Pending; + }, + Poll::Ready(_) => continue, + } + }, + } + } +} +impl Drop for WithStash { + fn drop(&mut self) { + if std::thread::panicking() { + eprintln!("Panicking through with_stash may silently drop stashed cleanup work") + } + for future in std::mem::take(&mut self.stash) { + stash(future); + } + } +} + +#[cfg(test)] +mod test { + use futures::SinkExt; + use futures::channel::mpsc; + use futures::future::join; + use orchid_async_utils::debug::spin_on; + + use super::*; + + #[test] + fn run_stashed_future() { + let (mut send, recv) = mpsc::channel(0); + spin_on( + false, + join( + with_stash(async { + let mut send1 = send.clone(); + stash(async move { + send1.send(1).await.unwrap(); + }); + let mut send1 = send.clone(); + stash(async move { + let mut send2 = send1.clone(); + stash(async move { + send2.send(2).await.unwrap(); + }); + send1.send(3).await.unwrap(); + stash(async move { + send1.send(4).await.unwrap(); + }) + }); + let mut send1 = send.clone(); + stash(async move { + send1.send(5).await.unwrap(); + }); + send.send(6).await.unwrap(); + }), + async { + let mut results = recv.take(6).collect::>().await; + results.sort(); + assert_eq!( + &results, + &[1, 2, 3, 4, 5, 6], + "all variations completed in unspecified order" + ); + }, + ), + ); + } } diff --git a/orchid-extension/src/atom.rs b/orchid-extension/src/atom.rs index 8f83d73..860f23f 100644 --- a/orchid-extension/src/atom.rs +++ b/orchid-extension/src/atom.rs @@ -184,27 +184,15 @@ pub trait AtomMethod: Coding + InHierarchy { /// A handler for an [AtomMethod] on an [Atomic]. The [AtomMethod] must also be /// registered in [Atomic::reg_methods] pub trait Supports: Atomic { - fn handle<'a>( - &self, - hand: Box + '_>, - req: M, - ) -> impl Future>>; + fn handle(&self, hand: Box, req: M) -> impl Future>; } trait HandleAtomMethod { - fn handle<'a, 'b: 'a>( - &'a self, - atom: &'a A, - reader: Box + 'a>, - ) -> LocalBoxFuture<'a, ()>; + fn handle<'a>(&'a self, atom: &'a A, reader: Box) -> LocalBoxFuture<'a, ()>; } struct AtomMethodHandler(PhantomData, PhantomData); impl> HandleAtomMethod for AtomMethodHandler { - fn handle<'a, 'b: 'a>( - &'a self, - atom: &'a A, - mut reader: Box + 'a>, - ) -> LocalBoxFuture<'a, ()> { + fn handle<'a>(&'a self, atom: &'a A, mut reader: Box) -> LocalBoxFuture<'a, ()> { Box::pin(async { let req = reader.read_req::().await.unwrap(); let _ = Supports::::handle(atom, reader.finish().await, req).await.unwrap(); @@ -244,12 +232,7 @@ pub(crate) struct MethodSet { handlers: HashMap>>, } impl MethodSet { - pub(crate) async fn dispatch<'a>( - &self, - atom: &'_ A, - key: Sym, - req: Box + 'a>, - ) -> bool { + pub(crate) async fn dispatch(&self, atom: &A, key: Sym, req: Box) -> bool { match self.handlers.get(&key) { None => false, Some(handler) => { @@ -341,7 +324,7 @@ pub trait AtomOps: 'static { &'a self, ctx: AtomCtx<'a>, key: Sym, - req: Box + 'a>, + req: Box, ) -> LocalBoxFuture<'a, bool>; fn serialize<'a, 'b: 'a>( &'a self, diff --git a/orchid-extension/src/atom_owned.rs b/orchid-extension/src/atom_owned.rs index 36cd99e..500b47a 100644 --- a/orchid-extension/src/atom_owned.rs +++ b/orchid-extension/src/atom_owned.rs @@ -119,7 +119,7 @@ impl AtomOps for OwnedAtomOps { &'a self, AtomCtx(_, id): AtomCtx<'a>, key: Sym, - req: Box + 'a>, + req: Box, ) -> LocalBoxFuture<'a, bool> { Box::pin(async move { let a = AtomReadGuard::new(id.unwrap()).await; diff --git a/orchid-extension/src/atom_thin.rs b/orchid-extension/src/atom_thin.rs index 7b9ae61..b8685f6 100644 --- a/orchid-extension/src/atom_thin.rs +++ b/orchid-extension/src/atom_thin.rs @@ -53,7 +53,7 @@ impl AtomOps for ThinAtomOps { &'a self, AtomCtx(buf, ..): AtomCtx<'a>, key: Sym, - req: Box + 'a>, + req: Box, ) -> LocalBoxFuture<'a, bool> { Box::pin(async move { let ms = self.ms.get_or_init(self.msbuild.pack()).await; diff --git a/orchid-extension/src/cmd_atom.rs b/orchid-extension/src/cmd_atom.rs index c03bbef..3e89e6b 100644 --- a/orchid-extension/src/cmd_atom.rs +++ b/orchid-extension/src/cmd_atom.rs @@ -6,7 +6,7 @@ use never::Never; use orchid_base::{Receipt, ReqHandle, ReqHandleExt}; use crate::gen_expr::{GExpr, new_atom, serialize}; -use crate::std_reqs::RunCommand; +use crate::std_reqs::StartCommand; use crate::{Atomic, MethodSetBuilder, OwnedAtom, OwnedVariant, Supports, ToExpr}; pub trait AsyncFnDyn { @@ -21,18 +21,14 @@ pub struct CmdAtom(Rc); impl Atomic for CmdAtom { type Data = (); type Variant = OwnedVariant; - fn reg_methods() -> MethodSetBuilder { MethodSetBuilder::new().handle::() } + fn reg_methods() -> MethodSetBuilder { MethodSetBuilder::new().handle::() } } -impl Supports for CmdAtom { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: RunCommand, - ) -> std::io::Result> { +impl Supports for CmdAtom { + async fn handle(&self, hand: Box, req: StartCommand) -> std::io::Result { let reply = self.0.call().await; match reply { - None => hand.reply(&req, &None).await, - Some(next) => hand.reply(&req, &Some(serialize(next).await)).await, + None => hand.reply(&req, None).await, + Some(next) => hand.reply(&req, Some(serialize(next).await)).await, } } } diff --git a/orchid-extension/src/entrypoint.rs b/orchid-extension/src/entrypoint.rs index 13ce4f4..b047120 100644 --- a/orchid-extension/src/entrypoint.rs +++ b/orchid-extension/src/entrypoint.rs @@ -8,12 +8,12 @@ use std::pin::Pin; use std::rc::Rc; use std::time::Duration; -use futures::future::{LocalBoxFuture, join_all}; -use futures::{AsyncWriteExt, StreamExt, stream}; +use futures::future::{LocalBoxFuture, join_all, join3}; +use futures::{AsyncReadExt, AsyncWriteExt, StreamExt, stream}; use hashbrown::HashMap; use itertools::Itertools; use orchid_api_traits::{Decode, Encode, Request, UnderRoot, enc_vec}; -use orchid_async_utils::{Handle, to_task}; +use orchid_async_utils::{Handle, JoinError, to_task}; use orchid_base::{ Client, ClientExt, CommCtx, Comment, MsgReader, MsgReaderExt, ReqHandleExt, ReqReaderExt, Snippet, Sym, TokenVariant, Witness, char_filter_match, char_filter_union, es, io_comm, is, log, @@ -21,6 +21,7 @@ use orchid_base::{ }; use substack::Substack; use task_local::task_local; +use unsync_pipe::pipe; use crate::gen_expr::serialize; use crate::interner::new_interner; @@ -74,7 +75,7 @@ pub async fn request>(t: T) -> T: } /// Send a notification through the global client's [ClientExt::notify] -pub async fn notify>(t: T) { +pub async fn notify + 'static>(t: T) { get_client().notify(t).await.unwrap() } @@ -106,7 +107,7 @@ impl) + 'static> ContextModifier for F { pub(crate) trait DynTaskHandle: 'static { fn abort(self: Box); - fn join(self: Box) -> LocalBoxFuture<'static, Box>; + fn join(self: Box) -> LocalBoxFuture<'static, Result, JoinError>>; } task_local! { @@ -124,7 +125,7 @@ impl TaskHandle { /// Stop working on the task and return the nested future. The distinction /// between this and waiting until the task is complete without reparenting it /// is significant for the purpose of [task_local] context - pub async fn join(self) -> T { *self.0.join().await.downcast().unwrap() } + pub async fn join(self) -> Result { Ok(*self.0.join().await?.downcast().unwrap()) } } /// Spawn a future that is not associated with a pending request or a past @@ -138,7 +139,9 @@ pub fn spawn + 'static>(delay: Duration, f: F) -> Tas impl DynTaskHandle for Handle> { fn abort(self: Box) { Self::abort(&self); } - fn join(self: Box) -> LocalBoxFuture<'static, Box> { Box::pin(Self::join(*self)) } + fn join(self: Box) -> LocalBoxFuture<'static, Result, JoinError>> { + Box::pin(Self::join(*self)) + } } /// A new Orchid extension as specified in loaders. An extension is a unit of @@ -213,15 +216,15 @@ impl ExtensionBuilder { match req { api::HostExtReq::SystemDrop(sys_drop) => { SYSTEM_TABLE.with(|l| l.borrow_mut().remove(&sys_drop.0)); - handle.reply(&sys_drop, &()).await + handle.reply(&sys_drop, ()).await }, api::HostExtReq::AtomDrop(atom_drop @ api::AtomDrop(sys_id, atom)) => with_sys_record(sys_id, async { take_atom(atom).await.dyn_free().await; - handle.reply(&atom_drop, &()).await + handle.reply(&atom_drop, ()).await }) .await, - api::HostExtReq::Ping(ping @ api::Ping) => handle.reply(&ping, &()).await, + api::HostExtReq::Ping(ping @ api::Ping) => handle.reply(&ping, ()).await, api::HostExtReq::Sweep(api::Sweep) => todo!(), api::HostExtReq::SysReq(api::SysReq::NewSystem(new_sys)) => { let (ctor_idx, _) = (decls.iter().enumerate().find(|(_, s)| s.id == new_sys.system)) @@ -257,7 +260,7 @@ impl ExtensionBuilder { .await; let response = api::NewSystemResponse { lex_filter, const_root, line_types, prelude }; - handle.reply(&new_sys, &response).await + handle.reply(&new_sys, response).await }) .await }, @@ -266,17 +269,23 @@ impl ExtensionBuilder { let (path, tree) = get_lazy(tree_id).await; let mut tia_ctx = TreeIntoApiCtxImpl { path: Substack::Bottom, basepath: &path[..] }; - handle.reply(&get_tree, &tree.into_api(&mut tia_ctx).await).await + handle.reply(&get_tree, tree.into_api(&mut tia_ctx).await).await }) .await, api::HostExtReq::SysReq(api::SysReq::SysFwded(fwd)) => { let fwd_tok = Witness::of(&fwd); let api::SysFwded(sys_id, payload) = fwd; with_sys_record(sys_id, async { + let (mut req_in, req) = pipe(1024); + let (rep, mut rep_out) = pipe(1024); let mut reply = Vec::new(); - let req = TrivialReqCycle { req: &payload, rep: &mut reply }; - let _ = dyn_cted().inst().dyn_request(Box::new(req)).await; - handle.reply(fwd_tok, &reply).await + let (..) = join3( + async { req_in.write_all(&payload).await.expect("Ingress failed") }, + async { rep_out.read_to_end(&mut reply).await.expect("Egress failed") }, + dyn_cted().inst().dyn_request(Box::new(TrivialReqCycle { req, rep })), + ) + .await; + handle.reply(fwd_tok, reply).await }) .await }, @@ -298,7 +307,7 @@ impl ExtensionBuilder { Err(e) => { let eopt = e.keep_only(|e| *e != ekey_cascade).map(|e| Err(e.to_api())); expr_store.dispose().await; - return handle.reply(&lex, &eopt).await; + return handle.reply(&lex, eopt).await; }, Ok((s, expr)) => { let expr = join_all( @@ -308,13 +317,13 @@ impl ExtensionBuilder { .await; let pos = (text.len() - s.len()) as u32; expr_store.dispose().await; - return handle.reply(&lex, &Some(Ok(api::LexedExpr { pos, expr }))).await; + return handle.reply(&lex, Some(Ok(api::LexedExpr { pos, expr }))).await; }, } } writeln!(log("warn"), "Got notified about n/a character '{trigger_char}'").await; expr_store.dispose().await; - handle.reply(&lex, &None).await + handle.reply(&lex, None).await }) .await, api::HostExtReq::ParseLine(pline) => { @@ -338,14 +347,14 @@ impl ExtensionBuilder { }; mem::drop(line); expr_store.dispose().await; - handle.reply(req, &o_line).await + handle.reply(req, o_line).await }) .await }, api::HostExtReq::FetchParsedConst(ref fpc @ api::FetchParsedConst(sys, id)) => with_sys_record(sys, async { let cnst = get_const(id).await; - handle.reply(fpc, &serialize(cnst).await).await + handle.reply(fpc, serialize(cnst).await).await }) .await, api::HostExtReq::AtomReq(atom_req) => { @@ -357,24 +366,30 @@ impl ExtensionBuilder { api::AtomReq::SerializeAtom(ser) => { let mut buf = enc_vec(&id); match nfo.serialize(actx, Pin::<&mut Vec<_>>::new(&mut buf)).await { - None => handle.reply(ser, &None).await, + None => handle.reply(ser, None).await, Some(refs) => { let refs = join_all(refs.into_iter().map(async |ex| ex.into_api(&mut ()).await)) .await; - handle.reply(ser, &Some((buf, refs))).await + handle.reply(ser, Some((buf, refs))).await }, } }, api::AtomReq::AtomPrint(print @ api::AtomPrint(_)) => - handle.reply(print, &nfo.print(actx).await.to_api()).await, + handle.reply(print, nfo.print(actx).await.to_api()).await, api::AtomReq::Fwded(fwded) => { let api::Fwded(_, key, payload) = &fwded; + let (mut req_in, req) = pipe(1024); + let (rep, mut rep_out) = pipe(1024); let mut reply = Vec::new(); let key = Sym::from_api(*key).await; - let req = TrivialReqCycle { req: payload, rep: &mut reply }; - let some = nfo.handle_req_ref(actx, key, Box::new(req)).await; - handle.reply(fwded, &some.then_some(reply)).await + let (.., some) = join3( + async { req_in.write_all(payload).await.expect("Ingress failed") }, + async { rep_out.read_to_end(&mut reply).await.expect("Egress failed") }, + nfo.handle_req_ref(actx, key, Box::new(TrivialReqCycle { req, rep })), + ) + .await; + handle.reply(fwded, some.then_some(reply)).await }, api::AtomReq::CallRef(call @ api::CallRef(_, arg)) => { let expr_store = BorrowedExprStore::new(); @@ -383,7 +398,7 @@ impl ExtensionBuilder { let api_expr = serialize(ret).await; mem::drop(expr_handle); expr_store.dispose().await; - handle.reply(call, &api_expr).await + handle.reply(call, api_expr).await }, api::AtomReq::FinalCall(call @ api::FinalCall(_, arg)) => { let expr_store = BorrowedExprStore::new(); @@ -392,7 +407,7 @@ impl ExtensionBuilder { let api_expr = serialize(ret).await; mem::drop(expr_handle); expr_store.dispose().await; - handle.reply(call, &api_expr).await + handle.reply(call, api_expr).await }, } }) @@ -409,7 +424,7 @@ impl ExtensionBuilder { let id = AtomTypeId::decode_slice(read); let nfo = (dyn_cted().inst().card().ops_by_atid(id)) .expect("Deserializing atom with invalid ID"); - handle.reply(&deser, &nfo.deserialize(read, &refs).await).await + handle.reply(&deser, nfo.deserialize(read, &refs).await).await }) .await }, diff --git a/orchid-extension/src/std_reqs.rs b/orchid-extension/src/std_reqs.rs index 3b1f642..215ce64 100644 --- a/orchid-extension/src/std_reqs.rs +++ b/orchid-extension/src/std_reqs.rs @@ -16,11 +16,11 @@ impl Request for Spawn { /// Execute the atom as a command. #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Coding, Hierarchy)] -pub struct RunCommand; -impl Request for RunCommand { +pub struct StartCommand; +impl Request for StartCommand { type Response = Option; } -impl AtomMethod for RunCommand { +impl AtomMethod for StartCommand { const NAME: &str = "orchid::cmd::run"; } diff --git a/orchid-extension/src/streams.rs b/orchid-extension/src/streams.rs index 3fab669..f208578 100644 --- a/orchid-extension/src/streams.rs +++ b/orchid-extension/src/streams.rs @@ -40,23 +40,19 @@ impl OwnedAtom for WriterAtom { async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } } impl Supports for WriterAtom { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: OutputReq, - ) -> Result> { + async fn handle(&self, hand: Box, req: OutputReq) -> Result { match req { OutputReq::WriteReq(ref wr @ WriteReq { ref data }) => { self.0.lock().await.buf.extend(data); - hand.reply(wr, &Ok(())).await + hand.reply(wr, Ok(())).await }, OutputReq::FlushReq(ref fr @ FlushReq) => { let mut g = self.0.lock().await; let WriterState { buf, writer } = &mut *g; - hand.reply(fr, &writer.write_all(&buf[..]).await.map_err(|e| e.into())).await + hand.reply(fr, writer.write_all(&buf[..]).await.map_err(|e| e.into())).await }, OutputReq::CloseReq(ref cr @ CloseReq) => - hand.reply(cr, &self.0.lock().await.writer.close().await.map_err(|e| e.into())).await, + hand.reply(cr, self.0.lock().await.writer.close().await.map_err(|e| e.into())).await, } } } @@ -80,11 +76,7 @@ impl OwnedAtom for ReaderAtom { async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } } impl Supports for ReaderAtom { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: ReadReq, - ) -> Result> { + async fn handle(&self, hand: Box, req: ReadReq) -> Result { let mut buf = Vec::new(); let mut reader = self.0.lock().await; let rep = match match req.limit { @@ -98,6 +90,6 @@ impl Supports for ReaderAtom { Err(e) => Err(e.into()), Ok(()) => Ok(buf), }; - hand.reply(&req, &rep).await + hand.reply(&req, rep).await } } diff --git a/orchid-extension/src/system.rs b/orchid-extension/src/system.rs index 7bc3c33..95073d9 100644 --- a/orchid-extension/src/system.rs +++ b/orchid-extension/src/system.rs @@ -20,11 +20,11 @@ pub trait System: Debug + 'static { fn env(&self) -> impl Future> { futures::future::ready(Vec::new()) } fn lexers(&self) -> Vec { Vec::new() } fn parsers(&self) -> Vec { Vec::new() } - fn request<'a>( + fn request( &self, - hand: Box + 'a>, + hand: Box, req: ReqForSystem, - ) -> impl Future>; + ) -> impl Future; } pub trait DynSystem: Debug + 'static { @@ -32,10 +32,7 @@ pub trait DynSystem: Debug + 'static { fn dyn_env(&self) -> LocalBoxFuture<'_, Vec>; fn dyn_lexers(&self) -> Vec; fn dyn_parsers(&self) -> Vec; - fn dyn_request<'a, 'b: 'a>( - &'a self, - hand: Box + 'b>, - ) -> LocalBoxFuture<'a, Receipt<'b>>; + fn dyn_request<'a, 'b: 'a>(&'a self, hand: Box) -> LocalBoxFuture<'a, Receipt>; fn card(&self) -> Box; } @@ -46,8 +43,8 @@ impl DynSystem for T { fn dyn_parsers(&self) -> Vec { self.parsers() } fn dyn_request<'a, 'b: 'a>( &'a self, - mut hand: Box + 'b>, - ) -> LocalBoxFuture<'a, Receipt<'b>> { + mut hand: Box, + ) -> LocalBoxFuture<'a, Receipt> { Box::pin(async move { let value = hand.read_req().await.unwrap(); self.request(hand.finish().await, value).await diff --git a/orchid-extension/src/trivial_req.rs b/orchid-extension/src/trivial_req.rs index 5d30eb1..6757d32 100644 --- a/orchid-extension/src/trivial_req.rs +++ b/orchid-extension/src/trivial_req.rs @@ -4,25 +4,26 @@ use std::pin::Pin; use futures::future::LocalBoxFuture; use futures::{AsyncRead, AsyncWrite}; use orchid_base::{Receipt, RepWriter, ReqHandle, ReqReader}; +use unsync_pipe::{Reader, Writer}; -pub struct TrivialReqCycle<'a> { - pub req: &'a [u8], - pub rep: &'a mut Vec, +pub struct TrivialReqCycle { + pub req: Reader, + pub rep: Writer, } -impl<'a> ReqReader<'a> for TrivialReqCycle<'a> { +impl ReqReader for TrivialReqCycle { fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { Pin::new(&mut self.req) as Pin<&mut _> } - fn finish(self: Box) -> LocalBoxFuture<'a, Box + 'a>> { + fn finish(self: Box) -> LocalBoxFuture<'static, Box> { Box::pin(async { self as Box<_> }) } } -impl<'a> ReqHandle<'a> for TrivialReqCycle<'a> { - fn start_reply(self: Box) -> LocalBoxFuture<'a, io::Result + 'a>>> { +impl ReqHandle for TrivialReqCycle { + fn start_reply(self: Box) -> LocalBoxFuture<'static, io::Result>> { Box::pin(async { Ok(self as Box<_>) }) } } -impl<'a> RepWriter<'a> for TrivialReqCycle<'a> { +impl RepWriter for TrivialReqCycle { fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { Pin::new(&mut self.rep) as Pin<&mut _> } - fn finish(self: Box) -> LocalBoxFuture<'a, io::Result>> { + fn finish(self: Box) -> LocalBoxFuture<'static, io::Result> { Box::pin(async { Ok(Receipt::_new()) }) } } diff --git a/orchid-host/src/cmd_system.rs b/orchid-host/src/cmd_system.rs index 46c7f59..b3bcdcf 100644 --- a/orchid-host/src/cmd_system.rs +++ b/orchid-host/src/cmd_system.rs @@ -13,7 +13,7 @@ use futures::future::LocalBoxFuture; use futures::stream::FuturesUnordered; use futures::{SinkExt, StreamExt, select}; use never::Never; -use orchid_base::{OrcErrv, Receipt, ReqHandle, Sym, fmt, is, log, mk_errv}; +use orchid_base::{OrcErrv, Receipt, ReqHandle, Sym, fmt, is, log, mk_errv, sym}; use orchid_extension::{self as ox, AtomicFeatures as _, get_arg}; use crate::ctx::Ctx; @@ -100,7 +100,6 @@ pub enum CmdEvent { } pub struct CmdRunner { - root: Root, queue: CommandQueue, gas: Option, interrupted: Option, @@ -108,7 +107,7 @@ pub struct CmdRunner { futures: FuturesUnordered>>, } impl CmdRunner { - pub async fn new(root: Root, ctx: Ctx, init: impl IntoIterator) -> Self { + pub async fn new(root: &mut Root, ctx: Ctx, init: impl IntoIterator) -> Self { let queue = CommandQueue::new(ctx.clone(), init); let ext_builder = ox::ExtensionBuilder::new("orchid::cmd").system(CmdSystemCtor(queue.clone())); let extension = (Extension::new(ext_inline(ext_builder, ctx.clone()).await, ctx).await) @@ -116,16 +115,17 @@ impl CmdRunner { let system_ctor = (extension.system_ctors().find(|ctor| ctor.decl.name == "orchid::cmd")) .expect("Missing command system ctor"); let (cmd_root, system) = system_ctor.run(vec![]).await; - let root = root.merge(&cmd_root).await.expect("Could not merge command system into tree"); - Self { futures: FuturesUnordered::new(), gas: None, root, interrupted: None, queue, system } + *root = root.merge(&cmd_root).await.expect("Could not merge command system into tree"); + Self { futures: FuturesUnordered::new(), gas: None, interrupted: None, queue, system } } + pub fn push(&self, expr: Expr) { self.queue.push(Task::RunCommand(expr)); } #[must_use] pub fn sys(&self) -> &System { &self.system } #[must_use] pub fn get_gas(&self) -> u64 { self.gas.expect("queried gas but no gas was set") } pub fn set_gas(&mut self, gas: u64) { self.gas = Some(gas) } pub fn disable_gas(&mut self) { self.gas = None } - pub async fn execute(&mut self) -> CmdEvent { + pub async fn execute(&mut self, root: &Root) -> CmdEvent { let waiting_on_queue = RefCell::new(false); let (mut spawn, mut on_spawn) = mpsc::channel::>>(1); let mut normalize_stream = pin!( @@ -134,7 +134,7 @@ impl CmdRunner { waiting_on_queue.replace(false); let mut xctx = match self.interrupted.take() { None => match self.queue.get_new().await { - Task::RunCommand(expr) => ExecCtx::new(self.root.clone(), expr).await, + Task::RunCommand(expr) => ExecCtx::new(root.clone(), expr).await, Task::Sleep(until, expr) => { let queue = self.queue.clone(); let ctx = queue.0.borrow_mut().ctx.clone(); @@ -186,7 +186,7 @@ impl CmdRunner { let ctx = queue.0.borrow_mut().ctx.clone(); spawn .send(Box::pin(async move { - match atom.ipc(ox::std_reqs::RunCommand).await { + match atom.ipc(ox::std_reqs::StartCommand).await { None => Some(CmdEvent::NonCommand(val)), Some(None) => None, Some(Some(expr)) => { @@ -209,8 +209,14 @@ impl CmdRunner { loop { let task = select!( r_opt = self.futures.by_ref().next() => match r_opt { - Some(Some(r)) => break r, - None if *waiting_on_queue.borrow() => break CmdEvent::Settled, + Some(Some(r)) => { + eprintln!("Exiting because "); + break r + }, + None if *waiting_on_queue.borrow() => { + eprintln!("Exiting because settled"); + break CmdEvent::Settled + }, None | Some(None) => continue, }, r = normalize_stream.by_ref().next() => match r { @@ -218,7 +224,7 @@ impl CmdRunner { Some(r) => break r, }, task = on_spawn.by_ref().next() => match task { - None => break CmdEvent::Exit, + None => continue, Some(r) => r, }, ); @@ -301,14 +307,10 @@ impl ox::System for CmdSystemInst { }), ]) } - async fn prelude(&self) -> Vec { vec![] } + async fn prelude(&self) -> Vec { vec![sym!("orchid")] } fn lexers(&self) -> Vec { vec![] } fn parsers(&self) -> Vec { vec![] } - async fn request<'a>( - &self, - _hand: Box + 'a>, - req: ox::ReqForSystem, - ) -> Receipt<'a> { + async fn request(&self, _hand: Box, req: ox::ReqForSystem) -> Receipt { match req {} } } diff --git a/orchid-host/src/extension.rs b/orchid-host/src/extension.rs index 9103d2c..2f1da00 100644 --- a/orchid-host/src/extension.rs +++ b/orchid-host/src/extension.rs @@ -133,31 +133,31 @@ impl Extension { } let this = Self(weak.upgrade().unwrap()); match req { - api::ExtHostReq::Ping(ping) => handle.reply(&ping, &()).await, + api::ExtHostReq::Ping(ping) => handle.reply(&ping, ()).await, api::ExtHostReq::IntReq(intreq) => match intreq { api::IntReq::InternStr(s) => { let i = is(&s.0).await; this.0.strings.borrow_mut().insert(i.clone()); - handle.reply(&s, &i.to_api()).await + handle.reply(&s, i.to_api()).await }, api::IntReq::InternStrv(v) => { let tokens = join_all(v.0.iter().map(|m| es(*m))).await; this.0.strings.borrow_mut().extend(tokens.iter().cloned()); let i = iv(&tokens).await; this.0.string_vecs.borrow_mut().insert(i.clone()); - handle.reply(&v, &i.to_api()).await + handle.reply(&v, i.to_api()).await }, api::IntReq::ExternStr(si) => { let i = es(si.0).await; this.0.strings.borrow_mut().insert(i.clone()); - handle.reply(&si, &i.to_string()).await + handle.reply(&si, i.to_string()).await }, api::IntReq::ExternStrv(vi) => { let i = ev(vi.0).await; this.0.strings.borrow_mut().extend(i.iter().cloned()); this.0.string_vecs.borrow_mut().insert(i.clone()); let markerv = i.iter().map(|t| t.to_api()).collect_vec(); - handle.reply(&vi, &markerv).await + handle.reply(&vi, markerv).await }, }, api::ExtHostReq::Fwd(ref fw @ api::Fwd { ref target, ref method, ref body }) => { @@ -168,11 +168,11 @@ impl Extension { .request(api::Fwded(target.clone(), *method, body.clone())) .await .unwrap(); - handle.reply(fw, &reply).await + handle.reply(fw, reply).await }, api::ExtHostReq::SysFwd(ref fw @ api::SysFwd(id, ref body)) => { let sys = ctx.system_inst(id).await.unwrap(); - handle.reply(fw, &sys.request(body.clone()).await).await + handle.reply(fw, sys.request(body.clone()).await).await }, api::ExtHostReq::SubLex(sl) => { let (rep_in, mut rep_out) = channel(0); @@ -182,13 +182,13 @@ impl Extension { lex_g.get(&sl.id).cloned().expect("Sublex for nonexistent lexid"); req_in.send(ReqPair(sl.clone(), rep_in)).await.unwrap(); } - handle.reply(&sl, &rep_out.next().await.unwrap()).await + handle.reply(&sl, rep_out.next().await.unwrap()).await }, api::ExtHostReq::ExprReq(expr_req) => match expr_req { api::ExprReq::Inspect(ins @ api::Inspect { target }) => { let expr = ctx.exprs.get_expr(target).expect("Invalid ticket"); handle - .reply(&ins, &api::Inspected { + .reply(&ins, api::Inspected { refcount: expr.strong_count() as u32, location: expr.pos().to_api(), kind: expr.to_api().await, @@ -201,7 +201,7 @@ impl Extension { Some(expr) => expr.print(&FmtCtxImpl::default()).await, } .to_api(); - handle.reply(&prt, &msg).await + handle.reply(&prt, msg).await }, api::ExprReq::Create(cre) => { let req = Witness::of(&cre); @@ -213,7 +213,7 @@ impl Extension { .await; let expr_id = expr.id(); ctx.exprs.give_expr(expr); - handle.reply(req, &expr_id).await + handle.reply(req, expr_id).await }, }, api::ExtHostReq::LsModule(ref ls @ api::LsModule(_sys, path)) => { @@ -246,7 +246,7 @@ impl Extension { } Ok(api::ModuleInfo { members }) }; - handle.reply(ls, &reply).await + handle.reply(ls, reply).await }, api::ExtHostReq::ResolveNames(ref rn) => { let api::ResolveNames { constid, names, sys } = rn; @@ -275,12 +275,12 @@ impl Extension { }) .collect() .await; - handle.reply(rn, &responses).await + handle.reply(rn, responses).await }, api::ExtHostReq::ExtAtomPrint(ref eap @ api::ExtAtomPrint(ref atom)) => { let atom = AtomHand::from_api(atom, Pos::None, &mut ctx.clone()).await; let unit = atom.print(&FmtCtxImpl::default()).await; - handle.reply(eap, &unit.to_api()).await + handle.reply(eap, unit.to_api()).await }, } }) diff --git a/orchid-std/src/macros/macro_system.rs b/orchid-std/src/macros/macro_system.rs index 9e5b216..3c1196c 100644 --- a/orchid-std/src/macros/macro_system.rs +++ b/orchid-std/src/macros/macro_system.rs @@ -48,9 +48,7 @@ pub struct MacroSystemInst { } impl System for MacroSystemInst { type Ctor = MacroSystem; - async fn request<'a>(&self, _: Box + 'a>, req: Never) -> Receipt<'a> { - match req {} - } + async fn request(&self, _: Box, req: Never) -> Receipt { match req {} } async fn prelude(&self) -> Vec { vec![ sym!(macros::common::+), diff --git a/orchid-std/src/std/number/num_atom.rs b/orchid-std/src/std/number/num_atom.rs index 6ebdaef..627626f 100644 --- a/orchid-std/src/std/number/num_atom.rs +++ b/orchid-std/src/std/number/num_atom.rs @@ -45,13 +45,9 @@ impl ToExpr for Int { } } impl Supports for Int { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: ProtocolMethod, - ) -> io::Result> { + async fn handle(&self, hand: Box, req: ProtocolMethod) -> io::Result { match req { - ProtocolMethod::GetTagId(req) => hand.reply(&req, &sym!(std::number::Int).to_api()).await, + ProtocolMethod::GetTagId(req) => hand.reply(&req, sym!(std::number::Int).to_api()).await, ProtocolMethod::GetImpl(ref req @ GetImpl(key)) => { let name = Sym::from_api(key).await; let val = if name == sym!(std::ops::add) { @@ -65,20 +61,16 @@ impl Supports for Int { } else if name == sym!(std::ops::mod) { sym!(std::number::imod) } else { - return hand.reply(req, &None).await; + return hand.reply(req, None).await; }; - hand.reply(req, &Some(val.to_expr().await.serialize().await)).await + hand.reply(req, Some(val.to_expr().await.serialize().await)).await }, } } } impl Supports for Int { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: ToStringMethod, - ) -> io::Result> { - hand.reply(&req, &self.0.to_string()).await + async fn handle(&self, hand: Box, req: ToStringMethod) -> io::Result { + hand.reply(&req, self.0.to_string()).await } } @@ -112,13 +104,9 @@ impl ToExpr for Float { } } impl Supports for Float { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: ProtocolMethod, - ) -> io::Result> { + async fn handle(&self, hand: Box, req: ProtocolMethod) -> io::Result { match req { - ProtocolMethod::GetTagId(req) => hand.reply(&req, &sym!(std::number::Float).to_api()).await, + ProtocolMethod::GetTagId(req) => hand.reply(&req, sym!(std::number::Float).to_api()).await, ProtocolMethod::GetImpl(ref req @ GetImpl(key)) => { let name = Sym::from_api(key).await; let val = if name == sym!(std::ops::add) { @@ -132,20 +120,16 @@ impl Supports for Float { } else if name == sym!(std::ops::mod) { sym!(std::number::fmod) } else { - return hand.reply(req, &None).await; + return hand.reply(req, None).await; }; - hand.reply(req, &Some(val.to_expr().await.serialize().await)).await + hand.reply(req, Some(val.to_expr().await.serialize().await)).await }, } } } impl Supports for Float { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: ToStringMethod, - ) -> io::Result> { - hand.reply(&req, &self.0.to_string()).await + async fn handle(&self, hand: Box, req: ToStringMethod) -> io::Result { + hand.reply(&req, self.0.to_string()).await } } diff --git a/orchid-std/src/std/protocol/types.rs b/orchid-std/src/std/protocol/types.rs index c35f351..c87653c 100644 --- a/orchid-std/src/std/protocol/types.rs +++ b/orchid-std/src/std/protocol/types.rs @@ -36,16 +36,16 @@ impl OwnedAtom for Tag { async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(self.id.to_api()) } } impl Supports for Tag { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ProtocolMethod, - ) -> std::io::Result> { + ) -> std::io::Result { match req { - ProtocolMethod::GetTagId(req) => hand.reply(&req, &self.id.to_api()).await, + ProtocolMethod::GetTagId(req) => hand.reply(&req, self.id.to_api()).await, ProtocolMethod::GetImpl(ref req @ GetImpl(key)) => hand - .reply(req, &self.impls.get(&Sym::from_api(key).await).map(|expr| expr.handle().ticket())) + .reply(req, self.impls.get(&Sym::from_api(key).await).map(|expr| expr.handle().ticket())) .await, } } @@ -87,11 +87,11 @@ impl OwnedAtom for Tagged { async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(self.tag.id.to_api()) } } impl Supports for Tagged { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ProtocolMethod, - ) -> io::Result> { + ) -> io::Result { self.tag.handle(hand, req).await } } diff --git a/orchid-std/src/std/record/record_atom.rs b/orchid-std/src/std/record/record_atom.rs index 990bca7..951b741 100644 --- a/orchid-std/src/std/record/record_atom.rs +++ b/orchid-std/src/std/record/record_atom.rs @@ -40,13 +40,9 @@ impl OwnedAtom for RecordAtom { async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } } impl Supports for RecordAtom { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: ProtocolMethod, - ) -> io::Result> { + async fn handle(&self, hand: Box, req: ProtocolMethod) -> io::Result { match req { - ProtocolMethod::GetTagId(req) => hand.reply(&req, &sym!(std::record::Record).to_api()).await, + ProtocolMethod::GetTagId(req) => hand.reply(&req, sym!(std::record::Record).to_api()).await, ProtocolMethod::GetImpl(ref req @ GetImpl(key)) => { let name = Sym::from_api(key).await; let val = if name == sym!(std::ops::get) { @@ -54,9 +50,9 @@ impl Supports for RecordAtom { } else if name == sym!(std::ops::set) { sym!(std::record::set) } else { - return hand.reply(req, &None).await; + return hand.reply(req, None).await; }; - return hand.reply(req, &Some(val.to_expr().await.serialize().await)).await; + return hand.reply(req, Some(val.to_expr().await.serialize().await)).await; }, } } diff --git a/orchid-std/src/std/reflection/sym_atom.rs b/orchid-std/src/std/reflection/sym_atom.rs index f4a3782..153fd27 100644 --- a/orchid-std/src/std/reflection/sym_atom.rs +++ b/orchid-std/src/std/reflection/sym_atom.rs @@ -27,12 +27,12 @@ impl OwnedAtom for SymAtom { async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(SymAtomData(self.0.tok().to_api())) } } impl Supports for SymAtom { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ToStringMethod, - ) -> std::io::Result> { - hand.reply(&req, &self.0.to_string()).await + ) -> std::io::Result { + hand.reply(&req, self.0.to_string()).await } } diff --git a/orchid-std/src/std/std_system.rs b/orchid-std/src/std/std_system.rs index d5ceaf8..bc692d7 100644 --- a/orchid-std/src/std/std_system.rs +++ b/orchid-std/src/std/std_system.rs @@ -27,6 +27,7 @@ use crate::std::protocol::types::{CreateTag, Tag, Tagged, gen_protocol_lib}; use crate::std::record::record_atom::{CreateRecord, RecordAtom}; use crate::std::record::record_lib::gen_record_lib; use crate::std::reflection::sym_atom::{CreateSymAtom, SymAtom, gen_sym_lib}; +use crate::std::stream::stream_cmds::{ReadStreamCmd, WriteStreamCmd}; use crate::std::stream::stream_lib::gen_stream_lib; use crate::std::string::str_lexer::StringLexer; use crate::std::time::{CreateDT, gen_time_lib}; @@ -72,27 +73,25 @@ impl SystemCard for StdSystem { Some(TupleBuilder::ops()), Some(Tag::ops()), Some(Tagged::ops()), + Some(ReadStreamCmd::ops()), + Some(WriteStreamCmd::ops()), ] } } impl System for StdSystem { type Ctor = Self; - async fn request<'a>( - &self, - xreq: Box + 'a>, - req: ReqForSystem, - ) -> Receipt<'a> { + async fn request(&self, xreq: Box, req: ReqForSystem) -> Receipt { match req { StdReq::CreateInt(ref req @ CreateInt(int)) => - xreq.reply(req, &new_atom(int).to_expr().await.serialize().await).await.unwrap(), + xreq.reply(req, new_atom(int).to_expr().await.serialize().await).await.unwrap(), StdReq::CreateFloat(ref req @ CreateFloat(float)) => - xreq.reply(req, &new_atom(float).to_expr().await.serialize().await).await.unwrap(), + xreq.reply(req, new_atom(float).to_expr().await.serialize().await).await.unwrap(), StdReq::CreateDT(ref req @ CreateDT(dt)) => - xreq.reply(req, &new_atom(dt).to_expr().await.serialize().await).await.unwrap(), + xreq.reply(req, new_atom(dt).to_expr().await.serialize().await).await.unwrap(), StdReq::CreateTuple(ref req @ CreateTuple(ref items)) => { let tpl = Tuple(Rc::new(join_all(items.iter().copied().map(Expr::deserialize)).await)); let tk = new_atom(tpl).to_expr().await.serialize().await; - xreq.reply(req, &tk).await.unwrap() + xreq.reply(req, tk).await.unwrap() }, StdReq::CreateRecord(ref req @ CreateRecord(ref items)) => { let values = @@ -100,11 +99,11 @@ impl System for StdSystem { .await; let rec = RecordAtom(Rc::new(values.into_iter().collect())); let tk = new_atom(rec).to_expr().await.serialize().await; - xreq.reply(req, &tk).await.unwrap() + xreq.reply(req, tk).await.unwrap() }, StdReq::CreateSymAtom(ref req @ CreateSymAtom(sym_tok)) => { let sym_atom = SymAtom(Sym::from_api(sym_tok).await); - xreq.reply(req, &new_atom(sym_atom).to_expr().await.serialize().await).await.unwrap() + xreq.reply(req, new_atom(sym_atom).to_expr().await.serialize().await).await.unwrap() }, StdReq::CreateTag(ref req @ CreateTag { name, ref impls }) => { let tag_atom = Tag { @@ -119,7 +118,7 @@ impl System for StdSystem { .collect(), ), }; - xreq.reply(req, &new_atom(tag_atom).to_expr().await.serialize().await).await.unwrap() + xreq.reply(req, new_atom(tag_atom).to_expr().await.serialize().await).await.unwrap() }, } } diff --git a/orchid-std/src/std/stream/stream_cmds.rs b/orchid-std/src/std/stream/stream_cmds.rs index c4e5f4b..7d30de1 100644 --- a/orchid-std/src/std/stream/stream_cmds.rs +++ b/orchid-std/src/std/stream/stream_cmds.rs @@ -3,12 +3,15 @@ use std::io; use std::rc::Rc; use never::Never; -use orchid_base::{ReqHandleExt, fmt, is, mk_errv}; +use orchid_base::{Receipt, ReqHandle, ReqHandleExt, fmt, is, mk_errv}; use orchid_extension::gen_expr::{bot, call, new_atom, serialize}; -use orchid_extension::std_reqs::{ReadLimit, ReadReq, RunCommand}; -use orchid_extension::{Atomic, Expr, ForeignAtom, OwnedAtom, OwnedVariant, Supports, ToExpr}; +use orchid_extension::std_reqs::{CloseReq, FlushReq, ReadLimit, ReadReq, StartCommand, WriteReq}; +use orchid_extension::{ + Atomic, Expr, ForeignAtom, MethodSetBuilder, OwnedAtom, OwnedVariant, Supports, ToExpr, +}; use crate::std::binary::binary_atom::BlobAtom; +use crate::std::string::str_atom::StrAtom; #[derive(Clone, Debug)] pub struct ReadStreamCmd { @@ -16,40 +19,99 @@ pub struct ReadStreamCmd { pub limit: ReadLimit, pub succ: Expr, pub fail: Expr, + pub as_str: bool, } impl Atomic for ReadStreamCmd { type Variant = OwnedVariant; type Data = (); + fn reg_methods() -> MethodSetBuilder { MethodSetBuilder::new().handle::() } } impl OwnedAtom for ReadStreamCmd { type Refs = Never; async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } } -impl Supports for ReadStreamCmd { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: RunCommand, - ) -> io::Result> { - let ret = match self.hand.call(ReadReq { limit: self.limit.clone() }).await { - None => Err(mk_errv( - is("Atom is not readable").await, - format!("Expected a readable stream handle, found {}", fmt(&self.hand).await), - [self.hand.pos()], +impl Supports for ReadStreamCmd { + async fn handle(&self, hand: Box, req: StartCommand) -> io::Result { + let ret = 'ret: { + let Some(read_res) = self.hand.call(ReadReq { limit: self.limit.clone() }).await else { + break 'ret Err(mk_errv( + is("Atom is not readable").await, + format!("Expected a readable stream handle, found {}", fmt(&self.hand).await), + [self.hand.pos()], + )); + }; + let res = match read_res { + Err(e) => Err(mk_errv( + is(e.kind.message()).await, + format!("An error occurred while reading: {}", e.message), + [self.hand.pos(), self.succ.pos().await], + )), + Ok(v) if !self.as_str => Ok(new_atom(BlobAtom(Rc::new(v)))), + Ok(v) => match String::from_utf8(v) { + Ok(s) => Ok(new_atom(StrAtom(Rc::new(s)))), + Err(e) => Err(mk_errv(is("Invalid utf8 in input string").await, e.to_string(), [ + self.hand.pos(), + self.succ.pos().await, + ])), + }, + }; + Ok(match res { + Err(e) => call(self.fail.clone(), bot(e)), + Ok(gex) => call(self.succ.clone(), gex), + }) + }; + hand.reply(&req, Some(serialize(ret.to_gen().await).await)).await + } +} + +#[derive(Clone, Debug)] +pub enum WriteAction { + Write(Rc>), + Flush, + Close, +} + +#[derive(Clone, Debug)] +pub struct WriteStreamCmd { + pub hand: ForeignAtom, + pub action: WriteAction, + pub succ: Expr, + pub fail: Expr, +} +impl Atomic for WriteStreamCmd { + type Variant = OwnedVariant; + type Data = (); + fn reg_methods() -> MethodSetBuilder { MethodSetBuilder::new().handle::() } +} +impl OwnedAtom for WriteStreamCmd { + type Refs = Never; + async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } +} +impl Supports for WriteStreamCmd { + async fn handle(&self, hand: Box, req: StartCommand) -> io::Result { + let result = match &self.action { + WriteAction::Write(bin) => self.hand.call(WriteReq { data: bin.to_vec() }).await, + WriteAction::Flush => self.hand.call(FlushReq).await, + WriteAction::Close => self.hand.call(CloseReq).await, + }; + let cont = match result { + None => bot(mk_errv( + is("Not a writer").await, + format!("{} cannot be written to", fmt(&self.hand).await), + [self.hand.pos(), self.succ.pos().await], )), - Some(Err(e)) => Ok( + Some(Err(e)) => call( self.fail.clone(), bot(mk_errv( is(e.kind.message()).await, - format!("An error occurred while reading: {}", e.message), + format!("An error occurred while writing: {}", e.message), [self.hand.pos(), self.succ.pos().await], )), ) .await, - ), - Some(Ok(v)) => Ok(call(self.succ.clone(), new_atom(BlobAtom(Rc::new(v)))).await), + Some(Ok(())) => self.succ.clone().to_gen().await, }; - hand.reply(&req, &Some(serialize(ret.to_gen().await).await)).await + hand.reply(&req, Some(serialize(cont.to_gen().await).await)).await } } diff --git a/orchid-std/src/std/stream/stream_lib.rs b/orchid-std/src/std/stream/stream_lib.rs index 745af9d..cb3d053 100644 --- a/orchid-std/src/std/stream/stream_lib.rs +++ b/orchid-std/src/std/stream/stream_lib.rs @@ -1,22 +1,26 @@ use std::num::NonZero; use std::rc::Rc; +use itertools::Itertools; use orchid_base::{is, mk_errv}; use orchid_extension::gen_expr::{call, new_atom}; use orchid_extension::std_reqs::ReadLimit; use orchid_extension::tree::{GenMember, comments, fun, prefix}; -use orchid_extension::{Expr, ForeignAtom, get_arg}; +use orchid_extension::{Expr, ForeignAtom, TAtom, get_arg}; -use crate::Int; use crate::std::binary::binary_atom::BlobAtom; -use crate::std::stream::stream_cmds::ReadStreamCmd; +use crate::std::stream::stream_cmds::{ReadStreamCmd, WriteAction, WriteStreamCmd}; +use crate::{Int, OrcString}; pub fn gen_stream_lib() -> Vec { prefix("std", [comments( ["Read from and write to byte streams"], prefix("stream", [ fun(true, "read_bin", async |hand: ForeignAtom, succ: Expr, fail: Expr| { - new_atom(ReadStreamCmd { hand, succ, fail, limit: ReadLimit::End }) + new_atom(ReadStreamCmd { hand, succ, fail, as_str: false, limit: ReadLimit::End }) + }), + fun(true, "read_str", async |hand: ForeignAtom, succ: Expr, fail: Expr| { + new_atom(ReadStreamCmd { hand, succ, fail, as_str: true, limit: ReadLimit::End }) }), fun(true, "read_until", async |hand: ForeignAtom, delim: Int, succ: Expr, fail: Expr| { let Ok(end) = delim.0.try_into() else { @@ -29,12 +33,15 @@ pub fn gen_stream_lib() -> Vec { [get_arg(1).pos().await], )); }; - Ok(new_atom(ReadStreamCmd { hand, succ, fail, limit: ReadLimit::Delimiter(end) })) + let limit = ReadLimit::Delimiter(end); + Ok(new_atom(ReadStreamCmd { hand, succ, fail, as_str: false, limit })) }), fun(true, "read_bytes", async |hand: ForeignAtom, count: Int, succ: Expr, fail: Expr| { match count.0.try_into().map(NonZero::new) { - Ok(Some(nzlen)) => - Ok(new_atom(ReadStreamCmd { hand, succ, fail, limit: ReadLimit::Length(nzlen) })), + Ok(Some(nzlen)) => { + let limit = ReadLimit::Length(nzlen); + Ok(new_atom(ReadStreamCmd { hand, succ, fail, as_str: false, limit })) + }, Ok(None) => Ok(call(succ, new_atom(BlobAtom(Rc::default()))).await), Err(_) => Err(mk_errv( is("Length cannot be negative").await, @@ -43,6 +50,28 @@ pub fn gen_stream_lib() -> Vec { )), } }), + fun(true, "read_line", async |hand: ForeignAtom, succ: Expr, fail: Expr| { + const LIMIT_BR: ReadLimit = ReadLimit::Delimiter(b'\n'); + new_atom(ReadStreamCmd { hand, succ, fail, as_str: true, limit: LIMIT_BR }) + }), + fun(true, "write_str", async |hand: ForeignAtom, str: OrcString, succ: Expr, fail: Expr| { + let action = WriteAction::Write(Rc::new(str.get_string().await.bytes().collect_vec())); + new_atom(WriteStreamCmd { hand, action, succ, fail }) + }), + fun( + true, + "write_bin", + async |hand: ForeignAtom, bin: TAtom, succ: Expr, fail: Expr| { + let action = WriteAction::Write(bin.own().await.0.clone()); + new_atom(WriteStreamCmd { hand, action, succ, fail }) + }, + ), + fun(true, "flush", async |hand: ForeignAtom, succ: Expr, fail: Expr| { + new_atom(WriteStreamCmd { hand, action: WriteAction::Flush, succ, fail }) + }), + fun(true, "close", async |hand: ForeignAtom, succ: Expr, fail: Expr| { + new_atom(WriteStreamCmd { hand, action: WriteAction::Close, succ, fail }) + }), ]), )]) } diff --git a/orchid-std/src/std/string/str_atom.rs b/orchid-std/src/std/string/str_atom.rs index a5e6c08..8d36c75 100644 --- a/orchid-std/src/std/string/str_atom.rs +++ b/orchid-std/src/std/string/str_atom.rs @@ -28,7 +28,7 @@ impl AtomMethod for StringGetValMethod { } #[derive(Clone)] -pub struct StrAtom(Rc); +pub struct StrAtom(pub(crate) Rc); impl Atomic for StrAtom { type Variant = OwnedVariant; type Data = (); @@ -60,39 +60,39 @@ impl OwnedAtom for StrAtom { } } impl Supports for StrAtom { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: StringGetValMethod, - ) -> io::Result> { - hand.reply(&req, &self.0).await + ) -> io::Result { + hand.reply(&req, self.0.clone()).await } } impl Supports for StrAtom { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ToStringMethod, - ) -> io::Result> { - hand.reply(&req, &self.0).await + ) -> io::Result { + hand.reply(&req, self.0.to_string()).await } } impl Supports for StrAtom { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ProtocolMethod, - ) -> io::Result> { + ) -> io::Result { match req { - ProtocolMethod::GetTagId(req) => hand.reply(&req, &sym!(std::string::StrAtom).to_api()).await, + ProtocolMethod::GetTagId(req) => hand.reply(&req, sym!(std::string::StrAtom).to_api()).await, ProtocolMethod::GetImpl(ref req @ GetImpl(key)) => { let name = Sym::from_api(key).await; let val = if name == sym!(std::ops::add) { sym!(std::string::concat) } else { - return hand.reply(req, &None).await; + return hand.reply(req, None).await; }; - hand.reply(req, &Some(val.to_expr().await.serialize().await)).await + hand.reply(req, Some(val.to_expr().await.serialize().await)).await }, } } @@ -130,31 +130,31 @@ impl TryFromExpr for IntStrAtom { } } impl Supports for IntStrAtom { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ToStringMethod, - ) -> io::Result> { - hand.reply(&req, &self.0.rc()).await + ) -> io::Result { + hand.reply(&req, self.0.to_string()).await } } impl Supports for IntStrAtom { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ProtocolMethod, - ) -> io::Result> { + ) -> io::Result { match req { ProtocolMethod::GetTagId(req) => - hand.reply(&req, &sym!(std::string::IntStrAtom).to_api()).await, + hand.reply(&req, sym!(std::string::IntStrAtom).to_api()).await, ProtocolMethod::GetImpl(ref req @ GetImpl(key)) => { let name = Sym::from_api(key).await; let val = if name == sym!(std::ops::add) { sym!(std::string::concat) } else { - return hand.reply(req, &None).await; + return hand.reply(req, None).await; }; - hand.reply(req, &Some(val.to_expr().await.serialize().await)).await + hand.reply(req, Some(val.to_expr().await.serialize().await)).await }, } } diff --git a/orchid-std/src/std/time.rs b/orchid-std/src/std/time.rs index 9ce4252..39724d6 100644 --- a/orchid-std/src/std/time.rs +++ b/orchid-std/src/std/time.rs @@ -8,7 +8,7 @@ use orchid_api_derive::{Coding, Hierarchy}; use orchid_api_traits::Request; use orchid_base::{Numeric, OrcRes, Receipt, ReqHandle, ReqHandleExt}; use orchid_extension::gen_expr::{GExpr, call, new_atom, serialize}; -use orchid_extension::std_reqs::{AsInstant, RunCommand}; +use orchid_extension::std_reqs::{AsInstant, StartCommand}; use orchid_extension::tree::{GenMember, fun, prefix}; use orchid_extension::{ Atomic, Expr, MethodSetBuilder, OwnedAtom, OwnedVariant, Supports, TAtom, ThinAtom, ThinVariant, @@ -55,12 +55,8 @@ impl OwnedAtom for InstantAtom { async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } } impl Supports for InstantAtom { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: AsInstant, - ) -> std::io::Result> { - hand.reply(&req, &self.0).await + async fn handle(&self, hand: Box, req: AsInstant) -> std::io::Result { + hand.reply(&req, self.0).await } } @@ -69,20 +65,16 @@ struct Now(Expr); impl Atomic for Now { type Variant = OwnedVariant; type Data = (); - fn reg_methods() -> MethodSetBuilder { MethodSetBuilder::new().handle::() } + fn reg_methods() -> MethodSetBuilder { MethodSetBuilder::new().handle::() } } impl OwnedAtom for Now { type Refs = Never; async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } } -impl Supports for Now { - async fn handle<'a>( - &self, - hand: Box + '_>, - req: RunCommand, - ) -> io::Result> { +impl Supports for Now { + async fn handle(&self, hand: Box, req: StartCommand) -> io::Result { let cont = serialize(call(self.0.clone(), new_atom(InstantAtom(Utc::now()))).await).await; - hand.reply(&req, &Some(cont)).await + hand.reply(&req, Some(cont)).await } } diff --git a/orchid-std/src/std/tuple.rs b/orchid-std/src/std/tuple.rs index b9381d5..259ff01 100644 --- a/orchid-std/src/std/tuple.rs +++ b/orchid-std/src/std/tuple.rs @@ -48,13 +48,13 @@ impl OwnedAtom for Tuple { } } impl Supports for Tuple { - async fn handle<'a>( + async fn handle( &self, - hand: Box + '_>, + hand: Box, req: ProtocolMethod, - ) -> std::io::Result> { + ) -> std::io::Result { match req { - ProtocolMethod::GetTagId(req) => hand.reply(&req, &sym!(std::tuple).to_api()).await, + ProtocolMethod::GetTagId(req) => hand.reply(&req, sym!(std::tuple).to_api()).await, ProtocolMethod::GetImpl(ref req @ GetImpl(key)) => { let name = Sym::from_api(key).await; let val = if name == sym!(std::ops::get) { @@ -62,9 +62,9 @@ impl Supports for Tuple { } else if name == sym!(std::ops::set) { sym!(std::tuple::set) } else { - return hand.reply(req, &None).await; + return hand.reply(req, None).await; }; - hand.reply(req, &Some(val.to_expr().await.serialize().await)).await + hand.reply(req, Some(val.to_expr().await.serialize().await)).await }, } } diff --git a/orcx/src/main.rs b/orcx/src/main.rs index faf4a81..725fd79 100644 --- a/orcx/src/main.rs +++ b/orcx/src/main.rs @@ -163,7 +163,7 @@ pub enum Commands { /// Execute effectful Orchid code Exec { /// Entrypoint or startup command - #[arg()] + #[arg(long)] main: String, }, } @@ -288,11 +288,7 @@ impl ox::SystemCtor for StdIoSystem { } impl ox::System for StdIoSystem { type Ctor = Self; - async fn request<'a>( - &self, - _: Box + 'a>, - req: ox::ReqForSystem, - ) -> Receipt<'a> { + async fn request(&self, _: Box, req: ox::ReqForSystem) -> Receipt { match req {} } async fn env(&self) -> Vec { @@ -450,20 +446,21 @@ fn main() -> io::Result { let io_ext_init = ext_inline(io_ext_builder, ctx.clone()).await; let io_ext = Extension::new(io_ext_init, ctx.clone()).await.map_err(|e| e.to_string())?; - let io_ctor = (io_ext.system_ctors().find(|ctor| ctor.name() == "orchid::cmd")) - .expect("Missing command system ctor"); + let io_ctor = (io_ext.system_ctors().find(|ctor| ctor.name() == "orcx::stdio")) + .expect("Missing io system ctor"); let (io_root, io_system) = io_ctor.run(vec![]).await; root = root.merge(&io_root).await.expect("Failed to merge stdio root into tree"); systems.push(io_system); extensions.push(io_ext); + let mut crun = CmdRunner::new(&mut root, ctx.clone(), []).await; + systems.push(crun.sys().clone()); load_proj_if_set(&mut root, &args, ctx).await?; add_const_at(&mut root, ctx, &systems[..], path.clone(), is(main).await).await?; - let expr = ExprKind::Const(path.clone()).at(SrcRange::zw(path.clone(), 0).pos()); - let mut crun = CmdRunner::new(root, ctx.clone(), [expr]).await; + crun.push(ExprKind::Const(path.clone()).at(SrcRange::zw(path.clone(), 0).pos())); if !args.no_gas { crun.set_gas(args.gas); } - match crun.execute().await { + match crun.execute(&root).await { CmdEvent::Exit | CmdEvent::Settled => (), CmdEvent::Err(e) => println!("error: {e}"), CmdEvent::Gas => println!("Exceeded gas limit of {}", args.gas),