diff --git a/Cargo.lock b/Cargo.lock index 897b2f5..8c16c87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1015,6 +1015,7 @@ version = "0.1.0" dependencies = [ "async-fn-stream", "async-once-cell", + "bound", "derive_destructure", "dyn-clone", "futures", diff --git a/orchid-api-traits/src/hierarchy.rs b/orchid-api-traits/src/hierarchy.rs index 8ded423..fa738db 100644 --- a/orchid-api-traits/src/hierarchy.rs +++ b/orchid-api-traits/src/hierarchy.rs @@ -29,25 +29,21 @@ pub trait Extends: InHierarchy + Into { pub trait UnderRootImpl: Sized { type __Root: UnderRoot; fn __into_root(self) -> Self::__Root; - fn __try_from_root(root: Self::__Root) -> Result; } pub trait UnderRoot: InHierarchy { type Root: UnderRoot; fn into_root(self) -> Self::Root; - fn try_from_root(root: Self::Root) -> Result; } impl> UnderRoot for T { type Root = ::IsRoot>>::__Root; fn into_root(self) -> Self::Root { self.__into_root() } - fn try_from_root(root: Self::Root) -> Result { Self::__try_from_root(root) } } impl> UnderRootImpl for T { type __Root = Self; fn __into_root(self) -> Self::__Root { self } - fn __try_from_root(root: Self::__Root) -> Result { Ok(root) } } impl + Extends> UnderRootImpl for T { @@ -57,8 +53,4 @@ impl + Extends> UnderRootImpl for T { fn __into_root(self) -> Self::__Root { ::Parent>>::into(self).into_root() } - fn __try_from_root(root: Self::__Root) -> Result { - let parent = ::Parent::try_from_root(root)?; - parent.clone().try_into().map_err(|_| parent.into_root()) - } } diff --git a/orchid-api-traits/src/relations.rs b/orchid-api-traits/src/relations.rs index 3d473d8..b86b619 100644 --- a/orchid-api-traits/src/relations.rs +++ b/orchid-api-traits/src/relations.rs @@ -9,12 +9,6 @@ pub trait Request: fmt::Debug + Sized + 'static { } pub async fn respond(_: &R, rep: R::Response) -> Vec { enc_vec(&rep).await } -pub async fn respond_with>( - r: &R, - f: impl FnOnce(&R) -> F, -) -> Vec { - respond(r, f(r).await).await -} pub trait Channel: 'static { type Req: Coding + Sized + 'static; diff --git a/orchid-api/src/interner.rs b/orchid-api/src/interner.rs index 1263d1d..d663c23 100644 --- a/orchid-api/src/interner.rs +++ b/orchid-api/src/interner.rs @@ -3,7 +3,7 @@ use std::num::NonZeroU64; use orchid_api_derive::{Coding, Hierarchy}; use orchid_api_traits::Request; -use crate::{ExtHostReq, HostExtReq}; +use crate::{ExtHostNotif, ExtHostReq, HostExtReq}; /// Intern requests sent by the replica to the master. These requests are /// repeatable. @@ -71,18 +71,21 @@ pub struct TStr(pub NonZeroU64); pub struct TStrv(pub NonZeroU64); /// A request to sweep the replica. The master will not be sweeped until all -/// replicas respond, as it must retain everything the replicas retained +/// replicas respond. For efficiency, replicas should make sure to send the +/// [Sweeped] notif before returning. #[derive(Clone, Copy, Debug, Coding, Hierarchy)] #[extends(HostExtReq)] pub struct Sweep; impl Request for Sweep { - type Response = Retained; + type Response = (); } -/// List of keys in this replica that couldn't be sweeped because local -/// datastructures reference their value. -#[derive(Clone, Debug, Coding)] -pub struct Retained { +/// List of keys in this replica that were removed during a sweep. This may have +/// been initiated via a [Sweep] request, but can also be triggered by the +/// replica autonomously. +#[derive(Clone, Debug, Coding, Hierarchy)] +#[extends(ExtHostNotif)] +pub struct Sweeped { pub strings: Vec, pub vecs: Vec, } diff --git a/orchid-api/src/proto.rs b/orchid-api/src/proto.rs index 8df1d73..4763b16 100644 --- a/orchid-api/src/proto.rs +++ b/orchid-api/src/proto.rs @@ -28,7 +28,7 @@ use futures::{AsyncRead, AsyncWrite}; use orchid_api_derive::{Coding, Hierarchy}; use orchid_api_traits::{Channel, Decode, Encode, MsgSet, Request, read_exact, write_exact}; -use crate::{atom, expr, interner, lexer, logging, parser, system, tree}; +use crate::{Sweeped, atom, expr, interner, lexer, logging, parser, system, tree}; static HOST_INTRO: &[u8] = b"Orchid host, binary API v0\n"; pub struct HostHeader { @@ -99,6 +99,7 @@ pub enum ExtHostReq { pub enum ExtHostNotif { ExprNotif(expr::ExprNotif), Log(logging::Log), + Sweeped(Sweeped), } pub struct ExtHostChannel; diff --git a/orchid-base/Cargo.toml b/orchid-base/Cargo.toml index 3a49db5..7168160 100644 --- a/orchid-base/Cargo.toml +++ b/orchid-base/Cargo.toml @@ -8,6 +8,7 @@ edition = "2024" [dependencies] async-fn-stream = { version = "0.1.0", path = "../async-fn-stream" } async-once-cell = "0.5.4" +bound = "0.6.0" derive_destructure = "1.0.0" dyn-clone = "1.0.20" futures = { version = "0.3.31", features = ["std"], default-features = false } diff --git a/orchid-base/src/interner.rs b/orchid-base/src/interner.rs index 7e1e0b7..8675fa9 100644 --- a/orchid-base/src/interner.rs +++ b/orchid-base/src/interner.rs @@ -1,20 +1,99 @@ use std::borrow::Borrow; +use std::fmt::{Debug, Display}; use std::future::Future; -use std::hash::BuildHasher as _; +use std::hash::{BuildHasher as _, Hash}; use std::num::NonZeroU64; use std::ops::Deref; use std::rc::Rc; use std::sync::atomic; use std::{fmt, hash}; +use futures::future::LocalBoxFuture; use futures::lock::Mutex; use hashbrown::{HashMap, HashSet}; use itertools::Itertools as _; use orchid_api_traits::Request; +use some_executor::task_local; use crate::api; use crate::reqnot::{DynRequester, Requester}; +pub trait IStrHandle: AsRef {} +pub trait IStrvHandle: AsRef<[IStr]> {} + +#[derive(Clone)] +pub struct IStr(pub api::TStr, pub Rc); +impl Deref for IStr { + type Target = str; + fn deref(&self) -> &Self::Target { self.1.as_ref().as_ref() } +} +impl Eq for IStr {} +impl PartialEq for IStr { + fn eq(&self, other: &Self) -> bool { self.0 == other.0 } +} +impl Hash for IStr { + fn hash(&self, state: &mut H) { self.0.hash(state) } +} +impl Display for IStr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.deref()) } +} +impl Debug for IStr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "IStr({self}") } +} +#[derive(Clone)] +pub struct IStrv(pub api::TStrv, pub Rc); +impl Deref for IStrv { + type Target = [IStr]; + fn deref(&self) -> &Self::Target { self.1.as_ref().as_ref() } +} +impl Eq for IStrv {} +impl PartialEq for IStrv { + fn eq(&self, other: &Self) -> bool { self.0 == other.0 } +} +impl Hash for IStrv { + fn hash(&self, state: &mut H) { self.0.0.hash(state) } +} +impl Display for IStrv { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut iter = self.deref().iter(); + match iter.next() { + None => return Ok(()), + Some(s) => write!(f, "{s}")?, + } + for s in iter { + write!(f, "::{s}")? + } + Ok(()) + } +} +impl Debug for IStrv { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "IStrv({self})") } +} + +pub trait InternerSrv { + fn is(&self, v: &str) -> LocalBoxFuture<'static, IStr>; + fn es(&self, t: api::TStr) -> LocalBoxFuture<'static, IStr>; + fn iv(&self, v: &[IStr]) -> LocalBoxFuture<'static, IStrv>; + fn ev(&self, t: api::TStrv) -> LocalBoxFuture<'static, IStrv>; +} + +task_local! { + static INTERNER: Rc; +} + +pub async fn with_interner(val: Rc, fut: F) -> F::Output { + INTERNER.scope(val, fut).await +} + +fn get_interner() -> Rc { + INTERNER.with(|i| i.expect("Interner not initialized").clone()) +} + +pub async fn is(v: &str) -> IStr { get_interner().is(v).await } +pub async fn iv(v: &[IStr]) -> IStrv { get_interner().iv(v).await } +pub async fn es(v: api::TStr) -> IStr { get_interner().es(v).await } +pub async fn ev(v: api::TStrv) -> IStrv { get_interner().ev(v).await } + /// Clippy crashes while verifying `Tok: Sized` without this and I cba to create /// a minimal example #[derive(Clone)] @@ -255,12 +334,7 @@ impl Interner { M::Interned::bimap(&mut *self.0.interners.lock().await).insert(token.clone()); token } - pub async fn sweep_replica(&self) -> api::Retained { - assert!(self.0.master.is_some(), "Not a replica"); - let mut g = self.0.interners.lock().await; - api::Retained { strings: g.strings.sweep_replica(), vecs: g.vecs.sweep_replica() } - } - pub async fn sweep_master(&self, retained: api::Retained) { + pub async fn sweep_master(&self, retained: api::Sweeped) { assert!(self.0.master.is_none(), "Not master"); let mut g = self.0.interners.lock().await; g.strings.sweep_master(retained.strings.into_iter().collect()); @@ -275,7 +349,7 @@ impl fmt::Debug for Interner { static ID: atomic::AtomicU64 = atomic::AtomicU64::new(1); -pub fn merge_retained(into: &mut api::Retained, from: &api::Retained) { +pub fn merge_retained(into: &mut api::Sweeped, from: &api::Sweeped) { into.strings = into.strings.iter().chain(&from.strings).copied().unique().collect(); into.vecs = into.vecs.iter().chain(&from.vecs).copied().unique().collect(); } diff --git a/orchid-base/src/lib.rs b/orchid-base/src/lib.rs index 42a07c4..b812005 100644 --- a/orchid-base/src/lib.rs +++ b/orchid-base/src/lib.rs @@ -25,6 +25,7 @@ pub mod pure_seq; pub mod reqnot; pub mod sequence; pub mod side; +pub mod stash; mod tl_cache; pub mod tokens; pub mod tree; diff --git a/orchid-base/src/reqnot.rs b/orchid-base/src/reqnot.rs index 4b24d24..e42daea 100644 --- a/orchid-base/src/reqnot.rs +++ b/orchid-base/src/reqnot.rs @@ -1,20 +1,28 @@ +use std::any::TypeId; use std::cell::RefCell; +use std::collections::VecDeque; use std::future::Future; use std::marker::PhantomData; use std::mem; use std::ops::{BitAnd, Deref}; -use std::pin::Pin; +use std::pin::{Pin, pin}; +use std::rc::Rc; use std::sync::Arc; +use std::task::Poll; use std::thread::panicking; +use async_fn_stream::stream; +use bound::Bound; use derive_destructure::destructure; use dyn_clone::{DynClone, clone_box}; -use futures::channel::mpsc; -use futures::future::LocalBoxFuture; -use futures::lock::Mutex; -use futures::{SinkExt, StreamExt}; +use futures::channel::mpsc::{self, Receiver, Sender, channel}; +use futures::channel::oneshot; +use futures::future::{LocalBoxFuture, select_all}; +use futures::lock::{Mutex, MutexGuard}; +use futures::{AsyncRead, AsyncWrite, SinkExt, Stream, StreamExt, stream, stream_select}; use hashbrown::HashMap; -use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request, enc_vec}; +use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request, UnderRoot, enc_vec}; +use some_executor::task_local; use trait_set::trait_set; use crate::clone; @@ -22,12 +30,347 @@ use crate::logging::Logger; pub struct Receipt<'a>(PhantomData<&'a mut ()>); +/// Write guard to outbound for the purpose of serializing a request. Only one +/// can exist at a time. Dropping this object should panic. +pub trait ReqWriter { + /// Access to the underlying channel. This may be buffered. + fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>; + /// Finalize the request, release the outbound channel, then queue for the + /// reply on the inbound channel. + fn send(self: Box) -> LocalBoxFuture<'static, Box>; +} + +/// Write guard to inbound for the purpose of deserializing a reply. While held, +/// no inbound requests or other replies can be processed. +/// +/// Dropping this object should panic even if [RepReader::finish] returns +/// synchronously, because the API isn't cancellation safe in general so it is a +/// programmer error in all cases to drop an object related to it without proper +/// cleanup. +pub trait RepReader { + /// Access to the underlying channel. The length of the message is inferred + /// from the number of bytes read so this must not be buffered. + fn reader(&mut self) -> Pin<&mut dyn AsyncRead>; + /// Finish reading the request + fn finish(self: Box) -> LocalBoxFuture<'static, ()>; +} + +/// Write guard to outbound for the purpose of serializing a notification. +/// +/// Dropping this object should panic for the same reason [RepReader] panics +pub trait MsgWriter { + /// Access to the underlying channel. This may be buffered. + fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>; + /// Send the notification + fn finish(self: Box) -> LocalBoxFuture<'static, ()>; +} + +/// For initiating outbound requests and notifications +pub trait Client { + fn root_req_tid(&self) -> TypeId; + fn root_notif_tid(&self) -> TypeId; + fn start_request(&self) -> LocalBoxFuture<'_, Box>; + fn start_notif(&self) -> LocalBoxFuture<'_, Box>; +} + +/// Extension trait with convenience methods that handle outbound request and +/// notif lifecycle and typing +#[allow(async_fn_in_trait)] +pub trait ClientExt: Client { + async fn request>(&self, t: T) -> T::Response { + assert_eq!(TypeId::of::<::Root>(), self.root_req_tid()); + let mut req = self.start_request().await; + t.into_root().encode(req.writer().as_mut()).await; + let mut rep = req.send().await; + let response = T::Response::decode(rep.reader()).await; + rep.finish().await; + response + } + async fn notify + 'static>(&self, t: T) { + assert_eq!(TypeId::of::<::Root>(), self.root_notif_tid()); + let mut notif = self.start_notif().await; + t.into_root().encode(notif.writer().as_mut()).await; + notif.finish().await; + } +} +impl ClientExt for T {} + +/// A form of [Evidence] that doesn't require the value to be kept around +pub struct Witness(PhantomData); +impl Witness { + fn of(t: &T) -> Self { Self(PhantomData) } +} +impl Copy for Witness {} +impl Clone for Witness { + fn clone(&self) -> Self { Self(PhantomData) } +} + +/// A proxy for the type of a value either previously saved into a [Witness] or +/// still available. +pub trait Evidence {} +impl Evidence for &'_ T {} +impl Evidence for Witness {} + +type IoRef = Pin>; +type IoLock = Rc>>>; +type IoGuard = Bound>>, IoLock>; + +/// An incoming request. This holds a lock on the ingress channel. +pub struct ReqReader<'a> { + id: u64, + read: MutexGuard<'a, IoRef>, + write: &'a Mutex>, +} +impl<'a> ReqReader<'a> { + /// Access + pub fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() } + pub async fn read_req(&mut self) -> R { R::decode(self.reader()).await } + pub async fn start_reply(self) -> RepWriter<'a> { self.branch().await.start_reply().await } + pub async fn reply(self, req: impl Evidence, rep: &R::Response) -> Receipt<'a> { + self.branch().await.reply(req, rep).await + } + pub async fn branch(self) -> ReqHandle<'a> { ReqHandle { id: self.id, write: self.write } } +} +pub struct ReqHandle<'a> { + id: u64, + write: &'a Mutex>, +} +impl<'a> ReqHandle<'a> { + pub async fn reply( + self, + _: impl Evidence, + rep: &Req::Response, + ) -> Receipt<'a> { + let mut reply = self.start_reply().await; + rep.encode(reply.writer()).await; + reply.send().await + } + pub async fn start_reply(self) -> RepWriter<'a> { + let mut write = self.write.lock().await; + (!self.id).encode(write.as_mut()).await; + RepWriter { write } + } +} +pub struct RepWriter<'a> { + write: MutexGuard<'a, IoRef>, +} +impl<'a> RepWriter<'a> { + pub fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.write.as_mut() } + pub async fn send(self) -> Receipt<'a> { Receipt(PhantomData) } +} + +pub struct NotifReader<'a> { + read: MutexGuard<'a, IoRef>, +} +impl<'a> NotifReader<'a> { + pub fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() } + pub async fn read(mut self) -> N { + let n = N::decode(self.reader()).await; + self.release().await; + n + } + pub async fn release(self) {} +} + +#[derive(Clone)] +struct IO { + i: IoLock, + o: IoLock, +} + +struct ReplySub { + id: u64, + ack: oneshot::Sender<()>, + cb: oneshot::Sender>, +} + +struct IoClient { + output: IoLock, + id: Rc>, + subscribe: Rc>, + req_tid: TypeId, + notif_tid: TypeId, +} +impl IoClient { + pub async fn new( + output: IoLock, + ) -> (Receiver, Self) { + let (req, rep) = mpsc::channel(0); + (rep, Self { + output, + id: Rc::new(RefCell::new(0)), + req_tid: TypeId::of::(), + notif_tid: TypeId::of::(), + subscribe: Rc::new(req), + }) + } + async fn lock_out(&self) -> IoGuard { + Bound::async_new(self.output.clone(), async |o| o.lock().await).await + } +} +impl Client for IoClient { + fn root_notif_tid(&self) -> TypeId { self.notif_tid } + fn root_req_tid(&self) -> TypeId { self.req_tid } + fn start_notif(&self) -> LocalBoxFuture<'_, Box> { + Box::pin(async { + let mut o = self.lock_out().await; + 0u64.encode(o.as_mut()).await; + Box::new(IoNotifWriter { o }) as Box + }) + } + fn start_request(&self) -> LocalBoxFuture<'_, Box> { + Box::pin(async { + let id = { + let mut id_g = self.id.borrow_mut(); + *id_g += 1; + *id_g + }; + let (cb, reply) = oneshot::channel(); + let (ack, got_ack) = oneshot::channel(); + self.subscribe.as_ref().clone().send(ReplySub { id, ack, cb }).await; + got_ack.await; + let mut w = self.lock_out().await; + id.encode(w.as_mut()).await; + Box::new(IoReqWriter { reply, w }) as Box + }) + } +} + +struct IoReqWriter { + reply: oneshot::Receiver>, + w: IoGuard, +} +impl ReqWriter for IoReqWriter { + fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.w.as_mut() } + fn send(self: Box) -> LocalBoxFuture<'static, Box> { + Box::pin(async { + let Self { reply, .. } = *self; + let i = reply.await.expect("Client dropped before reply received"); + Box::new(IoRepReader { i }) as Box + }) + } +} + +struct IoRepReader { + i: IoGuard, +} +impl RepReader for IoRepReader { + fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.i.as_mut() } + fn finish(self: Box) -> LocalBoxFuture<'static, ()> { Box::pin(async {}) } +} + +#[derive(destructure)] +struct IoNotifWriter { + o: IoGuard, +} +impl MsgWriter for IoNotifWriter { + fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.o.as_mut() } + fn finish(self: Box) -> LocalBoxFuture<'static, ()> { + self.destructure(); + Box::pin(async {}) + } +} + +pub struct CommCtx { + quit: Sender<()>, + client: Rc, +} + +impl CommCtx { + pub async fn quit(self) { self.quit.clone().send(()).await.expect("quit channel dropped"); } + pub fn client(&self) -> Rc { self.client.clone() as Rc } +} + +pub async fn io_comm( + o: Rc>>>, + i: Mutex>>, + notif: impl for<'a> AsyncFn(&mut CommCtx, NotifReader<'a>), + req: impl for<'a> AsyncFn(&mut CommCtx, ReqReader<'a>), +) { + let i = Rc::new(i); + let (onsub, client) = IoClient::new::(o.clone()).await; + let client = Rc::new(client); + let (exit, onexit) = channel(1); + enum Event { + Input(u64), + Sub(ReplySub), + Exit, + } + let exiting = RefCell::new(false); + let input_stream = stream(async |mut h| { + loop { + let id = u64::decode(i.lock().await.as_mut()).await; + h.emit(Event::Input(id)).await; + } + }); + let pending_reqs = RefCell::new(VecDeque::>::new()); + // this stream will never yield a value + let mut fork_stream = pin!( + stream::poll_fn(|cx| { + let mut reqs_g = pending_reqs.borrow_mut(); + reqs_g.retain_mut(|req| match req.as_mut().poll(cx) { + Poll::Pending => true, + Poll::Ready(()) => false, + }); + if *exiting.borrow() { Poll::Ready(None) } else { Poll::Pending } + }) + .fuse() + ); + { + let mut shared = pin!(stream_select!( + Box::pin(input_stream) as Pin>>, + Box::pin(onsub.map(Event::Sub)) as Pin>>, + Box::pin(fork_stream.as_mut()) as Pin>>, + Box::pin(onexit.map(|()| Event::Exit)) as Pin>>, + )); + let mut pending_replies = HashMap::new(); + while let Some(next) = shared.next().await { + match next { + Event::Exit => { + *exiting.borrow_mut() = true; + break; + }, + Event::Sub(ReplySub { id, ack, cb }) => { + pending_replies.insert(id, cb); + ack.send(()); + }, + Event::Input(id) if id == 0 => { + let (i, notif, exit, client) = (i.clone(), ¬if, exit.clone(), client.clone()); + pending_reqs.borrow_mut().push_back(Box::pin(async move { + let g = i.lock().await; + notif(&mut CommCtx { client, quit: exit.clone() }, NotifReader { read: g }).await + })); + }, + // id.msb == 0 is a request, !id where id.msb == 1 is the equivalent response + Event::Input(id) => + if (id & (1 << (u64::BITS - 1))) == 0 { + let (i, o, req, exit, client) = + (i.clone(), o.clone(), &req, exit.clone(), client.clone()); + pending_reqs.borrow_mut().push_back(Box::pin(async move { + let g = i.lock().await; + req(&mut CommCtx { client, quit: exit.clone() }, ReqReader { + id, + read: g, + write: &*o, + }) + .await + }) as LocalBoxFuture<()>); + } else { + let cb = pending_replies.remove(&!id).expect("Reply to unrecognized request"); + let _ = cb.send(Bound::async_new(i.clone(), |i| i.lock()).await); + }, + } + } + } + fork_stream.as_mut().count().await; +} + trait_set! { pub trait SendFn = for<'a> FnMut(&'a [u8], ReqNot) -> LocalBoxFuture<'a, ()> + DynClone + 'static; pub trait ReqFn = - for<'a> FnMut(RequestHandle<'a, T>, ::Req) + for<'a> FnMut(ReqReader<'a, T>, ::Req) -> LocalBoxFuture<'a, Receipt<'a>> + DynClone + 'static; pub trait NotifFn = @@ -59,7 +402,7 @@ pub struct RequestHandle<'a, MS: MsgSet> { parent: ReqNot, raw_reply: RefCell>, } -impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> { +impl<'a, MS: MsgSet + 'static> ReqReader<'a, MS> { pub fn new(parent: ReqNot, raw_reply: impl AsyncFnOnce(Vec) + 'static) -> Self { Self { defer: RefCell::default(), @@ -87,12 +430,12 @@ impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> { Receipt(PhantomData) } } -impl ReqHandlish for RequestHandle<'_, MS> { +impl ReqHandlish for ReqReader<'_, MS> { fn defer_objsafe(&self, val: Pin>>) { self.defer.borrow_mut().push(val) } } -impl Drop for RequestHandle<'_, MS> { +impl Drop for ReqReader<'_, MS> { fn drop(&mut self) { if !panicking() { debug_assert!(self.raw_reply.borrow().is_none(), "Request dropped without response") @@ -158,7 +501,7 @@ impl ReqNot { let rn = self.clone(); let rn2 = self.clone(); req_cb( - RequestHandle::new(rn, async move |vec| { + ReqReader::new(rn, async move |vec| { let mut buf = (!id).to_be_bytes().to_vec(); buf.extend(vec); let mut send = clone_box(&*rn2.0.lock().await.send); diff --git a/orchid-base/src/stash.rs b/orchid-base/src/stash.rs new file mode 100644 index 0000000..83290ab --- /dev/null +++ b/orchid-base/src/stash.rs @@ -0,0 +1,42 @@ +//! A pattern for running async code from sync destructors and other +//! unfortunately sync callbacks +//! +//! We create a task_local + +use std::collections::VecDeque; +use std::pin::Pin; + +use some_executor::task_local; + +#[derive(Default)] +struct StashedFutures { + queue: VecDeque>>>, +} + +task_local! { + static STASHED_FUTURES: StashedFutures; +} + +/// Complete the argument future, and any futures spawned from it via [stash]. +/// This is useful mostly to guarantee that messaging destructors have run. +pub async fn with_stash(fut: F) -> F::Output { + STASHED_FUTURES + .scope(StashedFutures::default(), async { + let val = fut.await; + while let Some(fut) = STASHED_FUTURES.with_mut(|sf| sf.unwrap().queue.pop_front()) { + fut.await; + } + val + }) + .await +} + +/// Schedule a future to be run before the next [with_stash] guard ends. This is +/// most useful for sending messages from destructors. +pub fn stash(fut: F) { + STASHED_FUTURES.with_mut(|sf| { + sf.expect("No stash! Timely completion cannot be guaranteed").queue.push_back(Box::pin(async { + fut.await; + })) + }) +} diff --git a/orchid-extension/src/entrypoint.rs b/orchid-extension/src/entrypoint.rs index f871ef0..1eb3549 100644 --- a/orchid-extension/src/entrypoint.rs +++ b/orchid-extension/src/entrypoint.rs @@ -3,7 +3,7 @@ use std::future::Future; use std::mem; use std::num::NonZero; use std::pin::Pin; -use std::rc::Rc; +use std::rc::{Rc, Weak}; use futures::channel::mpsc::{Receiver, Sender, channel}; use futures::future::{LocalBoxFuture, join_all}; @@ -21,7 +21,7 @@ use orchid_base::interner::{Interner, Tok}; use orchid_base::logging::Logger; use orchid_base::name::Sym; use orchid_base::parse::{Comment, Snippet}; -use orchid_base::reqnot::{ReqNot, RequestHandle, Requester}; +use orchid_base::reqnot::{ReqNot, ReqReader, Requester}; use orchid_base::tree::{TokenVariant, ttv_from_api}; use substack::Substack; use trait_set::trait_set; @@ -37,7 +37,29 @@ use crate::system::atom_by_idx; use crate::system_ctor::{CtedObj, DynSystemCtor}; use crate::tree::{LazyMemberFactory, TreeIntoApiCtxImpl}; -pub type ExtReq<'a> = RequestHandle<'a, api::ExtMsgSet>; +task_local::task_local! { + static CLIENT: Rc; +} + +fn get_client() -> Rc { + CLIENT.with(|c| c.expect("Client not set, not running inside a duplex reqnot channel!").clone()) +} + +/// Sent the client used for global [request] and [notify] functions within the +/// runtime of this future +pub async fn with_client(c: Rc, fut: F) -> F::Output { + CLIENT.scope(c, fut).await +} + +/// Send a request through the global client's [ClientExt::request] +pub async fn request>(t: T) -> T::Response { + get_client().request(t).await +} + +/// Send a notification through the global client's [ClientExt::notify] +pub async fn notify + 'static>(t: T) { get_client().notify(t).await } + +pub type ExtReq<'a> = ReqReader<'a, api::ExtMsgSet>; pub type ExtReqNot = ReqNot; pub struct ExtensionData { @@ -177,8 +199,7 @@ pub fn extension_init( }) .await, api::HostExtReq::Ping(ping @ api::Ping) => hand.handle(&ping, &()).await, - api::HostExtReq::Sweep(sweep @ api::Sweep) => - hand.handle(&sweep, &interner.sweep_replica().await).await, + api::HostExtReq::Sweep(api::Sweep) => todo!(), api::HostExtReq::SysReq(api::SysReq::NewSystem(new_sys)) => { let (sys_id, _) = (decls.iter().enumerate().find(|(_, s)| s.id == new_sys.system)) .expect("NewSystem call received for invalid system"); diff --git a/orchid-extension/src/interner.rs b/orchid-extension/src/interner.rs new file mode 100644 index 0000000..49700e3 --- /dev/null +++ b/orchid-extension/src/interner.rs @@ -0,0 +1,73 @@ +use std::borrow::Borrow; +use std::cell::RefCell; +use std::fmt::Debug; +use std::hash::Hash; +use std::rc::{Rc, Weak}; + +use hashbrown::HashMap; +use orchid_api_traits::Coding; +use orchid_base::interner::{IStr, IStrHandle, IStrv, IStrvHandle}; + +use crate::api; + +trait Branch: 'static { + type Token: Clone + Copy + Debug + Hash + PartialEq + Eq + PartialOrd + Ord + Coding + 'static; + type Data: 'static + Borrow; + type Borrow: ToOwned + ?Sized; + type Handle: AsRef; + type Interned: Clone; + fn mk_interned(t: Self::Token, h: Rc) -> Self::Interned; +} + +struct StrBranch; +impl Branch for StrBranch { + type Data = String; + type Token = api::TStr; + type Borrow = str; + type Handle = Handle; + type Interned = IStr; + fn mk_interned(t: Self::Token, h: Rc) -> Self::Interned { IStr(t, h) } +} +struct StrvBranch; +impl Branch for StrvBranch { + type Data = Vec; + type Token = api::TStrv; + type Borrow = [IStr]; + type Handle = Handle; + type Interned = IStrv; + fn mk_interned(t: Self::Token, h: Rc) -> Self::Interned { IStrv(t, h) } +} + +struct Data { + token: B::Token, + data: Rc, +} + +struct Handle { + data: Rc>, + parent: Weak>>, +} +impl IStrHandle for Handle {} +impl AsRef for Handle { + fn as_ref(&self) -> &str { self.data.data.as_ref().as_ref() } +} +impl IStrvHandle for Handle {} +impl AsRef<[IStr]> for Handle { + fn as_ref(&self) -> &[IStr] { self.data.data.as_ref().as_ref() } +} + +struct Rec { + handle: Weak, + data: Rc>, +} + +struct IntData { + by_tok: HashMap>, + by_data: HashMap, Rec>, +} +impl IntData { + async fn i(&mut self, q: &B::Borrow) -> B::Interned { todo!() } + async fn e(&mut self, q: &B::Token) -> B::Interned { todo!() } +} + +struct Int(Rc>>); diff --git a/orchid-extension/src/lib.rs b/orchid-extension/src/lib.rs index 41db629..fd71ca2 100644 --- a/orchid-extension/src/lib.rs +++ b/orchid-extension/src/lib.rs @@ -12,6 +12,7 @@ pub mod gen_expr; pub mod lexer; // pub mod msg; pub mod context; +pub mod interner; pub mod other_system; pub mod parser; pub mod reflection; diff --git a/preview.svg b/preview.svg index def81d8..31b2381 100644 --- a/preview.svg +++ b/preview.svg @@ -11,7 +11,7 @@ inkscape:export-filename="preview.png" inkscape:export-xdpi="96" inkscape:export-ydpi="96" - inkscape:version="1.2.2 (b0a8486541, 2022-12-01)" + inkscape:version="1.3 (0e150ed6c4, 2023-07-21)" xml:space="preserve" xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape" xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd" @@ -28,21 +28,26 @@ inkscape:deskcolor="#505050" inkscape:document-units="px" showgrid="false" - inkscape:zoom="0.81037995" - inkscape:cx="698.43781" - inkscape:cy="389.32355" - inkscape:window-width="1536" - inkscape:window-height="792" - inkscape:window-x="0" - inkscape:window-y="0" + inkscape:zoom="0.57302516" + inkscape:cx="643.07822" + inkscape:cy="350.76994" + inkscape:window-width="1920" + inkscape:window-height="991" + inkscape:window-x="-9" + inkscape:window-y="-9" inkscape:window-maximized="1" inkscape:current-layer="layer1" showguides="true" - shape-rendering="crispEdges">OrchidOrchidEmbeddable scripting languageEmbeddable scripting language