From 5e474069e03a56a0680ec542104a5ba05364000f Mon Sep 17 00:00:00 2001 From: Lawrence Bethlenfalvy Date: Sat, 22 Feb 2025 19:01:05 +0100 Subject: [PATCH] reference cycles resolved --- Cargo.lock | 6 +- orchid-api-traits/Cargo.toml | 2 +- orchid-api-traits/src/coding.rs | 10 +- orchid-api/Cargo.toml | 2 +- orchid-base/Cargo.toml | 2 +- orchid-base/src/builtin.rs | 5 +- orchid-base/src/macros.rs | 9 +- orchid-base/src/reqnot.rs | 2 +- orchid-base/src/tree.rs | 5 +- orchid-extension/Cargo.toml | 4 +- orchid-extension/src/atom.rs | 4 +- orchid-extension/src/atom_owned.rs | 38 +++-- orchid-extension/src/atom_thin.rs | 13 +- orchid-extension/src/entrypoint.rs | 210 +++++++++++++-------------- orchid-extension/src/func_atom.rs | 8 +- orchid-extension/src/gen_expr.rs | 4 +- orchid-extension/src/lib.rs | 1 + orchid-extension/src/macros.rs | 6 +- orchid-extension/src/tokio.rs | 54 +++++++ orchid-extension/src/tree.rs | 5 +- orchid-host/Cargo.toml | 2 +- orchid-host/src/ctx.rs | 4 +- orchid-host/src/execute.rs | 44 ++---- orchid-host/src/extension.rs | 44 +++--- orchid-host/src/macros.rs | 5 +- orchid-host/src/subprocess.rs | 23 +-- orchid-host/src/system.rs | 20 +-- orchid-host/src/tree.rs | 48 +++++-- orchid-std/Cargo.toml | 6 +- orchid-std/src/main.rs | 12 +- orchid.code-workspace | 2 + orcx/src/main.rs | 221 +++++++++++++++-------------- 32 files changed, 433 insertions(+), 388 deletions(-) create mode 100644 orchid-extension/src/tokio.rs diff --git a/Cargo.lock b/Cargo.lock index a6ae50a..12261ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1137,6 +1137,7 @@ dependencies = [ "ahash 0.8.11", "async-once-cell", "async-std", + "async-stream", "derive_destructure", "dyn-clone", "futures", @@ -1155,6 +1156,7 @@ dependencies = [ "paste", "some_executor 0.4.0", "substack", + "tokio", "trait-set", ] @@ -1223,9 +1225,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "4.6.0" +version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" +checksum = "e2c1f9f56e534ac6a9b8a4600bdf0f530fb393b5f393e7b4d03489c3cf0c3f01" dependencies = [ "num-traits", ] diff --git a/orchid-api-traits/Cargo.toml b/orchid-api-traits/Cargo.toml index 6712518..e5097e6 100644 --- a/orchid-api-traits/Cargo.toml +++ b/orchid-api-traits/Cargo.toml @@ -11,4 +11,4 @@ async-stream = "0.3.6" futures = "0.3.31" itertools = "0.14.0" never = "0.1.0" -ordered-float = "4.6.0" +ordered-float = "5.0.0" diff --git a/orchid-api-traits/src/coding.rs b/orchid-api-traits/src/coding.rs index 1fea9dd..f6fff0d 100644 --- a/orchid-api-traits/src/coding.rs +++ b/orchid-api-traits/src/coding.rs @@ -9,8 +9,7 @@ use std::sync::Arc; use async_std::io::{Read, ReadExt, Write, WriteExt}; use async_stream::stream; -use futures::future::LocalBoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use never::Never; use ordered_float::NotNan; @@ -28,11 +27,8 @@ pub trait Encode { pub trait Coding: Encode + Decode + Clone { fn get_decoder + 'static>( map: impl Fn(Self) -> F + Clone + 'static, - ) -> impl for<'a> Fn(Pin<&'a mut dyn Read>) -> LocalBoxFuture<'a, T> { - move |r| { - let map = map.clone(); - async move { map(Self::decode(r).await).await }.boxed_local() - } + ) -> impl AsyncFn(Pin<&mut dyn Read>) -> T { + async move |r| map(Self::decode(r).await).await } } impl Coding for T {} diff --git a/orchid-api/Cargo.toml b/orchid-api/Cargo.toml index ea2b2b0..ba53184 100644 --- a/orchid-api/Cargo.toml +++ b/orchid-api/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -ordered-float = "4.6.0" +ordered-float = "5.0.0" orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" } orchid-api-derive = { version = "0.1.0", path = "../orchid-api-derive" } async-std = "1.13.0" diff --git a/orchid-base/Cargo.toml b/orchid-base/Cargo.toml index 4944ceb..730fb16 100644 --- a/orchid-base/Cargo.toml +++ b/orchid-base/Cargo.toml @@ -20,7 +20,7 @@ num-traits = "0.2.19" orchid-api = { version = "0.1.0", path = "../orchid-api" } orchid-api-derive = { version = "0.1.0", path = "../orchid-api-derive" } orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" } -ordered-float = "4.6.0" +ordered-float = "5.0.0" regex = "1.11.1" rust-embed = "8.5.0" some_executor = "0.4.0" diff --git a/orchid-base/src/builtin.rs b/orchid-base/src/builtin.rs index d441a70..ec59f54 100644 --- a/orchid-base/src/builtin.rs +++ b/orchid-base/src/builtin.rs @@ -6,7 +6,6 @@ use futures::future::LocalBoxFuture; use crate::api; pub type Spawner = Rc)>; -pub type RecvCB<'a> = Box FnOnce(&'b [u8]) -> LocalBoxFuture<'b, ()> + 'a>; /// The 3 primary contact points with an extension are /// - send a message @@ -16,7 +15,7 @@ pub type RecvCB<'a> = Box FnOnce(&'b [u8]) -> LocalBoxFuture<'b, ()> /// There are no ordering guarantees about these pub trait ExtPort { fn send<'a>(&'a self, msg: &'a [u8]) -> LocalBoxFuture<'a, ()>; - fn recv<'a>(&'a self, cb: RecvCB<'a>) -> LocalBoxFuture<'a, ()>; + fn recv(&self) -> LocalBoxFuture<'_, Option>>; } pub struct ExtInit { @@ -25,7 +24,7 @@ pub struct ExtInit { } impl ExtInit { pub async fn send(&self, msg: &[u8]) { self.port.send(msg).await } - pub async fn recv<'a, 's: 'a>(&'s self, cb: RecvCB<'a>) { self.port.recv(Box::new(cb)).await } + pub async fn recv(&self) -> Option> { self.port.recv().await } } impl Deref for ExtInit { type Target = api::ExtensionHeader; diff --git a/orchid-base/src/macros.rs b/orchid-base/src/macros.rs index c1fd1fc..73ea44f 100644 --- a/orchid-base/src/macros.rs +++ b/orchid-base/src/macros.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::sync::Arc; use async_stream::stream; -use futures::future::{LocalBoxFuture, join_all}; +use futures::future::join_all; use futures::{FutureExt, StreamExt}; use never::Never; use trait_set::trait_set; @@ -22,9 +22,8 @@ impl MacroSlot<'_> { } trait_set! { - pub trait MacroAtomToApi = for<'a> FnMut(&'a A) -> LocalBoxFuture<'a, api::MacroToken>; - pub trait MacroAtomFromApi<'a, A> = - for<'b> FnMut(&'b api::Atom) -> LocalBoxFuture<'b, MTok<'a, A>>; + pub trait MacroAtomToApi = AsyncFnMut(&A) -> api::MacroToken; + pub trait MacroAtomFromApi<'a, A> = AsyncFnMut(&api::Atom) -> MTok<'a, A>; } #[derive(Clone, Debug)] @@ -87,7 +86,7 @@ impl<'a, A> MTok<'a, A> { }) } pub(crate) async fn to_api(&self, do_atom: &mut impl MacroAtomToApi) -> api::MacroToken { - fn sink(n: &Never) -> LocalBoxFuture<'_, T> { match *n {} } + async fn sink(n: &Never) -> T { match *n {} } match_mapping!(&self, MTok => api::MacroToken { Lambda(x => mtreev_to_api(x, do_atom).await, b => mtreev_to_api(b, do_atom).await), Name(t.tok().to_api()), diff --git a/orchid-base/src/reqnot.rs b/orchid-base/src/reqnot.rs index 2ec7e2b..60fd53d 100644 --- a/orchid-base/src/reqnot.rs +++ b/orchid-base/src/reqnot.rs @@ -17,8 +17,8 @@ use hashbrown::HashMap; use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request}; use trait_set::trait_set; +use crate::clone; use crate::logging::Logger; -use crate::{api, clone}; pub struct Receipt<'a>(PhantomData<&'a mut ()>); diff --git a/orchid-base/src/tree.rs b/orchid-base/src/tree.rs index d1c4464..a145935 100644 --- a/orchid-base/src/tree.rs +++ b/orchid-base/src/tree.rs @@ -8,7 +8,7 @@ use std::sync::Arc; pub use api::PhKind; use async_stream::stream; -use futures::future::{LocalBoxFuture, join_all}; +use futures::future::join_all; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use never::Never; @@ -26,8 +26,7 @@ use crate::{api, match_mapping, tl_cache}; trait_set! { pub trait RecurCB<'a, A: AtomRepr, X: ExtraTok> = Fn(TokTree<'a, A, X>) -> TokTree<'a, A, X>; pub trait ExtraTok = Format + Clone + fmt::Debug; - pub trait RefDoExtra = - for<'b> FnMut(&'b X, Range) -> LocalBoxFuture<'b, api::TokenTree>; + pub trait RefDoExtra = AsyncFnMut(&X, Range) -> api::TokenTree; } pub fn recur<'a, A: AtomRepr, X: ExtraTok>( diff --git a/orchid-extension/Cargo.toml b/orchid-extension/Cargo.toml index 0dd39e5..1ae60aa 100644 --- a/orchid-extension/Cargo.toml +++ b/orchid-extension/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" ahash = "0.8.11" async-once-cell = "0.5.4" async-std = "1.13.0" +async-stream = "0.3.6" derive_destructure = "1.0.0" dyn-clone = "1.0.17" futures = "0.3.31" @@ -23,8 +24,9 @@ orchid-api = { version = "0.1.0", path = "../orchid-api" } orchid-api-derive = { version = "0.1.0", path = "../orchid-api-derive" } orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" } orchid-base = { version = "0.1.0", path = "../orchid-base" } -ordered-float = "4.6.0" +ordered-float = "5.0.0" paste = "1.0.15" some_executor = "0.4.0" substack = "1.1.1" +tokio = { version = "1.43.0", optional = true } trait-set = "0.3.0" diff --git a/orchid-extension/src/atom.rs b/orchid-extension/src/atom.rs index 439cc48..3062dc0 100644 --- a/orchid-extension/src/atom.rs +++ b/orchid-extension/src/atom.rs @@ -319,8 +319,8 @@ trait_set! { } pub struct AtomFactory(Box); impl AtomFactory { - pub fn new + 'static>( - f: impl FnOnce(SysCtx) -> F + Clone + 'static, + pub fn new( + f: impl AsyncFnOnce(SysCtx) -> api::Atom + Clone + 'static, ) -> Self { Self(Box::new(|ctx| f(ctx).boxed_local())) } diff --git a/orchid-extension/src/atom_owned.rs b/orchid-extension/src/atom_owned.rs index 8336b67..ae9dbd3 100644 --- a/orchid-extension/src/atom_owned.rs +++ b/orchid-extension/src/atom_owned.rs @@ -35,7 +35,7 @@ pub struct OwnedVariant; impl AtomicVariant for OwnedVariant {} impl> AtomicFeaturesImpl for A { fn _factory(self) -> AtomFactory { - AtomFactory::new(move |ctx| async move { + AtomFactory::new(async move |ctx| { let serial = ctx.get_or_default::().next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let atom_id = api::AtomId(NonZero::new(serial + 1).unwrap()); @@ -82,25 +82,26 @@ impl AtomDynfo for OwnedAtomDynfo { fn tid(&self) -> TypeId { TypeId::of::() } fn name(&self) -> &'static str { type_name::() } fn decode<'a>(&'a self, AtomCtx(data, ..): AtomCtx<'a>) -> LocalBoxFuture<'a, Box> { - async { + Box::pin(async { Box::new(::Data::decode(Pin::new(&mut &data[..])).await) as Box - } - .boxed_local() + }) } fn call(&self, AtomCtx(_, id, ctx): AtomCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, GExpr> { - async move { take_atom(id.unwrap(), &ctx).await.dyn_call(ctx.clone(), arg).await }.boxed_local() + Box::pin(async move { take_atom(id.unwrap(), &ctx).await.dyn_call(ctx.clone(), arg).await }) } fn call_ref<'a>( &'a self, AtomCtx(_, id, ctx): AtomCtx<'a>, arg: api::ExprTicket, ) -> LocalBoxFuture<'a, GExpr> { - async move { AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_call_ref(ctx.clone(), arg).await } - .boxed_local() + Box::pin(async move { + AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_call_ref(ctx.clone(), arg).await + }) } fn print(&self, AtomCtx(_, id, ctx): AtomCtx<'_>) -> LocalBoxFuture<'_, FmtUnit> { - async move { AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_print(ctx.clone()).await } - .boxed_local() + Box::pin( + async move { AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_print(ctx.clone()).await }, + ) } fn handle_req<'a, 'b: 'a, 'c: 'a>( &'a self, @@ -109,34 +110,32 @@ impl AtomDynfo for OwnedAtomDynfo { req: Pin<&'b mut dyn Read>, rep: Pin<&'c mut dyn Write>, ) -> LocalBoxFuture<'a, bool> { - async move { + Box::pin(async move { let a = AtomReadGuard::new(id.unwrap(), &ctx).await; let ms = self.ms.get_or_init(self.msbuild.pack(ctx.clone())).await; ms.dispatch(a.as_any_ref().downcast_ref().unwrap(), ctx.clone(), key, req, rep).await - } - .boxed_local() + }) } fn command<'a>( &'a self, AtomCtx(_, id, ctx): AtomCtx<'a>, ) -> LocalBoxFuture<'a, OrcRes>> { - async move { take_atom(id.unwrap(), &ctx).await.dyn_command(ctx.clone()).await }.boxed_local() + Box::pin(async move { take_atom(id.unwrap(), &ctx).await.dyn_command(ctx.clone()).await }) } fn drop(&self, AtomCtx(_, id, ctx): AtomCtx) -> LocalBoxFuture<'_, ()> { - async move { take_atom(id.unwrap(), &ctx).await.dyn_free(ctx.clone()).await }.boxed_local() + Box::pin(async move { take_atom(id.unwrap(), &ctx).await.dyn_free(ctx.clone()).await }) } fn serialize<'a, 'b: 'a>( &'a self, AtomCtx(_, id, ctx): AtomCtx<'a>, mut write: Pin<&'b mut dyn Write>, ) -> LocalBoxFuture<'a, Option>> { - async move { + Box::pin(async move { let id = id.unwrap(); id.encode(write.as_mut()).await; let refs = AtomReadGuard::new(id, &ctx).await.dyn_serialize(ctx.clone(), write).await; refs.map(|v| v.into_iter().map(|t| t.handle().tk).collect_vec()) - } - .boxed_local() + }) } fn deserialize<'a>( &'a self, @@ -144,13 +143,12 @@ impl AtomDynfo for OwnedAtomDynfo { data: &'a [u8], refs: &'a [api::ExprTicket], ) -> LocalBoxFuture<'a, api::Atom> { - async move { + Box::pin(async move { let refs = refs.iter().map(|tk| Expr::from_handle(Rc::new(ExprHandle::from_args(ctx.clone(), *tk)))); let obj = T::deserialize(DeserCtxImpl(data, &ctx), T::Refs::from_iter(refs)).await; obj._factory().build(ctx).await - } - .boxed_local() + }) } } diff --git a/orchid-extension/src/atom_thin.rs b/orchid-extension/src/atom_thin.rs index 6742ae6..f68bb4e 100644 --- a/orchid-extension/src/atom_thin.rs +++ b/orchid-extension/src/atom_thin.rs @@ -25,7 +25,7 @@ pub struct ThinVariant; impl AtomicVariant for ThinVariant {} impl> AtomicFeaturesImpl for A { fn _factory(self) -> AtomFactory { - AtomFactory::new(move |ctx| async move { + AtomFactory::new(async move |ctx| { let (id, _) = get_info::(ctx.get::().inst().card()); let mut buf = enc_vec(&id).await; self.encode(Pin::new(&mut buf)).await; @@ -42,12 +42,12 @@ pub struct ThinAtomDynfo { } impl AtomDynfo for ThinAtomDynfo { fn print<'a>(&self, AtomCtx(buf, _, ctx): AtomCtx<'a>) -> LocalBoxFuture<'a, FmtUnit> { - async move { T::decode(Pin::new(&mut &buf[..])).await.print(ctx).await }.boxed_local() + Box::pin(async move { T::decode(Pin::new(&mut &buf[..])).await.print(ctx).await }) } fn tid(&self) -> TypeId { TypeId::of::() } fn name(&self) -> &'static str { type_name::() } fn decode<'a>(&'a self, AtomCtx(buf, ..): AtomCtx<'a>) -> LocalBoxFuture<'a, Box> { - async { Box::new(T::decode(Pin::new(&mut &buf[..])).await) as Box }.boxed_local() + Box::pin(async { Box::new(T::decode(Pin::new(&mut &buf[..])).await) as Box }) } fn call<'a>( &'a self, @@ -102,14 +102,13 @@ impl AtomDynfo for ThinAtomDynfo { refs: &'a [api::ExprTicket], ) -> LocalBoxFuture<'a, api::Atom> { assert!(refs.is_empty(), "Refs found when deserializing thin atom"); - async { T::decode(Pin::new(&mut &data[..])).await._factory().build(ctx).await }.boxed_local() + Box::pin(async { T::decode(Pin::new(&mut &data[..])).await._factory().build(ctx).await }) } fn drop<'a>(&'a self, AtomCtx(buf, _, ctx): AtomCtx<'a>) -> LocalBoxFuture<'a, ()> { - async move { + Box::pin(async move { let string_self = T::decode(Pin::new(&mut &buf[..])).await.print(ctx.clone()).await; writeln!(ctx.logger(), "Received drop signal for non-drop atom {string_self:?}"); - } - .boxed_local() + }) } } diff --git a/orchid-extension/src/entrypoint.rs b/orchid-extension/src/entrypoint.rs index f12aed4..116d55e 100644 --- a/orchid-extension/src/entrypoint.rs +++ b/orchid-extension/src/entrypoint.rs @@ -1,23 +1,20 @@ use std::cell::RefCell; use std::future::Future; -use std::io::Write; use std::mem; use std::num::NonZero; use std::pin::Pin; use std::rc::Rc; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use async_std::channel::{Receiver, Sender}; +use async_std::channel::{self, Receiver, RecvError, Sender}; use async_std::stream; use async_std::sync::Mutex; use futures::future::{LocalBoxFuture, join_all}; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, stream_select}; use hashbrown::HashMap; use itertools::Itertools; use orchid_api::{ApplyMacro, ExtMsgSet}; -use orchid_api_traits::{Decode, Encode, enc_vec}; -use orchid_base::builtin::{ExtPort, Spawner}; +use orchid_api_traits::{Decode, enc_vec}; +use orchid_base::builtin::{ExtInit, ExtPort, Spawner}; use orchid_base::char_filter::{char_filter_match, char_filter_union, mk_char_filter}; use orchid_base::clone; use orchid_base::interner::{Interner, Tok}; @@ -36,7 +33,6 @@ use crate::atom_owned::take_atom; use crate::fs::VirtFS; use crate::lexer::{LexContext, err_cascade, err_not_applicable}; use crate::macros::{Rule, RuleCtx}; -use crate::msg::{recv_parent_msg, send_parent_msg}; use crate::system::{SysCtx, atom_by_idx}; use crate::system_ctor::{CtedObj, DynSystemCtor}; use crate::tree::{GenTok, GenTokTree, LazyMemberFactory, TreeIntoApiCtxImpl, do_extra}; @@ -69,18 +65,18 @@ pub struct SystemRecord { } trait_set! { - pub trait WARCallback<'a, T> = FnOnce( + pub trait WithAtomRecordCallback<'a, T> = AsyncFnOnce( Box, SysCtx, AtomTypeId, &'a [u8] - ) -> LocalBoxFuture<'a, T> + ) -> T } pub async fn with_atom_record<'a, F: Future, T>( get_sys_ctx: &impl Fn(api::SysId) -> F, atom: &'a api::Atom, - cb: impl WARCallback<'a, T>, + cb: impl WithAtomRecordCallback<'a, T>, ) -> T { let mut data = &atom.data[..]; let ctx = get_sys_ctx(atom.owner).await; @@ -104,49 +100,41 @@ pub async fn with_atom_record<'a, F: Future, T>( // } pub struct ExtensionOwner { - rn: ReqNot, + _interner_cell: Rc>>, + _systems_lock: Rc>>, out_recv: Receiver>, out_send: Sender>, - ext_header: api::ExtensionHeader, } impl ExtPort for ExtensionOwner { fn send<'a>(&'a self, msg: &'a [u8]) -> LocalBoxFuture<'a, ()> { - self.rn.receive(msg).boxed_local() + Box::pin(async { self.out_send.send(msg.to_vec()).boxed_local().await.unwrap() }) } - fn recv<'a>( - &'a self, - cb: Box LocalBoxFuture<'_, ()> + 'a>, - ) -> LocalBoxFuture<'a, ()> { - async { - let msg = self.out_recv.recv().await.unwrap(); - cb(&msg[..]).await - } - .boxed_local() + fn recv(&self) -> LocalBoxFuture<'_, Option>> { + Box::pin(async { + match self.out_recv.recv().await { + Ok(v) => Some(v), + Err(RecvError) => None, + } + }) } } -impl ExtensionOwner { - pub fn ext_header(&self) -> &api::ExtensionHeader { &self.ext_header } - // pub async fn new(data: ExtensionData, spawner: Spawner, header: - // api::HostHeader) -> Self { let decls = - // } -} -pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { - let api::HostHeader { log_strategy, msg_logs } = - api::HostHeader::decode(Pin::new(&mut async_std::io::stdin())).await; - let mut buf = Vec::new(); +pub fn extension_init( + data: ExtensionData, + host_header: api::HostHeader, + spawner: Spawner, +) -> ExtInit { + let api::HostHeader { log_strategy, msg_logs } = host_header; let decls = (data.systems.iter().enumerate()) .map(|(id, sys)| (u16::try_from(id).expect("more than u16max system ctors"), sys)) .map(|(id, sys)| sys.decl(api::SysDeclId(NonZero::new(id + 1).unwrap()))) .collect_vec(); let systems_lock = Rc::new(Mutex::new(HashMap::::new())); - api::ExtensionHeader { name: data.name.to_string(), systems: decls.clone() } - .encode(Pin::new(&mut buf)) - .await; - std::io::stdout().write_all(&buf).unwrap(); - std::io::stdout().flush().unwrap(); - let exiting = Arc::new(AtomicBool::new(false)); + let ext_header = api::ExtensionHeader { name: data.name.to_string(), systems: decls.clone() }; + let (out_send, in_recv) = channel::bounded::>(1); + let (in_send, out_recv) = channel::bounded::>(1); + let (exit_send, exit_recv) = channel::bounded(1); let logger = Logger::new(log_strategy); let msg_logger = Logger::new(msg_logs); let interner_cell = Rc::new(RefCell::new(None::)); @@ -159,9 +147,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { x })); let init_ctx = { - clone!(systems_weak, interner_weak, spawner, logger); + clone!(interner_weak, spawner, logger); move |id: api::SysId, cted: CtedObj, reqnot: ReqNot| { - clone!(systems_weak, interner_weak, spawner, logger; async move { + clone!(interner_weak, spawner, logger; async move { let interner_rc = interner_weak.upgrade().expect("System construction order while shutting down"); let i = interner_rc.borrow().clone().expect("mk_ctx called very early, no interner!"); @@ -171,11 +159,11 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { }; let rn = ReqNot::::new( msg_logger.clone(), - move |a, _| async move { send_parent_msg(a).await.unwrap() }.boxed_local(), - clone!(systems_weak, exiting, get_ctx; move |n, reqnot| { - clone!(systems_weak, exiting, get_ctx; async move { + move |a, _| clone!(in_send; Box::pin(async move { in_send.send(a.to_vec()).await.unwrap() })), + clone!(systems_weak, exit_send, get_ctx; move |n, _| { + clone!(systems_weak, exit_send, get_ctx; async move { match n { - api::HostExtNotif::Exit => exiting.store(true, Ordering::Relaxed), + api::HostExtNotif::Exit => exit_send.send(()).await.unwrap(), api::HostExtNotif::SystemDrop(api::SystemDrop(sys_id)) => if let Some(rc) = systems_weak.upgrade() { mem::drop(rc.lock().await.remove(&sys_id)) @@ -188,9 +176,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { }.boxed_local()) }), { - clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, spawner, decls, msg_logger); + clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, decls, msg_logger); move |hand, req| { - clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, spawner, decls, msg_logger); + clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, decls, msg_logger); async move { let interner_cell = interner_weak.upgrade().expect("Interner dropped before request"); let i = interner_cell.borrow().clone().expect("Request arrived before interner set"); @@ -302,11 +290,8 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { return hand.handle(&lex, &eopt).await; }, Ok((s, expr)) => { - let expr = expr - .to_api(&mut |f, r| { - clone!(sys_ctx; async move { do_extra(f, r, sys_ctx).await }).boxed_local() - }) - .await; + let expr = + expr.to_api(&mut async |f, r| do_extra(f, r, sys_ctx.clone()).await).await; let pos = (text.len() - s.len()) as u32; return hand.handle(&lex, &Some(Ok(api::LexedExpr { pos, expr }))).await; }, @@ -329,12 +314,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { let o_line = match parser.parse(*exported, comments, tail) { Err(e) => Err(e.to_api()), Ok(t) => Ok( - ttv_to_api(t, &mut |f, range| { - clone!(ctx); - async move { - api::TokenTree { range, token: api::Token::Atom(f.clone().build(ctx).await) } - } - .boxed_local() + ttv_to_api(t, &mut async move |f, range| api::TokenTree { + range, + token: api::Token::Atom(f.clone().build(ctx.clone()).await), }) .await, ), @@ -344,52 +326,49 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { api::HostExtReq::AtomReq(atom_req) => { let atom = atom_req.get_atom(); let atom_req = atom_req.clone(); - with_atom_record(&get_ctx, atom, move |nfo, ctx, id, buf| { - async move { - let actx = AtomCtx(buf, atom.drop, ctx.clone()); - match &atom_req { - api::AtomReq::SerializeAtom(ser) => { - let mut buf = enc_vec(&id).await; - let refs_opt = nfo.serialize(actx, Pin::<&mut Vec<_>>::new(&mut buf)).await; - hand.handle(ser, &refs_opt.map(|refs| (buf, refs))).await - }, - api::AtomReq::AtomPrint(print @ api::AtomPrint(_)) => - hand.handle(print, &nfo.print(actx).await.to_api()).await, - api::AtomReq::Fwded(fwded) => { - let api::Fwded(_, key, payload) = &fwded; - let mut reply = Vec::new(); - let key = Sym::from_api(*key, &i).await; - let some = nfo - .handle_req( - actx, - key, - Pin::<&mut &[u8]>::new(&mut &payload[..]), - Pin::<&mut Vec<_>>::new(&mut reply), - ) - .await; - hand.handle(fwded, &some.then_some(reply)).await - }, - api::AtomReq::CallRef(call @ api::CallRef(_, arg)) => { - let ret = nfo.call_ref(actx, *arg).await; - hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await - }, - api::AtomReq::FinalCall(call @ api::FinalCall(_, arg)) => { - let ret = nfo.call(actx, *arg).await; - hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await - }, - api::AtomReq::Command(cmd @ api::Command(_)) => match nfo.command(actx).await { - Err(e) => hand.handle(cmd, &Err(e.to_api())).await, - Ok(opt) => match opt { - None => hand.handle(cmd, &Ok(api::NextStep::Halt)).await, - Some(cont) => { - let cont = cont.api_return(ctx.clone(), &hand).await; - hand.handle(cmd, &Ok(api::NextStep::Continue(cont))).await - }, + with_atom_record(&get_ctx, atom, async move |nfo, ctx, id, buf| { + let actx = AtomCtx(buf, atom.drop, ctx.clone()); + match &atom_req { + api::AtomReq::SerializeAtom(ser) => { + let mut buf = enc_vec(&id).await; + let refs_opt = nfo.serialize(actx, Pin::<&mut Vec<_>>::new(&mut buf)).await; + hand.handle(ser, &refs_opt.map(|refs| (buf, refs))).await + }, + api::AtomReq::AtomPrint(print @ api::AtomPrint(_)) => + hand.handle(print, &nfo.print(actx).await.to_api()).await, + api::AtomReq::Fwded(fwded) => { + let api::Fwded(_, key, payload) = &fwded; + let mut reply = Vec::new(); + let key = Sym::from_api(*key, &i).await; + let some = nfo + .handle_req( + actx, + key, + Pin::<&mut &[u8]>::new(&mut &payload[..]), + Pin::<&mut Vec<_>>::new(&mut reply), + ) + .await; + hand.handle(fwded, &some.then_some(reply)).await + }, + api::AtomReq::CallRef(call @ api::CallRef(_, arg)) => { + let ret = nfo.call_ref(actx, *arg).await; + hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await + }, + api::AtomReq::FinalCall(call @ api::FinalCall(_, arg)) => { + let ret = nfo.call(actx, *arg).await; + hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await + }, + api::AtomReq::Command(cmd @ api::Command(_)) => match nfo.command(actx).await { + Err(e) => hand.handle(cmd, &Err(e.to_api())).await, + Ok(opt) => match opt { + None => hand.handle(cmd, &Ok(api::NextStep::Halt)).await, + Some(cont) => { + let cont = cont.api_return(ctx.clone(), &hand).await; + hand.handle(cmd, &Ok(api::NextStep::Continue(cont))).await }, }, - } + }, } - .boxed_local() }) .await }, @@ -411,7 +390,7 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { for (k, v) in params { ctx.args.insert( Tok::from_api(k, &i).await, - mtreev_from_api(&v, &i, &mut |_| panic!("No atom in macro prompt!")).await, + mtreev_from_api(&v, &i, &mut async |_| panic!("No atom in macro prompt!")).await, ); } let err_cascade = err_cascade(&i).await; @@ -424,10 +403,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { hand.handle_as(tok, &new_errors.map(|e| Err(e.to_api()))).await }, Ok(t) => { - let result = mtreev_to_api(&t, &mut |a| { - clone!(sys_ctx; async move { - api::MacroToken::Atom(a.clone().build(sys_ctx.clone()).await) - }.boxed_local()) + clone!(sys_ctx); + let result = mtreev_to_api(&t, &mut async |a| { + api::MacroToken::Atom(a.clone().build(sys_ctx.clone()).await) }) .await; hand.handle_as(tok, &Some(Ok(result))).await @@ -441,8 +419,22 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) { }, ); *interner_cell.borrow_mut() = Some(Interner::new_replica(rn.clone().map())); - while !exiting.load(Ordering::Relaxed) { - let rcvd = recv_parent_msg().await.unwrap(); - spawner(Box::pin(clone!(rn; async move { rn.receive(&rcvd).await }))) + spawner(Box::pin(clone!(spawner; async move { + let mut streams = stream_select! { in_recv.map(Some), exit_recv.map(|_| None) }; + while let Some(item) = streams.next().await { + match item { + Some(rcvd) => spawner(Box::pin(clone!(rn; async move { rn.receive(&rcvd[..]).await }))), + None => break, + } + } + }))); + ExtInit { + header: ext_header, + port: Box::new(ExtensionOwner { + out_recv, + out_send, + _interner_cell: interner_cell, + _systems_lock: systems_lock, + }), } } diff --git a/orchid-extension/src/func_atom.rs b/orchid-extension/src/func_atom.rs index 69e397d..6a8d885 100644 --- a/orchid-extension/src/func_atom.rs +++ b/orchid-extension/src/func_atom.rs @@ -133,8 +133,6 @@ impl OwnedAtom for Lambda { } mod expr_func_derives { - use std::future::Future; - use orchid_base::error::OrcRes; use super::ExprFunc; @@ -147,9 +145,9 @@ mod expr_func_derives { paste::paste!{ impl< $($t: TryFromExpr, )* - Fut: Future, - Func: Fn($($t,)*) -> Fut + Clone + Send + Sync + 'static - > ExprFunc<($($t,)*), Fut::Output> for Func { + Out: ToExpr, + Func: AsyncFn($($t,)*) -> Out + Clone + Send + Sync + 'static + > ExprFunc<($($t,)*), Out> for Func { const ARITY: u8 = $arity; async fn apply(&self, v: Vec) -> OrcRes { assert_eq!(v.len(), Self::ARITY.into(), "Arity mismatch"); diff --git a/orchid-extension/src/gen_expr.rs b/orchid-extension/src/gen_expr.rs index 3bd70e9..c4c14b0 100644 --- a/orchid-extension/src/gen_expr.rs +++ b/orchid-extension/src/gen_expr.rs @@ -100,9 +100,9 @@ pub fn bot(ev: impl IntoIterator) -> GExpr { inherit(GExprKind::Bottom(OrcErrv::new(ev).unwrap())) } -pub fn with>( +pub fn with( expr: GExpr, - cont: impl Fn(I) -> Fut + Clone + Send + Sync + 'static, + cont: impl AsyncFn(I) -> O + Clone + Send + Sync + 'static, ) -> GExpr { call([lambda(0, [seq([arg(0), call([Lambda::new(cont).to_expr(), arg(0)])])]), expr]) } diff --git a/orchid-extension/src/lib.rs b/orchid-extension/src/lib.rs index 416bca5..650f1ca 100644 --- a/orchid-extension/src/lib.rs +++ b/orchid-extension/src/lib.rs @@ -16,4 +16,5 @@ pub mod other_system; pub mod parser; pub mod system; pub mod system_ctor; +pub mod tokio; pub mod tree; diff --git a/orchid-extension/src/macros.rs b/orchid-extension/src/macros.rs index dbd102f..aa23d08 100644 --- a/orchid-extension/src/macros.rs +++ b/orchid-extension/src/macros.rs @@ -43,13 +43,13 @@ impl<'a> RuleCtx<'a> { pub async fn recurse(&mut self, tree: &[MTree<'a, Never>]) -> OrcRes>> { let req = api::RunMacros { run_id: self.run_id, - query: mtreev_to_api(tree, &mut |b| match *b {}).await, + query: mtreev_to_api(tree, &mut async |b| match *b {}).await, }; let Some(treev) = self.sys.get::>().request(req).await else { return Err(err_cascade(self.sys.i()).await.into()); }; static ATOM_MSG: &str = "Returned atom from Rule recursion"; - Ok(mtreev_from_api(&treev, self.sys.i(), &mut |_| panic!("{ATOM_MSG}")).await) + Ok(mtreev_from_api(&treev, self.sys.i(), &mut async |_| panic!("{ATOM_MSG}")).await) } pub fn getv(&mut self, key: &Tok) -> Vec> { self.args.remove(key).expect("Key not found") @@ -83,7 +83,7 @@ impl Rule { })) .await, location: api::Location::Inherit, - pattern: mtreev_to_api(&self.pattern, &mut |b| match *b {}).await, + pattern: mtreev_to_api(&self.pattern, &mut async |b| match *b {}).await, id: ctx.with_rule(Rc::new(self)), } } diff --git a/orchid-extension/src/tokio.rs b/orchid-extension/src/tokio.rs new file mode 100644 index 0000000..4eb7428 --- /dev/null +++ b/orchid-extension/src/tokio.rs @@ -0,0 +1,54 @@ +use crate::entrypoint::ExtensionData; + +#[cfg(feature = "tokio")] +pub async fn tokio_main(data: ExtensionData) { + use std::future::Future; + use std::io::Write; + use std::mem; + use std::pin::{Pin, pin}; + use std::rc::Rc; + + use async_std::io; + use async_stream::stream; + use futures::future::LocalBoxFuture; + use futures::stream::FuturesUnordered; + use futures::{StreamExt, stream, stream_select}; + use orchid_api_traits::{Decode, Encode}; + use orchid_base::clone; + use tokio::task::{LocalSet, spawn_local}; + + use crate::api; + use crate::entrypoint::extension_init; + use crate::msg::{recv_parent_msg, send_parent_msg}; + + let local_set = LocalSet::new(); + local_set.spawn_local(async { + let host_header = api::HostHeader::decode(Pin::new(&mut async_std::io::stdin())).await; + let init = + Rc::new(extension_init(data, host_header, Rc::new(|fut| mem::drop(spawn_local(fut))))); + let mut buf = Vec::new(); + init.header.encode(Pin::new(&mut buf)).await; + std::io::stdout().write_all(&buf).unwrap(); + std::io::stdout().flush().unwrap(); + // These are concurrent processes that never exit, so if the FuturesUnordered + // produces any result the extension should exit + let mut io = FuturesUnordered::>::new(); + io.push(Box::pin(async { + loop { + match recv_parent_msg().await { + Ok(msg) => init.send(&msg[..]).await, + Err(e) if e.kind() == io::ErrorKind::BrokenPipe => break, + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => panic!("{e}"), + } + } + })); + io.push(Box::pin(async { + while let Some(msg) = init.recv().await { + send_parent_msg(&msg[..]).await.unwrap(); + } + })); + io.next().await; + }); + local_set.await; +} diff --git a/orchid-extension/src/tree.rs b/orchid-extension/src/tree.rs index 4918491..4818c83 100644 --- a/orchid-extension/src/tree.rs +++ b/orchid-extension/src/tree.rs @@ -1,4 +1,3 @@ -use std::future::Future; use std::num::NonZero; use std::ops::Range; use std::rc::Rc; @@ -133,9 +132,7 @@ trait_set! { } pub struct LazyMemberFactory(Box); impl LazyMemberFactory { - pub fn new + 'static>( - cb: impl FnOnce(Sym, SysCtx) -> F + Clone + 'static, - ) -> Self { + pub fn new(cb: impl AsyncFnOnce(Sym, SysCtx) -> MemKind + Clone + 'static) -> Self { Self(Box::new(|s, ctx| cb(s, ctx).boxed_local())) } pub async fn build(self, path: Sym, ctx: SysCtx) -> MemKind { (self.0)(path, ctx).await } diff --git a/orchid-host/Cargo.toml b/orchid-host/Cargo.toml index 3b743b8..2695b69 100644 --- a/orchid-host/Cargo.toml +++ b/orchid-host/Cargo.toml @@ -21,7 +21,7 @@ num-traits = "0.2.19" orchid-api = { version = "0.1.0", path = "../orchid-api" } orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" } orchid-base = { version = "0.1.0", path = "../orchid-base" } -ordered-float = "4.6.0" +ordered-float = "5.0.0" paste = "1.0.15" substack = "1.1.1" test_executors = "0.3.2" diff --git a/orchid-host/src/ctx.rs b/orchid-host/src/ctx.rs index e081728..3ca36aa 100644 --- a/orchid-host/src/ctx.rs +++ b/orchid-host/src/ctx.rs @@ -20,7 +20,7 @@ pub struct CtxData { pub systems: RwLock>, pub system_id: RefCell, pub owned_atoms: RwLock>, - pub root: RwLock, + // pub root: RwLock, } #[derive(Clone)] pub struct Ctx(Rc); @@ -36,7 +36,7 @@ impl Ctx { systems: RwLock::default(), system_id: RefCell::new(NonZero::new(1).unwrap()), owned_atoms: RwLock::default(), - root: RwLock::new(Module::default()), + // root: RwLock::new(Module::default()), })) } pub(crate) async fn system_inst(&self, id: api::SysId) -> Option { diff --git a/orchid-host/src/execute.rs b/orchid-host/src/execute.rs index 05a9663..381357b 100644 --- a/orchid-host/src/execute.rs +++ b/orchid-host/src/execute.rs @@ -11,7 +11,7 @@ use orchid_base::name::NameLike; use crate::ctx::Ctx; use crate::expr::{Expr, ExprKind, PathSet, Step}; -use crate::tree::{ItemKind, MemberKind}; +use crate::tree::{ItemKind, MemberKind, Module, Root}; type ExprGuard = Bound, Expr>; @@ -38,12 +38,13 @@ pub struct ExecCtx { cur_pos: Pos, did_pop: bool, logger: Logger, + root: Root, } impl ExecCtx { - pub async fn new(ctx: Ctx, logger: Logger, init: Expr) -> Self { + pub async fn new(ctx: Ctx, logger: Logger, root: Root, init: Expr) -> Self { let cur_pos = init.pos(); let cur = Bound::async_new(init, |init| init.kind().write()).await; - Self { ctx, gas: None, stack: vec![], cur, cur_pos, did_pop: false, logger } + Self { ctx, gas: None, stack: vec![], cur, cur_pos, did_pop: false, root, logger } } pub fn remaining_gas(&self) -> u64 { self.gas.expect("queried remaining_gas but no gas was set") } pub fn set_gas(&mut self, gas: Option) { self.gas = gas } @@ -91,38 +92,11 @@ impl ExecCtx { }, ExprKind::Seq(a, b) if !self.did_pop => (ExprKind::Seq(a.clone(), b), StackOp::Push(a)), ExprKind::Seq(_, b) => (ExprKind::Identity(b), StackOp::Nop), - ExprKind::Const(name) => { - let (cn, mp) = name.split_last(); - let root_lock = self.ctx.root.read().await; - let module = root_lock.walk(true, mp.iter().cloned()).await.unwrap(); - let member = (module.items.iter()) - .filter_map(|it| if let ItemKind::Member(m) = &it.kind { Some(m) } else { None }) - .find(|m| m.name() == cn); - match member { - None => ( - ExprKind::Bottom(mk_errv( - self.ctx.i.i("Constant does not exist").await, - format!("{name} does not refer to a constant"), - [self.cur_pos.clone().into()], - )), - StackOp::Pop, - ), - Some(mem) => match mem.kind().await { - MemberKind::Mod(_) => ( - ExprKind::Bottom(mk_errv( - self.ctx.i.i("module used as constant").await, - format!("{name} is a module"), - [self.cur_pos.clone().into()], - )), - StackOp::Pop, - ), - MemberKind::Const(c) => { - let value = c.get_bytecode(&self.ctx).await; - (ExprKind::Identity(value.clone()), StackOp::Nop) - }, - }, - } - }, + ExprKind::Const(name) => + match self.root.get_const_value(name, self.cur_pos.clone(), self.ctx.clone()).await { + Err(e) => (ExprKind::Bottom(e), StackOp::Pop), + Ok(v) => (ExprKind::Identity(v), StackOp::Nop), + }, ExprKind::Arg => panic!("This should not appear outside function bodies"), ek @ ExprKind::Atom(_) => (ek, StackOp::Pop), ExprKind::Bottom(bot) => (ExprKind::Bottom(bot.clone()), StackOp::Unwind(bot)), diff --git a/orchid-host/src/extension.rs b/orchid-host/src/extension.rs index 9e88e53..e176c1a 100644 --- a/orchid-host/src/extension.rs +++ b/orchid-host/src/extension.rs @@ -2,13 +2,15 @@ use std::cell::RefCell; use std::future::Future; use std::io; use std::num::NonZeroU64; +use std::pin::pin; use std::rc::{Rc, Weak}; use async_std::channel::{self, Sender}; use async_std::sync::Mutex; +use async_stream::stream; use derive_destructure::destructure; -use futures::FutureExt; use futures::future::{join, join_all}; +use futures::{FutureExt, StreamExt, stream, stream_select}; use hashbrown::HashMap; use itertools::Itertools; use orchid_api::HostMsgSet; @@ -41,13 +43,18 @@ pub struct ExtensionData { logger: Logger, next_pars: RefCell, exprs: ExprStore, + exiting_snd: Sender<()>, lex_recur: Mutex>>>, mac_recur: Mutex>>>, } impl Drop for ExtensionData { fn drop(&mut self) { let reqnot = self.reqnot.clone(); - (self.ctx.spawn)(Box::pin(async move { reqnot.notify(api::HostExtNotif::Exit).await })) + let exiting_snd = self.exiting_snd.clone(); + (self.ctx.spawn)(Box::pin(async move { + reqnot.notify(api::HostExtNotif::Exit).await; + exiting_snd.send(()).await.unwrap() + })) } } @@ -57,43 +64,34 @@ impl Extension { pub fn new(init: ExtInit, logger: Logger, msg_logger: Logger, ctx: Ctx) -> io::Result { Ok(Self(Rc::new_cyclic(|weak: &Weak| { let init = Rc::new(init); + let (exiting_snd, exiting_rcv) = channel::bounded::<()>(1); (ctx.spawn)(clone!(init, weak, ctx; Box::pin(async move { - let reqnot_opt = weak.upgrade().map(|rc| rc.reqnot.clone()); - if let Some(reqnot) = reqnot_opt { - let mut repeat = true; - while repeat { - repeat = false; - (init.recv(Box::new(|msg| { - repeat = true; - Box::pin(clone!(reqnot, ctx; async move { - let msg = msg.to_vec(); - let reqnot = reqnot.clone(); - (ctx.spawn)(Box::pin(async move { - reqnot.receive(&msg).await; - })) - })) - }))) - .await; + let rcv_stream = stream! { loop { yield init.recv().await } }; + let mut event_stream = pin!(stream::select(exiting_rcv.map(|()| None), rcv_stream)); + while let Some(Some(msg)) = event_stream.next().await { + if let Some(reqnot) = weak.upgrade().map(|rc| rc.reqnot.clone()) { + let reqnot = reqnot.clone(); + (ctx.spawn)(Box::pin(async move { + reqnot.receive(&msg).await; + })) } } }))); ExtensionData { + exiting_snd, exprs: ExprStore::default(), ctx: ctx.clone(), systems: (init.systems.iter().cloned()) .map(|decl| SystemCtor { decl, ext: WeakExtension(weak.clone()) }) .collect(), logger: logger.clone(), - init, + init: init.clone(), next_pars: RefCell::new(NonZeroU64::new(1).unwrap()), lex_recur: Mutex::default(), mac_recur: Mutex::default(), reqnot: ReqNot::new( msg_logger, - clone!(weak; move |sfn, _| clone!(weak; async move { - let data = weak.upgrade().unwrap(); - data.init.send(sfn).await - }.boxed_local())), + move |sfn, _| clone!(init; Box::pin(async move { init.send(sfn).await })), clone!(weak; move |notif, _| { clone!(weak; Box::pin(async move { let this = Extension(weak.upgrade().unwrap()); diff --git a/orchid-host/src/macros.rs b/orchid-host/src/macros.rs index dff356c..ab11de1 100644 --- a/orchid-host/src/macros.rs +++ b/orchid-host/src/macros.rs @@ -33,9 +33,8 @@ pub async fn macro_treev_to_api(mtree: Vec, slots: &mut Slots) -> Vec, ctx: Ctx) -> Vec { - mtreev_from_api(&api, &ctx.clone().i, &mut move |atom| { - clone!(ctx); - Box::pin(async move { MacTok::Atom(AtomHand::new(atom.clone(), &ctx).await) }) + mtreev_from_api(&api, &ctx.clone().i, &mut async move |atom| { + MacTok::Atom(AtomHand::new(atom.clone(), &ctx).await) }) .await } diff --git a/orchid-host/src/subprocess.rs b/orchid-host/src/subprocess.rs index e0b7aad..4f1caf0 100644 --- a/orchid-host/src/subprocess.rs +++ b/orchid-host/src/subprocess.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use async_process::{self, Child, ChildStdin, ChildStdout}; use async_std::io::{self, BufReadExt, BufReader}; use async_std::sync::Mutex; -use futures::FutureExt; +use futures::AsyncWriteExt; use futures::future::LocalBoxFuture; use orchid_api_traits::{Decode, Encode}; use orchid_base::builtin::{ExtInit, ExtPort}; @@ -46,7 +46,7 @@ pub async fn ext_command( header, port: Box::new(Subprocess { child: RefCell::new(Some(child)), - stdin: Mutex::new(Box::pin(stdin)), + stdin: Some(Mutex::new(Box::pin(stdin))), stdout: Mutex::new(Box::pin(stdout)), ctx, }), @@ -55,14 +55,16 @@ pub async fn ext_command( pub struct Subprocess { child: RefCell>, - stdin: Mutex>>, + stdin: Option>>>, stdout: Mutex>>, ctx: Ctx, } impl Drop for Subprocess { fn drop(&mut self) { let mut child = self.child.borrow_mut().take().unwrap(); + let stdin = self.stdin.take().unwrap(); (self.ctx.spawn)(Box::pin(async move { + stdin.lock().await.close().await.unwrap(); let status = child.status().await.expect("Extension exited with error"); assert!(status.success(), "Extension exited with error {status}"); })) @@ -70,18 +72,17 @@ impl Drop for Subprocess { } impl ExtPort for Subprocess { fn send<'a>(&'a self, msg: &'a [u8]) -> LocalBoxFuture<'a, ()> { - async { send_msg(Pin::new(&mut *self.stdin.lock().await), msg).await.unwrap() }.boxed_local() + Box::pin(async { + send_msg(Pin::new(&mut *self.stdin.as_ref().unwrap().lock().await), msg).await.unwrap() + }) } - fn recv<'a>( - &'a self, - cb: Box LocalBoxFuture<'_, ()> + 'a>, - ) -> LocalBoxFuture<'a, ()> { + fn recv(&self) -> LocalBoxFuture<'_, Option>> { Box::pin(async { std::io::Write::flush(&mut std::io::stderr()).unwrap(); match recv_msg(self.stdout.lock().await.as_mut()).await { - Ok(msg) => cb(&msg).await, - Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (), - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => (), + Ok(msg) => Some(msg), + Err(e) if e.kind() == io::ErrorKind::BrokenPipe => None, + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None, Err(e) => panic!("Failed to read from stdout: {}, {e}", e.kind()), } }) diff --git a/orchid-host/src/system.rs b/orchid-host/src/system.rs index 22b9782..912e9b2 100644 --- a/orchid-host/src/system.rs +++ b/orchid-host/src/system.rs @@ -24,7 +24,7 @@ use substack::{Stackframe, Substack}; use crate::api; use crate::ctx::Ctx; use crate::extension::{Extension, WeakExtension}; -use crate::tree::{ItemKind, Member, Module, ParsTokTree}; +use crate::tree::{ItemKind, Member, Module, ParsTokTree, Root}; #[derive(destructure)] struct SystemInstData { @@ -80,7 +80,7 @@ impl System { comments: Vec, ) -> OrcRes> { let line = - join_all(line.iter().map(|t| async { t.to_api(&mut |n, _| match *n {}).await })).await; + join_all(line.iter().map(|t| async { t.to_api(&mut async |n, _| match *n {}).await })).await; let comments = comments.iter().map(Comment::to_api).collect_vec(); match self.reqnot().request(api::ParseLine { exported, sys: self.id(), comments, line }).await { Ok(parsed) => Ok(ttv_from_api(parsed, &mut self.ctx().clone(), &self.ctx().i).await), @@ -122,7 +122,7 @@ impl SystemCtor { self.decl.depends.iter().map(|s| &**s) } pub fn id(&self) -> api::SysDeclId { self.decl.id } - pub async fn run<'a>(&self, depends: impl IntoIterator) -> System { + pub async fn run<'a>(&self, depends: impl IntoIterator) -> (Module, System) { let depends = depends.into_iter().map(|si| si.id()).collect_vec(); debug_assert_eq!(depends.len(), self.decl.depends.len(), "Wrong number of deps provided"); let ext = self.ext.upgrade().expect("SystemCtor should be freed before Extension"); @@ -150,10 +150,8 @@ impl SystemCtor { .collect::>() .await; ext.ctx().systems.write().await.insert(id, data.downgrade()); - let mut swap = Module::default(); - mem::swap(&mut swap, &mut *ext.ctx().root.write().await); - *ext.ctx().root.write().await = Module::new(swap.items.into_iter().chain(const_root)); - data + let root = Module::new(const_root); + (root, data) } } @@ -166,7 +164,7 @@ pub enum SysResolvErr { pub async fn init_systems( tgts: &[String], exts: &[Extension], -) -> Result, SysResolvErr> { +) -> Result<(Root, Vec), SysResolvErr> { let mut to_load = HashMap::<&str, &SystemCtor>::new(); let mut to_find = tgts.iter().map(|s| s.as_str()).collect::>(); while let Some(target) = to_find.pop_front() { @@ -205,9 +203,11 @@ pub async fn init_systems( walk_deps(&mut to_load, &mut to_load_ordered, Substack::Bottom.new_frame(tgt))?; } let mut systems = HashMap::<&str, System>::new(); + let mut root = Module::default(); for ctor in to_load_ordered.iter() { - let sys = ctor.run(ctor.depends().map(|n| &systems[n])).await; + let (sys_root, sys) = ctor.run(ctor.depends().map(|n| &systems[n])).await; systems.insert(ctor.name(), sys); + root.merge(sys_root); } - Ok(systems.into_values().collect_vec()) + Ok((Root::new(root), systems.into_values().collect_vec())) } diff --git a/orchid-host/src/tree.rs b/orchid-host/src/tree.rs index f5398fa..cb55f67 100644 --- a/orchid-host/src/tree.rs +++ b/orchid-host/src/tree.rs @@ -2,17 +2,18 @@ use std::fmt::Debug; use std::rc::Rc; use async_once_cell::OnceCell; -use async_std::sync::Mutex; +use async_std::sync::{Mutex, RwLock}; use async_stream::stream; use futures::future::join_all; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use never::Never; +use orchid_base::error::{OrcRes, mk_errv}; use orchid_base::format::{FmtCtx, FmtUnit, Format, Variants}; use orchid_base::interner::Tok; use orchid_base::location::Pos; use orchid_base::macros::{mtreev_fmt, mtreev_from_api}; -use orchid_base::name::Sym; +use orchid_base::name::{NameLike, Sym}; use orchid_base::parse::{Comment, Import}; use orchid_base::tree::{AtomRepr, TokTree, Token}; use orchid_base::{clone, tl_cache}; @@ -52,7 +53,7 @@ impl Item { let kind = match tree.kind { api::ItemKind::Member(m) => ItemKind::Member(Member::from_api(m, path, sys).await), api::ItemKind::Import(name) => ItemKind::Import(Import { - path: Sym::from_api(name, &sys.ctx().i).await.iter().cloned().collect(), + path: Sym::from_api(name, &sys.ctx().i).await.iter().collect(), name: None, }), api::ItemKind::Export(e) => ItemKind::Export(Tok::from_api(e, &sys.ctx().i).await), @@ -66,11 +67,8 @@ impl Item { let pos = Pos::from_api(&rule.location, &sys.ctx().i).await; let pattern = mtreev_from_api(&rule.pattern, &sys.ctx().i, &mut { clone!(pos, sys); - move |a| { - clone!(pos, sys); - Box::pin(async move { - MacTok::Atom(AtomHand::from_api(a, pos.clone(), &mut sys.ctx().clone()).await) - }) + async move |a| { + MacTok::Atom(AtomHand::from_api(a, pos.clone(), &mut sys.ctx().clone()).await) } }) .await; @@ -182,6 +180,11 @@ impl Module { .collect_vec(); Self { imports: vec![], exports, items } } + pub fn merge(&mut self, other: Module) { + let mut swap = Module::default(); + std::mem::swap(self, &mut swap); + *self = Module::new(swap.items.into_iter().chain(other.items)) + } pub async fn from_api(m: api::Module, path: &mut Vec>, sys: &System) -> Self { Self::new( stream! { for item in m.items { yield Item::from_api(item, path, sys).boxed_local().await } } @@ -340,3 +343,32 @@ impl CodeLocator { Self { steps, rule_loc: Some((macro_i, rule_i)) } } } + +#[derive(Clone)] +pub struct Root(Rc>); +impl Root { + pub fn new(module: Module) -> Self { Self(Rc::new(RwLock::new(module))) } + pub async fn get_const_value(&self, name: impl NameLike, pos: Pos, ctx: Ctx) -> OrcRes { + let (cn, mp) = name.split_last(); + let root_lock = self.0.read().await; + let module = root_lock.walk(true, mp.iter().cloned()).await.unwrap(); + let member = (module.items.iter()) + .filter_map(|it| if let ItemKind::Member(m) = &it.kind { Some(m) } else { None }) + .find(|m| m.name() == cn); + match member { + None => Err(mk_errv( + ctx.i.i("Constant does not exist").await, + format!("{name} does not refer to a constant"), + [pos.clone().into()], + )), + Some(mem) => match mem.kind().await { + MemberKind::Mod(_) => Err(mk_errv( + ctx.i.i("module used as constant").await, + format!("{name} is a module, not a constant"), + [pos.clone().into()], + )), + MemberKind::Const(c) => Ok((c.get_bytecode(&ctx).await).clone()), + }, + } + } +} diff --git a/orchid-std/Cargo.toml b/orchid-std/Cargo.toml index 46affd6..4fb3210 100644 --- a/orchid-std/Cargo.toml +++ b/orchid-std/Cargo.toml @@ -14,7 +14,9 @@ orchid-api = { version = "0.1.0", path = "../orchid-api" } orchid-api-derive = { version = "0.1.0", path = "../orchid-api-derive" } orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" } orchid-base = { version = "0.1.0", path = "../orchid-base" } -orchid-extension = { version = "0.1.0", path = "../orchid-extension" } -ordered-float = "4.6.0" +orchid-extension = { version = "0.1.0", path = "../orchid-extension", features = [ + "tokio", +] } +ordered-float = "5.0.0" rust_decimal = "1.36.0" tokio = { version = "1.43.0", features = ["full"] } diff --git a/orchid-std/src/main.rs b/orchid-std/src/main.rs index d4b9cf0..f18edb0 100644 --- a/orchid-std/src/main.rs +++ b/orchid-std/src/main.rs @@ -1,16 +1,10 @@ use std::mem; use std::rc::Rc; -use orchid_extension::entrypoint::{ExtensionData, extension_main_logic}; +use orchid_extension::entrypoint::ExtensionData; +use orchid_extension::tokio::tokio_main; use orchid_std::StdSystem; use tokio::task::{LocalSet, spawn_local}; #[tokio::main(flavor = "current_thread")] -pub async fn main() { - LocalSet::new() - .run_until(async { - let data = ExtensionData::new("orchid-std::main", &[&StdSystem]); - extension_main_logic(data, Rc::new(|fut| mem::drop(spawn_local(fut)))).await; - }) - .await -} +pub async fn main() { tokio_main(ExtensionData::new("orchid-std::main", &[&StdSystem])).await } diff --git a/orchid.code-workspace b/orchid.code-workspace index 9851040..f942232 100644 --- a/orchid.code-workspace +++ b/orchid.code-workspace @@ -32,6 +32,8 @@ "rust-analyzer.rustfmt.extraArgs": [ "+nightly" ], + "rust-analyzer.cargo.features": "all", + "rust-analyzer.check.features": "all", "files.associations": { "*.mjsd": "markdown" }, diff --git a/orcx/src/main.rs b/orcx/src/main.rs index b38c96b..a0a7288 100644 --- a/orcx/src/main.rs +++ b/orcx/src/main.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::fs::File; use std::io::{Read, Write}; use std::mem; @@ -35,7 +36,9 @@ pub struct Args { #[arg(short, long, env = "ORCHID_DEFAULT_SYSTEMS", value_delimiter = ';')] system: Vec, #[arg(short, long)] - verbose: bool, + logs: bool, + #[arg(short, long)] + msg_logs: bool, #[command(subcommand)] command: Commands, } @@ -78,113 +81,117 @@ fn get_all_extensions<'a>( } } -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() -> io::Result { - let mut code = ExitCode::SUCCESS; - LocalSet::new() - .run_until(async { - let args = Args::parse(); - let ctx = &Ctx::new(Rc::new(|fut| mem::drop(spawn_local(fut)))); - let i = &ctx.i; - let logger = - Logger::new(if args.verbose { LogStrategy::StdErr } else { LogStrategy::Discard }); - let extensions = get_all_extensions(&args, &logger, &Logger::new(LogStrategy::Discard), ctx) - .try_collect::>() + let code = Rc::new(RefCell::new(ExitCode::SUCCESS)); + let local_set = LocalSet::new(); + let code1 = code.clone(); + local_set.spawn_local(async move { + let args = Args::parse(); + let ctx = &Ctx::new(Rc::new(|fut| mem::drop(spawn_local(fut)))); + let i = &ctx.i; + let logger = Logger::new(if args.logs { LogStrategy::StdErr } else { LogStrategy::Discard }); + let msg_logger = + Logger::new(if args.msg_logs { LogStrategy::StdErr } else { LogStrategy::Discard }); + let extensions = get_all_extensions(&args, &logger, &msg_logger, ctx) + .try_collect::>() + .await + .unwrap(); + match args.command { + Commands::Lex { file } => { + let (_, systems) = init_systems(&args.system, &extensions).await.unwrap(); + let mut file = File::open(file.as_std_path()).unwrap(); + let mut buf = String::new(); + file.read_to_string(&mut buf).unwrap(); + let lexemes = lex(i.i(&buf).await, &systems, ctx).await.unwrap(); + println!("{}", take_first(&ttv_fmt(&lexemes, &FmtCtxImpl { i }).await, true)) + }, + Commands::Parse { file } => { + let (_, systems) = init_systems(&args.system, &extensions).await.unwrap(); + let mut file = File::open(file.as_std_path()).unwrap(); + let mut buf = String::new(); + file.read_to_string(&mut buf).unwrap(); + let lexemes = lex(i.i(&buf).await, &systems, ctx).await.unwrap(); + let Some(first) = lexemes.first() else { + println!("File empty!"); + return; + }; + let reporter = ReporterImpl::new(); + let pctx = ParseCtxImpl { reporter: &reporter, systems: &systems }; + let snip = Snippet::new(first, &lexemes, i); + let ptree = parse_items(&pctx, Substack::Bottom, snip).await.unwrap(); + if let Some(errv) = reporter.errv() { + eprintln!("{errv}"); + *code1.borrow_mut() = ExitCode::FAILURE; + return; + } + if ptree.is_empty() { + eprintln!("File empty only after parsing, but no errors were reported"); + *code1.borrow_mut() = ExitCode::FAILURE; + return; + } + for item in ptree { + println!("{}", take_first(&item.print(&FmtCtxImpl { i }).await, true)) + } + }, + Commands::Repl => loop { + let (root, systems) = init_systems(&args.system, &extensions).await.unwrap(); + print!("\\.> "); + std::io::stdout().flush().unwrap(); + let mut prompt = String::new(); + stdin().read_line(&mut prompt).await.unwrap(); + let lexemes = lex(i.i(prompt.trim()).await, &systems, ctx).await.unwrap(); + if args.logs { + println!("lexed: {}", take_first(&ttv_fmt(&lexemes, &FmtCtxImpl { i }).await, true)); + } + let mtreev = parse_mtree( + Snippet::new(&lexemes[0], &lexemes, i), + Substack::Bottom.push(i.i("orcx").await).push(i.i("input").await), + ) .await .unwrap(); - match args.command { - Commands::Lex { file } => { - let systems = init_systems(&args.system, &extensions).await.unwrap(); - let mut file = File::open(file.as_std_path()).unwrap(); - let mut buf = String::new(); - file.read_to_string(&mut buf).unwrap(); - let lexemes = lex(i.i(&buf).await, &systems, ctx).await.unwrap(); - println!("{}", take_first(&ttv_fmt(&lexemes, &FmtCtxImpl { i }).await, true)) - }, - Commands::Parse { file } => { - let systems = init_systems(&args.system, &extensions).await.unwrap(); - let mut file = File::open(file.as_std_path()).unwrap(); - let mut buf = String::new(); - file.read_to_string(&mut buf).unwrap(); - let lexemes = lex(i.i(&buf).await, &systems, ctx).await.unwrap(); - let Some(first) = lexemes.first() else { - println!("File empty!"); - return; - }; - let reporter = ReporterImpl::new(); - let pctx = ParseCtxImpl { reporter: &reporter, systems: &systems }; - let snip = Snippet::new(first, &lexemes, i); - let ptree = parse_items(&pctx, Substack::Bottom, snip).await.unwrap(); - if let Some(errv) = reporter.errv() { - eprintln!("{errv}"); - code = ExitCode::FAILURE; - return; - } - if ptree.is_empty() { - eprintln!("File empty only after parsing, but no errors were reported"); - code = ExitCode::FAILURE; - return; - } - for item in ptree { - println!("{}", take_first(&item.print(&FmtCtxImpl { i }).await, true)) - } - }, - Commands::Repl => loop { - let systems = init_systems(&args.system, &extensions).await.unwrap(); - print!("\\.> "); - std::io::stdout().flush().unwrap(); - let mut prompt = String::new(); - stdin().read_line(&mut prompt).await.unwrap(); - let lexemes = lex(i.i(prompt.trim()).await, &systems, ctx).await.unwrap(); - if args.verbose { - println!("lexed: {}", take_first(&ttv_fmt(&lexemes, &FmtCtxImpl { i }).await, true)); - } - let mtreev = parse_mtree( - Snippet::new(&lexemes[0], &lexemes, i), - Substack::Bottom.push(i.i("orcx").await).push(i.i("input").await), - ) - .await - .unwrap(); - if args.verbose { - let fmt = mtreev_fmt(&mtreev, &FmtCtxImpl { i }).await; - println!("parsed: {}", take_first(&fmt, true)); - } - let expr = mtreev_to_expr(&mtreev, Substack::Bottom, ctx).await; - let mut xctx = ExecCtx::new(ctx.clone(), logger.clone(), expr.at(Pos::None)).await; - xctx.set_gas(Some(1000)); - xctx.execute().await; - match xctx.result() { - ExecResult::Value(val) => - println!("{}", take_first(&val.print(&FmtCtxImpl { i }).await, false)), - ExecResult::Err(e) => println!("error: {e}"), - ExecResult::Gas(_) => println!("Ran out of gas!"), - } - }, - Commands::Execute { code } => { - let systems = init_systems(&args.system, &extensions).await.unwrap(); - let lexemes = lex(i.i(code.trim()).await, &systems, ctx).await.unwrap(); - if args.verbose { - println!("lexed: {}", take_first(&ttv_fmt(&lexemes, &FmtCtxImpl { i }).await, true)); - } - let mtreev = - parse_mtree(Snippet::new(&lexemes[0], &lexemes, i), Substack::Bottom).await.unwrap(); - if args.verbose { - let fmt = mtreev_fmt(&mtreev, &FmtCtxImpl { i }).await; - println!("parsed: {}", take_first(&fmt, true)); - } - let expr = mtreev_to_expr(&mtreev, Substack::Bottom, ctx).await; - let mut xctx = ExecCtx::new(ctx.clone(), logger.clone(), expr.at(Pos::None)).await; - xctx.set_gas(Some(1000)); - xctx.execute().await; - match xctx.result() { - ExecResult::Value(val) => - println!("{}", take_first(&val.print(&FmtCtxImpl { i }).await, false)), - ExecResult::Err(e) => println!("error: {e}"), - ExecResult::Gas(_) => println!("Ran out of gas!"), - } - }, - } - }) - .await; - Ok(code) + if args.logs { + let fmt = mtreev_fmt(&mtreev, &FmtCtxImpl { i }).await; + println!("parsed: {}", take_first(&fmt, true)); + } + let expr = mtreev_to_expr(&mtreev, Substack::Bottom, ctx).await; + let mut xctx = + ExecCtx::new(ctx.clone(), logger.clone(), root.clone(), expr.at(Pos::None)).await; + xctx.set_gas(Some(1000)); + xctx.execute().await; + match xctx.result() { + ExecResult::Value(val) => + println!("{}", take_first(&val.print(&FmtCtxImpl { i }).await, false)), + ExecResult::Err(e) => println!("error: {e}"), + ExecResult::Gas(_) => println!("Ran out of gas!"), + } + }, + Commands::Execute { code } => { + let (root, systems) = init_systems(&args.system, &extensions).await.unwrap(); + let lexemes = lex(i.i(code.trim()).await, &systems, ctx).await.unwrap(); + if args.logs { + println!("lexed: {}", take_first(&ttv_fmt(&lexemes, &FmtCtxImpl { i }).await, true)); + } + let mtreev = + parse_mtree(Snippet::new(&lexemes[0], &lexemes, i), Substack::Bottom).await.unwrap(); + if args.logs { + let fmt = mtreev_fmt(&mtreev, &FmtCtxImpl { i }).await; + println!("parsed: {}", take_first(&fmt, true)); + } + let expr = mtreev_to_expr(&mtreev, Substack::Bottom, ctx).await; + let mut xctx = ExecCtx::new(ctx.clone(), logger.clone(), root, expr.at(Pos::None)).await; + xctx.set_gas(Some(1000)); + xctx.execute().await; + match xctx.result() { + ExecResult::Value(val) => + println!("{}", take_first(&val.print(&FmtCtxImpl { i }).await, false)), + ExecResult::Err(e) => println!("error: {e}"), + ExecResult::Gas(_) => println!("Ran out of gas!"), + } + }, + } + }); + local_set.await; + let x = *code.borrow(); + Ok(x) }