Correctly halts
This commit is contained in:
@@ -40,6 +40,7 @@ pub struct ReqPair<R: Request>(R, Sender<R::Response>);
|
||||
/// upgrading fails.
|
||||
#[derive(destructure)]
|
||||
pub struct ExtensionData {
|
||||
name: String,
|
||||
ctx: Ctx,
|
||||
reqnot: ReqNot<api::HostMsgSet>,
|
||||
systems: Vec<SystemCtor>,
|
||||
@@ -67,21 +68,29 @@ impl Extension {
|
||||
Ok(Self(Rc::new_cyclic(|weak: &Weak<ExtensionData>| {
|
||||
let init = Rc::new(init);
|
||||
let (exiting_snd, exiting_rcv) = channel::<()>(0);
|
||||
(ctx.spawn)(clone!(init, weak, ctx; Box::pin(async move {
|
||||
let rcv_stream = stream(async |mut cx| loop { cx.emit( init.recv().await).await });
|
||||
let mut event_stream = pin!(stream::select(exiting_rcv.map(|()| None), rcv_stream));
|
||||
while let Some(Some(msg)) = event_stream.next().await {
|
||||
if let Some(reqnot) = weak.upgrade().map(|rc| rc.reqnot.clone()) {
|
||||
let reqnot = reqnot.clone();
|
||||
(ctx.spawn)(Box::pin(async move {
|
||||
reqnot.receive(&msg).await;
|
||||
}))
|
||||
(ctx.spawn)({
|
||||
clone!(init, weak, ctx);
|
||||
Box::pin(async move {
|
||||
let rcv_stream = stream(async |mut cx| {
|
||||
loop {
|
||||
cx.emit(init.recv().await).await
|
||||
}
|
||||
});
|
||||
let mut event_stream = pin!(stream::select(exiting_rcv.map(|()| None), rcv_stream));
|
||||
while let Some(Some(msg)) = event_stream.next().await {
|
||||
if let Some(reqnot) = weak.upgrade().map(|rc| rc.reqnot.clone()) {
|
||||
let reqnot = reqnot.clone();
|
||||
(ctx.spawn)(Box::pin(async move {
|
||||
reqnot.receive(&msg).await;
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
})));
|
||||
})
|
||||
});
|
||||
ExtensionData {
|
||||
name: init.name.clone(),
|
||||
exiting_snd,
|
||||
exprs: ctx.common_exprs.derive(),
|
||||
exprs: ctx.common_exprs.derive(false),
|
||||
ctx: ctx.clone(),
|
||||
systems: (init.systems.iter().cloned())
|
||||
.map(|decl| SystemCtor { decl, ext: WeakExtension(weak.clone()) })
|
||||
@@ -95,17 +104,26 @@ impl Extension {
|
||||
clone!(weak; move |notif, _| {
|
||||
clone!(weak; Box::pin(async move {
|
||||
let this = Extension(weak.upgrade().unwrap());
|
||||
if !matches!(notif, api::ExtHostNotif::Log(_)) {
|
||||
writeln!(this.reqnot().logger(), "Host received notif {notif:?}");
|
||||
}
|
||||
match notif {
|
||||
api::ExtHostNotif::ExprNotif(api::ExprNotif::Acquire(acq)) => {
|
||||
let target = this.0.exprs.get_expr(acq.1).expect("Invalid ticket");
|
||||
this.0.exprs.give_expr(target)
|
||||
}
|
||||
api::ExtHostNotif::ExprNotif(api::ExprNotif::Release(rel)) => {
|
||||
this.assert_own_sys(rel.0).await;
|
||||
this.0.exprs.take_expr(rel.1);
|
||||
if this.is_own_sys(rel.0).await {
|
||||
this.0.exprs.take_expr(rel.1);
|
||||
} else {
|
||||
writeln!(this.reqnot().logger(), "Not our system {:?}", rel.0)
|
||||
}
|
||||
}
|
||||
api::ExtHostNotif::ExprNotif(api::ExprNotif::Move(mov)) => {
|
||||
this.assert_own_sys(mov.dec).await;
|
||||
if !this.is_own_sys(mov.dec).await {
|
||||
writeln!(this.reqnot().logger(), "Not our system {:?}", mov.dec);
|
||||
return;
|
||||
}
|
||||
let recp = this.ctx().system_inst(mov.inc).await.expect("invallid recipient sys id");
|
||||
let expr = this.0.exprs.get_expr(mov.expr).expect("invalid ticket");
|
||||
recp.ext().0.exprs.give_expr(expr);
|
||||
@@ -120,7 +138,9 @@ impl Extension {
|
||||
clone!(weak, ctx);
|
||||
Box::pin(async move {
|
||||
let this = Self(weak.upgrade().unwrap());
|
||||
writeln!(this.reqnot().logger(), "Host received request {req:?}");
|
||||
if !matches!(req, api::ExtHostReq::ExtAtomPrint(_)) {
|
||||
writeln!(this.reqnot().logger(), "Host received request {req:?}");
|
||||
}
|
||||
let i = this.ctx().i.clone();
|
||||
match req {
|
||||
api::ExtHostReq::Ping(ping) => hand.handle(&ping, &()).await,
|
||||
@@ -235,8 +255,9 @@ impl Extension {
|
||||
}
|
||||
})))
|
||||
}
|
||||
pub fn name(&self) -> &String { &self.0.name }
|
||||
#[must_use]
|
||||
pub(crate) fn reqnot(&self) -> &ReqNot<api::HostMsgSet> { &self.0.reqnot }
|
||||
pub fn reqnot(&self) -> &ReqNot<api::HostMsgSet> { &self.0.reqnot }
|
||||
#[must_use]
|
||||
pub fn ctx(&self) -> &Ctx { &self.0.ctx }
|
||||
#[must_use]
|
||||
@@ -246,12 +267,12 @@ impl Extension {
|
||||
pub fn exprs(&self) -> &ExprStore { &self.0.exprs }
|
||||
#[must_use]
|
||||
pub async fn is_own_sys(&self, id: api::SysId) -> bool {
|
||||
let sys = self.ctx().system_inst(id).await.expect("invalid sender sys id");
|
||||
let Some(sys) = self.ctx().system_inst(id).await else {
|
||||
writeln!(self.logger(), "Invalid system ID {id:?}");
|
||||
return false;
|
||||
};
|
||||
Rc::ptr_eq(&self.0, &sys.ext().0)
|
||||
}
|
||||
pub async fn assert_own_sys(&self, id: api::SysId) {
|
||||
assert!(self.is_own_sys(id).await, "Incoming message impersonates separate system");
|
||||
}
|
||||
#[must_use]
|
||||
pub fn next_pars(&self) -> NonZeroU64 {
|
||||
let mut next_pars = self.0.next_pars.borrow_mut();
|
||||
@@ -293,7 +314,7 @@ impl Extension {
|
||||
pub fn system_drop(&self, id: api::SysId) {
|
||||
let rc = self.clone();
|
||||
(self.ctx().spawn)(Box::pin(async move {
|
||||
rc.reqnot().notify(api::SystemDrop(id)).await;
|
||||
rc.reqnot().request(api::SystemDrop(id)).await;
|
||||
rc.ctx().systems.write().await.remove(&id);
|
||||
}))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user