reference cycles resolved

This commit is contained in:
2025-02-22 19:01:05 +01:00
parent cfa8b6ee52
commit 5e474069e0
32 changed files with 433 additions and 388 deletions

View File

@@ -9,6 +9,7 @@ edition = "2021"
ahash = "0.8.11"
async-once-cell = "0.5.4"
async-std = "1.13.0"
async-stream = "0.3.6"
derive_destructure = "1.0.0"
dyn-clone = "1.0.17"
futures = "0.3.31"
@@ -23,8 +24,9 @@ orchid-api = { version = "0.1.0", path = "../orchid-api" }
orchid-api-derive = { version = "0.1.0", path = "../orchid-api-derive" }
orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" }
orchid-base = { version = "0.1.0", path = "../orchid-base" }
ordered-float = "4.6.0"
ordered-float = "5.0.0"
paste = "1.0.15"
some_executor = "0.4.0"
substack = "1.1.1"
tokio = { version = "1.43.0", optional = true }
trait-set = "0.3.0"

View File

@@ -319,8 +319,8 @@ trait_set! {
}
pub struct AtomFactory(Box<dyn AtomFactoryFn>);
impl AtomFactory {
pub fn new<F: Future<Output = api::Atom> + 'static>(
f: impl FnOnce(SysCtx) -> F + Clone + 'static,
pub fn new(
f: impl AsyncFnOnce(SysCtx) -> api::Atom + Clone + 'static,
) -> Self {
Self(Box::new(|ctx| f(ctx).boxed_local()))
}

View File

@@ -35,7 +35,7 @@ pub struct OwnedVariant;
impl AtomicVariant for OwnedVariant {}
impl<A: OwnedAtom + Atomic<Variant = OwnedVariant>> AtomicFeaturesImpl<OwnedVariant> for A {
fn _factory(self) -> AtomFactory {
AtomFactory::new(move |ctx| async move {
AtomFactory::new(async move |ctx| {
let serial =
ctx.get_or_default::<ObjStore>().next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let atom_id = api::AtomId(NonZero::new(serial + 1).unwrap());
@@ -82,25 +82,26 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
fn tid(&self) -> TypeId { TypeId::of::<T>() }
fn name(&self) -> &'static str { type_name::<T>() }
fn decode<'a>(&'a self, AtomCtx(data, ..): AtomCtx<'a>) -> LocalBoxFuture<'a, Box<dyn Any>> {
async {
Box::pin(async {
Box::new(<T as AtomCard>::Data::decode(Pin::new(&mut &data[..])).await) as Box<dyn Any>
}
.boxed_local()
})
}
fn call(&self, AtomCtx(_, id, ctx): AtomCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, GExpr> {
async move { take_atom(id.unwrap(), &ctx).await.dyn_call(ctx.clone(), arg).await }.boxed_local()
Box::pin(async move { take_atom(id.unwrap(), &ctx).await.dyn_call(ctx.clone(), arg).await })
}
fn call_ref<'a>(
&'a self,
AtomCtx(_, id, ctx): AtomCtx<'a>,
arg: api::ExprTicket,
) -> LocalBoxFuture<'a, GExpr> {
async move { AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_call_ref(ctx.clone(), arg).await }
.boxed_local()
Box::pin(async move {
AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_call_ref(ctx.clone(), arg).await
})
}
fn print(&self, AtomCtx(_, id, ctx): AtomCtx<'_>) -> LocalBoxFuture<'_, FmtUnit> {
async move { AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_print(ctx.clone()).await }
.boxed_local()
Box::pin(
async move { AtomReadGuard::new(id.unwrap(), &ctx).await.dyn_print(ctx.clone()).await },
)
}
fn handle_req<'a, 'b: 'a, 'c: 'a>(
&'a self,
@@ -109,34 +110,32 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
req: Pin<&'b mut dyn Read>,
rep: Pin<&'c mut dyn Write>,
) -> LocalBoxFuture<'a, bool> {
async move {
Box::pin(async move {
let a = AtomReadGuard::new(id.unwrap(), &ctx).await;
let ms = self.ms.get_or_init(self.msbuild.pack(ctx.clone())).await;
ms.dispatch(a.as_any_ref().downcast_ref().unwrap(), ctx.clone(), key, req, rep).await
}
.boxed_local()
})
}
fn command<'a>(
&'a self,
AtomCtx(_, id, ctx): AtomCtx<'a>,
) -> LocalBoxFuture<'a, OrcRes<Option<GExpr>>> {
async move { take_atom(id.unwrap(), &ctx).await.dyn_command(ctx.clone()).await }.boxed_local()
Box::pin(async move { take_atom(id.unwrap(), &ctx).await.dyn_command(ctx.clone()).await })
}
fn drop(&self, AtomCtx(_, id, ctx): AtomCtx) -> LocalBoxFuture<'_, ()> {
async move { take_atom(id.unwrap(), &ctx).await.dyn_free(ctx.clone()).await }.boxed_local()
Box::pin(async move { take_atom(id.unwrap(), &ctx).await.dyn_free(ctx.clone()).await })
}
fn serialize<'a, 'b: 'a>(
&'a self,
AtomCtx(_, id, ctx): AtomCtx<'a>,
mut write: Pin<&'b mut dyn Write>,
) -> LocalBoxFuture<'a, Option<Vec<api::ExprTicket>>> {
async move {
Box::pin(async move {
let id = id.unwrap();
id.encode(write.as_mut()).await;
let refs = AtomReadGuard::new(id, &ctx).await.dyn_serialize(ctx.clone(), write).await;
refs.map(|v| v.into_iter().map(|t| t.handle().tk).collect_vec())
}
.boxed_local()
})
}
fn deserialize<'a>(
&'a self,
@@ -144,13 +143,12 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
data: &'a [u8],
refs: &'a [api::ExprTicket],
) -> LocalBoxFuture<'a, api::Atom> {
async move {
Box::pin(async move {
let refs =
refs.iter().map(|tk| Expr::from_handle(Rc::new(ExprHandle::from_args(ctx.clone(), *tk))));
let obj = T::deserialize(DeserCtxImpl(data, &ctx), T::Refs::from_iter(refs)).await;
obj._factory().build(ctx).await
}
.boxed_local()
})
}
}

