use std::cell::RefCell; use std::future::Future; use std::num::NonZero; use std::pin::Pin; use std::rc::Rc; use std::{io, mem}; use futures::future::{LocalBoxFuture, join_all}; use futures::lock::Mutex; use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, StreamExt, stream}; use hashbrown::HashMap; use itertools::Itertools; use orchid_api::{ExtHostNotif, ExtHostReq}; use orchid_api_traits::{Decode, Encode, Request, UnderRoot, enc_vec}; use orchid_base::char_filter::{char_filter_match, char_filter_union, mk_char_filter}; use orchid_base::error::try_with_reporter; use orchid_base::interner::{es, is, with_interner}; use orchid_base::logging::{log, with_logger}; use orchid_base::name::Sym; use orchid_base::parse::{Comment, Snippet}; use orchid_base::reqnot::{ Client, ClientExt, CommCtx, MsgReader, MsgReaderExt, Receipt, RepWriter, ReqHandle, ReqHandleExt, ReqReader, ReqReaderExt, Witness, io_comm, }; use orchid_base::stash::with_stash; use orchid_base::tree::{TokenVariant, ttv_from_api}; use substack::Substack; use task_local::task_local; use crate::api; use crate::atom::{AtomCtx, AtomTypeId, resolve_atom_type}; use crate::atom_owned::{take_atom, with_obj_store}; use crate::expr::{BorrowedExprStore, Expr, ExprHandle}; use crate::ext_port::ExtPort; use crate::func_atom::with_funs_ctx; use crate::interner::new_interner; use crate::lexer::{LexContext, ekey_cascade, ekey_not_applicable}; use crate::logger::LoggerImpl; use crate::parser::{PTokTree, ParsCtx, get_const, linev_into_api, with_parsed_const_ctx}; use crate::reflection::with_refl_roots; use crate::system::{SysCtx, atom_by_idx, cted, with_sys}; use crate::system_ctor::{CtedObj, DynSystemCtor, SystemCtor}; use crate::tree::{TreeIntoApiCtxImpl, get_lazy, with_lazy_member_store}; task_local::task_local! { static CLIENT: Rc; static CTX: Rc>>; } fn get_client() -> Rc { CLIENT.get() } pub async fn exit() { let cx = CTX.get().borrow_mut().take(); cx.unwrap().exit().await } /// Sent the client used for global [request] and [notify] functions within the /// runtime of this future pub async fn with_comm(c: Rc, ctx: CommCtx, fut: F) -> F::Output { CLIENT.scope(c, CTX.scope(Rc::new(RefCell::new(Some(ctx))), fut)).await } task_local! { pub static MUTE_REPLY: (); } /// Send a request through the global client's [ClientExt::request] pub async fn request>(t: T) -> T::Response { let response = get_client().request(t).await.unwrap(); if MUTE_REPLY.try_with(|b| *b).is_err() { writeln!(log("msg"), "Got response {response:?}").await; } response } /// Send a notification through the global client's [ClientExt::notify] pub async fn notify>(t: T) { get_client().notify(t).await.unwrap() } pub struct SystemRecord { cted: CtedObj, } type SystemTable = RefCell>>; task_local! { static SYSTEM_TABLE: SystemTable; } async fn with_sys_record(id: api::SysId, fut: F) -> F::Output { let cted = SYSTEM_TABLE.with(|tbl| tbl.borrow().get(&id).expect("Invalid sys ID").cted.clone()); with_sys(SysCtx(id, cted), fut).await } pub trait ContextModifier: 'static { fn apply<'a>(self: Box, fut: LocalBoxFuture<'a, ()>) -> LocalBoxFuture<'a, ()>; } impl) + 'static> ContextModifier for F { fn apply<'a>(self: Box, fut: LocalBoxFuture<'a, ()>) -> LocalBoxFuture<'a, ()> { Box::pin((self)(fut)) } } pub struct ExtensionBuilder { pub name: &'static str, pub systems: Vec>, pub context: Vec>, } impl ExtensionBuilder { pub fn new(name: &'static str) -> Self { Self { name, systems: Vec::new(), context: Vec::new() } } pub fn system(mut self, ctor: impl SystemCtor) -> Self { self.systems.push(Box::new(ctor) as Box<_>); self } pub fn add_context(&mut self, fun: impl ContextModifier) { self.context.push(Box::new(fun) as Box<_>); } pub fn context(mut self, fun: impl ContextModifier) -> Self { self.add_context(fun); self } pub fn build(mut self, mut ctx: ExtPort) { self.add_context(with_funs_ctx); self.add_context(with_parsed_const_ctx); self.add_context(with_obj_store); self.add_context(with_lazy_member_store); self.add_context(with_refl_roots); (ctx.spawn)(Box::pin(async move { let host_header = api::HostHeader::decode(ctx.input.as_mut()).await.unwrap(); let decls = (self.systems.iter().enumerate()) .map(|(id, sys)| (u16::try_from(id).expect("more than u16max system ctors"), sys)) .map(|(id, sys)| sys.decl(api::SysDeclId(NonZero::new(id + 1).unwrap()))) .collect_vec(); api::ExtensionHeader { name: self.name.to_string(), systems: decls.clone() } .encode(ctx.output.as_mut()) .await .unwrap(); ctx.output.as_mut().flush().await.unwrap(); let logger1 = LoggerImpl::from_api(&host_header.logger); let logger2 = logger1.clone(); let (client, comm_ctx, extension_srv) = io_comm(Rc::new(Mutex::new(ctx.output)), Mutex::new(ctx.input)); let extension_fut = extension_srv.listen( async |n: Box>| { let notif = n.read().await.unwrap(); match notif { api::HostExtNotif::Exit => exit().await, } Ok(()) }, async |mut reader| { with_stash(async { let req = reader.read_req().await.unwrap(); let handle = reader.finish().await; // Atom printing is never reported because it generates too much // noise if !matches!(req, api::HostExtReq::AtomReq(api::AtomReq::AtomPrint(_))) { writeln!(log("msg"), "{} extension received request {req:?}", self.name).await; } match req { api::HostExtReq::SystemDrop(sys_drop) => { SYSTEM_TABLE.with(|l| l.borrow_mut().remove(&sys_drop.0)); handle.reply(&sys_drop, &()).await }, api::HostExtReq::AtomDrop(atom_drop @ api::AtomDrop(sys_id, atom)) => with_sys_record(sys_id, async { take_atom(atom).await.dyn_free().await; handle.reply(&atom_drop, &()).await }) .await, api::HostExtReq::Ping(ping @ api::Ping) => handle.reply(&ping, &()).await, api::HostExtReq::Sweep(api::Sweep) => todo!(), api::HostExtReq::SysReq(api::SysReq::NewSystem(new_sys)) => { let (ctor_idx, _) = (decls.iter().enumerate().find(|(_, s)| s.id == new_sys.system)) .expect("NewSystem call received for invalid system"); let cted = self.systems[ctor_idx].new_system(&new_sys); let record = Rc::new(SystemRecord { cted: cted.clone() }); SYSTEM_TABLE.with(|tbl| { let mut g = tbl.borrow_mut(); g.insert(new_sys.id, record); }); with_sys_record(new_sys.id, async { let lex_filter = cted.inst().dyn_lexers().iter().fold(api::CharFilter(vec![]), |cf, lx| { char_filter_union(&cf, &mk_char_filter(lx.char_filter().iter().cloned())) }); let const_root = stream::iter(cted.inst().dyn_env().await) .then(async |mem| { let name = is(&mem.name).await; let mut tia_ctx = TreeIntoApiCtxImpl { basepath: &[], path: Substack::Bottom.push(name.clone()), }; (name.to_api(), mem.kind.into_api(&mut tia_ctx).await) }) .collect() .await; let prelude = cted.inst().dyn_prelude().await.iter().map(|sym| sym.to_api()).collect(); let line_types = join_all( (cted.inst().dyn_parsers().iter()) .map(async |p| is(p.line_head()).await.to_api()), ) .await; let response = api::NewSystemResponse { lex_filter, const_root, line_types, prelude }; handle.reply(&new_sys, &response).await }) .await }, api::HostExtReq::GetMember(get_tree @ api::GetMember(sys_id, tree_id)) => with_sys_record(sys_id, async { let (path, tree) = get_lazy(tree_id).await; let mut tia_ctx = TreeIntoApiCtxImpl { path: Substack::Bottom, basepath: &path[..] }; handle.reply(&get_tree, &tree.into_api(&mut tia_ctx).await).await }) .await, api::HostExtReq::SysReq(api::SysReq::SysFwded(fwd)) => { let fwd_tok = Witness::of(&fwd); let api::SysFwded(sys_id, payload) = fwd; with_sys_record(sys_id, async { struct TrivialReqCycle<'a> { req: &'a [u8], rep: &'a mut Vec, } impl<'a> ReqReader<'a> for TrivialReqCycle<'a> { fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { Pin::new(&mut self.req) as Pin<&mut _> } fn finish(self: Box) -> LocalBoxFuture<'a, Box + 'a>> { Box::pin(async { self as Box<_> }) } } impl<'a> ReqHandle<'a> for TrivialReqCycle<'a> { fn start_reply( self: Box, ) -> LocalBoxFuture<'a, io::Result + 'a>>> { Box::pin(async { Ok(self as Box<_>) }) } } impl<'a> RepWriter<'a> for TrivialReqCycle<'a> { fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { Pin::new(&mut self.rep) as Pin<&mut _> } fn finish( self: Box, ) -> LocalBoxFuture<'a, io::Result>> { Box::pin(async { Ok(Receipt::_new()) }) } } let mut reply = Vec::new(); let req = TrivialReqCycle { req: &payload, rep: &mut reply }; let _ = cted().inst().dyn_request(Box::new(req)).await; handle.reply(fwd_tok, &reply).await }) .await }, api::HostExtReq::LexExpr(lex @ api::LexExpr { sys, src, text, pos, id }) => with_sys_record(sys, async { let text = es(text).await; let src = Sym::from_api(src).await; let expr_store = BorrowedExprStore::new(); let trigger_char = text.chars().nth(pos as usize).unwrap(); let ekey_na = ekey_not_applicable().await; let ekey_cascade = ekey_cascade().await; let lexers = cted().inst().dyn_lexers(); for lx in lexers.iter().filter(|l| char_filter_match(l.char_filter(), trigger_char)) { let ctx = LexContext::new(&expr_store, &text, id, pos, src.clone()); match try_with_reporter(lx.lex(&text[pos as usize..], &ctx)).await { Err(e) if e.any(|e| *e == ekey_na) => continue, Err(e) => { let eopt = e.keep_only(|e| *e != ekey_cascade).map(|e| Err(e.to_api())); expr_store.dispose().await; return handle.reply(&lex, &eopt).await; }, Ok((s, expr)) => { let expr = expr.into_api(&mut (), &mut ()).await; let pos = (text.len() - s.len()) as u32; expr_store.dispose().await; return handle.reply(&lex, &Some(Ok(api::LexedExpr { pos, expr }))).await; }, } } writeln!(log("warn"), "Got notified about n/a character '{trigger_char}'").await; expr_store.dispose().await; handle.reply(&lex, &None).await }) .await, api::HostExtReq::ParseLine(pline) => { let api::ParseLine { module, src, exported, comments, sys, line, idx } = &pline; with_sys_record(*sys, async { let parsers = cted().inst().dyn_parsers(); let src = Sym::from_api(*src).await; let comments = join_all(comments.iter().map(|c| Comment::from_api(c, src.clone()))).await; let expr_store = BorrowedExprStore::new(); let line: Vec = ttv_from_api(line, &mut &expr_store, &mut (), &src).await; let snip = Snippet::new(line.first().expect("Empty line"), &line); let parser = parsers[*idx as usize]; let module = Sym::from_api(*module).await; let pctx = ParsCtx::new(module); let o_line = match try_with_reporter(parser.parse(pctx, *exported, comments, snip)).await { Err(e) => Err(e.to_api()), Ok(t) => Ok(linev_into_api(t).await), }; mem::drop(line); expr_store.dispose().await; handle.reply(&pline, &o_line).await }) .await }, api::HostExtReq::FetchParsedConst(ref fpc @ api::FetchParsedConst(sys, id)) => with_sys_record(sys, async { let cnst = get_const(id).await; handle.reply(fpc, &cnst.serialize().await).await }) .await, api::HostExtReq::AtomReq(atom_req) => { let atom = atom_req.get_atom(); with_sys_record(atom.owner, async { let (nfo, id, buf) = resolve_atom_type(atom); let actx = AtomCtx(buf, atom.drop); match &atom_req { api::AtomReq::SerializeAtom(ser) => { let mut buf = enc_vec(&id); match nfo.serialize(actx, Pin::<&mut Vec<_>>::new(&mut buf)).await { None => handle.reply(ser, &None).await, Some(refs) => { let refs = join_all(refs.into_iter().map(async |ex| ex.into_api(&mut ()).await)) .await; handle.reply(ser, &Some((buf, refs))).await }, } }, api::AtomReq::AtomPrint(print @ api::AtomPrint(_)) => handle.reply(print, &nfo.print(actx).await.to_api()).await, api::AtomReq::Fwded(fwded) => { let api::Fwded(_, key, payload) = &fwded; let mut reply = Vec::new(); let key = Sym::from_api(*key).await; let some = nfo .handle_req( actx, key, Pin::<&mut &[u8]>::new(&mut &payload[..]), Pin::<&mut Vec<_>>::new(&mut reply), ) .await; handle.reply(fwded, &some.then_some(reply)).await }, api::AtomReq::CallRef(call @ api::CallRef(_, arg)) => { let expr_store = BorrowedExprStore::new(); let expr_handle = ExprHandle::borrowed(*arg, &expr_store); let ret = nfo.call_ref(actx, Expr::from_handle(expr_handle.clone())).await; let api_expr = ret.serialize().await; mem::drop(expr_handle); expr_store.dispose().await; handle.reply(call, &api_expr).await }, api::AtomReq::FinalCall(call @ api::FinalCall(_, arg)) => { let expr_store = BorrowedExprStore::new(); let expr_handle = ExprHandle::borrowed(*arg, &expr_store); let ret = nfo.call(actx, Expr::from_handle(expr_handle.clone())).await; let api_expr = ret.serialize().await; mem::drop(expr_handle); expr_store.dispose().await; handle.reply(call, &api_expr).await }, api::AtomReq::Command(cmd @ api::Command(_)) => match nfo.command(actx).await { Err(e) => handle.reply(cmd, &Err(e.to_api())).await, Ok(opt) => match opt { None => handle.reply(cmd, &Ok(api::NextStep::Halt)).await, Some(cont) => { let cont = cont.serialize().await; handle.reply(cmd, &Ok(api::NextStep::Continue(cont))).await }, }, }, } }) .await }, api::HostExtReq::DeserAtom(deser) => { let api::DeserAtom(sys, buf, refs) = &deser; let read = &mut &buf[..]; with_sys_record(*sys, async { // SAFETY: deserialization implicitly grants ownership to previously owned exprs let refs = (refs.iter()) .map(|tk| Expr::from_handle(ExprHandle::deserialize(*tk))) .collect_vec(); let id = AtomTypeId::decode_slice(read); let nfo = atom_by_idx(cted().inst().card(), id) .expect("Deserializing atom with invalid ID"); handle.reply(&deser, &nfo.deserialize(read, &refs).await).await }) .await }, } }) .await }, ); // add essential services to the very tail, then fold all context into the run // future SYSTEM_TABLE .scope( RefCell::default(), with_interner( new_interner(), with_logger( logger2, with_comm( Rc::new(client), comm_ctx, (self.context.into_iter()).fold( Box::pin(async { extension_fut.await.unwrap() }) as LocalBoxFuture<()>, |fut, cx| cx.apply(fut), ), ), ), ), ) .await; }) as Pin>); } }