Method refactor now compiles

This commit is contained in:
2026-01-29 16:28:57 +01:00
parent 534f08b45c
commit cdcca694c5
22 changed files with 487 additions and 284 deletions

View File

@@ -2,6 +2,8 @@ use std::any::{Any, TypeId, type_name};
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::num::NonZeroU32;
use std::ops::Deref;
use std::pin::Pin;
@@ -9,14 +11,15 @@ use std::rc::Rc;
use dyn_clone::{DynClone, clone_box};
use futures::future::LocalBoxFuture;
use futures::{AsyncRead, AsyncWrite, FutureExt, StreamExt, stream};
use futures::{AsyncWrite, FutureExt, StreamExt, stream};
use orchid_api_derive::Coding;
use orchid_api_traits::{Coding, Decode, Encode, Request, enc_vec};
use orchid_api_traits::{Coding, Decode, InHierarchy, Request, UnderRoot, enc_vec};
use orchid_base::error::{OrcErrv, OrcRes, mk_errv, mk_errv_floating};
use orchid_base::format::{FmtCtx, FmtUnit, Format, fmt, take_first};
use orchid_base::interner::is;
use orchid_base::location::Pos;
use orchid_base::name::Sym;
use orchid_base::reqnot::{Receipt, ReqHandle, ReqReader, ReqReaderExt};
use trait_set::trait_set;
use crate::api;
@@ -98,14 +101,17 @@ impl ForeignAtom {
pub(crate) fn new(handle: Rc<ExprHandle>, atom: api::Atom, pos: Pos) -> Self {
ForeignAtom { atom, expr: handle, pos }
}
pub async fn request<M: AtomMethod>(&self, m: M) -> Option<M::Response> {
pub async fn request<R: Request + UnderRoot<Root: AtomMethod>>(
&self,
r: R,
) -> Option<R::Response> {
let rep = (request(api::Fwd(
self.atom.clone(),
Sym::parse(M::NAME).await.unwrap().tok().to_api(),
enc_vec(&m),
Sym::parse(<R as UnderRoot>::Root::NAME).await.unwrap().tok().to_api(),
enc_vec(&r.into_root()),
)))
.await?;
Some(M::Response::decode_slice(&mut &rep[..]))
Some(R::Response::decode_slice(&mut &rep[..]))
}
pub async fn downcast<T: AtomicFeatures>(self) -> Result<TAtom<T>, NotTypAtom> {
TAtom::downcast(self.ex().handle()).await
@@ -154,23 +160,40 @@ impl Debug for NotTypAtom {
}
}
pub trait AtomMethod: Request + Coding {
pub trait AtomMethod: Coding + InHierarchy {
const NAME: &str;
}
pub trait Supports<M: AtomMethod>: AtomCard {
fn handle(&self, req: M) -> impl Future<Output = <M as Request>::Response>;
fn handle<'a>(
&self,
hand: Box<dyn ReqHandle<'a> + '_>,
req: M,
) -> impl Future<Output = io::Result<Receipt<'a>>>;
}
trait_set! {
trait AtomReqCb<A> = for<'a> Fn(
&'a A,
Pin<&'a mut dyn AsyncRead>,
Pin<&'a mut dyn AsyncWrite>,
) -> LocalBoxFuture<'a, ()>
trait HandleAtomMethod<A> {
fn handle<'a, 'b: 'a>(
&'a self,
atom: &'a A,
req: Box<dyn ReqReader<'b> + 'a>,
) -> LocalBoxFuture<'a, ()>;
}
struct AtomMethodHandler<M, A>(PhantomData<M>, PhantomData<A>);
impl<M: AtomMethod, A: Supports<M>> HandleAtomMethod<A> for AtomMethodHandler<M, A> {
fn handle<'a, 'b: 'a>(
&'a self,
a: &'a A,
mut reader: Box<dyn ReqReader<'b> + 'a>,
) -> LocalBoxFuture<'a, ()> {
Box::pin(async {
let req = reader.read_req::<M>().await.unwrap();
let _ = Supports::<M>::handle(a, reader.finish().await, req).await.unwrap();
})
}
}
pub struct MethodSetBuilder<A: AtomCard> {
handlers: Vec<(&'static str, Rc<dyn AtomReqCb<A>>)>,
handlers: Vec<(&'static str, Rc<dyn HandleAtomMethod<A>>)>,
}
impl<A: AtomCard> MethodSetBuilder<A> {
pub fn new() -> Self { Self { handlers: vec![] } }
@@ -178,15 +201,7 @@ impl<A: AtomCard> MethodSetBuilder<A> {
pub fn handle<M: AtomMethod>(mut self) -> Self
where A: Supports<M> {
assert!(!M::NAME.is_empty(), "AtomMethod::NAME cannoot be empty");
self.handlers.push((
M::NAME,
Rc::new(move |a: &A, req: Pin<&mut dyn AsyncRead>, rep: Pin<&mut dyn AsyncWrite>| {
async {
Supports::<M>::handle(a, M::decode(req).await.unwrap()).await.encode(rep).await.unwrap()
}
.boxed_local()
}),
));
self.handlers.push((M::NAME, Rc::new(AtomMethodHandler::<M, A>(PhantomData, PhantomData))));
self
}
@@ -201,20 +216,19 @@ impl<A: AtomCard> MethodSetBuilder<A> {
}
pub struct MethodSet<A: AtomCard> {
handlers: HashMap<Sym, Rc<dyn AtomReqCb<A>>>,
handlers: HashMap<Sym, Rc<dyn HandleAtomMethod<A>>>,
}
impl<A: AtomCard> MethodSet<A> {
pub(crate) async fn dispatch<'a>(
&'a self,
atom: &'a A,
&self,
atom: &'_ A,
key: Sym,
req: Pin<&'a mut dyn AsyncRead>,
rep: Pin<&'a mut dyn AsyncWrite>,
req: Box<dyn ReqReader<'a> + 'a>,
) -> bool {
match self.handlers.get(&key) {
None => false,
Some(handler) => {
handler(atom, req, rep).await;
handler.handle(atom, req).await;
true
},
}
@@ -243,13 +257,13 @@ impl<A: AtomicFeatures> TAtom<A> {
},
}
}
pub async fn request<M: AtomMethod>(&self, req: M) -> M::Response
where A: Supports<M> {
M::Response::decode_slice(
pub async fn request<R: Request + UnderRoot<Root: AtomMethod>>(&self, req: R) -> R::Response
where A: Supports<<R as UnderRoot>::Root> {
R::Response::decode_slice(
&mut &(request(api::Fwd(
self.untyped.atom.clone(),
Sym::parse(M::NAME).await.unwrap().tok().to_api(),
enc_vec(&req),
Sym::parse(<R as UnderRoot>::Root::NAME).await.unwrap().tok().to_api(),
enc_vec(&req.into_root()),
)))
.await
.unwrap()[..],
@@ -278,12 +292,11 @@ pub trait AtomDynfo: 'static {
fn call<'a>(&'a self, ctx: AtomCtx<'a>, arg: Expr) -> LocalBoxFuture<'a, GExpr>;
fn call_ref<'a>(&'a self, ctx: AtomCtx<'a>, arg: Expr) -> LocalBoxFuture<'a, GExpr>;
fn print<'a>(&'a self, ctx: AtomCtx<'a>) -> LocalBoxFuture<'a, FmtUnit>;
fn handle_req<'a, 'b: 'a, 'c: 'a>(
fn handle_req<'a>(
&'a self,
ctx: AtomCtx<'a>,
key: Sym,
req: Pin<&'b mut dyn AsyncRead>,
rep: Pin<&'c mut dyn AsyncWrite>,
req: Box<dyn ReqReader<'a> + 'a>,
) -> LocalBoxFuture<'a, bool>;
fn command<'a>(&'a self, ctx: AtomCtx<'a>) -> LocalBoxFuture<'a, OrcRes<Option<GExpr>>>;
fn serialize<'a, 'b: 'a>(

View File

@@ -11,7 +11,7 @@ use std::rc::Rc;
use async_once_cell::OnceCell;
use dyn_clone::{DynClone, clone_box};
use futures::future::{LocalBoxFuture, ready};
use futures::{AsyncRead, AsyncWrite, FutureExt};
use futures::{AsyncWrite, FutureExt};
use futures_locks::{RwLock, RwLockReadGuard};
use itertools::Itertools;
use memo_map::MemoMap;
@@ -119,17 +119,16 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
fn print(&self, AtomCtx(_, id): AtomCtx<'_>) -> LocalBoxFuture<'_, FmtUnit> {
Box::pin(async move { AtomReadGuard::new(id.unwrap()).await.dyn_print().await })
}
fn handle_req<'a, 'b: 'a, 'c: 'a>(
fn handle_req<'a>(
&'a self,
AtomCtx(_, id): AtomCtx,
AtomCtx(_, id): AtomCtx<'a>,
key: Sym,
req: Pin<&'b mut dyn AsyncRead>,
rep: Pin<&'c mut dyn AsyncWrite>,
req: Box<dyn orchid_base::reqnot::ReqReader<'a> + 'a>,
) -> LocalBoxFuture<'a, bool> {
Box::pin(async move {
let a = AtomReadGuard::new(id.unwrap()).await;
let ms = self.ms.get_or_init(self.msbuild.pack()).await;
ms.dispatch(a.as_any_ref().downcast_ref().unwrap(), key, req, rep).await
ms.dispatch(a.as_any_ref().downcast_ref().unwrap(), key, req).await
})
}
fn command<'a>(

View File

@@ -4,7 +4,7 @@ use std::pin::Pin;
use async_once_cell::OnceCell;
use futures::future::LocalBoxFuture;
use futures::{AsyncRead, AsyncWrite, FutureExt};
use futures::{AsyncWrite, FutureExt};
use orchid_api_traits::{Coding, enc_vec};
use orchid_base::error::OrcRes;
use orchid_base::format::FmtUnit;
@@ -54,16 +54,15 @@ impl<T: ThinAtom> AtomDynfo for ThinAtomDynfo<T> {
fn call_ref<'a>(&'a self, AtomCtx(buf, ..): AtomCtx<'a>, arg: Expr) -> LocalBoxFuture<'a, GExpr> {
Box::pin(async move { T::decode_slice(&mut &buf[..]).call(arg).await })
}
fn handle_req<'a, 'm1: 'a, 'm2: 'a>(
fn handle_req<'a>(
&'a self,
AtomCtx(buf, _): AtomCtx<'a>,
AtomCtx(buf, ..): AtomCtx<'a>,
key: Sym,
req: Pin<&'m1 mut dyn AsyncRead>,
rep: Pin<&'m2 mut dyn AsyncWrite>,
req: Box<dyn orchid_base::reqnot::ReqReader<'a> + 'a>,
) -> LocalBoxFuture<'a, bool> {
Box::pin(async move {
let ms = self.ms.get_or_init(self.msbuild.pack()).await;
ms.dispatch(&T::decode_slice(&mut &buf[..]), key, req, rep).await
ms.dispatch(&T::decode_slice(&mut &buf[..]), key, req).await
})
}
fn command<'a>(

View File

@@ -1,12 +1,12 @@
use std::cell::RefCell;
use std::future::Future;
use std::mem;
use std::num::NonZero;
use std::pin::Pin;
use std::rc::Rc;
use std::{io, mem};
use futures::future::{LocalBoxFuture, join_all};
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, StreamExt, stream};
use futures::{AsyncWriteExt, StreamExt, stream};
use hashbrown::HashMap;
use itertools::Itertools;
use orchid_api_traits::{Decode, Encode, Request, UnderRoot, enc_vec};
@@ -17,8 +17,7 @@ use orchid_base::logging::{log, with_logger};
use orchid_base::name::Sym;
use orchid_base::parse::{Comment, Snippet};
use orchid_base::reqnot::{
Client, ClientExt, CommCtx, MsgReader, MsgReaderExt, Receipt, RepWriter, ReqHandle, ReqHandleExt,
ReqReader, ReqReaderExt, Witness, io_comm,
Client, ClientExt, CommCtx, MsgReader, MsgReaderExt, ReqHandleExt, ReqReaderExt, Witness, io_comm,
};
use orchid_base::stash::with_stash;
use orchid_base::tree::{TokenVariant, ttv_from_api};
@@ -39,6 +38,7 @@ use crate::reflection::with_refl_roots;
use crate::system::{SysCtx, atom_by_idx, cted, with_sys};
use crate::system_ctor::{CtedObj, DynSystemCtor, SystemCtor};
use crate::tree::{TreeIntoApiCtxImpl, get_lazy, with_lazy_member_store};
use crate::trivial_req::TrivialReqCycle;
task_local::task_local! {
static CLIENT: Rc<dyn Client>;
@@ -219,36 +219,6 @@ impl ExtensionBuilder {
let fwd_tok = Witness::of(&fwd);
let api::SysFwded(sys_id, payload) = fwd;
with_sys_record(sys_id, async {
struct TrivialReqCycle<'a> {
req: &'a [u8],
rep: &'a mut Vec<u8>,
}
impl<'a> ReqReader<'a> for TrivialReqCycle<'a> {
fn reader(&mut self) -> Pin<&mut dyn AsyncRead> {
Pin::new(&mut self.req) as Pin<&mut _>
}
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, Box<dyn ReqHandle<'a> + 'a>> {
Box::pin(async { self as Box<_> })
}
}
impl<'a> ReqHandle<'a> for TrivialReqCycle<'a> {
fn start_reply(
self: Box<Self>,
) -> LocalBoxFuture<'a, io::Result<Box<dyn RepWriter<'a> + 'a>>> {
Box::pin(async { Ok(self as Box<_>) })
}
}
impl<'a> RepWriter<'a> for TrivialReqCycle<'a> {
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> {
Pin::new(&mut self.rep) as Pin<&mut _>
}
fn finish(
self: Box<Self>,
) -> LocalBoxFuture<'a, io::Result<orchid_base::reqnot::Receipt<'a>>>
{
Box::pin(async { Ok(Receipt::_new()) })
}
}
let mut reply = Vec::new();
let req = TrivialReqCycle { req: &payload, rep: &mut reply };
let _ = cted().inst().dyn_request(Box::new(req)).await;
@@ -349,14 +319,8 @@ impl ExtensionBuilder {
let api::Fwded(_, key, payload) = &fwded;
let mut reply = Vec::new();
let key = Sym::from_api(*key).await;
let some = nfo
.handle_req(
actx,
key,
Pin::<&mut &[u8]>::new(&mut &payload[..]),
Pin::<&mut Vec<_>>::new(&mut reply),
)
.await;
let req = TrivialReqCycle { req: payload, rep: &mut reply };
let some = nfo.handle_req(actx, key, Box::new(req)).await;
handle.reply(fwded, &some.then_some(reply)).await
},
api::AtomReq::CallRef(call @ api::CallRef(_, arg)) => {

View File

@@ -17,7 +17,9 @@ pub mod logger;
pub mod other_system;
pub mod parser;
pub mod reflection;
pub mod stream_reqs;
pub mod system;
pub mod system_ctor;
pub mod tokio;
pub mod tree;
mod trivial_req;

View File

@@ -0,0 +1,72 @@
use orchid_api_derive::{Coding, Hierarchy};
use orchid_api_traits::Request;
use crate::atom::AtomMethod;
/// Represents [std::io::ErrorKind] values that are produced while operating on
/// already-opened files
#[derive(Clone, Debug, Hash, PartialEq, Eq, Coding)]
pub enum IoErrorKind {
BrokenPipe,
UnexpectedEof,
ConnectionAborted,
Other,
}
/// Represents [std::io::Error] values that are produced while operating on
/// already-opened files
#[derive(Clone, Debug, Coding)]
pub struct IoError {
pub message: String,
pub kind: IoErrorKind,
}
/// Read at most the specified number of bytes, but at least one byte, from a
/// stream. If the returned vector is empty, the stream has reached its end.
#[derive(Clone, Debug, Coding, Hierarchy)]
pub struct ReadReq(pub u64);
impl Request for ReadReq {
type Response = Result<Vec<u8>, IoError>;
}
impl AtomMethod for ReadReq {
const NAME: &str = "orchid::stream::read";
}
/// Write the specified number of bytes into a stream.
#[derive(Clone, Debug, Coding, Hierarchy)]
#[extends(OutputReq)]
pub struct WriteReq {
pub data: Vec<u8>,
}
impl Request for WriteReq {
type Response = Result<(), IoError>;
}
/// Flush a stream, ensuring that all data reached its destination.
#[derive(Clone, Debug, Coding, Hierarchy)]
#[extends(OutputReq)]
pub struct FlushReq;
impl Request for FlushReq {
type Response = Result<(), IoError>;
}
/// Close a stream, indicating that no further data will be sent through it.
#[derive(Clone, Debug, Coding, Hierarchy)]
#[extends(OutputReq)]
pub struct CloseReq;
impl Request for CloseReq {
type Response = Result<(), IoError>;
}
/// Operations on outbound streams across extension boundaries.
#[derive(Clone, Debug, Coding, Hierarchy)]
#[extendable]
#[allow(clippy::enum_variant_names)]
pub enum OutputReq {
WriteReq(WriteReq),
FlushReq(FlushReq),
CloseReq(CloseReq),
}
impl AtomMethod for OutputReq {
const NAME: &str = "orchid::stream::write";
}

View File

@@ -0,0 +1,28 @@
use std::io;
use std::pin::Pin;
use futures::future::LocalBoxFuture;
use futures::{AsyncRead, AsyncWrite};
use orchid_base::reqnot::{Receipt, RepWriter, ReqHandle, ReqReader};
pub struct TrivialReqCycle<'a> {
pub req: &'a [u8],
pub rep: &'a mut Vec<u8>,
}
impl<'a> ReqReader<'a> for TrivialReqCycle<'a> {
fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { Pin::new(&mut self.req) as Pin<&mut _> }
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, Box<dyn ReqHandle<'a> + 'a>> {
Box::pin(async { self as Box<_> })
}
}
impl<'a> ReqHandle<'a> for TrivialReqCycle<'a> {
fn start_reply(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Box<dyn RepWriter<'a> + 'a>>> {
Box::pin(async { Ok(self as Box<_>) })
}
}
impl<'a> RepWriter<'a> for TrivialReqCycle<'a> {
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { Pin::new(&mut self.rep) as Pin<&mut _> }
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Receipt<'a>>> {
Box::pin(async { Ok(Receipt::_new()) })
}
}