use std::borrow::Cow; use std::marker::PhantomData; use std::rc::Rc; use futures::channel::mpsc::{Sender, channel}; use futures::lock::Mutex; use futures::stream::{self, LocalBoxStream}; use futures::{FutureExt, SinkExt, StreamExt}; use never::Never; use orchid_base::error::OrcRes; use crate::atom::Atomic; use crate::atom_owned::{OwnedAtom, OwnedVariant}; use crate::conv::{ToExpr, TryFromExpr}; use crate::expr::Expr; use crate::gen_expr::{GExpr, arg, call, lambda, seq}; enum Command { Execute(GExpr, Sender), Register(GExpr, Sender), Halt(GExpr), } struct BuilderCoroutineData { receiver: Mutex>, } #[derive(Clone)] struct BuilderCoroutine(Rc); impl BuilderCoroutine { pub async fn run(self) -> GExpr { let cmd = self.0.receiver.lock().await.next().await; match cmd { None => panic!("Before the stream ends, we should have gotten a Halt"), Some(Command::Halt(expr)) => expr, Some(Command::Execute(expr, reply)) => call( lambda(0, [seq( [arg(0)], call(Replier { reply, builder: self }.to_gen().await, [arg(0)]), )]), [expr], ), Some(Command::Register(expr, reply)) => call(Replier { reply, builder: self }.to_gen().await, [expr]), } } } #[derive(Clone)] pub struct Replier { reply: Sender, builder: BuilderCoroutine, } impl Atomic for Replier { type Data = (); type Variant = OwnedVariant; } impl OwnedAtom for Replier { type Refs = Never; async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } async fn call(mut self, arg: Expr) -> GExpr { self.reply.send(arg).await.expect("What the heck"); std::mem::drop(self.reply); self.builder.run().await } } pub async fn exec(f: impl for<'a> AsyncFnOnce(ExecHandle<'a>) -> R + 'static) -> GExpr { let (cmd_snd, cmd_recv) = channel(0); let halt = async { Command::Halt(f(ExecHandle(cmd_snd, PhantomData)).await.to_gen().await) } .into_stream(); let coro = BuilderCoroutine(Rc::new(BuilderCoroutineData { receiver: Mutex::new(stream::select(halt, cmd_recv).boxed_local()), })); coro.run().await } static WEIRD_DROP_ERR: &str = "Coroutine dropped while we are being polled somehow"; pub struct ExecHandle<'a>(Sender, PhantomData<&'a ()>); impl ExecHandle<'_> { pub async fn exec(&mut self, val: impl ToExpr) -> OrcRes { let (reply_snd, mut reply_recv) = channel(1); self.0.send(Command::Execute(val.to_gen().await, reply_snd)).await.expect(WEIRD_DROP_ERR); T::try_from_expr(reply_recv.next().await.expect(WEIRD_DROP_ERR)).await } pub async fn register(&mut self, val: impl ToExpr) -> Expr { let (reply_snd, mut reply_recv) = channel(1); self.0.send(Command::Register(val.to_gen().await, reply_snd)).await.expect(WEIRD_DROP_ERR); reply_recv.next().await.expect(WEIRD_DROP_ERR) } }