use std::borrow::Cow; use std::marker::PhantomData; use std::rc::Rc; use futures::channel::mpsc::{Sender, channel}; use futures::lock::Mutex; use futures::stream::{self, LocalBoxStream}; use futures::{FutureExt, SinkExt, StreamExt}; use never::Never; use orchid_base::OrcRes; use crate::gen_expr::{GExpr, arg, call, lam, new_atom, seq}; use crate::{Atomic, Expr, OwnedAtom, OwnedVariant, ToExpr, TryFromExpr}; enum Command { Execute(GExpr, Sender), Halt(GExpr), } struct BuilderCoroutineData { receiver: Mutex>, } #[derive(Clone)] struct BuilderCoroutine(Rc); impl BuilderCoroutine { pub async fn run(self) -> GExpr { let cmd = self.0.receiver.lock().await.next().await; match cmd { None => panic!("Exec handle dropped and coroutine blocked instead of returning"), Some(Command::Halt(expr)) => expr, Some(Command::Execute(expr, reply)) => call(lam::<0>(seq(arg(0), call(new_atom(Replier { reply, builder: self }), arg(0)))), expr) .await, } } } #[derive(Clone)] pub(crate) struct Replier { reply: Sender, builder: BuilderCoroutine, } impl Atomic for Replier { type Data = (); type Variant = OwnedVariant; } impl OwnedAtom for Replier { type Refs = Never; async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) } async fn call(mut self, arg: Expr) -> impl ToExpr { self.reply.send(arg).await.expect("Resolution request dropped after sending"); std::mem::drop(self.reply); self.builder.run().await } } /// A long-lived async context that can yield to the executor. The expression /// representing an in-progress exec block is not serializable. pub async fn exec(f: impl for<'a> AsyncFnOnce(ExecHandle<'a>) -> R + 'static) -> GExpr { let (cmd_snd, cmd_recv) = channel(0); let halt = async { Command::Halt(f(ExecHandle(cmd_snd, PhantomData)).await.to_gen().await) }.into_stream(); let coro = BuilderCoroutine(Rc::new(BuilderCoroutineData { receiver: Mutex::new(stream::select(halt, cmd_recv).boxed_local()), })); coro.run().await } static WEIRD_DROP_ERR: &str = "Coroutine dropped while we are being polled somehow"; /// The handle an [exec] callback uses to yield to the executor pub struct ExecHandle<'a>(Sender, PhantomData<&'a ()>); impl ExecHandle<'_> { /// Yield to the executor by resolving to an expression that normalizes the /// value and then calls the continuation of the body with the result. pub async fn exec(&mut self, val: impl ToExpr) -> OrcRes { let (reply_snd, mut reply_recv) = channel(1); self.0.send(Command::Execute(val.to_gen().await, reply_snd)).await.expect(WEIRD_DROP_ERR); T::try_from_expr(reply_recv.next().await.expect(WEIRD_DROP_ERR)).await } }