View File

@@ -25,7 +25,7 @@ pub struct ThinVariant;
impl AtomicVariant for ThinVariant {}
impl<A: ThinAtom + Atomic<Variant = ThinVariant>> AtomicFeaturesImpl<ThinVariant> for A {
fn _factory(self) -> AtomFactory {
AtomFactory::new(move |ctx| async move {
AtomFactory::new(async move |ctx| {
let (id, _) = get_info::<A>(ctx.get::<CtedObj>().inst().card());
let mut buf = enc_vec(&id).await;
self.encode(Pin::new(&mut buf)).await;
@@ -42,12 +42,12 @@ pub struct ThinAtomDynfo<T: ThinAtom> {
}
impl<T: ThinAtom> AtomDynfo for ThinAtomDynfo<T> {
fn print<'a>(&self, AtomCtx(buf, _, ctx): AtomCtx<'a>) -> LocalBoxFuture<'a, FmtUnit> {
async move { T::decode(Pin::new(&mut &buf[..])).await.print(ctx).await }.boxed_local()
Box::pin(async move { T::decode(Pin::new(&mut &buf[..])).await.print(ctx).await })
}
fn tid(&self) -> TypeId { TypeId::of::<T>() }
fn name(&self) -> &'static str { type_name::<T>() }
fn decode<'a>(&'a self, AtomCtx(buf, ..): AtomCtx<'a>) -> LocalBoxFuture<'a, Box<dyn Any>> {
async { Box::new(T::decode(Pin::new(&mut &buf[..])).await) as Box<dyn Any> }.boxed_local()
Box::pin(async { Box::new(T::decode(Pin::new(&mut &buf[..])).await) as Box<dyn Any> })
}
fn call<'a>(
&'a self,
@@ -102,14 +102,13 @@ impl<T: ThinAtom> AtomDynfo for ThinAtomDynfo<T> {
refs: &'a [api::ExprTicket],
) -> LocalBoxFuture<'a, api::Atom> {
assert!(refs.is_empty(), "Refs found when deserializing thin atom");
async { T::decode(Pin::new(&mut &data[..])).await._factory().build(ctx).await }.boxed_local()
Box::pin(async { T::decode(Pin::new(&mut &data[..])).await._factory().build(ctx).await })
}
fn drop<'a>(&'a self, AtomCtx(buf, _, ctx): AtomCtx<'a>) -> LocalBoxFuture<'a, ()> {
async move {
Box::pin(async move {
let string_self = T::decode(Pin::new(&mut &buf[..])).await.print(ctx.clone()).await;
writeln!(ctx.logger(), "Received drop signal for non-drop atom {string_self:?}");
}
.boxed_local()
})
}
}

View File

@@ -1,23 +1,20 @@
use std::cell::RefCell;
use std::future::Future;
use std::io::Write;
use std::mem;
use std::num::NonZero;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use async_std::channel::{Receiver, Sender};
use async_std::channel::{self, Receiver, RecvError, Sender};
use async_std::stream;
use async_std::sync::Mutex;
use futures::future::{LocalBoxFuture, join_all};
use futures::{FutureExt, StreamExt};
use futures::{FutureExt, StreamExt, stream_select};
use hashbrown::HashMap;
use itertools::Itertools;
use orchid_api::{ApplyMacro, ExtMsgSet};
use orchid_api_traits::{Decode, Encode, enc_vec};
use orchid_base::builtin::{ExtPort, Spawner};
use orchid_api_traits::{Decode, enc_vec};
use orchid_base::builtin::{ExtInit, ExtPort, Spawner};
use orchid_base::char_filter::{char_filter_match, char_filter_union, mk_char_filter};
use orchid_base::clone;
use orchid_base::interner::{Interner, Tok};
@@ -36,7 +33,6 @@ use crate::atom_owned::take_atom;
use crate::fs::VirtFS;
use crate::lexer::{LexContext, err_cascade, err_not_applicable};
use crate::macros::{Rule, RuleCtx};
use crate::msg::{recv_parent_msg, send_parent_msg};
use crate::system::{SysCtx, atom_by_idx};
use crate::system_ctor::{CtedObj, DynSystemCtor};
use crate::tree::{GenTok, GenTokTree, LazyMemberFactory, TreeIntoApiCtxImpl, do_extra};
@@ -69,18 +65,18 @@ pub struct SystemRecord {
}
trait_set! {
pub trait WARCallback<'a, T> = FnOnce(
pub trait WithAtomRecordCallback<'a, T> = AsyncFnOnce(
Box<dyn AtomDynfo>,
SysCtx,
AtomTypeId,
&'a [u8]
) -> LocalBoxFuture<'a, T>
) -> T
}
pub async fn with_atom_record<'a, F: Future<Output = SysCtx>, T>(
get_sys_ctx: &impl Fn(api::SysId) -> F,
atom: &'a api::Atom,
cb: impl WARCallback<'a, T>,
cb: impl WithAtomRecordCallback<'a, T>,
) -> T {
let mut data = &atom.data[..];
let ctx = get_sys_ctx(atom.owner).await;
@@ -104,49 +100,41 @@ pub async fn with_atom_record<'a, F: Future<Output = SysCtx>, T>(
// }
pub struct ExtensionOwner {
rn: ReqNot<api::ExtMsgSet>,
_interner_cell: Rc<RefCell<Option<Interner>>>,
_systems_lock: Rc<Mutex<HashMap<api::SysId, SystemRecord>>>,
out_recv: Receiver<Vec<u8>>,
out_send: Sender<Vec<u8>>,
ext_header: api::ExtensionHeader,
}
impl ExtPort for ExtensionOwner {
fn send<'a>(&'a self, msg: &'a [u8]) -> LocalBoxFuture<'a, ()> {
self.rn.receive(msg).boxed_local()
Box::pin(async { self.out_send.send(msg.to_vec()).boxed_local().await.unwrap() })
}
fn recv<'a>(
&'a self,
cb: Box<dyn FnOnce(&[u8]) -> LocalBoxFuture<'_, ()> + 'a>,
) -> LocalBoxFuture<'a, ()> {
async {
let msg = self.out_recv.recv().await.unwrap();
cb(&msg[..]).await
}
.boxed_local()
fn recv(&self) -> LocalBoxFuture<'_, Option<Vec<u8>>> {
Box::pin(async {
match self.out_recv.recv().await {
Ok(v) => Some(v),
Err(RecvError) => None,
}
})
}
}
impl ExtensionOwner {
pub fn ext_header(&self) -> &api::ExtensionHeader { &self.ext_header }
// pub async fn new(data: ExtensionData, spawner: Spawner, header:
// api::HostHeader) -> Self { let decls =
// }
}
pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
let api::HostHeader { log_strategy, msg_logs } =
api::HostHeader::decode(Pin::new(&mut async_std::io::stdin())).await;
let mut buf = Vec::new();
pub fn extension_init(
data: ExtensionData,
host_header: api::HostHeader,
spawner: Spawner,
) -> ExtInit {
let api::HostHeader { log_strategy, msg_logs } = host_header;
let decls = (data.systems.iter().enumerate())
.map(|(id, sys)| (u16::try_from(id).expect("more than u16max system ctors"), sys))
.map(|(id, sys)| sys.decl(api::SysDeclId(NonZero::new(id + 1).unwrap())))
.collect_vec();
let systems_lock = Rc::new(Mutex::new(HashMap::<api::SysId, SystemRecord>::new()));
api::ExtensionHeader { name: data.name.to_string(), systems: decls.clone() }
.encode(Pin::new(&mut buf))
.await;
std::io::stdout().write_all(&buf).unwrap();
std::io::stdout().flush().unwrap();
let exiting = Arc::new(AtomicBool::new(false));
let ext_header = api::ExtensionHeader { name: data.name.to_string(), systems: decls.clone() };
let (out_send, in_recv) = channel::bounded::<Vec<u8>>(1);
let (in_send, out_recv) = channel::bounded::<Vec<u8>>(1);
let (exit_send, exit_recv) = channel::bounded(1);
let logger = Logger::new(log_strategy);
let msg_logger = Logger::new(msg_logs);
let interner_cell = Rc::new(RefCell::new(None::<Interner>));
@@ -159,9 +147,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
x
}));
let init_ctx = {
clone!(systems_weak, interner_weak, spawner, logger);
clone!(interner_weak, spawner, logger);
move |id: api::SysId, cted: CtedObj, reqnot: ReqNot<ExtMsgSet>| {
clone!(systems_weak, interner_weak, spawner, logger; async move {
clone!(interner_weak, spawner, logger; async move {
let interner_rc =
interner_weak.upgrade().expect("System construction order while shutting down");
let i = interner_rc.borrow().clone().expect("mk_ctx called very early, no interner!");
@@ -171,11 +159,11 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
};
let rn = ReqNot::<api::ExtMsgSet>::new(
msg_logger.clone(),
move |a, _| async move { send_parent_msg(a).await.unwrap() }.boxed_local(),
clone!(systems_weak, exiting, get_ctx; move |n, reqnot| {
clone!(systems_weak, exiting, get_ctx; async move {
move |a, _| clone!(in_send; Box::pin(async move { in_send.send(a.to_vec()).await.unwrap() })),
clone!(systems_weak, exit_send, get_ctx; move |n, _| {
clone!(systems_weak, exit_send, get_ctx; async move {
match n {
api::HostExtNotif::Exit => exiting.store(true, Ordering::Relaxed),
api::HostExtNotif::Exit => exit_send.send(()).await.unwrap(),
api::HostExtNotif::SystemDrop(api::SystemDrop(sys_id)) =>
if let Some(rc) = systems_weak.upgrade() {
mem::drop(rc.lock().await.remove(&sys_id))
@@ -188,9 +176,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
}.boxed_local())
}),
{
clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, spawner, decls, msg_logger);
clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, decls, msg_logger);
move |hand, req| {
clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, spawner, decls, msg_logger);
clone!(logger, get_ctx, init_ctx, systems_weak, interner_weak, decls, msg_logger);
async move {
let interner_cell = interner_weak.upgrade().expect("Interner dropped before request");
let i = interner_cell.borrow().clone().expect("Request arrived before interner set");
@@ -302,11 +290,8 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
return hand.handle(&lex, &eopt).await;
},
Ok((s, expr)) => {
let expr = expr
.to_api(&mut |f, r| {
clone!(sys_ctx; async move { do_extra(f, r, sys_ctx).await }).boxed_local()
})
.await;
let expr =
expr.to_api(&mut async |f, r| do_extra(f, r, sys_ctx.clone()).await).await;
let pos = (text.len() - s.len()) as u32;
return hand.handle(&lex, &Some(Ok(api::LexedExpr { pos, expr }))).await;
},
@@ -329,12 +314,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
let o_line = match parser.parse(*exported, comments, tail) {
Err(e) => Err(e.to_api()),
Ok(t) => Ok(
ttv_to_api(t, &mut |f, range| {
clone!(ctx);
async move {
api::TokenTree { range, token: api::Token::Atom(f.clone().build(ctx).await) }
}
.boxed_local()
ttv_to_api(t, &mut async move |f, range| api::TokenTree {
range,
token: api::Token::Atom(f.clone().build(ctx.clone()).await),
})
.await,
),
@@ -344,52 +326,49 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
api::HostExtReq::AtomReq(atom_req) => {
let atom = atom_req.get_atom();
let atom_req = atom_req.clone();
with_atom_record(&get_ctx, atom, move |nfo, ctx, id, buf| {
async move {
let actx = AtomCtx(buf, atom.drop, ctx.clone());
match &atom_req {
api::AtomReq::SerializeAtom(ser) => {
let mut buf = enc_vec(&id).await;
let refs_opt = nfo.serialize(actx, Pin::<&mut Vec<_>>::new(&mut buf)).await;
hand.handle(ser, &refs_opt.map(|refs| (buf, refs))).await
},
api::AtomReq::AtomPrint(print @ api::AtomPrint(_)) =>
hand.handle(print, &nfo.print(actx).await.to_api()).await,
api::AtomReq::Fwded(fwded) => {
let api::Fwded(_, key, payload) = &fwded;
let mut reply = Vec::new();
let key = Sym::from_api(*key, &i).await;
let some = nfo
.handle_req(
actx,
key,
Pin::<&mut &[u8]>::new(&mut &payload[..]),
Pin::<&mut Vec<_>>::new(&mut reply),
)
.await;
hand.handle(fwded, &some.then_some(reply)).await
},
api::AtomReq::CallRef(call @ api::CallRef(_, arg)) => {
let ret = nfo.call_ref(actx, *arg).await;
hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await
},
api::AtomReq::FinalCall(call @ api::FinalCall(_, arg)) => {
let ret = nfo.call(actx, *arg).await;
hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await
},
api::AtomReq::Command(cmd @ api::Command(_)) => match nfo.command(actx).await {
Err(e) => hand.handle(cmd, &Err(e.to_api())).await,
Ok(opt) => match opt {
None => hand.handle(cmd, &Ok(api::NextStep::Halt)).await,
Some(cont) => {
let cont = cont.api_return(ctx.clone(), &hand).await;
hand.handle(cmd, &Ok(api::NextStep::Continue(cont))).await
},
with_atom_record(&get_ctx, atom, async move |nfo, ctx, id, buf| {
let actx = AtomCtx(buf, atom.drop, ctx.clone());
match &atom_req {
api::AtomReq::SerializeAtom(ser) => {
let mut buf = enc_vec(&id).await;
let refs_opt = nfo.serialize(actx, Pin::<&mut Vec<_>>::new(&mut buf)).await;
hand.handle(ser, &refs_opt.map(|refs| (buf, refs))).await
},
api::AtomReq::AtomPrint(print @ api::AtomPrint(_)) =>
hand.handle(print, &nfo.print(actx).await.to_api()).await,
api::AtomReq::Fwded(fwded) => {
let api::Fwded(_, key, payload) = &fwded;
let mut reply = Vec::new();
let key = Sym::from_api(*key, &i).await;
let some = nfo
.handle_req(
actx,
key,
Pin::<&mut &[u8]>::new(&mut &payload[..]),
Pin::<&mut Vec<_>>::new(&mut reply),
)
.await;
hand.handle(fwded, &some.then_some(reply)).await
},
api::AtomReq::CallRef(call @ api::CallRef(_, arg)) => {
let ret = nfo.call_ref(actx, *arg).await;
hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await
},
api::AtomReq::FinalCall(call @ api::FinalCall(_, arg)) => {
let ret = nfo.call(actx, *arg).await;
hand.handle(call, &ret.api_return(ctx.clone(), &hand).await).await
},
api::AtomReq::Command(cmd @ api::Command(_)) => match nfo.command(actx).await {
Err(e) => hand.handle(cmd, &Err(e.to_api())).await,
Ok(opt) => match opt {
None => hand.handle(cmd, &Ok(api::NextStep::Halt)).await,
Some(cont) => {
let cont = cont.api_return(ctx.clone(), &hand).await;
hand.handle(cmd, &Ok(api::NextStep::Continue(cont))).await
},
},
}
},
}
.boxed_local()
})
.await
},
@@ -411,7 +390,7 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
for (k, v) in params {
ctx.args.insert(
Tok::from_api(k, &i).await,
mtreev_from_api(&v, &i, &mut |_| panic!("No atom in macro prompt!")).await,
mtreev_from_api(&v, &i, &mut async |_| panic!("No atom in macro prompt!")).await,
);
}
let err_cascade = err_cascade(&i).await;
@@ -424,10 +403,9 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
hand.handle_as(tok, &new_errors.map(|e| Err(e.to_api()))).await
},
Ok(t) => {
let result = mtreev_to_api(&t, &mut |a| {
clone!(sys_ctx; async move {
api::MacroToken::Atom(a.clone().build(sys_ctx.clone()).await)
}.boxed_local())
clone!(sys_ctx);
let result = mtreev_to_api(&t, &mut async |a| {
api::MacroToken::Atom(a.clone().build(sys_ctx.clone()).await)
})
.await;
hand.handle_as(tok, &Some(Ok(result))).await
@@ -441,8 +419,22 @@ pub async fn extension_main_logic(data: ExtensionData, spawner: Spawner) {
},
);
*interner_cell.borrow_mut() = Some(Interner::new_replica(rn.clone().map()));
while !exiting.load(Ordering::Relaxed) {
let rcvd = recv_parent_msg().await.unwrap();
spawner(Box::pin(clone!(rn; async move { rn.receive(&rcvd).await })))
spawner(Box::pin(clone!(spawner; async move {
let mut streams = stream_select! { in_recv.map(Some), exit_recv.map(|_| None) };
while let Some(item) = streams.next().await {
match item {
Some(rcvd) => spawner(Box::pin(clone!(rn; async move { rn.receive(&rcvd[..]).await }))),
None => break,
}
}
})));
ExtInit {
header: ext_header,
port: Box::new(ExtensionOwner {
out_recv,
out_send,
_interner_cell: interner_cell,
_systems_lock: systems_lock,
}),
}
}

View File

@@ -133,8 +133,6 @@ impl OwnedAtom for Lambda {
}
mod expr_func_derives {
use std::future::Future;
use orchid_base::error::OrcRes;
use super::ExprFunc;
@@ -147,9 +145,9 @@ mod expr_func_derives {
paste::paste!{
impl<
$($t: TryFromExpr, )*
Fut: Future<Output: ToExpr>,
Func: Fn($($t,)*) -> Fut + Clone + Send + Sync + 'static
> ExprFunc<($($t,)*), Fut::Output> for Func {
Out: ToExpr,
Func: AsyncFn($($t,)*) -> Out + Clone + Send + Sync + 'static
> ExprFunc<($($t,)*), Out> for Func {
const ARITY: u8 = $arity;
async fn apply(&self, v: Vec<Expr>) -> OrcRes<GExpr> {
assert_eq!(v.len(), Self::ARITY.into(), "Arity mismatch");

View File

@@ -100,9 +100,9 @@ pub fn bot(ev: impl IntoIterator<Item = OrcErr>) -> GExpr {
inherit(GExprKind::Bottom(OrcErrv::new(ev).unwrap()))
}
pub fn with<I: TryFromExpr, Fut: Future<Output: ToExpr>>(
pub fn with<I: TryFromExpr, O: ToExpr>(
expr: GExpr,
cont: impl Fn(I) -> Fut + Clone + Send + Sync + 'static,
cont: impl AsyncFn(I) -> O + Clone + Send + Sync + 'static,
) -> GExpr {
call([lambda(0, [seq([arg(0), call([Lambda::new(cont).to_expr(), arg(0)])])]), expr])
}

View File

@@ -16,4 +16,5 @@ pub mod other_system;
pub mod parser;
pub mod system;
pub mod system_ctor;
pub mod tokio;
pub mod tree;

View File

@@ -43,13 +43,13 @@ impl<'a> RuleCtx<'a> {
pub async fn recurse(&mut self, tree: &[MTree<'a, Never>]) -> OrcRes<Vec<MTree<'a, Never>>> {
let req = api::RunMacros {
run_id: self.run_id,
query: mtreev_to_api(tree, &mut |b| match *b {}).await,
query: mtreev_to_api(tree, &mut async |b| match *b {}).await,
};
let Some(treev) = self.sys.get::<ReqNot<ExtMsgSet>>().request(req).await else {
return Err(err_cascade(self.sys.i()).await.into());
};
static ATOM_MSG: &str = "Returned atom from Rule recursion";
Ok(mtreev_from_api(&treev, self.sys.i(), &mut |_| panic!("{ATOM_MSG}")).await)
Ok(mtreev_from_api(&treev, self.sys.i(), &mut async |_| panic!("{ATOM_MSG}")).await)
}
pub fn getv(&mut self, key: &Tok<String>) -> Vec<MTree<'a, Never>> {
self.args.remove(key).expect("Key not found")
@@ -83,7 +83,7 @@ impl Rule {
}))
.await,
location: api::Location::Inherit,
pattern: mtreev_to_api(&self.pattern, &mut |b| match *b {}).await,
pattern: mtreev_to_api(&self.pattern, &mut async |b| match *b {}).await,
id: ctx.with_rule(Rc::new(self)),
}
}

View File

@@ -0,0 +1,54 @@
use crate::entrypoint::ExtensionData;
#[cfg(feature = "tokio")]
pub async fn tokio_main(data: ExtensionData) {
use std::future::Future;
use std::io::Write;
use std::mem;
use std::pin::{Pin, pin};
use std::rc::Rc;
use async_std::io;
use async_stream::stream;
use futures::future::LocalBoxFuture;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, stream, stream_select};
use orchid_api_traits::{Decode, Encode};
use orchid_base::clone;
use tokio::task::{LocalSet, spawn_local};
use crate::api;
use crate::entrypoint::extension_init;
use crate::msg::{recv_parent_msg, send_parent_msg};
let local_set = LocalSet::new();
local_set.spawn_local(async {
let host_header = api::HostHeader::decode(Pin::new(&mut async_std::io::stdin())).await;
let init =
Rc::new(extension_init(data, host_header, Rc::new(|fut| mem::drop(spawn_local(fut)))));
let mut buf = Vec::new();
init.header.encode(Pin::new(&mut buf)).await;
std::io::stdout().write_all(&buf).unwrap();
std::io::stdout().flush().unwrap();
// These are concurrent processes that never exit, so if the FuturesUnordered
// produces any result the extension should exit
let mut io = FuturesUnordered::<LocalBoxFuture<()>>::new();
io.push(Box::pin(async {
loop {
match recv_parent_msg().await {
Ok(msg) => init.send(&msg[..]).await,
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => break,
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => panic!("{e}"),
}
}
}));
io.push(Box::pin(async {
while let Some(msg) = init.recv().await {
send_parent_msg(&msg[..]).await.unwrap();
}
}));
io.next().await;
});
local_set.await;
}

View File

@@ -1,4 +1,3 @@
use std::future::Future;
use std::num::NonZero;
use std::ops::Range;
use std::rc::Rc;
@@ -133,9 +132,7 @@ trait_set! {
}
pub struct LazyMemberFactory(Box<dyn LazyMemberCallback>);
impl LazyMemberFactory {
pub fn new<F: Future<Output = MemKind> + 'static>(
cb: impl FnOnce(Sym, SysCtx) -> F + Clone + 'static,
) -> Self {
pub fn new(cb: impl AsyncFnOnce(Sym, SysCtx) -> MemKind + Clone + 'static) -> Self {
Self(Box::new(|s, ctx| cb(s, ctx).boxed_local()))
}
pub async fn build(self, path: Sym, ctx: SysCtx) -> MemKind { (self.0)(path, ctx).await }