for a moment, everything works

This commit is contained in:
2025-02-21 23:53:31 +01:00
parent 9e7648bc72
commit cfa8b6ee52
23 changed files with 491 additions and 371 deletions

View File

@@ -35,7 +35,7 @@ pub struct ReqPair<R: Request>(R, Sender<R::Response>);
#[derive(destructure)]
pub struct ExtensionData {
ctx: Ctx,
init: ExtInit,
init: Rc<ExtInit>,
reqnot: ReqNot<api::HostMsgSet>,
systems: Vec<SystemCtor>,
logger: Logger,
@@ -55,114 +55,140 @@ impl Drop for ExtensionData {
pub struct Extension(Rc<ExtensionData>);
impl Extension {
pub fn new(init: ExtInit, logger: Logger, msg_logger: Logger, ctx: Ctx) -> io::Result<Self> {
Ok(Self(Rc::new_cyclic(|weak: &Weak<ExtensionData>| ExtensionData {
exprs: ExprStore::default(),
ctx: ctx.clone(),
systems: (init.systems.iter().cloned())
.map(|decl| SystemCtor { decl, ext: WeakExtension(weak.clone()) })
.collect(),
logger: logger.clone(),
init,
next_pars: RefCell::new(NonZeroU64::new(1).unwrap()),
lex_recur: Mutex::default(),
mac_recur: Mutex::default(),
reqnot: ReqNot::new(
msg_logger,
clone!(weak; move |sfn, _| clone!(weak; async move {
Ok(Self(Rc::new_cyclic(|weak: &Weak<ExtensionData>| {
let init = Rc::new(init);
(ctx.spawn)(clone!(init, weak, ctx; Box::pin(async move {
let reqnot_opt = weak.upgrade().map(|rc| rc.reqnot.clone());
if let Some(reqnot) = reqnot_opt {
let mut repeat = true;
while repeat {
repeat = false;
(init.recv(Box::new(|msg| {
repeat = true;
Box::pin(clone!(reqnot, ctx; async move {
let msg = msg.to_vec();
let reqnot = reqnot.clone();
(ctx.spawn)(Box::pin(async move {
reqnot.receive(&msg).await;
}))
}))
})))
.await;
}
}
})));
ExtensionData {
exprs: ExprStore::default(),
ctx: ctx.clone(),
systems: (init.systems.iter().cloned())
.map(|decl| SystemCtor { decl, ext: WeakExtension(weak.clone()) })
.collect(),
logger: logger.clone(),
init,
next_pars: RefCell::new(NonZeroU64::new(1).unwrap()),
lex_recur: Mutex::default(),
mac_recur: Mutex::default(),
reqnot: ReqNot::new(
msg_logger,
clone!(weak; move |sfn, _| clone!(weak; async move {
let data = weak.upgrade().unwrap();
data.init.send(sfn).await
}.boxed_local())),
clone!(weak; move |notif, _| {
clone!(weak; Box::pin(async move {
let this = Extension(weak.upgrade().unwrap());
match notif {
api::ExtHostNotif::ExprNotif(api::ExprNotif::Acquire(acq)) => {
let target = this.0.exprs.get_expr(acq.1).expect("Invalid ticket");
this.0.exprs.give_expr(target)
}
api::ExtHostNotif::ExprNotif(api::ExprNotif::Release(rel)) => {
this.assert_own_sys(rel.0).await;
this.0.exprs.take_expr(rel.1)
}
api::ExtHostNotif::ExprNotif(api::ExprNotif::Move(mov)) => {
this.assert_own_sys(mov.dec).await;
let recp = this.ctx().system_inst(mov.inc).await.expect("invallid recipient sys id");
let expr = this.0.exprs.get_expr(mov.expr).expect("invalid ticket");
recp.ext().0.exprs.give_expr(expr);
this.0.exprs.take_expr(mov.expr);
},
api::ExtHostNotif::Log(api::Log(str)) => this.logger().log(str),
}
}))}),
{
clone!(weak, ctx);
move |hand, req| {
clone!(weak, ctx);
Box::pin(async move {
let this = Self(weak.upgrade().unwrap());
writeln!(this.reqnot().logger(), "Host received request {req:?}");
let i = this.ctx().i.clone();
match req {
api::ExtHostReq::Ping(ping) => hand.handle(&ping, &()).await,
api::ExtHostReq::IntReq(intreq) => match intreq {
api::IntReq::InternStr(s) => hand.handle(&s, &i.i(&*s.0).await.to_api()).await,
api::IntReq::InternStrv(v) => {
let tokens = join_all(v.0.iter().map(|m| i.ex(*m))).await;
hand.handle(&v, &i.i(&tokens).await.to_api()).await
},
api::IntReq::ExternStr(si) =>
hand.handle(&si, &Tok::<String>::from_api(si.0, &i).await.rc()).await,
api::IntReq::ExternStrv(vi) => {
let markerv = (i.ex(vi.0).await.iter()).map(|t| t.to_api()).collect_vec();
hand.handle(&vi, &markerv).await
},
},
api::ExtHostReq::Fwd(ref fw @ api::Fwd(ref atom, ref key, ref body)) => {
let sys = ctx.system_inst(atom.owner).await.expect("owner of live atom dropped");
let reply =
sys.reqnot().request(api::Fwded(fw.0.clone(), *key, body.clone())).await;
hand.handle(fw, &reply).await
},
api::ExtHostReq::SysFwd(ref fw @ api::SysFwd(id, ref body)) => {
let sys = ctx.system_inst(id).await.unwrap();
hand.handle(fw, &sys.request(body.clone()).await).await
},
api::ExtHostReq::SubLex(sl) => {
let (rep_in, rep_out) = channel::bounded(1);
{
let lex_g = this.0.lex_recur.lock().await;
let req_in = lex_g.get(&sl.id).expect("Sublex for nonexistent lexid");
req_in.send(ReqPair(sl.clone(), rep_in)).await.unwrap();
}
hand.handle(&sl, &rep_out.recv().await.unwrap()).await
},
api::ExtHostReq::ExprReq(api::ExprReq::Inspect(ins @ api::Inspect { target })) => {
let expr = this.exprs().get_expr(target).expect("Invalid ticket");
hand
.handle(&ins, &api::Inspected {
refcount: expr.strong_count() as u32,
location: expr.pos().to_api(),
kind: expr.to_api().await,
})
.await
},
api::ExtHostReq::RunMacros(rm) => {
let (rep_in, rep_out) = channel::bounded(1);
let lex_g = this.0.mac_recur.lock().await;
let req_in = lex_g.get(&rm.run_id).expect("Sublex for nonexistent lexid");
req_in.send(ReqPair(rm.clone(), rep_in)).await.unwrap();
hand.handle(&rm, &rep_out.recv().await.unwrap()).await
},
api::ExtHostReq::ExtAtomPrint(ref eap @ api::ExtAtomPrint(ref atom)) => {
let atom = AtomHand::new(atom.clone(), &ctx).await;
let unit = atom.print(&FmtCtxImpl { i: &this.ctx().i }).await;
hand.handle(eap, &unit.to_api()).await
},
clone!(weak; move |notif, _| {
clone!(weak; Box::pin(async move {
let this = Extension(weak.upgrade().unwrap());
match notif {
api::ExtHostNotif::ExprNotif(api::ExprNotif::Acquire(acq)) => {
let target = this.0.exprs.get_expr(acq.1).expect("Invalid ticket");
this.0.exprs.give_expr(target)
}
})
}
},
),
api::ExtHostNotif::ExprNotif(api::ExprNotif::Release(rel)) => {
this.assert_own_sys(rel.0).await;
this.0.exprs.take_expr(rel.1)
}
api::ExtHostNotif::ExprNotif(api::ExprNotif::Move(mov)) => {
this.assert_own_sys(mov.dec).await;
let recp = this.ctx().system_inst(mov.inc).await.expect("invallid recipient sys id");
let expr = this.0.exprs.get_expr(mov.expr).expect("invalid ticket");
recp.ext().0.exprs.give_expr(expr);
this.0.exprs.take_expr(mov.expr);
},
api::ExtHostNotif::Log(api::Log(str)) => this.logger().log(str),
}
}))}),
{
clone!(weak, ctx);
move |hand, req| {
clone!(weak, ctx);
Box::pin(async move {
let this = Self(weak.upgrade().unwrap());
writeln!(this.reqnot().logger(), "Host received request {req:?}");
let i = this.ctx().i.clone();
match req {
api::ExtHostReq::Ping(ping) => hand.handle(&ping, &()).await,
api::ExtHostReq::IntReq(intreq) => match intreq {
api::IntReq::InternStr(s) => hand.handle(&s, &i.i(&*s.0).await.to_api()).await,
api::IntReq::InternStrv(v) => {
let tokens = join_all(v.0.iter().map(|m| i.ex(*m))).await;
hand.handle(&v, &i.i(&tokens).await.to_api()).await
},
api::IntReq::ExternStr(si) =>
hand.handle(&si, &Tok::<String>::from_api(si.0, &i).await.rc()).await,
api::IntReq::ExternStrv(vi) => {
let markerv = (i.ex(vi.0).await.iter()).map(|t| t.to_api()).collect_vec();
hand.handle(&vi, &markerv).await
},
},
api::ExtHostReq::Fwd(ref fw @ api::Fwd(ref atom, ref key, ref body)) => {
let sys =
ctx.system_inst(atom.owner).await.expect("owner of live atom dropped");
let reply =
sys.reqnot().request(api::Fwded(fw.0.clone(), *key, body.clone())).await;
hand.handle(fw, &reply).await
},
api::ExtHostReq::SysFwd(ref fw @ api::SysFwd(id, ref body)) => {
let sys = ctx.system_inst(id).await.unwrap();
hand.handle(fw, &sys.request(body.clone()).await).await
},
api::ExtHostReq::SubLex(sl) => {
let (rep_in, rep_out) = channel::bounded(1);
{
let lex_g = this.0.lex_recur.lock().await;
let req_in = lex_g.get(&sl.id).expect("Sublex for nonexistent lexid");
req_in.send(ReqPair(sl.clone(), rep_in)).await.unwrap();
}
hand.handle(&sl, &rep_out.recv().await.unwrap()).await
},
api::ExtHostReq::ExprReq(api::ExprReq::Inspect(
ins @ api::Inspect { target },
)) => {
let expr = this.exprs().get_expr(target).expect("Invalid ticket");
hand
.handle(&ins, &api::Inspected {
refcount: expr.strong_count() as u32,
location: expr.pos().to_api(),
kind: expr.to_api().await,
})
.await
},
api::ExtHostReq::RunMacros(rm) => {
let (rep_in, rep_out) = channel::bounded(1);
let lex_g = this.0.mac_recur.lock().await;
let req_in = lex_g.get(&rm.run_id).expect("Sublex for nonexistent lexid");
req_in.send(ReqPair(rm.clone(), rep_in)).await.unwrap();
hand.handle(&rm, &rep_out.recv().await.unwrap()).await
},
api::ExtHostReq::ExtAtomPrint(ref eap @ api::ExtAtomPrint(ref atom)) => {
let atom = AtomHand::new(atom.clone(), &ctx).await;
let unit = atom.print(&FmtCtxImpl { i: &this.ctx().i }).await;
hand.handle(eap, &unit.to_api()).await
},
}
})
}
},
),
}
})))
}
pub(crate) fn reqnot(&self) -> &ReqNot<HostMsgSet> { &self.0.reqnot }
@@ -212,20 +238,6 @@ impl Extension {
.await;
ret.transpose()
}
pub async fn recv_one(&self) {
let reqnot = self.0.reqnot.clone();
let ctx = self.ctx().clone();
(self.0.init.recv(Box::new(move |msg| {
Box::pin(async move {
let msg = msg.to_vec();
let reqnot = reqnot.clone();
(ctx.spawn)(Box::pin(async move {
reqnot.receive(&msg).await;
}))
})
})))
.await;
}
pub fn system_drop(&self, id: api::SysId) {
let rc = self.clone();
(self.ctx().spawn)(Box::pin(async move {