Initial extension asynchronization efforts.

This commit is contained in:
2025-01-22 13:02:13 +01:00
parent 1974c69019
commit 7be8716b19
29 changed files with 1077 additions and 882 deletions

View File

@@ -2,8 +2,9 @@ use std::any::{Any, TypeId, type_name};
use std::borrow::Cow;
use std::future::Future;
use std::io::{Read, Write};
use std::sync::Arc;
use std::rc::Rc;
use async_once_cell::OnceCell;
use futures::FutureExt;
use futures::future::{LocalBoxFuture, ready};
use itertools::Itertools;
@@ -17,24 +18,25 @@ use orchid_base::name::Sym;
use crate::api;
use crate::atom::{
AtomCard, AtomCtx, AtomDynfo, AtomFactory, Atomic, AtomicFeaturesImpl, AtomicVariant, MethodSet,
err_not_callable, err_not_command, get_info,
MethodSetBuilder, err_not_callable, err_not_command, get_info,
};
use crate::expr::{Expr, ExprHandle, bot};
use crate::expr::{Expr, ExprHandle};
use crate::gen_expr::{GExpr, bot};
use crate::system::SysCtx;
pub struct OwnedVariant;
impl AtomicVariant for OwnedVariant {}
impl<A: OwnedAtom + Atomic<Variant = OwnedVariant>> AtomicFeaturesImpl<OwnedVariant> for A {
fn _factory(self) -> AtomFactory {
AtomFactory::new(move |ctx| {
AtomFactory::new(move |ctx| async move {
let rec = ctx.obj_store.add(Box::new(self));
let (id, _) = get_info::<A>(ctx.cted.inst().card());
let mut data = enc_vec(&id);
rec.encode(&mut data);
rec.encode(&mut data).await;
api::Atom { drop: Some(api::AtomId(rec.id())), data, owner: ctx.id }
})
}
fn _info() -> Self::_Info { OwnedAtomDynfo(A::reg_reqs()) }
fn _info() -> Self::_Info { OwnedAtomDynfo { msbuild: A::reg_reqs(), ms: OnceCell::new() } }
type _Info = OwnedAtomDynfo<A>;
}
@@ -46,21 +48,24 @@ fn with_atom<'a, U>(
f(ctx.obj_store.get(id.0).unwrap_or_else(|| panic!("Received invalid atom ID: {}", id.0)))
}
pub struct OwnedAtomDynfo<T: OwnedAtom>(MethodSet<T>);
pub struct OwnedAtomDynfo<T: OwnedAtom> {
msbuild: MethodSetBuilder<T>,
ms: OnceCell<MethodSet<T>>,
}
impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
fn tid(&self) -> TypeId { TypeId::of::<T>() }
fn name(&self) -> &'static str { type_name::<T>() }
fn decode(&self, AtomCtx(data, ..): AtomCtx) -> Box<dyn Any> {
Box::new(<T as AtomCard>::Data::decode(&mut &data[..]))
}
fn call(&self, AtomCtx(_, id, ctx): AtomCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, Expr> {
fn call(&self, AtomCtx(_, id, ctx): AtomCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, GExpr> {
with_atom(id.unwrap(), &ctx, |a| a.remove()).dyn_call(ctx.clone(), arg)
}
fn call_ref<'a>(
&'a self,
AtomCtx(_, id, ctx): AtomCtx<'a>,
arg: api::ExprTicket,
) -> LocalBoxFuture<'a, Expr> {
) -> LocalBoxFuture<'a, GExpr> {
async move {
with_atom(id.unwrap(), &ctx, |a| clone!(ctx; async move { a.dyn_call_ref(ctx, arg).await }))
.await
@@ -83,7 +88,8 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
async move {
with_atom(id.unwrap(), &ctx, |a| {
clone!(ctx; async move {
self.0.dispatch(a.as_any_ref().downcast_ref().unwrap(), ctx, key, req, rep).await
let ms = self.ms.get_or_init(self.msbuild.pack(ctx.clone())).await;
ms.dispatch(a.as_any_ref().downcast_ref().unwrap(), ctx, key, req, rep).await
})
})
.await
@@ -93,7 +99,7 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
fn command<'a>(
&'a self,
AtomCtx(_, id, ctx): AtomCtx<'a>,
) -> LocalBoxFuture<'a, OrcRes<Option<Expr>>> {
) -> LocalBoxFuture<'a, OrcRes<Option<GExpr>>> {
async move { with_atom(id.unwrap(), &ctx, |a| a.remove().dyn_command(ctx.clone())).await }
.boxed_local()
}
@@ -111,7 +117,7 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
id.encode(write);
with_atom(id, &ctx, |a| clone!(ctx; async move { a.dyn_serialize(ctx, write).await }))
.await
.map(|v| v.into_iter().map(|t| t.handle().unwrap().tk).collect_vec())
.map(|v| v.into_iter().map(|t| t.handle().tk).collect_vec())
}
.boxed_local()
}
@@ -123,9 +129,9 @@ impl<T: OwnedAtom> AtomDynfo for OwnedAtomDynfo<T> {
) -> LocalBoxFuture<'a, api::Atom> {
async move {
let refs =
refs.iter().map(|tk| Expr::from_handle(Arc::new(ExprHandle::from_args(ctx.clone(), *tk))));
refs.iter().map(|tk| Expr::from_handle(Rc::new(ExprHandle::from_args(ctx.clone(), *tk))));
let obj = T::deserialize(DeserCtxImpl(data, &ctx), T::Refs::from_iter(refs)).await;
obj._factory().build(ctx)
obj._factory().build(ctx).await
}
.boxed_local()
}
@@ -197,20 +203,20 @@ pub trait OwnedAtom: Atomic<Variant = OwnedVariant> + Any + Clone + 'static {
type Refs: RefSet;
fn val(&self) -> impl Future<Output = Cow<'_, Self::Data>>;
#[allow(unused_variables)]
fn call_ref(&self, arg: ExprHandle) -> impl Future<Output = Expr> {
async { bot([err_not_callable().await]) }
fn call_ref(&self, arg: ExprHandle) -> impl Future<Output = GExpr> {
async move { bot([err_not_callable(&arg.ctx.i).await]) }
}
fn call(self, arg: ExprHandle) -> impl Future<Output = Expr> {
fn call(self, arg: ExprHandle) -> impl Future<Output = GExpr> {
async {
let ctx = arg.get_ctx();
let gcl = self.call_ref(arg).await;
self.free(ctx);
self.free(ctx).await;
gcl
}
}
#[allow(unused_variables)]
fn command(self, ctx: SysCtx) -> impl Future<Output = OrcRes<Option<Expr>>> {
async { Err(err_not_command().await.into()) }
fn command(self, ctx: SysCtx) -> impl Future<Output = OrcRes<Option<GExpr>>> {
async move { Err(err_not_command(&ctx.i).await.into()) }
}
#[allow(unused_variables)]
fn free(self, ctx: SysCtx) -> impl Future<Output = ()> { async {} }
@@ -245,9 +251,10 @@ pub trait DynOwnedAtom: 'static {
fn atom_tid(&self) -> TypeId;
fn as_any_ref(&self) -> &dyn Any;
fn encode<'a>(&'a self, buffer: &'a mut dyn Write) -> LocalBoxFuture<'a, ()>;
fn dyn_call_ref(&self, ctx: SysCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, Expr>;
fn dyn_call(self: Box<Self>, ctx: SysCtx, arg: api::ExprTicket) -> LocalBoxFuture<'static, Expr>;
fn dyn_command(self: Box<Self>, ctx: SysCtx) -> LocalBoxFuture<'static, OrcRes<Option<Expr>>>;
fn dyn_call_ref(&self, ctx: SysCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, GExpr>;
fn dyn_call(self: Box<Self>, ctx: SysCtx, arg: api::ExprTicket)
-> LocalBoxFuture<'static, GExpr>;
fn dyn_command(self: Box<Self>, ctx: SysCtx) -> LocalBoxFuture<'static, OrcRes<Option<GExpr>>>;
fn dyn_free(self: Box<Self>, ctx: SysCtx) -> LocalBoxFuture<'static, ()>;
fn dyn_print(&self, ctx: SysCtx) -> LocalBoxFuture<'_, String>;
fn dyn_serialize<'a>(
@@ -262,13 +269,17 @@ impl<T: OwnedAtom> DynOwnedAtom for T {
fn encode<'a>(&'a self, buffer: &'a mut dyn Write) -> LocalBoxFuture<'a, ()> {
async { self.val().await.as_ref().encode(buffer) }.boxed_local()
}
fn dyn_call_ref(&self, ctx: SysCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, Expr> {
fn dyn_call_ref(&self, ctx: SysCtx, arg: api::ExprTicket) -> LocalBoxFuture<'_, GExpr> {
self.call_ref(ExprHandle::from_args(ctx, arg)).boxed_local()
}
fn dyn_call(self: Box<Self>, ctx: SysCtx, arg: api::ExprTicket) -> LocalBoxFuture<'static, Expr> {
fn dyn_call(
self: Box<Self>,
ctx: SysCtx,
arg: api::ExprTicket,
) -> LocalBoxFuture<'static, GExpr> {
self.call(ExprHandle::from_args(ctx, arg)).boxed_local()
}
fn dyn_command(self: Box<Self>, ctx: SysCtx) -> LocalBoxFuture<'static, OrcRes<Option<Expr>>> {
fn dyn_command(self: Box<Self>, ctx: SysCtx) -> LocalBoxFuture<'static, OrcRes<Option<GExpr>>> {
self.command(ctx).boxed_local()
}
fn dyn_free(self: Box<Self>, ctx: SysCtx) -> LocalBoxFuture<'static, ()> {
@@ -287,4 +298,4 @@ impl<T: OwnedAtom> DynOwnedAtom for T {
}
}
pub type ObjStore = Arc<IdStore<Box<dyn DynOwnedAtom>>>;
pub type ObjStore = Rc<IdStore<Box<dyn DynOwnedAtom>>>;