use std::any::{type_name, Any}; use std::cell::RefCell; use std::fmt::Debug; use std::rc::Rc; use hashbrown::HashMap; use itertools::Itertools; use trait_set::trait_set; use super::busy::{BusyState, NextItemReportKind}; use super::Canceller; use crate::facade::{IntoSystem, System}; use crate::foreign::cps_box::{init_cps, CPSBox}; use crate::foreign::InertAtomic; use crate::interpreted::ExprInst; use crate::interpreter::HandlerTable; use crate::systems::asynch::{AsynchSystem, MessagePort}; use crate::systems::stl::Boolean; use crate::systems::AssertionError; use crate::utils::ddispatch::Request; use crate::utils::thread_pool::ThreadPool; use crate::utils::{take_with_output, unwrap_or, IdMap}; use crate::{define_fn, ConstTree}; enum SharedResource { Free(T), Busy(BusyState), Taken, } /// Possible states of a shared resource #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub enum SharedState { /// The resource is ready to be used or taken Free, /// The resource is currently in use but operations can be asynchronously /// scheduled on it Busy, /// The resource is currently in use and a consuming seal has already been /// scheduled, therefore further operations cannot access it and it will /// transition to [SharedState::Taken] as soon as the currently pending /// operations finish or are cancelled. Sealed, /// The resource has been removed from this location. Taken, } /// A shared handle for a resource of type `T` that can be used with a /// [SeqScheduler] to execute mutating operations one by one in worker threads. pub struct SharedHandle(Rc>>); impl SharedHandle { /// Wrap a value to be accessible to a [SeqScheduler]. pub fn wrap(t: T) -> Self { Self(Rc::new(RefCell::new(SharedResource::Free(t)))) } /// Check the state of the handle pub fn state(&self) -> SharedState { match &*self.0.as_ref().borrow() { SharedResource::Busy(b) if b.is_sealed() => SharedState::Sealed, SharedResource::Busy(_) => SharedState::Busy, SharedResource::Free(_) => SharedState::Free, SharedResource::Taken => SharedState::Taken, } } /// Remove the value from the handle if it's free. To interact with a handle /// you probably want to use a [SeqScheduler], but sometimes this makes /// sense as eg. an optimization. You can return the value after processing /// via [SyncHandle::untake]. pub fn take(&self) -> Option { take_with_output(&mut *self.0.as_ref().borrow_mut(), |state| match state { SharedResource::Free(t) => (SharedResource::Taken, Some(t)), _ => (state, None), }) } /// Return the value to a handle that doesn't have one. The intended use case /// is to return values synchronously after they have been removed with /// [SyncHandle::untake]. pub fn untake(&self, value: T) -> Result<(), T> { take_with_output(&mut *self.0.as_ref().borrow_mut(), |state| match state { SharedResource::Taken => (SharedResource::Free(value), Ok(())), _ => (state, Err(value)), }) } } impl Clone for SharedHandle { fn clone(&self) -> Self { Self(self.0.clone()) } } impl Debug for SharedHandle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SharedHandle") .field("state", &self.state()) .field("type", &type_name::()) .finish() } } impl InertAtomic for SharedHandle { fn type_str() -> &'static str { "a SharedHandle" } fn respond(&self, mut request: Request) { request.serve_with(|| { let this = self.clone(); TakeCmd(Rc::new(move |sch| { let _ = sch.seal(this.clone(), |_| Vec::new()); })) }) } } #[derive(Clone)] pub struct TakeCmd(pub Rc); impl Debug for TakeCmd { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "A command to drop a shared resource") } } /// Error produced when an operation is scheduled or a seal placed on a resource /// which is either already sealed or taken. #[derive(Debug, Clone)] pub struct SealedOrTaken; impl InertAtomic for SealedOrTaken { fn type_str() -> &'static str { "a sealed-or-taken error for a shared resource" } } define_fn! { pub TakeAndDrop = |x| { let location = x.location(); match x.request() { Some(t) => Ok(init_cps::(1, t)), None => AssertionError::fail(location, "SharedHandle"), } }; IsTakenError = |x| { Ok(Boolean(x.downcast::().is_ok()).atom_cls()) } } trait_set! { /// The part of processing a blocking I/O task that cannot be done on a remote /// thread, eg. because it accesses other systems or Orchid code. trait NonSendFn = FnOnce(Box, SeqScheduler) -> Vec; } struct SyncReply { opid: u64, data: Box, } struct CheshireCat { pool: ThreadPool>, pending: RefCell>>, port: MessagePort, } /// A task scheduler that executes long blocking operations that have mutable /// access to a shared one by one on a worker thread. The resources are /// held in [SharedHandle]s #[derive(Clone)] pub struct SeqScheduler(Rc); impl SeqScheduler { /// Creates a new [SeqScheduler]. The new object is also kept alive by a /// callback in the provided [AsynchSystem]. There should be at most one pub fn new(asynch: &mut AsynchSystem) -> Self { let this = Self(Rc::new(CheshireCat { pending: RefCell::new(IdMap::new()), pool: ThreadPool::new(), port: asynch.get_port(), })); let this1 = this.clone(); // referenced by asynch, references this asynch.register(move |res: Box| { let callback = this1.0.pending.borrow_mut().remove(res.opid).expect( "Received reply for task we didn't start. This likely means that \ there are multiple SequencingContexts attached to the same \ AsynchSystem.", ); callback(res.data, this1.clone()) }); this } /// Submit an action to be executed on a worker thread which can own the data /// in the handle. /// /// * handle - data to be transformed /// * operation - long blocking mutation to execute off-thread. /// * handler - process the results, talk to other systems, generate and run /// Orchid code. /// * early_cancel - clean up in case the task got cancelled before it was /// scheduled. This is an optimization so that threads aren't spawned if a /// large batch of tasks is scheduled and then cancelled. pub fn schedule( &self, handle: SharedHandle, operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static, handler: impl FnOnce(T, U, Canceller) -> (T, Vec) + 'static, early_cancel: impl FnOnce(T) -> (T, Vec) + 'static, ) -> Result { take_with_output(&mut *handle.0.as_ref().borrow_mut(), { let handle = handle.clone(); |state| { match state { SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)), SharedResource::Busy(mut b) => { match b.enqueue(operation, handler, early_cancel) { Some(cancelled) => (SharedResource::Busy(b), Ok(cancelled)), None => (SharedResource::Busy(b), Err(SealedOrTaken)), } }, SharedResource::Free(t) => { let cancelled = Canceller::new(); drop(early_cancel); // cannot possibly be useful self.submit(t, handle, cancelled.clone(), operation); (SharedResource::Busy(BusyState::new(handler)), Ok(cancelled)) }, } } }) } /// Run an operation asynchronously and then process its result in thread, /// without queuing on any particular data. pub fn run_orphan( &self, operation: impl FnOnce(Canceller) -> T + Send + 'static, handler: impl FnOnce(T, Canceller) -> Vec + 'static, ) -> Canceller { let cancelled = Canceller::new(); let canc1 = cancelled.clone(); let opid = self.0.pending.borrow_mut().insert(Box::new(|data, _| { handler(*data.downcast().expect("This is associated by ID"), canc1) })); let canc1 = cancelled.clone(); let mut port = self.0.port.clone(); self.0.pool.submit(Box::new(move || { port.send(SyncReply { opid, data: Box::new(operation(canc1)) }); })); cancelled } /// Schedule a function that will consume the value. After this the handle is /// considered sealed and all [SeqScheduler::schedule] calls will fail. pub fn seal( &self, handle: SharedHandle, seal: impl FnOnce(T) -> Vec + 'static, ) -> Result, SealedOrTaken> { take_with_output( &mut *handle.0.as_ref().borrow_mut(), |state| match state { SharedResource::Busy(mut b) if !b.is_sealed() => { b.seal(seal); (SharedResource::Busy(b), Ok(Vec::new())) }, SharedResource::Busy(_) => (state, Err(SealedOrTaken)), SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)), SharedResource::Free(t) => (SharedResource::Taken, Ok(seal(t))), }, ) } /// Asynchronously recursive function to schedule a new task for execution and /// act upon its completion. The self-reference is passed into the callback /// from the callback passed to the [AsynchSystem] so that if the task is /// never resolved but the [AsynchSystem] through which the resolving event /// would arrive is dropped this [SeqScheduler] is also dropped. fn submit( &self, t: T, handle: SharedHandle, cancelled: Canceller, operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static, ) { // referenced by self until run, references handle let opid = self.0.pending.borrow_mut().insert(Box::new({ let cancelled = cancelled.clone(); move |data, this| { let (t, u): (T, U) = *data.downcast().expect("This is associated by ID"); let handle2 = handle.clone(); take_with_output(&mut *handle.0.as_ref().borrow_mut(), |state| { let busy = unwrap_or! { state => SharedResource::Busy; panic!("Handle with outstanding invocation must be busy") }; let report = busy.rotate(t, u, cancelled); match report.kind { NextItemReportKind::Free(t) => (SharedResource::Free(t), report.events), NextItemReportKind::Taken => (SharedResource::Taken, report.events), NextItemReportKind::Next { instance, cancelled, operation, rest, } => { this.submit(instance, handle2, cancelled, operation); (SharedResource::Busy(rest), report.events) }, } }) } })); let mut port = self.0.port.clone(); // referenced by thread until run, references port self.0.pool.submit(Box::new(move || { port.send(SyncReply { opid, data: Box::new(operation(t, cancelled)) }) })) } } impl IntoSystem<'static> for SeqScheduler { fn into_system(self, i: &crate::Interner) -> crate::facade::System<'static> { let mut handlers = HandlerTable::new(); handlers.register(|cmd: Box>| { let (canceller, cont) = cmd.unpack1(); canceller.cancel(); Ok(cont) }); handlers.register(move |cmd: Box>| { let (TakeCmd(cb), cont) = cmd.unpack1(); cb(self.clone()); Ok(cont) }); System { name: ["system", "scheduler"].into_iter().map_into().collect(), prelude: Vec::new(), code: HashMap::new(), handlers, constants: ConstTree::namespace( [i.i("system"), i.i("scheduler")], ConstTree::tree([ (i.i("is_taken_error"), ConstTree::xfn(IsTakenError)), (i.i("take_and_drop"), ConstTree::xfn(TakeAndDrop)), ]), ) .unwrap_tree(), } } }