Lexer test mode works

This commit is contained in:
2024-08-04 23:24:32 +02:00
parent 9d35ba8040
commit 11951ede43
36 changed files with 687 additions and 115 deletions

View File

@@ -1,6 +1,13 @@
use std::io::Write as _;
use orchid_api::logging::Log;
use orchid_base::logging::Logger;
use orchid_base::msg::{recv_msg, send_msg};
use substack::{Stackframe, Substack};
use std::collections::VecDeque;
use std::io::{stderr, BufRead, BufReader, Write as _};
use std::num::NonZero;
use std::ops::Deref;
use std::path::PathBuf;
use std::process::ChildStdin;
use std::sync::atomic::{AtomicU16, AtomicU32, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
@@ -11,7 +18,7 @@ use hashbrown::hash_map::Entry;
use hashbrown::HashMap;
use itertools::Itertools;
use lazy_static::lazy_static;
use orchid_api::atom::{Atom, AtomDrop, AtomSame, CallRef, FinalCall, Fwd, Fwded};
use orchid_api::atom::{Atom, AtomDrop, AtomPrint, AtomSame, CallRef, FinalCall, Fwd, Fwded};
use orchid_api::error::ProjResult;
use orchid_api::expr::{Acquire, Expr, ExprNotif, ExprTicket, Release, Relocate};
use orchid_api::interner::IntReq;
@@ -83,6 +90,7 @@ impl AtomHand {
self.0.owner.reqnot().request(Fwded(self.0.api_ref(), req))
}
pub fn api_ref(&self) -> Atom { self.0.api_ref() }
pub fn print(&self) -> String { self.0.owner.reqnot().request(AtomPrint(self.0.api_ref())) }
}
/// Data held about an Extension. This is refcounted within [Extension]. It's
@@ -92,11 +100,16 @@ impl AtomHand {
#[derive(destructure)]
pub struct ExtensionData {
child: Mutex<process::Child>,
child_stdin: Mutex<ChildStdin>,
reqnot: ReqNot<HostMsgSet>,
systems: Vec<SystemCtor>,
logger: Logger,
}
impl Drop for ExtensionData {
fn drop(&mut self) { self.reqnot.notify(HostExtNotif::Exit) }
fn drop(&mut self) {
self.reqnot.notify(HostExtNotif::Exit);
self.child.lock().unwrap().wait().expect("Extension exited with error");
}
}
fn acq_expr(sys: SysId, extk: ExprTicket) {
@@ -115,27 +128,46 @@ fn rel_expr(sys: SysId, extk: ExprTicket) {
#[derive(Clone)]
pub struct Extension(Arc<ExtensionData>);
impl Extension {
pub fn new(mut cmd: process::Command) -> io::Result<Self> {
let mut child = cmd.stdin(process::Stdio::piped()).stdout(process::Stdio::piped()).spawn()?;
HostHeader.encode(child.stdin.as_mut().unwrap());
let eh = ExtensionHeader::decode(child.stdout.as_mut().unwrap());
Ok(Self(Arc::new_cyclic(|weak| ExtensionData {
pub fn new(mut cmd: process::Command, logger: Logger) -> io::Result<Self> {
let mut child = cmd
.stdin(process::Stdio::piped())
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.spawn()?;
let mut child_stdin = child.stdin.take().unwrap();
let mut child_stdout = child.stdout.take().unwrap();
let child_stderr = child.stderr.take().unwrap();
thread::Builder::new().name("stderr forwarder".to_string()).spawn(|| {
let mut reader = BufReader::new(child_stderr);
loop {
let mut buf = String::new();
if 0 == reader.read_line(&mut buf).unwrap() {
break;
}
stderr().write_all(buf.as_bytes()).unwrap();
}
}).unwrap();
HostHeader{ log_strategy: logger.strat() }.encode(&mut child_stdin);
let eh = ExtensionHeader::decode(&mut child_stdout);
let ret = Arc::new_cyclic(|weak: &Weak<ExtensionData>| ExtensionData {
logger,
child: Mutex::new(child),
child_stdin: Mutex::new(child_stdin),
reqnot: ReqNot::new(
clone!(weak; move |sfn, _| {
let arc: Arc<ExtensionData> = weak.upgrade().unwrap();
let mut g = arc.child.lock().unwrap();
g.stdin.as_mut().unwrap().write_all(sfn).unwrap();
eprintln!("Downsending {:?}", sfn);
send_msg(&mut *weak.upgrade().unwrap().child_stdin.lock().unwrap(), sfn).unwrap();
}),
|notif, _| match notif {
clone!(weak; move |notif, _| match notif {
ExtHostNotif::ExprNotif(ExprNotif::Acquire(Acquire(sys, extk))) => acq_expr(sys, extk),
ExtHostNotif::ExprNotif(ExprNotif::Release(Release(sys, extk))) => rel_expr(sys, extk),
ExtHostNotif::ExprNotif(ExprNotif::Relocate(Relocate { dec, inc, expr })) => {
acq_expr(inc, expr);
rel_expr(dec, expr);
},
ExtHostNotif::AdviseSweep(_advice) => eprintln!("Sweep advice is unsupported")
},
ExtHostNotif::AdviseSweep(_advice) => eprintln!("Sweep advice is unsupported"),
ExtHostNotif::Log(Log(str)) => weak.upgrade().unwrap().logger.log(str),
}),
|req| match req.req() {
ExtHostReq::Ping(ping) => req.handle(ping, &()),
ExtHostReq::IntReq(IntReq::InternStr(s)) => req.handle(s, &intern(&**s.0).marker()),
@@ -160,7 +192,19 @@ impl Extension {
},
),
systems: eh.systems.into_iter().map(|decl| SystemCtor { decl, ext: weak.clone() }).collect(),
})))
});
let weak = Arc::downgrade(&ret);
let prog_pbuf = PathBuf::from(cmd.get_program());
let prog = prog_pbuf.file_stem().unwrap_or(cmd.get_program()).to_string_lossy();
thread::Builder::new().name(format!("host-end:{}", prog)).spawn(move || {
loop {
let ingress = recv_msg(&mut child_stdout).expect("could not receive");
if let Some(sys) = weak.upgrade() {
sys.reqnot.receive(ingress);
}
}
}).unwrap();
Ok(Self(ret))
}
pub fn systems(&self) -> impl Iterator<Item = &SystemCtor> { self.0.systems.iter() }
}
@@ -293,3 +337,54 @@ impl Deref for System {
type Target = SystemInstData;
fn deref(&self) -> &Self::Target { self.0.as_ref() }
}
#[derive(Debug, Clone)]
pub enum SysResolvErr {
Loop(Vec<String>),
Missing(String)
}
pub fn init_systems(tgts: &[String], exts: &[Extension]) -> Result<Vec<System>, SysResolvErr> {
let mut to_load = HashMap::<&str, &SystemCtor>::new();
let mut to_find = tgts.iter().map(|s| s.as_str()).collect::<VecDeque::<&str>>();
while let Some(target) = to_find.pop_front() {
if to_load.contains_key(target) {
continue;
}
let ctor = (exts.iter())
.flat_map(|e| e.systems().filter(|c| c.decl.name == target))
.max_by_key(|c| c.decl.priority)
.ok_or_else(|| SysResolvErr::Missing(target.to_string()))?;
to_load.insert(target, ctor);
to_find.extend(ctor.decl.depends.iter().map(|s| s.as_str()));
}
let mut to_load_ordered = Vec::new();
fn walk_deps<'a>(
graph: &mut HashMap::<&str, &'a SystemCtor>,
list: &mut Vec<&'a SystemCtor>,
chain: Stackframe<&str>
) -> Result<(), SysResolvErr> {
if let Some(ctor) = graph.remove(chain.item) {
// if the above is none, the system is already queued. Missing systems are detected above
for dep in ctor.decl.depends.iter() {
if Substack::Frame(chain).iter().any(|c| c == dep) {
let mut circle = vec![dep.to_string()];
circle.extend(Substack::Frame(chain).iter().map(|s| s.to_string()));
return Err(SysResolvErr::Loop(circle))
}
walk_deps(graph, list, Substack::Frame(chain).new_frame(dep))?
}
list.push(ctor);
}
Ok(())
}
for tgt in tgts {
walk_deps(&mut to_load, &mut to_load_ordered, Substack::Bottom.new_frame(tgt))?;
}
let mut systems = HashMap::<&str, System>::new();
for ctor in to_load_ordered.iter() {
let sys = ctor.run(ctor.depends().map(|n| &systems[n]));
systems.insert(ctor.name(), sys);
}
Ok(systems.into_values().collect_vec())
}