in midst of refactor
This commit is contained in:
7
orchidlang/src/libs/asynch/async.orc
Normal file
7
orchidlang/src/libs/asynch/async.orc
Normal file
@@ -0,0 +1,7 @@
|
||||
import std::panic
|
||||
|
||||
export const block_on := \action. \cont. (
|
||||
action cont
|
||||
(\e.panic "unwrapped asynch call")
|
||||
\c.yield
|
||||
)
|
||||
13
orchidlang/src/libs/asynch/delete_cell.rs
Normal file
13
orchidlang/src/libs/asynch/delete_cell.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub struct DeleteCell<T>(pub Arc<Mutex<Option<T>>>);
|
||||
impl<T> DeleteCell<T> {
|
||||
pub fn new(t: T) -> Self { Self(Arc::new(Mutex::new(Some(t)))) }
|
||||
pub fn take(&self) -> Option<T> { self.0.lock().unwrap().take() }
|
||||
}
|
||||
impl<T: Clone> DeleteCell<T> {
|
||||
pub fn clone_out(&self) -> Option<T> { self.0.lock().unwrap().clone() }
|
||||
}
|
||||
impl<T> Clone for DeleteCell<T> {
|
||||
fn clone(&self) -> Self { Self(self.0.clone()) }
|
||||
}
|
||||
9
orchidlang/src/libs/asynch/mod.rs
Normal file
9
orchidlang/src/libs/asynch/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
//! An event queue other systems can use to trigger events on the main
|
||||
//! interpreter thread. These events are handled when the Orchid code returns
|
||||
//! `system::async::yield`, and may cause additional Orchid code to be executed
|
||||
//! beyond being general Rust functions.
|
||||
//! It also exposes timers.
|
||||
|
||||
mod delete_cell;
|
||||
pub mod poller;
|
||||
pub mod system;
|
||||
151
orchidlang/src/libs/asynch/poller.rs
Normal file
151
orchidlang/src/libs/asynch/poller.rs
Normal file
@@ -0,0 +1,151 @@
|
||||
//! Abstract implementation of the poller
|
||||
|
||||
use std::collections::BinaryHeap;
|
||||
use std::mem;
|
||||
use std::sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender};
|
||||
use std::thread::sleep;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::delete_cell::DeleteCell;
|
||||
|
||||
enum TimerKind<TOnce, TRec> {
|
||||
Once(DeleteCell<TOnce>),
|
||||
Recurring { period: Duration, data_cell: DeleteCell<TRec> },
|
||||
}
|
||||
impl<TOnce, TRec> Clone for TimerKind<TOnce, TRec> {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Self::Once(c) => Self::Once(c.clone()),
|
||||
Self::Recurring { period, data_cell: data } =>
|
||||
Self::Recurring { period: *period, data_cell: data.clone() },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicates a bit of code which is to be executed at a
|
||||
/// specific point in time
|
||||
///
|
||||
/// In order to work with Rust's builtin [BinaryHeap] which is a max heap, the
|
||||
/// [Ord] implemenetation of this struct is reversed; it can be intuitively
|
||||
/// thought of as ordering by urgency.
|
||||
struct Timer<TOnce, TRec> {
|
||||
expires: Instant,
|
||||
kind: TimerKind<TOnce, TRec>,
|
||||
}
|
||||
impl<TOnce, TRec> Clone for Timer<TOnce, TRec> {
|
||||
fn clone(&self) -> Self { Self { expires: self.expires, kind: self.kind.clone() } }
|
||||
}
|
||||
impl<TOnce, TRec> Eq for Timer<TOnce, TRec> {}
|
||||
impl<TOnce, TRec> PartialEq for Timer<TOnce, TRec> {
|
||||
fn eq(&self, other: &Self) -> bool { self.expires.eq(&other.expires) }
|
||||
}
|
||||
impl<TOnce, TRec> PartialOrd for Timer<TOnce, TRec> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(other.cmp(self)) }
|
||||
}
|
||||
impl<TOnce, TRec> Ord for Timer<TOnce, TRec> {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering { other.expires.cmp(&self.expires) }
|
||||
}
|
||||
|
||||
/// Representation of a scheduled timer
|
||||
#[derive(Clone)]
|
||||
pub struct TimerHandle<T>(DeleteCell<T>);
|
||||
impl<T> TimerHandle<T> {
|
||||
/// Cancel the timer
|
||||
pub fn cancel(self) { mem::drop(self.0.take()) }
|
||||
}
|
||||
|
||||
/// The abstract event poller implementation used by the standard asynch
|
||||
/// subsystem.
|
||||
pub struct Poller<TEv, TOnce, TRec: Clone> {
|
||||
timers: BinaryHeap<Timer<TOnce, TRec>>,
|
||||
receiver: Receiver<TEv>,
|
||||
}
|
||||
|
||||
impl<TEv, TOnce, TRec: Clone> Poller<TEv, TOnce, TRec> {
|
||||
/// Create an event poller and a [Sender] that can produce events on it.
|
||||
pub fn new() -> (Sender<TEv>, Self) {
|
||||
let (sender, receiver) = channel();
|
||||
let this = Self { receiver, timers: BinaryHeap::new() };
|
||||
(sender, this)
|
||||
}
|
||||
|
||||
/// Set a single-fire timer
|
||||
pub fn set_timeout(&mut self, duration: Duration, data: TOnce) -> TimerHandle<TOnce> {
|
||||
let data_cell = DeleteCell::new(data);
|
||||
self
|
||||
.timers
|
||||
.push(Timer { kind: TimerKind::Once(data_cell.clone()), expires: Instant::now() + duration });
|
||||
TimerHandle(data_cell)
|
||||
}
|
||||
|
||||
/// Set a recurring timer
|
||||
pub fn set_interval(&mut self, period: Duration, data: TRec) -> TimerHandle<TRec> {
|
||||
let data_cell = DeleteCell::new(data);
|
||||
self.timers.push(Timer {
|
||||
expires: Instant::now() + period,
|
||||
kind: TimerKind::Recurring { period, data_cell: data_cell.clone() },
|
||||
});
|
||||
TimerHandle(data_cell)
|
||||
}
|
||||
|
||||
/// Process a timer popped from the timers heap of this event loop.
|
||||
/// This function returns [None] if the timer had been cancelled. **This
|
||||
/// behaviour is different from [EventLoop::run] which is returns None if
|
||||
/// the event loop is empty, even though the types are compatible.**
|
||||
fn process_next_timer(
|
||||
&mut self,
|
||||
Timer { expires, kind }: Timer<TOnce, TRec>,
|
||||
) -> Option<PollEvent<TEv, TOnce, TRec>> {
|
||||
Some(match kind {
|
||||
TimerKind::Once(data) => PollEvent::Once(data.take()?),
|
||||
TimerKind::Recurring { period, data_cell } => {
|
||||
let data = data_cell.clone_out()?;
|
||||
self.timers.push(Timer {
|
||||
expires: expires + period,
|
||||
kind: TimerKind::Recurring { period, data_cell },
|
||||
});
|
||||
PollEvent::Recurring(data)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Block until a message is received or the first timer expires
|
||||
pub fn run(&mut self) -> Option<PollEvent<TEv, TOnce, TRec>> {
|
||||
loop {
|
||||
if let Some(expires) = self.timers.peek().map(|t| t.expires) {
|
||||
return match self.receiver.recv_timeout(expires - Instant::now()) {
|
||||
Ok(t) => Some(PollEvent::Event(t)),
|
||||
Err(e) => {
|
||||
if e == RecvTimeoutError::Disconnected {
|
||||
// The receiver is now inert, but the timer must finish
|
||||
sleep(expires - Instant::now());
|
||||
}
|
||||
// pop and process the timer we've been waiting on
|
||||
let timer = self.timers.pop().expect("checked before wait");
|
||||
let result = self.process_next_timer(timer);
|
||||
// if the timer had been cancelled, repeat
|
||||
if result.is_none() {
|
||||
continue;
|
||||
}
|
||||
result
|
||||
},
|
||||
};
|
||||
} else {
|
||||
return match self.receiver.recv() {
|
||||
Ok(t) => Some(PollEvent::Event(t)),
|
||||
Err(RecvError) => None,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Events produced by [Poller].
|
||||
pub enum PollEvent<TEv, TOnce, TRec> {
|
||||
/// An event was sent to the [Sender] associated with the [Poller].
|
||||
Event(TEv),
|
||||
/// A single-fire timer expired
|
||||
Once(TOnce),
|
||||
/// A recurring event fired
|
||||
Recurring(TRec),
|
||||
}
|
||||
210
orchidlang/src/libs/asynch/system.rs
Normal file
210
orchidlang/src/libs/asynch/system.rs
Normal file
@@ -0,0 +1,210 @@
|
||||
//! Object to pass to [crate::facade::loader::Loader::add_system] to enable the
|
||||
//! I/O subsystem. Also many other systems depend on it, these take a mut ref to
|
||||
//! register themselves.
|
||||
|
||||
use std::any::{type_name, Any, TypeId};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::rc::Rc;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use ordered_float::NotNan;
|
||||
use rust_embed::RustEmbed;
|
||||
|
||||
use super::poller::{PollEvent, Poller, TimerHandle};
|
||||
use crate::facade::system::{IntoSystem, System};
|
||||
use crate::foreign::atom::Atomic;
|
||||
use crate::foreign::cps_box::CPSBox;
|
||||
use crate::foreign::error::RTError;
|
||||
use crate::foreign::inert::{Inert, InertPayload};
|
||||
use crate::gen::tpl;
|
||||
use crate::gen::traits::Gen;
|
||||
use crate::gen::tree::{atom_ent, xfn_ent, ConstTree};
|
||||
use crate::interpreter::gen_nort::nort_gen;
|
||||
use crate::interpreter::handler::HandlerTable;
|
||||
use crate::interpreter::nort::Expr;
|
||||
use crate::libs::std::number::Numeric;
|
||||
use crate::location::{CodeGenInfo, CodeLocation};
|
||||
use crate::sym;
|
||||
use crate::utils::unwrap_or::unwrap_or;
|
||||
use crate::virt_fs::{DeclTree, EmbeddedFS, PrefixFS, VirtFS};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Timer {
|
||||
recurring: bool,
|
||||
delay: NotNan<f64>,
|
||||
}
|
||||
|
||||
fn set_timer(rec: Inert<bool>, delay: Numeric) -> CPSBox<Timer> {
|
||||
CPSBox::new(2, Timer { recurring: rec.0, delay: delay.as_float() })
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CancelTimer(Arc<Mutex<dyn Fn() + Send>>);
|
||||
impl CancelTimer {
|
||||
pub fn new<T: Send + Clone + 'static>(canceller: TimerHandle<T>) -> Self {
|
||||
Self(Arc::new(Mutex::new(move || canceller.clone().cancel())))
|
||||
}
|
||||
pub fn cancel(&self) { self.0.lock().unwrap()() }
|
||||
}
|
||||
impl fmt::Debug for CancelTimer {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("CancelTimer").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Yield;
|
||||
impl InertPayload for Yield {
|
||||
const TYPE_STR: &'static str = "asynch::yield";
|
||||
}
|
||||
|
||||
/// Error indicating a yield command when all event producers and timers had
|
||||
/// exited
|
||||
#[derive(Clone)]
|
||||
pub struct InfiniteBlock;
|
||||
impl RTError for InfiniteBlock {}
|
||||
impl fmt::Display for InfiniteBlock {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
static MSG: &str = "User code yielded, but there are no timers or event \
|
||||
producers to wake it up in the future";
|
||||
write!(f, "{}", MSG)
|
||||
}
|
||||
}
|
||||
|
||||
/// A thread-safe handle that can be used to send events of any type
|
||||
#[derive(Clone)]
|
||||
pub struct MessagePort(Sender<Box<dyn Any + Send>>);
|
||||
impl MessagePort {
|
||||
/// Send an event. Any type is accepted, handlers are dispatched by type ID
|
||||
pub fn send<T: Send + 'static>(&mut self, message: T) { let _ = self.0.send(Box::new(message)); }
|
||||
}
|
||||
|
||||
fn gen() -> CodeGenInfo { CodeGenInfo::no_details(sym!(asynch)) }
|
||||
|
||||
#[derive(RustEmbed)]
|
||||
#[folder = "src/libs/asynch"]
|
||||
#[include = "*.orc"]
|
||||
struct AsynchEmbed;
|
||||
|
||||
fn code() -> DeclTree {
|
||||
DeclTree::ns("system::async", [DeclTree::leaf(
|
||||
PrefixFS::new(EmbeddedFS::new::<AsynchEmbed>(".orc", gen()), "", "async").rc(),
|
||||
)])
|
||||
}
|
||||
|
||||
type AnyHandler<'a> = Box<dyn FnMut(Box<dyn Any>) -> Vec<Expr> + 'a>;
|
||||
|
||||
/// Datastructures the asynch system will eventually be constructed from.
|
||||
pub struct AsynchSystem<'a> {
|
||||
poller: Poller<Box<dyn Any + Send>, Expr, Expr>,
|
||||
sender: Sender<Box<dyn Any + Send>>,
|
||||
handlers: HashMap<TypeId, AnyHandler<'a>>,
|
||||
}
|
||||
|
||||
impl<'a> AsynchSystem<'a> {
|
||||
/// Create a new async event loop that allows registering handlers and taking
|
||||
/// references to the port before it's converted into a [System]
|
||||
#[must_use]
|
||||
pub fn new() -> Self {
|
||||
let (sender, poller) = Poller::new();
|
||||
Self { poller, sender, handlers: HashMap::new() }
|
||||
}
|
||||
|
||||
/// Register a callback to be called on the owning thread when an object of
|
||||
/// the given type is found on the queue. Each type should signify a single
|
||||
/// command so each type should have exactly one handler.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// if the given type is already handled.
|
||||
pub fn register<T: 'static>(&mut self, mut f: impl FnMut(Box<T>) -> Vec<Expr> + 'a) {
|
||||
let cb = move |a: Box<dyn Any>| f(a.downcast().expect("keyed by TypeId"));
|
||||
let prev = self.handlers.insert(TypeId::of::<T>(), Box::new(cb));
|
||||
assert!(prev.is_none(), "Duplicate handlers for async event {}", type_name::<T>())
|
||||
}
|
||||
|
||||
/// Obtain a message port for sending messages to the main thread. If an
|
||||
/// object is passed to the MessagePort that does not have a handler, the
|
||||
/// main thread panics.
|
||||
#[must_use]
|
||||
pub fn get_port(&self) -> MessagePort { MessagePort(self.sender.clone()) }
|
||||
}
|
||||
|
||||
impl<'a> Default for AsynchSystem<'a> {
|
||||
fn default() -> Self { Self::new() }
|
||||
}
|
||||
|
||||
impl<'a> IntoSystem<'a> for AsynchSystem<'a> {
|
||||
fn into_system(self) -> System<'a> {
|
||||
let Self { mut handlers, poller, .. } = self;
|
||||
let mut handler_table = HandlerTable::new();
|
||||
let polly = Rc::new(RefCell::new(poller));
|
||||
handler_table.register({
|
||||
let polly = polly.clone();
|
||||
move |t: &CPSBox<Timer>| {
|
||||
let mut polly = polly.borrow_mut();
|
||||
let (Timer { delay, recurring }, action, cont) = t.unpack2();
|
||||
let duration = Duration::from_secs_f64(**delay);
|
||||
let cancel_timer = match *recurring {
|
||||
true => CancelTimer::new(polly.set_interval(duration, action)),
|
||||
false => CancelTimer::new(polly.set_timeout(duration, action)),
|
||||
};
|
||||
let tpl = tpl::A(tpl::Slot, tpl::V(CPSBox::new(1, cancel_timer)));
|
||||
tpl.template(nort_gen(cont.location()), [cont])
|
||||
}
|
||||
});
|
||||
handler_table.register(move |t: &CPSBox<CancelTimer>| {
|
||||
let (command, cont) = t.unpack1();
|
||||
command.cancel();
|
||||
cont
|
||||
});
|
||||
handler_table.register({
|
||||
let polly = polly.clone();
|
||||
let mut microtasks = VecDeque::new();
|
||||
move |_: &Inert<Yield>| {
|
||||
if let Some(expr) = microtasks.pop_front() {
|
||||
return Ok(expr);
|
||||
}
|
||||
let mut polly = polly.borrow_mut();
|
||||
loop {
|
||||
let next = unwrap_or!(polly.run();
|
||||
return Err(InfiniteBlock.pack())
|
||||
);
|
||||
match next {
|
||||
PollEvent::Once(expr) => return Ok(expr),
|
||||
PollEvent::Recurring(expr) => return Ok(expr),
|
||||
PollEvent::Event(ev) => {
|
||||
let handler = (handlers.get_mut(&ev.as_ref().type_id()))
|
||||
.unwrap_or_else(|| panic!("Unhandled messgae type: {:?}", (*ev).type_id()));
|
||||
let events = handler(ev);
|
||||
// we got new microtasks
|
||||
if !events.is_empty() {
|
||||
microtasks = VecDeque::from(events);
|
||||
// trampoline
|
||||
let loc = CodeLocation::new_gen(CodeGenInfo::no_details(sym!(system::asynch)));
|
||||
return Ok(Inert(Yield).atom_expr(loc));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
System {
|
||||
name: "system::asynch",
|
||||
lexer_plugins: vec![],
|
||||
line_parsers: vec![],
|
||||
constants: ConstTree::ns("system::async", [ConstTree::tree([
|
||||
xfn_ent("set_timer", [set_timer]),
|
||||
atom_ent("yield", [Inert(Yield)]),
|
||||
])]),
|
||||
code: code(),
|
||||
prelude: Vec::new(),
|
||||
handlers: handler_table,
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user