Protocols and operators mostly

This commit is contained in:
2026-01-21 22:22:58 +01:00
parent 75b05a2965
commit f38193edcc
33 changed files with 578 additions and 147 deletions

View File

@@ -5,6 +5,7 @@ use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll, Wake, Waker};
use std::thread::panicking;
use futures::Stream;
use itertools::Itertools;
@@ -155,3 +156,15 @@ pub fn spin_on<Fut: Future>(f: Fut) -> Fut::Output {
}
}
}
/// Create an object that will panic if dropped. [PanicOnDrop::defuse] must be
/// called once the particular constraint preventing a drop has passed
pub fn assert_no_drop(msg: &'static str) -> PanicOnDrop { PanicOnDrop(true, msg) }
pub struct PanicOnDrop(bool, &'static str);
impl PanicOnDrop {
pub fn defuse(mut self) { self.0 = false; }
}
impl Drop for PanicOnDrop {
fn drop(&mut self) { assert!(panicking() || !self.0, "{}", self.1) }
}

View File

@@ -202,6 +202,11 @@ impl Sym {
pub async fn parse(s: &str) -> Result<Self, EmptyNameError> {
Ok(Sym(iv(&VName::parse(s).await?.into_vec()).await))
}
/// Read a `::` separated namespaced name from a static string where.
pub async fn literal(s: &'static str) -> Self {
assert!(!s.is_empty(), "Literal cannot be empty");
Self::parse(s).await.unwrap()
}
/// Assert that a token isn't empty, and wrap it in a [Sym]
pub fn from_tok(t: IStrv) -> Result<Self, EmptyNameError> {
if t.is_empty() { Err(EmptyNameError) } else { Ok(Self(t)) }

View File

@@ -17,6 +17,7 @@ use futures::{
use hashbrown::HashMap;
use orchid_api_traits::{Decode, Encode, Request, UnderRoot};
use crate::future_debug::{PanicOnDrop, assert_no_drop};
use crate::localset::LocalSet;
#[must_use = "Receipts indicate that a required action has been performed within a function. \
@@ -238,9 +239,12 @@ impl IoClient {
impl Client for IoClient {
fn start_notif(&self) -> LocalBoxFuture<'_, io::Result<Box<dyn MsgWriter<'_> + '_>>> {
Box::pin(async {
let drop_g = assert_no_drop("Notif future dropped");
let mut o = self.lock_out().await;
0u64.encode(o.as_mut()).await?;
Ok(Box::new(IoNotifWriter { o }) as Box<dyn MsgWriter>)
drop_g.defuse();
Ok(Box::new(IoNotifWriter { o, drop_g: assert_no_drop("Notif writer dropped") })
as Box<dyn MsgWriter>)
})
}
fn start_request(&self) -> LocalBoxFuture<'_, io::Result<Box<dyn ReqWriter<'_> + '_>>> {
@@ -252,11 +256,17 @@ impl Client for IoClient {
};
let (cb, reply) = oneshot::channel();
let (ack, got_ack) = oneshot::channel();
let drop_g = assert_no_drop("Request future dropped");
self.subscribe.as_ref().clone().send(ReplySub { id, ack, cb }).await.unwrap();
got_ack.await.unwrap();
let mut w = self.lock_out().await;
id.encode(w.as_mut()).await?;
Ok(Box::new(IoReqWriter { reply, w }) as Box<dyn ReqWriter>)
drop_g.defuse();
Ok(Box::new(IoReqWriter {
reply,
w,
drop_g: assert_no_drop("Request reader dropped without reply"),
}) as Box<dyn ReqWriter>)
})
}
}
@@ -264,36 +274,49 @@ impl Client for IoClient {
struct IoReqWriter {
reply: oneshot::Receiver<IoGuard<dyn AsyncRead>>,
w: IoGuard<dyn AsyncWrite>,
drop_g: PanicOnDrop,
}
impl<'a> ReqWriter<'a> for IoReqWriter {
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.w.as_mut() }
fn send(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Box<dyn RepReader<'a> + 'a>>> {
Box::pin(async {
let Self { reply, mut w } = *self;
let Self { reply, mut w, drop_g } = *self;
w.flush().await?;
mem::drop(w);
let i = reply.await.expect("Client dropped before reply received");
Ok(Box::new(IoRepReader { i }) as Box<dyn RepReader>)
drop_g.defuse();
Ok(Box::new(IoRepReader {
i,
drop_g: assert_no_drop("Reply reader dropped without finishing"),
}) as Box<dyn RepReader>)
})
}
}
struct IoRepReader {
i: IoGuard<dyn AsyncRead>,
drop_g: PanicOnDrop,
}
impl<'a> RepReader<'a> for IoRepReader {
fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.i.as_mut() }
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()> { Box::pin(async {}) }
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()> {
Box::pin(async { self.drop_g.defuse() })
}
}
#[derive(destructure)]
struct IoNotifWriter {
o: IoGuard<dyn AsyncWrite>,
drop_g: PanicOnDrop,
}
impl<'a> MsgWriter<'a> for IoNotifWriter {
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.o.as_mut() }
fn finish(mut self: Box<Self>) -> LocalBoxFuture<'static, io::Result<()>> {
Box::pin(async move { self.o.flush().await })
Box::pin(async move {
self.o.flush().await?;
self.drop_g.defuse();
Ok(())
})
}
}
@@ -375,7 +398,8 @@ impl IoCommServer {
Ok(Event::Exit) => break,
Ok(Event::Sub(ReplySub { id, ack, cb })) => {
pending_replies.insert(id, cb);
ack.send(()).unwrap();
// this is detected and logged on client
let _ = ack.send(());
},
Ok(Event::Input(0, read)) => {
let notif = &notif;