Generic mutation scheduling system

IO adapted to use it
Also, Atoms can now dispatch type-erased requests
This commit is contained in:
2023-09-14 22:54:42 +01:00
parent 8c866967a9
commit 3c0056c2db
51 changed files with 991 additions and 379 deletions

View File

@@ -0,0 +1,305 @@
use std::any::{type_name, Any};
use std::cell::RefCell;
use std::fmt::Debug;
use std::rc::Rc;
use hashbrown::HashMap;
use itertools::Itertools;
use trait_set::trait_set;
use super::busy::{BusyState, NextItemReportKind};
use super::take_and_drop::{request, TakeAndDrop, TakeCmd};
use super::Canceller;
use crate::facade::{IntoSystem, System};
use crate::foreign::cps_box::CPSBox;
use crate::interpreted::ExprInst;
use crate::interpreter::HandlerTable;
use crate::systems::asynch::{AsynchSystem, MessagePort};
use crate::systems::stl::Boolean;
use crate::utils::thread_pool::ThreadPool;
use crate::utils::{take_with_output, unwrap_or, IdMap};
use crate::{atomic_inert, define_fn, ConstTree};
enum SharedResource<T> {
Free(T),
Busy(BusyState<T>),
Taken,
}
/// Possible states of a shared resource
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum SharedState {
/// The resource is ready to be used or taken
Free,
/// The resource is currently in use but operations can be asynchronously
/// scheduled on it
Busy,
/// The resource is currently in use and a consuming seal has already been
/// scheduled, therefore further operations cannot access it and it will
/// transition to [SharedState::Taken] as soon as the currently pending
/// operations finish or are cancelled.
Sealed,
/// The resource has been removed from this location.
Taken,
}
/// A shared handle for a resource of type `T` that can be used with a
/// [SeqScheduler] to execute mutating operations one by one in worker threads.
pub struct SharedHandle<T> {
state: Rc<RefCell<SharedResource<T>>>,
}
impl<T> SharedHandle<T> {
/// Wrap a value to be accessible to a [SeqScheduler].
pub fn wrap(t: T) -> Self {
Self { state: Rc::new(RefCell::new(SharedResource::Free(t))) }
}
/// Check the state of the handle
pub fn state(&self) -> SharedState {
match &*self.state.as_ref().borrow() {
SharedResource::Busy(b) if b.is_sealed() => SharedState::Sealed,
SharedResource::Busy(_) => SharedState::Busy,
SharedResource::Free(_) => SharedState::Free,
SharedResource::Taken => SharedState::Taken,
}
}
/// Remove the value from the handle if it's free. To interact with a handle
/// you probably want to use a [SeqScheduler], but sometimes this makes
/// sense as eg. an optimization. You can return the value after processing
/// via [SyncHandle::untake].
pub fn take(&self) -> Option<T> {
take_with_output(
&mut *self.state.as_ref().borrow_mut(),
|state| match state {
SharedResource::Free(t) => (SharedResource::Taken, Some(t)),
_ => (state, None),
},
)
}
/// Return the value to a handle that doesn't have one. The intended use case
/// is to return values synchronously after they have been removed with
/// [SyncHandle::untake].
pub fn untake(&self, value: T) -> Result<(), T> {
take_with_output(
&mut *self.state.as_ref().borrow_mut(),
|state| match state {
SharedResource::Taken => (SharedResource::Free(value), Ok(())),
_ => (state, Err(value)),
},
)
}
}
impl<T> Clone for SharedHandle<T> {
fn clone(&self) -> Self {
Self { state: self.state.clone() }
}
}
impl<T> Debug for SharedHandle<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharedHandle")
.field("state", &self.state())
.field("type", &type_name::<T>())
.finish()
}
}
atomic_inert! {
SharedHandle(T),
typestr = "a shared handle",
request = |req: Box<dyn Any>, this: &SharedHandle<T>| request(this, req)
}
/// Error produced when an operation is scheduled or a seal placed on a resource
/// which is either already sealed or taken.
#[derive(Debug, Clone)]
pub struct SealedOrTaken;
atomic_inert!(
SealedOrTaken,
typestr = "a sealed-or-taken error for a shared resource"
);
define_fn! {
IsTakenError = |x| Ok(Boolean(SealedOrTaken::try_from(x).is_ok()).atom_cls())
}
trait_set! {
/// The part of processing a blocking I/O task that cannot be done on a remote
/// thread, eg. because it accesses other systems or Orchid code.
trait NonSendFn = FnOnce(Box<dyn Any>, SeqScheduler) -> Vec<ExprInst>;
}
struct SyncReply {
opid: u64,
data: Box<dyn Any + Send>,
}
struct CheshireCat {
pool: ThreadPool<Box<dyn FnOnce() + Send>>,
pending: RefCell<IdMap<Box<dyn NonSendFn>>>,
port: MessagePort,
}
/// A task scheduler that executes long blocking operations that have mutable
/// access to a shared one by one on a worker thread. The resources are
/// held in [SharedHandle]s
#[derive(Clone)]
pub struct SeqScheduler(Rc<CheshireCat>);
impl SeqScheduler {
/// Creates a new [SeqScheduler]. The new object is also kept alive by a
/// callback in the provided [AsynchSystem]. There should be at most one
pub fn new(asynch: &mut AsynchSystem) -> Self {
let this = Self(Rc::new(CheshireCat {
pending: RefCell::new(IdMap::new()),
pool: ThreadPool::new(),
port: asynch.get_port(),
}));
let this1 = this.clone();
// referenced by asynch, references this
asynch.register(move |res: Box<SyncReply>| {
let callback = this1.0.pending.borrow_mut().remove(res.opid).expect(
"Received reply for task we didn't start. This likely means that \
there are multiple SequencingContexts attached to the same \
AsynchSystem.",
);
callback(res.data, this1.clone())
});
this
}
/// Submit an action to be executed on a worker thread which can own the data
/// in the handle.
///
/// * handle - data to be transformed
/// * operation - long blocking mutation to execute off-thread.
/// * handler - process the results, talk to other systems, generate and run
/// Orchid code.
/// * early_cancel - clean up in case the task got cancelled before it was
/// scheduled. This is an optimization so that threads aren't spawned if a
/// large batch of tasks is scheduled and then cancelled.
pub fn schedule<T: Send + 'static, U: Send + 'static>(
&self,
handle: SharedHandle<T>,
operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static,
handler: impl FnOnce(T, U, Canceller) -> (T, Vec<ExprInst>) + 'static,
early_cancel: impl FnOnce(T) -> (T, Vec<ExprInst>) + 'static,
) -> Result<Canceller, SealedOrTaken> {
take_with_output(&mut *handle.state.as_ref().borrow_mut(), {
let handle = handle.clone();
|state| {
match state {
SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)),
SharedResource::Busy(mut b) =>
match b.enqueue(operation, handler, early_cancel) {
Some(cancelled) => (SharedResource::Busy(b), Ok(cancelled)),
None => (SharedResource::Busy(b), Err(SealedOrTaken)),
},
SharedResource::Free(t) => {
let cancelled = Canceller::new();
drop(early_cancel); // cannot possibly be useful
self.submit(t, handle, cancelled.clone(), operation);
(SharedResource::Busy(BusyState::new(handler)), Ok(cancelled))
},
}
}
})
}
/// Schedule a function that will consume the value. After this the handle is
/// considered sealed and all [SeqScheduler::schedule] calls will fail.
pub fn seal<T>(
&self,
handle: SharedHandle<T>,
seal: impl FnOnce(T) -> Vec<ExprInst> + 'static,
) -> Result<Vec<ExprInst>, SealedOrTaken> {
take_with_output(&mut *handle.state.as_ref().borrow_mut(), |state| {
match state {
SharedResource::Busy(mut b) if !b.is_sealed() => {
b.seal(seal);
(SharedResource::Busy(b), Ok(Vec::new()))
},
SharedResource::Busy(_) => (state, Err(SealedOrTaken)),
SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)),
SharedResource::Free(t) => (SharedResource::Taken, Ok(seal(t))),
}
})
}
/// Asynchronously recursive function to schedule a new task for execution and
/// act upon its completion. The self-reference is passed into the callback
/// from the callback passed to the [AsynchSystem] so that if the task is
/// never resolved but the [AsynchSystem] through which the resolving event
/// would arrive is dropped this [SeqScheduler] is also dropped.
fn submit<T: Send + 'static, U: Send + 'static>(
&self,
t: T,
handle: SharedHandle<T>,
cancelled: Canceller,
operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static,
) {
// referenced by self until run, references handle
let opid = self.0.pending.borrow_mut().insert(Box::new({
let cancelled = cancelled.clone();
move |data, this| {
let (t, u): (T, U) =
*data.downcast().expect("This is associated by ID");
let handle2 = handle.clone();
take_with_output(&mut *handle.state.as_ref().borrow_mut(), |state| {
let busy = unwrap_or! { state => SharedResource::Busy;
panic!("Handle with outstanding invocation must be busy")
};
let report = busy.rotate(t, u, cancelled);
match report.kind {
NextItemReportKind::Free(t) =>
(SharedResource::Free(t), report.events),
NextItemReportKind::Taken => (SharedResource::Taken, report.events),
NextItemReportKind::Next {
instance,
cancelled,
operation,
rest,
} => {
this.submit(instance, handle2, cancelled, operation);
(SharedResource::Busy(rest), report.events)
},
}
})
}
}));
let mut port = self.0.port.clone();
// referenced by thread until run, references port
self.0.pool.submit(Box::new(move || {
port.send(SyncReply { opid, data: Box::new(operation(t, cancelled)) })
}))
}
}
impl IntoSystem<'static> for SeqScheduler {
fn into_system(self, i: &crate::Interner) -> crate::facade::System<'static> {
let mut handlers = HandlerTable::new();
handlers.register(|cmd: &CPSBox<Canceller>| {
let (canceller, cont) = cmd.unpack1();
canceller.cancel();
Ok(cont.clone())
});
handlers.register(move |cmd: &CPSBox<TakeCmd>| {
let (TakeCmd(cb), cont) = cmd.unpack1();
cb(self.clone());
Ok(cont.clone())
});
System {
name: ["system", "scheduler"].into_iter().map_into().collect(),
prelude: Vec::new(),
code: HashMap::new(),
handlers,
constants: ConstTree::namespace(
[i.i("system"), i.i("scheduler")],
ConstTree::tree([
(i.i("is_taken_error"), ConstTree::xfn(IsTakenError)),
(i.i("take_and_drop"), ConstTree::xfn(TakeAndDrop)),
]),
)
.unwrap_tree(),
}
}
}