Cut down on macro nonsense

- InertAtomic replaced atomic_inert! for improved tooling support
- atomic_defaults! is easier to type out than to explain in a docstring
- Changed rustfmt config to better support tiny functions such as as_any
This commit is contained in:
2023-09-15 12:37:10 +01:00
parent 3c0056c2db
commit 0bcf10659b
73 changed files with 418 additions and 654 deletions

View File

@@ -8,17 +8,20 @@ use itertools::Itertools;
use trait_set::trait_set;
use super::busy::{BusyState, NextItemReportKind};
use super::take_and_drop::{request, TakeAndDrop, TakeCmd};
use super::Canceller;
use crate::facade::{IntoSystem, System};
use crate::foreign::cps_box::CPSBox;
use crate::foreign::cps_box::{init_cps, CPSBox};
use crate::foreign::InertAtomic;
use crate::interpreted::ExprInst;
use crate::interpreter::HandlerTable;
use crate::systems::asynch::{AsynchSystem, MessagePort};
use crate::systems::cast_exprinst::with_atom;
use crate::systems::stl::Boolean;
use crate::systems::AssertionError;
use crate::utils::ddispatch::{request, Request};
use crate::utils::thread_pool::ThreadPool;
use crate::utils::{take_with_output, unwrap_or, IdMap};
use crate::{atomic_inert, define_fn, ConstTree};
use crate::{define_fn, ConstTree};
enum SharedResource<T> {
Free(T),
@@ -45,18 +48,17 @@ pub enum SharedState {
/// A shared handle for a resource of type `T` that can be used with a
/// [SeqScheduler] to execute mutating operations one by one in worker threads.
pub struct SharedHandle<T> {
state: Rc<RefCell<SharedResource<T>>>,
}
pub struct SharedHandle<T>(Rc<RefCell<SharedResource<T>>>);
impl<T> SharedHandle<T> {
/// Wrap a value to be accessible to a [SeqScheduler].
pub fn wrap(t: T) -> Self {
Self { state: Rc::new(RefCell::new(SharedResource::Free(t))) }
Self(Rc::new(RefCell::new(SharedResource::Free(t))))
}
/// Check the state of the handle
pub fn state(&self) -> SharedState {
match &*self.state.as_ref().borrow() {
match &*self.0.as_ref().borrow() {
SharedResource::Busy(b) if b.is_sealed() => SharedState::Sealed,
SharedResource::Busy(_) => SharedState::Busy,
SharedResource::Free(_) => SharedState::Free,
@@ -69,32 +71,24 @@ impl<T> SharedHandle<T> {
/// sense as eg. an optimization. You can return the value after processing
/// via [SyncHandle::untake].
pub fn take(&self) -> Option<T> {
take_with_output(
&mut *self.state.as_ref().borrow_mut(),
|state| match state {
SharedResource::Free(t) => (SharedResource::Taken, Some(t)),
_ => (state, None),
},
)
take_with_output(&mut *self.0.as_ref().borrow_mut(), |state| match state {
SharedResource::Free(t) => (SharedResource::Taken, Some(t)),
_ => (state, None),
})
}
/// Return the value to a handle that doesn't have one. The intended use case
/// is to return values synchronously after they have been removed with
/// [SyncHandle::untake].
pub fn untake(&self, value: T) -> Result<(), T> {
take_with_output(
&mut *self.state.as_ref().borrow_mut(),
|state| match state {
SharedResource::Taken => (SharedResource::Free(value), Ok(())),
_ => (state, Err(value)),
},
)
take_with_output(&mut *self.0.as_ref().borrow_mut(), |state| match state {
SharedResource::Taken => (SharedResource::Free(value), Ok(())),
_ => (state, Err(value)),
})
}
}
impl<T> Clone for SharedHandle<T> {
fn clone(&self) -> Self {
Self { state: self.state.clone() }
}
fn clone(&self) -> Self { Self(self.0.clone()) }
}
impl<T> Debug for SharedHandle<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -104,23 +98,46 @@ impl<T> Debug for SharedHandle<T> {
.finish()
}
}
impl<T: 'static> InertAtomic for SharedHandle<T> {
fn type_str() -> &'static str { "a SharedHandle" }
fn respond(&self, mut request: Request) {
request.serve_with(|| {
let this = self.clone();
TakeCmd(Rc::new(move |sch| {
let _ = sch.seal(this.clone(), |_| Vec::new());
}))
})
}
}
atomic_inert! {
SharedHandle(T),
typestr = "a shared handle",
request = |req: Box<dyn Any>, this: &SharedHandle<T>| request(this, req)
#[derive(Clone)]
pub struct TakeCmd(pub Rc<dyn Fn(SeqScheduler)>);
impl Debug for TakeCmd {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "A command to drop a shared resource")
}
}
define_fn! {
pub TakeAndDrop = |x| with_atom(x, |a| match request(a.0.as_ref()) {
Some(t) => Ok(init_cps::<TakeCmd>(1, t)),
None => AssertionError::fail(x.clone(), "a SharedHandle"),
})
}
/// Error produced when an operation is scheduled or a seal placed on a resource
/// which is either already sealed or taken.
#[derive(Debug, Clone)]
pub struct SealedOrTaken;
atomic_inert!(
SealedOrTaken,
typestr = "a sealed-or-taken error for a shared resource"
);
impl InertAtomic for SealedOrTaken {
fn type_str() -> &'static str {
"a sealed-or-taken error for a shared resource"
}
}
define_fn! {
IsTakenError = |x| Ok(Boolean(SealedOrTaken::try_from(x).is_ok()).atom_cls())
IsTakenError = |x| {
Ok(Boolean(x.downcast::<SealedOrTaken>().is_ok()).atom_cls())
}
}
trait_set! {
@@ -184,16 +201,17 @@ impl SeqScheduler {
handler: impl FnOnce(T, U, Canceller) -> (T, Vec<ExprInst>) + 'static,
early_cancel: impl FnOnce(T) -> (T, Vec<ExprInst>) + 'static,
) -> Result<Canceller, SealedOrTaken> {
take_with_output(&mut *handle.state.as_ref().borrow_mut(), {
take_with_output(&mut *handle.0.as_ref().borrow_mut(), {
let handle = handle.clone();
|state| {
match state {
SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)),
SharedResource::Busy(mut b) =>
SharedResource::Busy(mut b) => {
match b.enqueue(operation, handler, early_cancel) {
Some(cancelled) => (SharedResource::Busy(b), Ok(cancelled)),
None => (SharedResource::Busy(b), Err(SealedOrTaken)),
},
}
},
SharedResource::Free(t) => {
let cancelled = Canceller::new();
drop(early_cancel); // cannot possibly be useful
@@ -212,8 +230,9 @@ impl SeqScheduler {
handle: SharedHandle<T>,
seal: impl FnOnce(T) -> Vec<ExprInst> + 'static,
) -> Result<Vec<ExprInst>, SealedOrTaken> {
take_with_output(&mut *handle.state.as_ref().borrow_mut(), |state| {
match state {
take_with_output(
&mut *handle.0.as_ref().borrow_mut(),
|state| match state {
SharedResource::Busy(mut b) if !b.is_sealed() => {
b.seal(seal);
(SharedResource::Busy(b), Ok(Vec::new()))
@@ -221,8 +240,8 @@ impl SeqScheduler {
SharedResource::Busy(_) => (state, Err(SealedOrTaken)),
SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)),
SharedResource::Free(t) => (SharedResource::Taken, Ok(seal(t))),
}
})
},
)
}
/// Asynchronously recursive function to schedule a new task for execution and
@@ -244,7 +263,7 @@ impl SeqScheduler {
let (t, u): (T, U) =
*data.downcast().expect("This is associated by ID");
let handle2 = handle.clone();
take_with_output(&mut *handle.state.as_ref().borrow_mut(), |state| {
take_with_output(&mut *handle.0.as_ref().borrow_mut(), |state| {
let busy = unwrap_or! { state => SharedResource::Busy;
panic!("Handle with outstanding invocation must be busy")
};