|
|
|
|
@@ -1,349 +0,0 @@
|
|
|
|
|
use std::borrow::Cow;
|
|
|
|
|
use std::cell::RefCell;
|
|
|
|
|
use std::cmp::{Ordering, Reverse};
|
|
|
|
|
use std::collections::{BinaryHeap, VecDeque};
|
|
|
|
|
use std::fmt::Debug;
|
|
|
|
|
use std::mem;
|
|
|
|
|
use std::num::NonZeroU64;
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::rc::Rc;
|
|
|
|
|
use std::task::{Context, Poll, Waker};
|
|
|
|
|
use std::time::Instant;
|
|
|
|
|
|
|
|
|
|
use async_event::Event;
|
|
|
|
|
use chrono::TimeDelta;
|
|
|
|
|
use futures::channel::{mpsc, oneshot};
|
|
|
|
|
use futures::{FutureExt, select};
|
|
|
|
|
use hashbrown::HashMap;
|
|
|
|
|
use never::Never;
|
|
|
|
|
use orchid_api_derive::{Coding, Hierarchy};
|
|
|
|
|
use orchid_api_traits::Request;
|
|
|
|
|
use orchid_base::{FmtCtxImpl, OrcRes};
|
|
|
|
|
use orchid_extension::ToExpr;
|
|
|
|
|
use orchid_extension::entrypoint::spawn;
|
|
|
|
|
use orchid_extension::Expr;
|
|
|
|
|
use orchid_extension::gen_expr::{GExpr, IntoGExprStream, call, new_atom};
|
|
|
|
|
use orchid_extension::system::cted;
|
|
|
|
|
use orchid_extension::tree::{GenMember, cnst, comments, fun, prefix};
|
|
|
|
|
use orchid_extension::{
|
|
|
|
|
Atomic, ForeignAtom, OwnedAtom, OwnedVariant, err_not_callable, err_not_command,
|
|
|
|
|
};
|
|
|
|
|
use rust_decimal::prelude::Zero;
|
|
|
|
|
use tokio::task::{JoinHandle, spawn_local};
|
|
|
|
|
use tokio::time::sleep;
|
|
|
|
|
|
|
|
|
|
use crate::std::std_system::StdReq;
|
|
|
|
|
use crate::std::time::OrcDT;
|
|
|
|
|
use crate::{StdSystem, api};
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Copy, Coding, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
|
|
|
|
pub struct AsyncTaskId(NonZeroU64);
|
|
|
|
|
|
|
|
|
|
/// Signals to the scheduler that some async work is in progress, and to take
|
|
|
|
|
/// ownership of this expression representing the progress of that work. This
|
|
|
|
|
/// doesn't have to be called before [FinishAsyncWork] if keeping the work and
|
|
|
|
|
/// thus the requesting system alive is not necessary
|
|
|
|
|
#[derive(Debug, Clone, Coding, Hierarchy)]
|
|
|
|
|
#[extends(FutureReq, StdReq)]
|
|
|
|
|
pub struct AddAsyncWork(pub api::ExprTicket);
|
|
|
|
|
impl Request for AddAsyncWork {
|
|
|
|
|
type Response = AsyncTaskId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Signals to the scheduler that some async work has been finished, and to
|
|
|
|
|
/// return this expression from a future `std::future::yield` call.
|
|
|
|
|
/// If [AddAsyncWork] was called before this, include the [AsyncTaskId] you
|
|
|
|
|
/// received to unlink the work from the scheduler so that cleanup is not
|
|
|
|
|
/// blocked.
|
|
|
|
|
#[derive(Debug, Clone, Coding, Hierarchy)]
|
|
|
|
|
#[extends(FutureReq, StdReq)]
|
|
|
|
|
pub struct FinishAsyncWork(pub Option<AsyncTaskId>, pub api::ExprTicket);
|
|
|
|
|
impl Request for FinishAsyncWork {
|
|
|
|
|
type Response = Result<(), SchedulerError>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Coding)]
|
|
|
|
|
pub struct SchedulerError;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Coding, Hierarchy)]
|
|
|
|
|
#[extendable]
|
|
|
|
|
#[extends(StdReq)]
|
|
|
|
|
pub enum FutureReq {
|
|
|
|
|
AddAsyncWork(AddAsyncWork),
|
|
|
|
|
FinishAsyncWork(FinishAsyncWork),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct Timer {
|
|
|
|
|
set_at: Instant,
|
|
|
|
|
delay: TimeDelta,
|
|
|
|
|
repetition: Option<u64>,
|
|
|
|
|
cancelled: Rc<Event>,
|
|
|
|
|
action: Expr,
|
|
|
|
|
}
|
|
|
|
|
impl Timer {
|
|
|
|
|
pub fn next_occurrence(&self) -> Instant {
|
|
|
|
|
let delay_mult = i32::try_from(self.repetition.unwrap_or(0) + 1).unwrap();
|
|
|
|
|
self.set_at + (self.delay * delay_mult).to_std().unwrap()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl PartialEq for Timer {
|
|
|
|
|
fn eq(&self, other: &Self) -> bool { self.next_occurrence().eq(&other.next_occurrence()) }
|
|
|
|
|
}
|
|
|
|
|
impl Eq for Timer {}
|
|
|
|
|
impl PartialOrd for Timer {
|
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) }
|
|
|
|
|
}
|
|
|
|
|
impl Ord for Timer {
|
|
|
|
|
fn cmp(&self, other: &Self) -> Ordering { self.next_occurrence().cmp(&other.next_occurrence()) }
|
|
|
|
|
}
|
|
|
|
|
impl Atomic for Timer {
|
|
|
|
|
type Variant = OwnedVariant;
|
|
|
|
|
type Data = ();
|
|
|
|
|
}
|
|
|
|
|
impl OwnedAtom for Timer {
|
|
|
|
|
type Refs = Never;
|
|
|
|
|
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
|
|
|
|
|
async fn command(mut self) -> CmdResult {
|
|
|
|
|
let sleep_until =
|
|
|
|
|
self.set_at + (self.delay * self.repetition.unwrap_or(1) as i32).to_std().unwrap();
|
|
|
|
|
let (timer_ready, on_timer_ready) = oneshot::channel();
|
|
|
|
|
let task = spawn(self.delay.to_std().unwrap(), async move { mem::drop(timer_ready.send(())) });
|
|
|
|
|
let res =
|
|
|
|
|
self.cancelled.wait_until_or_timeout(|| Some(()), on_timer_ready.map(mem::drop)).await;
|
|
|
|
|
task.abort();
|
|
|
|
|
// cancelled
|
|
|
|
|
if let Some(()) = res {
|
|
|
|
|
return Continuation::default().into();
|
|
|
|
|
}
|
|
|
|
|
// TODO: add binary API for sleep and
|
|
|
|
|
let mut ret = Continuation::default().into();
|
|
|
|
|
let mut ret = vec![self.action.to_gen().await];
|
|
|
|
|
if let Some(rep) = self.repetition.as_mut() {
|
|
|
|
|
*rep = *rep + 1;
|
|
|
|
|
ret.push(new_atom(self));
|
|
|
|
|
}
|
|
|
|
|
Ok(ret)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct SchedulerState {
|
|
|
|
|
/// Waker to call when async work finishes
|
|
|
|
|
finish_waker: Waker,
|
|
|
|
|
timer_task: Option<(Instant, JoinHandle<()>)>,
|
|
|
|
|
id: NonZeroU64,
|
|
|
|
|
background: HashMap<AsyncTaskId, Expr>,
|
|
|
|
|
ready: VecDeque<Expr>,
|
|
|
|
|
timers: BinaryHeap<Reverse<Timer>>,
|
|
|
|
|
}
|
|
|
|
|
impl SchedulerState {
|
|
|
|
|
fn activate_timers(&mut self, now: Instant) {
|
|
|
|
|
while let Some(t) = self.timers.peek()
|
|
|
|
|
&& t.0.next_occurrence() < now
|
|
|
|
|
{
|
|
|
|
|
let mut timer = self.timers.pop().unwrap().0;
|
|
|
|
|
let work = timer.action.clone();
|
|
|
|
|
self.ready.push_back(work);
|
|
|
|
|
if let Some(count) = timer.repetition {
|
|
|
|
|
timer.repetition = Some(count + 1);
|
|
|
|
|
self.timers.push(Reverse(timer));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl Debug for SchedulerState {
|
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
|
f.debug_struct("SchedulerState").finish_non_exhaustive()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl Default for SchedulerState {
|
|
|
|
|
fn default() -> Self {
|
|
|
|
|
SchedulerState {
|
|
|
|
|
background: HashMap::new(),
|
|
|
|
|
finish_waker: Waker::noop().clone(),
|
|
|
|
|
id: NonZeroU64::MIN,
|
|
|
|
|
timer_task: None,
|
|
|
|
|
ready: VecDeque::new(),
|
|
|
|
|
timers: BinaryHeap::new(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, Default)]
|
|
|
|
|
pub struct Scheduler(Rc<RefCell<SchedulerState>>);
|
|
|
|
|
impl Scheduler {
|
|
|
|
|
pub(crate) async fn add(&self, req: &AddAsyncWork) -> <AddAsyncWork as Request>::Response {
|
|
|
|
|
let expr = Expr::deserialize(req.0).await;
|
|
|
|
|
let mut this = self.0.borrow_mut();
|
|
|
|
|
let id = AsyncTaskId(this.id);
|
|
|
|
|
this.background.insert(id, expr);
|
|
|
|
|
this.id = this.id.checked_add(1).unwrap();
|
|
|
|
|
id
|
|
|
|
|
}
|
|
|
|
|
pub(crate) async fn finish(
|
|
|
|
|
&self,
|
|
|
|
|
req: &FinishAsyncWork,
|
|
|
|
|
) -> <FinishAsyncWork as Request>::Response {
|
|
|
|
|
let expr = Expr::deserialize(req.1).await;
|
|
|
|
|
let mut g = self.0.borrow_mut();
|
|
|
|
|
if let Some(id) = req.0 {
|
|
|
|
|
g.background.remove(&id);
|
|
|
|
|
}
|
|
|
|
|
g.ready.push_back(expr);
|
|
|
|
|
g.finish_waker.wake_by_ref();
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct Yield;
|
|
|
|
|
impl Atomic for Yield {
|
|
|
|
|
type Variant = OwnedVariant;
|
|
|
|
|
type Data = ();
|
|
|
|
|
}
|
|
|
|
|
impl OwnedAtom for Yield {
|
|
|
|
|
type Refs = Never;
|
|
|
|
|
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
|
|
|
|
|
async fn command(self) -> OrcRes<()> { Ok(()) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct Spawn(ForeignAtom, ForeignAtom);
|
|
|
|
|
impl Atomic for Spawn {
|
|
|
|
|
type Variant = OwnedVariant;
|
|
|
|
|
type Data = [api::ExprTicket; 2];
|
|
|
|
|
}
|
|
|
|
|
impl OwnedAtom for Spawn {
|
|
|
|
|
type Refs = Never;
|
|
|
|
|
async fn val(&self) -> Cow<'_, Self::Data> {
|
|
|
|
|
Cow::Owned([self.0.clone().ex().handle().ticket(), self.1.clone().ex().handle().ticket()])
|
|
|
|
|
}
|
|
|
|
|
async fn command(self) -> OrcRes<impl IntoGExprStream> { Ok((self.1, self.0)) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct Canceller {
|
|
|
|
|
cont: Option<Expr>,
|
|
|
|
|
cancel: Rc<RefCell<Option<oneshot::Sender<()>>>>,
|
|
|
|
|
}
|
|
|
|
|
impl Atomic for Canceller {
|
|
|
|
|
type Variant = OwnedVariant;
|
|
|
|
|
type Data = ();
|
|
|
|
|
}
|
|
|
|
|
impl OwnedAtom for Canceller {
|
|
|
|
|
type Refs = Never;
|
|
|
|
|
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
|
|
|
|
|
async fn call_ref(&self, arg: Expr) -> impl ToExpr {
|
|
|
|
|
match &self.cont {
|
|
|
|
|
Some(_) => Err(err_not_callable(&self.print_atom(&FmtCtxImpl::default()).await).await),
|
|
|
|
|
None => Ok(new_atom(Self { cont: Some(arg), cancel: self.cancel.clone() })),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
async fn command(self) -> OrcRes<impl IntoGExprStream> {
|
|
|
|
|
let Some(cont) = self.cont else {
|
|
|
|
|
return Err(err_not_command(&self.print_atom(&FmtCtxImpl::default()).await).await);
|
|
|
|
|
};
|
|
|
|
|
if let Some(canceller) = self.cancel.borrow_mut().take() {
|
|
|
|
|
canceller.send(());
|
|
|
|
|
}
|
|
|
|
|
Ok(cont)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct SetTimer {
|
|
|
|
|
delay: TimeDelta,
|
|
|
|
|
recurring: bool,
|
|
|
|
|
action: Expr,
|
|
|
|
|
cont: Expr,
|
|
|
|
|
}
|
|
|
|
|
impl Atomic for SetTimer {
|
|
|
|
|
type Variant = OwnedVariant;
|
|
|
|
|
type Data = ();
|
|
|
|
|
}
|
|
|
|
|
impl OwnedAtom for SetTimer {
|
|
|
|
|
type Refs = Never;
|
|
|
|
|
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
|
|
|
|
|
async fn command(self) -> OrcRes<impl IntoGExprStream> {
|
|
|
|
|
let (send, recv) = oneshot::channel();
|
|
|
|
|
Ok((
|
|
|
|
|
new_atom(Timer {
|
|
|
|
|
set_at: Instant::now(),
|
|
|
|
|
delay: self.delay,
|
|
|
|
|
cancelled: Rc::new(recv),
|
|
|
|
|
repetition: self.recurring.then_some(1),
|
|
|
|
|
action: self.action,
|
|
|
|
|
}),
|
|
|
|
|
call(
|
|
|
|
|
self.cont,
|
|
|
|
|
new_atom(Canceller { cont: None, cancel: Rc::new(RefCell::new(Some(send))) }),
|
|
|
|
|
),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn gen_future_lib() -> Vec<GenMember> {
|
|
|
|
|
prefix("std", [comments(
|
|
|
|
|
[
|
|
|
|
|
"This library exposes a futures executor, and tools for timing and cooperative multitasking. \
|
|
|
|
|
The use of these tools is only possible in a command trampoline, i.e. a caller that always \
|
|
|
|
|
defers to the command implementation of an atom.",
|
|
|
|
|
"Any command that correctly integrates with this library should return `std::future::yield` \
|
|
|
|
|
as its final value on all codepaths, which is the (re)entry point of the trampoline. \
|
|
|
|
|
Returning any other command, especially the ones in `std::exit_code` causes the program to \
|
|
|
|
|
immediately exit.",
|
|
|
|
|
"Cancellers take a continuation, stop whatever process they are associated with from \
|
|
|
|
|
proceeding, and call the continuation with information about the cancelled work.",
|
|
|
|
|
"|type canceller: \\T ((T -> cmd) -> cmd)|",
|
|
|
|
|
],
|
|
|
|
|
prefix("future", [
|
|
|
|
|
comments(
|
|
|
|
|
[
|
|
|
|
|
"A command without a continuation that defers control to the queued set of commands.",
|
|
|
|
|
"|type: cmd|",
|
|
|
|
|
],
|
|
|
|
|
cnst(true, "yield", new_atom(Yield)),
|
|
|
|
|
),
|
|
|
|
|
comments(
|
|
|
|
|
[
|
|
|
|
|
"Takes two commands and queues both to be executed one after the other.",
|
|
|
|
|
"|type: cmd -> cmd -> cmd|",
|
|
|
|
|
],
|
|
|
|
|
fun(true, "spawn", async |left: ForeignAtom, right: ForeignAtom| {
|
|
|
|
|
new_atom(Spawn(left, right))
|
|
|
|
|
}),
|
|
|
|
|
),
|
|
|
|
|
comments(
|
|
|
|
|
[
|
|
|
|
|
"Takes a time amount to wait, the command to perform after waiting, and a continuation, \
|
|
|
|
|
and returns a command that sets a single-fire timeout. The continuation will be \
|
|
|
|
|
called with a canceller, which reports true if the task has not yet run.",
|
|
|
|
|
"|type: Duration -> cmd -> (canceller bool -> cmd) -> cmd|",
|
|
|
|
|
],
|
|
|
|
|
fun(true, "timeout", async |OrcDT(delay): OrcDT, action: Expr, cont: Expr| {
|
|
|
|
|
new_atom(SetTimer { delay, action, cont, recurring: false })
|
|
|
|
|
}),
|
|
|
|
|
),
|
|
|
|
|
comments(
|
|
|
|
|
[
|
|
|
|
|
"Takes a time amount to wait between repetitions, the command to perform periodically, \
|
|
|
|
|
and a continuation, and returns a command. The continuation will be called with a \
|
|
|
|
|
canceller, which reports how many times the interval has run.",
|
|
|
|
|
"|type: Duration -> cmd -> (canceller Int -> cmd) -> cmd|",
|
|
|
|
|
],
|
|
|
|
|
fun(true, "interval", async |OrcDT(delay): OrcDT, action: Expr, cont: Expr| {
|
|
|
|
|
new_atom(SetTimer { delay, action, cont, recurring: true })
|
|
|
|
|
}),
|
|
|
|
|
),
|
|
|
|
|
]),
|
|
|
|
|
)])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get_scheduler() -> Scheduler {
|
|
|
|
|
let cted = cted();
|
|
|
|
|
let std = cted.as_any().downcast_ref::<StdSystem>().unwrap();
|
|
|
|
|
let sched = std.sched.get_or_init(Scheduler::default);
|
|
|
|
|
sched.clone()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct AsyncTaskAtom {}
|