Decided to eradicate Send/Sync dependence, broke everything as a result. Should resume from interner

This commit is contained in:
2025-01-20 22:22:33 +01:00
parent 5859b41a7c
commit 1974c69019
25 changed files with 733 additions and 784 deletions

View File

@@ -8,6 +8,7 @@ use async_std::channel::{Receiver, Sender};
use async_std::sync::Mutex;
use futures::FutureExt;
use futures::future::LocalBoxFuture;
use futures::task::LocalSpawn;
use hashbrown::HashMap;
use itertools::Itertools;
use orchid_api_traits::{Decode, Encode, enc_vec};
@@ -25,7 +26,7 @@ use substack::Substack;
use crate::api;
use crate::atom::{AtomCtx, AtomDynfo};
use crate::atom_owned::OBJ_STORE;
use crate::atom_owned::ObjStore;
use crate::fs::VirtFS;
use crate::lexer::{LexContext, err_cascade, err_not_applicable};
use crate::macros::{RuleCtx, apply_rule};
@@ -45,11 +46,11 @@ impl ExtensionData {
pub fn new(name: &'static str, systems: &'static [&'static dyn DynSystemCtor]) -> Self {
Self { name, systems }
}
pub fn main(self) { extension_main(self) }
// pub fn main(self) { extension_main(self) }
}
pub enum MemberRecord {
Gen(Sym, LazyMemberFactory),
Gen(Vec<Tok<String>>, LazyMemberFactory),
Res,
}
@@ -64,12 +65,7 @@ pub async fn with_atom_record<T>(
get_sys_ctx: &impl Fn(api::SysId, ReqNot<api::ExtMsgSet>) -> SysCtx,
reqnot: ReqNot<api::ExtMsgSet>,
atom: &api::Atom,
cb: impl for<'c> FnOnce(
Box<dyn AtomDynfo>,
SysCtx,
api::AtomId,
&'c [u8],
) -> LocalBoxFuture<'c, T>,
cb: impl for<'c> FnOnce(Box<dyn AtomDynfo>, SysCtx, api::AtomId, &'c [u8]) -> LocalBoxFuture<'c, T>,
) -> T {
let mut data = &atom.data[..];
let ctx = get_sys_ctx(atom.owner, reqnot);
@@ -79,17 +75,18 @@ pub async fn with_atom_record<T>(
cb(atom_record, ctx, id, data).await
}
pub fn extension_main(data: ExtensionData) {
if thread::Builder::new()
.name(format!("ext-main:{}", data.name))
.spawn(|| extension_main_logic(data))
.unwrap()
.join()
.is_err()
{
process::exit(-1)
}
}
// pub fn extension_main(data: ExtensionData) {
// if thread::Builder::new()
// .name(format!("ext-main:{}", data.name))
// .spawn(|| extension_main_logic(data))
// .unwrap()
// .join()
// .is_err()
// {
// process::exit(-1)
// }
// }
pub struct ExtensionOwner {
rn: ReqNot<api::ExtMsgSet>,
@@ -109,7 +106,7 @@ impl ExtPort for ExtensionOwner {
}
}
fn extension_main_logic(data: ExtensionData) {
async fn extension_main_logic(data: ExtensionData, spawner: Arc<dyn LocalSpawn>) {
let api::HostHeader { log_strategy } = api::HostHeader::decode(&mut std::io::stdin().lock());
let mut buf = Vec::new();
let decls = (data.systems.iter().enumerate())
@@ -122,11 +119,12 @@ fn extension_main_logic(data: ExtensionData) {
std::io::stdout().flush().unwrap();
let exiting = Arc::new(AtomicBool::new(false));
let logger = Arc::new(Logger::new(log_strategy));
let obj_store = ObjStore::default();
let mk_ctx = clone!(
logger, systems;
move |id: api::SysId, reqnot: ReqNot<api::ExtMsgSet>| async {
let cted = systems.lock().await[&id].cted.clone();
SysCtx { id, cted, logger: logger.clone(), reqnot }
SysCtx { id, cted, logger, reqnot, spawner, obj_store }
}.boxed_local()
);
let rn = ReqNot::<api::ExtMsgSet>::new(
@@ -140,200 +138,209 @@ fn extension_main_logic(data: ExtensionData) {
api::HostExtNotif::SystemDrop(api::SystemDrop(sys_id)) =>
mem::drop(systems.lock().await.remove(&sys_id)),
api::HostExtNotif::AtomDrop(api::AtomDrop(sys_id, atom)) =>
OBJ_STORE.get(atom.0).unwrap().remove().dyn_free(mk_ctx(sys_id, reqnot).await),
obj_store.get(atom.0).unwrap().remove().dyn_free(mk_ctx(sys_id, reqnot).await).await,
}
}.boxed_local()),
{
let systems = systems.clone();
let logger = logger.clone();
(move|hand,req|async {
let receipt:Receipt = match req {
api::HostExtReq::Ping(ping@api::Ping) => hand.handle(&ping, &()).await,
api::HostExtReq::Sweep(sweep@api::Sweep) => hand.handle(&sweep, &sweep_replica()).await,
api::HostExtReq::SysReq(api::SysReq::NewSystem(new_sys)) => {
let i = decls.iter().enumerate().find(|(_,s)|s.id==new_sys.system).unwrap().0;
let cted = data.systems[i].new_system(&new_sys);
let mut vfses = HashMap::new();
let lex_filter = cted.inst().dyn_lexers().iter().fold(api::CharFilter(vec![]), |cf,lx|{
let lxcf = mk_char_filter(lx.char_filter().iter().cloned());
char_filter_union(&cf, &lxcf)
});
let mut lazy_mems = HashMap::new();
let ctx = SysCtx {
cted:cted.clone(),id:new_sys.id,logger:logger.clone(),reqnot:hand.reqnot()
};
let mut tia_ctx = TIACtxImpl {
lazy: &mut lazy_mems,sys:ctx.clone(),basepath: &[],path:Substack::Bottom,
};
let const_root = (cted.inst().dyn_env().into_iter()).map(|(k,v)|(k.to_api(),v.into_api(&mut tia_ctx))).collect();
systems.lock().unwrap().insert(new_sys.id,SystemRecord {
declfs:cted.inst().dyn_vfs().to_api_rec(&mut vfses),vfses,cted,lazy_members:lazy_mems
});
hand.handle(&new_sys, &api::SystemInst {
lex_filter,const_root,line_types:vec![]
}).await
}
api::HostExtReq::GetMember(get_tree@api::GetMember(sys_id,tree_id)) => {
let mut systems_g = systems.lock().unwrap();
let sys = systems_g.get_mut(&sys_id).expect("System not found");
let lazy = &mut sys.lazy_members;
let(path,cb) = match lazy.insert(tree_id,MemberRecord::Res){
None => panic!("Tree for ID not found"),
Some(MemberRecord::Res) => panic!("This tree has already been transmitted"),
Some(MemberRecord::Gen(path,cb)) => (path,cb),
};
let tree = cb.build(path.clone());
hand.handle(&get_tree, &tree.into_api(&mut TIACtxImpl {
sys:SysCtx::new(sys_id, &sys.cted, &logger,hand.reqnot()),path:Substack::Bottom,basepath: &path,lazy,
})).await
}
api::HostExtReq::VfsReq(api::VfsReq::GetVfs(get_vfs@api::GetVfs(sys_id))) => {
let systems_g = systems.lock().unwrap();
hand.handle(&get_vfs, &systems_g[&sys_id].declfs).await
}
api::HostExtReq::SysReq(api::SysReq::SysFwded(fwd)) => {
let api::SysFwded(sys_id,payload) = fwd;
let ctx = mk_ctx(sys_id,hand.reqnot());
let sys = ctx.cted.inst();
sys.dyn_request(hand,payload)
}
api::HostExtReq::VfsReq(api::VfsReq::VfsRead(vfs_read)) => {
let api::VfsRead(sys_id,vfs_id,path) = &vfs_read;
let systems_g = systems.lock().unwrap();
let path = path.iter().map(|t|Tok::from_api(*t)).collect_vec();
hand.handle(&vfs_read, &systems_g[sys_id].vfses[vfs_id].load(PathSlice::new(&path))).await
}
api::HostExtReq::LexExpr(lex@api::LexExpr {
sys,text,pos,id
}) => {
let systems_g = systems.lock().unwrap();
let lexers = systems_g[&sys].cted.inst().dyn_lexers();
mem::drop(systems_g);
let text = Tok::from_api(text);
let ctx = LexContext {
sys,id,pos,reqnot:hand.reqnot(),text: &text
};
let trigger_char = text.await.chars().nth(pos as usize).unwrap();
for lx in lexers.iter().filter(|l|char_filter_match(l.char_filter(),trigger_char)){
match lx.lex(&text[pos as usize..], &ctx){
Err(e)if e.any(|e| *e==err_not_applicable()) => continue,
Err(e) => {
let eopt = e.keep_only(|e| *e!=err_cascade()).map(|e|Err(e.to_api()));
return hand.handle(&lex, &eopt)
},
Ok((s,expr)) => {
let ctx = mk_ctx(sys,hand.reqnot());
let expr = expr.to_api(&mut |f,r|do_extra(f,r,ctx.clone()));
let pos = (text.len()-s.len())as u32;
return hand.handle(&lex, &Some(Ok(api::LexedExpr {
pos,expr
})))
}
}
}writeln!(logger,"Got notified about n/a character '{trigger_char}'");
hand.handle(&lex, &None).await
},
api::HostExtReq::ParseLine(pline) => {
let api::ParseLine {
exported,comments,sys,line
} = &pline;
let mut ctx = mk_ctx(*sys,hand.reqnot());
let parsers = ctx.cted.inst().dyn_parsers();
let comments = comments.iter().map(Comment::from_api).collect();
let line:Vec<GenTokTree> = ttv_from_api(line, &mut ctx);
let snip = Snippet::new(line.first().expect("Empty line"), &line);
let(head,tail) = snip.pop_front().unwrap();
let name = if let GenTok::Name(n) = &head.tok {
n
}else {
panic!("No line head")
};
let parser = parsers.iter().find(|p|p.line_head()== **name).expect("No parser candidate");
let o_line = match parser.parse(*exported,comments,tail){
Err(e) => Err(e.to_api()),
Ok(t) => Ok(ttv_to_api(t, &mut |f,range|{
api::TokenTree {
range,token:api::Token::Atom(f.clone().build(ctx.clone()))
}
})),
};
hand.handle(&pline, &o_line).await
}
api::HostExtReq::AtomReq(atom_req) => {
let atom = atom_req.get_atom();
with_atom_record(&mk_ctx,hand.reqnot(),atom, |nfo,ctx,id,buf|async {
let actx = AtomCtx(buf,atom.drop,ctx.clone());
match&atom_req {
api::AtomReq::SerializeAtom(ser) => {
let mut buf = enc_vec(&id);
let refs_opt = nfo.serialize(actx, &mut buf);
hand.handle(ser, &refs_opt.map(|refs|(buf,refs))),await
}
api::AtomReq::AtomPrint(print@api::AtomPrint(_)) => hand.handle(print, &nfo.print(actx)).await,
api::AtomReq::Fwded(fwded) => {
let api::Fwded(_,key,payload) = &fwded;
let mut reply = Vec::new();
let some = nfo.handle_req(actx,Sym::from_api(*key), &mut &payload[..], &mut reply);
hand.handle(fwded, &some.then_some(reply)).await
}
api::AtomReq::CallRef(call@api::CallRef(_,arg)) => {
let ret = nfo.call_ref(actx, *arg);
hand.handle(call, &ret.api_return(ctx.clone(), &mut |h|hand.defer_drop(h))).await
},
api::AtomReq::FinalCall(call@api::FinalCall(_,arg)) => {
let ret = nfo.call(actx, *arg);
hand.handle(call, &ret.api_return(ctx.clone(), &mut |h|hand.defer_drop(h))).await
}
api::AtomReq::Command(cmd@api::Command(_)) => {
hand.handle(cmd, &match nfo.command(actx){
Err(e) => Err(e.to_api()),
Ok(opt) => Ok(match opt {
None => api::NextStep::Halt,
Some(cont) => api::NextStep::Continue(cont.api_return(ctx.clone(), &mut |h|hand.defer_drop(h))),
})
}).await
}
}
}).await
},
api::HostExtReq::DeserAtom(deser) => {
let api::DeserAtom(sys,buf,refs) = &deser;
let mut read = &mut &buf[..];
let ctx = mk_ctx(*sys,hand.reqnot());
let id = api::AtomId::decode(&mut read);
let inst = ctx.cted.inst();
let nfo = atom_by_idx(inst.card(),id).expect("Deserializing atom with invalid ID");
hand.handle(&deser, &nfo.deserialize(ctx.clone(),read,refs)).await
},
orchid_api::HostExtReq::ApplyMacro(am) => {
let tok = hand.will_handle_as(&am);
let sys_ctx = mk_ctx(am.sys,hand.reqnot());
let ctx = RuleCtx {
args:(am.params.into_iter()).map(|(k,v)|(Tok::from_api(k),mtreev_from_api(&v, &mut |_|panic!("No atom in macro prompt!")))).collect(),run_id:am.run_id,sys:sys_ctx.clone(),
};
hand.handle_as(tok, &match apply_rule(am.id,ctx){
Err(e) => e.keep_only(|e| *e!=err_cascade()).map(|e|Err(e.to_api())),
Ok(t) => Some(Ok(mtreev_to_api(&t, &mut |a|{
api::MacroToken::Atom(a.clone().build(sys_ctx.clone()))
}))),
}).await
}
};
receipt
}.boxed_local())
},
let systems = systems.clone();
let logger = logger.clone();
(move |hand, req| {
async {
let receipt: Receipt = match req {
api::HostExtReq::Ping(ping @ api::Ping) => hand.handle(&ping, &()).await,
_ => panic!(),
// api::HostExtReq::Sweep(sweep@api::Sweep) => hand.handle(&sweep,
// &sweep_replica()).await,
// api::HostExtReq::SysReq(api::SysReq::NewSystem(new_sys)) => {
// let i = decls.iter().enumerate().find(|(_,s)|s.id==new_sys.system).unwrap().0;
// let cted = data.systems[i].new_system(&new_sys);
// let mut vfses = HashMap::new();
// let lex_filter = cted.inst().dyn_lexers().iter().fold(api::CharFilter(vec![]),
// |cf,lx|{ let lxcf = mk_char_filter(lx.char_filter().iter().cloned());
// char_filter_union(&cf, &lxcf)
// });
// let mut lazy_mems = HashMap::new();
// let ctx = SysCtx {
// cted:cted.clone(),id:new_sys.id,logger:logger.clone(),reqnot:hand.reqnot()
// };
// let mut tia_ctx = TIACtxImpl {
// lazy: &mut lazy_mems,sys:ctx.clone(),basepath: &[],path:Substack::Bottom,
// };
// let const_root =
// (cted.inst().dyn_env().into_iter()).map(|(k,v)|(k.to_api(),v.into_api(&mut
// tia_ctx))).collect(); systems.lock().unwrap().insert(new_sys.id,
// SystemRecord { declfs:cted.inst().dyn_vfs().to_api_rec(&mut
// vfses),vfses,cted,lazy_members:lazy_mems });
// hand.handle(&new_sys, &api::SystemInst {
// lex_filter,const_root,line_types:vec![]
// }).await
// }
// api::HostExtReq::GetMember(get_tree@api::GetMember(sys_id,tree_id)) => {
// let mut systems_g = systems.lock().unwrap();
// let sys = systems_g.get_mut(&sys_id).expect("System not found");
// let lazy = &mut sys.lazy_members;
// let(path,cb) = match lazy.insert(tree_id,MemberRecord::Res){
// None => panic!("Tree for ID not found"),
// Some(MemberRecord::Res) => panic!("This tree has already been transmitted"),
// Some(MemberRecord::Gen(path,cb)) => (path,cb),
// };
// let tree = cb.build(path.clone());
// hand.handle(&get_tree, &tree.into_api(&mut TIACtxImpl {
// sys:SysCtx::new(sys_id, &sys.cted,
// &logger,hand.reqnot()),path:Substack::Bottom,basepath: &path,lazy,
// })).await
// }
// api::HostExtReq::VfsReq(api::VfsReq::GetVfs(get_vfs@api::GetVfs(sys_id))) => {
// let systems_g = systems.lock().unwrap();
// hand.handle(&get_vfs, &systems_g[&sys_id].declfs).await
// }
// api::HostExtReq::SysReq(api::SysReq::SysFwded(fwd)) => {
// let api::SysFwded(sys_id,payload) = fwd;
// let ctx = mk_ctx(sys_id,hand.reqnot());
// let sys = ctx.cted.inst();
// sys.dyn_request(hand,payload)
// }
// api::HostExtReq::VfsReq(api::VfsReq::VfsRead(vfs_read)) => {
// let api::VfsRead(sys_id,vfs_id,path) = &vfs_read;
// let systems_g = systems.lock().unwrap();
// let path = path.iter().map(|t|Tok::from_api(*t)).collect_vec();
// hand.handle(&vfs_read,
// &systems_g[sys_id].vfses[vfs_id].load(PathSlice::new(&path))).await }
// api::HostExtReq::LexExpr(lex@api::LexExpr {
// sys,text,pos,id
// }) => {
// let systems_g = systems.lock().unwrap();
// let lexers = systems_g[&sys].cted.inst().dyn_lexers();
// mem::drop(systems_g);
// let text = Tok::from_api(text);
// let ctx = LexContext {
// sys,id,pos,reqnot:hand.reqnot(),text: &text
// };
// let trigger_char = text.await.chars().nth(pos as usize).unwrap();
// for lx in
// lexers.iter().filter(|l|char_filter_match(l.char_filter(),trigger_char)){
// match lx.lex(&text[pos as usize..], &ctx){
// Err(e)if e.any(|e| *e==err_not_applicable()) => continue,
// Err(e) => {
// let eopt = e.keep_only(|e|
// *e!=err_cascade()).map(|e|Err(e.to_api())); return
// hand.handle(&lex, &eopt) },
// Ok((s,expr)) => {
// let ctx = mk_ctx(sys,hand.reqnot());
// let expr = expr.to_api(&mut |f,r|do_extra(f,r,ctx.clone()));
// let pos = (text.len()-s.len())as u32;
// return hand.handle(&lex, &Some(Ok(api::LexedExpr {
// pos,expr
// })))
// }
// }
// }writeln!(logger,"Got notified about n/a character '{trigger_char}'");
// hand.handle(&lex, &None).await
// },
// api::HostExtReq::ParseLine(pline) => {
// let api::ParseLine {
// exported,comments,sys,line
// } = &pline;
// let mut ctx = mk_ctx(*sys,hand.reqnot());
// let parsers = ctx.cted.inst().dyn_parsers();
// let comments = comments.iter().map(Comment::from_api).collect();
// let line:Vec<GenTokTree> = ttv_from_api(line, &mut ctx);
// let snip = Snippet::new(line.first().expect("Empty line"), &line);
// let(head,tail) = snip.pop_front().unwrap();
// let name = if let GenTok::Name(n) = &head.tok {
// n
// }else {
// panic!("No line head")
// };
// let parser = parsers.iter().find(|p|p.line_head()== **name).expect("No parser
// candidate"); let o_line = match parser.parse(*exported,comments,tail){
// Err(e) => Err(e.to_api()),
// Ok(t) => Ok(ttv_to_api(t, &mut |f,range|{
// api::TokenTree {
// range,token:api::Token::Atom(f.clone().build(ctx.clone()))
// }
// })),
// };
// hand.handle(&pline, &o_line).await
// }
// api::HostExtReq::AtomReq(atom_req) => {
// let atom = atom_req.get_atom();
// with_atom_record(&mk_ctx,hand.reqnot(),atom, |nfo,ctx,id,buf|async {
// let actx = AtomCtx(buf,atom.drop,ctx.clone());
// match&atom_req {
// api::AtomReq::SerializeAtom(ser) => {
// let mut buf = enc_vec(&id);
// let refs_opt = nfo.serialize(actx, &mut buf);
// hand.handle(ser, &refs_opt.map(|refs|(buf,refs))),await
// }
// api::AtomReq::AtomPrint(print@api::AtomPrint(_)) => hand.handle(print,
// &nfo.print(actx)).await, api::AtomReq::Fwded(fwded) => {
// let api::Fwded(_,key,payload) = &fwded;
// let mut reply = Vec::new();
// let some = nfo.handle_req(actx,Sym::from_api(*key), &mut
// &payload[..], &mut reply); hand.handle(fwded,
// &some.then_some(reply)).await }
// api::AtomReq::CallRef(call@api::CallRef(_,arg)) => {
// let ret = nfo.call_ref(actx, *arg);
// hand.handle(call, &ret.api_return(ctx.clone(), &mut
// |h|hand.defer_drop(h))).await },
// api::AtomReq::FinalCall(call@api::FinalCall(_,arg)) => {
// let ret = nfo.call(actx, *arg);
// hand.handle(call, &ret.api_return(ctx.clone(), &mut
// |h|hand.defer_drop(h))).await }
// api::AtomReq::Command(cmd@api::Command(_)) => {
// hand.handle(cmd, &match nfo.command(actx){
// Err(e) => Err(e.to_api()),
// Ok(opt) => Ok(match opt {
// None => api::NextStep::Halt,
// Some(cont) =>
// api::NextStep::Continue(cont.api_return(ctx.clone(), &mut |h|hand.defer_drop(h))),
// })
// }).await
// }
// }
// }).await
// },
// api::HostExtReq::DeserAtom(deser) => {
// let api::DeserAtom(sys,buf,refs) = &deser;
// let mut read = &mut &buf[..];
// let ctx = mk_ctx(*sys,hand.reqnot());
// let id = api::AtomId::decode(&mut read);
// let inst = ctx.cted.inst();
// let nfo = atom_by_idx(inst.card(),id).expect("Deserializing atom with invalid
// ID"); hand.handle(&deser, &nfo.deserialize(ctx.clone(),read,refs)).await
// },
// orchid_api::HostExtReq::ApplyMacro(am) => {
// let tok = hand.will_handle_as(&am);
// let sys_ctx = mk_ctx(am.sys,hand.reqnot());
// let ctx = RuleCtx {
// args:(am.params.into_iter()).map(|(k,v)|(Tok::from_api(k),mtreev_from_api(&v,
// &mut |_|panic!("No atom in macro
// prompt!")))).collect(),run_id:am.run_id,sys:sys_ctx.clone(), };
// hand.handle_as(tok, &match apply_rule(am.id,ctx){
// Err(e) => e.keep_only(|e| *e!=err_cascade()).map(|e|Err(e.to_api())),
// Ok(t) => Some(Ok(mtreev_to_api(&t, &mut |a|{
// api::MacroToken::Atom(a.clone().build(sys_ctx.clone()))
// }))),
// }).await
// }
};
receipt
}
.boxed_local()
})
},
);
init_replica(rn.clone().map());
while !exiting.load(Ordering::Relaxed) {
let rcvd = recv_parent_msg().unwrap();
rn.receive(&rcvd)
let rcvd = recv_parent_msg().await.unwrap();
rn.receive(&rcvd).await
}
}