September-october commit

- manual parser
- stl refinements
- all language constructs are now Send
This commit is contained in:
2023-10-11 18:27:50 +01:00
parent 56679dcc01
commit 86e520e8b8
127 changed files with 1666 additions and 1872 deletions

View File

@@ -2,6 +2,7 @@ use std::any::{type_name, Any};
use std::cell::RefCell;
use std::fmt::Debug;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use hashbrown::HashMap;
use itertools::Itertools;
@@ -9,14 +10,13 @@ use trait_set::trait_set;
use super::busy::{BusyState, NextItemReportKind};
use super::Canceller;
use crate::error::AssertionError;
use crate::facade::{IntoSystem, System};
use crate::foreign::cps_box::{init_cps, CPSBox};
use crate::foreign::{xfn_1ary, InertAtomic, XfnResult};
use crate::interpreted::{Clause, ExprInst};
use crate::interpreter::HandlerTable;
use crate::systems::asynch::{AsynchSystem, MessagePort};
use crate::systems::stl::Boolean;
use crate::systems::AssertionError;
use crate::utils::ddispatch::Request;
use crate::utils::thread_pool::ThreadPool;
use crate::utils::{take_with_output, unwrap_or, IdMap};
@@ -47,17 +47,17 @@ pub enum SharedState {
/// A shared handle for a resource of type `T` that can be used with a
/// [SeqScheduler] to execute mutating operations one by one in worker threads.
pub struct SharedHandle<T>(Rc<RefCell<SharedResource<T>>>);
pub struct SharedHandle<T>(Arc<Mutex<SharedResource<T>>>);
impl<T> SharedHandle<T> {
/// Wrap a value to be accessible to a [SeqScheduler].
pub fn wrap(t: T) -> Self {
Self(Rc::new(RefCell::new(SharedResource::Free(t))))
Self(Arc::new(Mutex::new(SharedResource::Free(t))))
}
/// Check the state of the handle
pub fn state(&self) -> SharedState {
match &*self.0.as_ref().borrow() {
match &*self.0.lock().unwrap() {
SharedResource::Busy(b) if b.is_sealed() => SharedState::Sealed,
SharedResource::Busy(_) => SharedState::Busy,
SharedResource::Free(_) => SharedState::Free,
@@ -70,7 +70,7 @@ impl<T> SharedHandle<T> {
/// sense as eg. an optimization. You can return the value after processing
/// via [SyncHandle::untake].
pub fn take(&self) -> Option<T> {
take_with_output(&mut *self.0.as_ref().borrow_mut(), |state| match state {
take_with_output(&mut *self.0.lock().unwrap(), |state| match state {
SharedResource::Free(t) => (SharedResource::Taken, Some(t)),
_ => (state, None),
})
@@ -80,10 +80,13 @@ impl<T> SharedHandle<T> {
/// is to return values synchronously after they have been removed with
/// [SyncHandle::untake].
pub fn untake(&self, value: T) -> Result<(), T> {
take_with_output(&mut *self.0.as_ref().borrow_mut(), |state| match state {
SharedResource::Taken => (SharedResource::Free(value), Ok(())),
_ => (state, Err(value)),
})
take_with_output(
&mut *self.0.lock().unwrap(),
|state| match state {
SharedResource::Taken => (SharedResource::Free(value), Ok(())),
_ => (state, Err(value)),
},
)
}
}
impl<T> Clone for SharedHandle<T> {
@@ -97,12 +100,12 @@ impl<T> Debug for SharedHandle<T> {
.finish()
}
}
impl<T: 'static> InertAtomic for SharedHandle<T> {
impl<T: Send + 'static> InertAtomic for SharedHandle<T> {
fn type_str() -> &'static str { "a SharedHandle" }
fn respond(&self, mut request: Request) {
request.serve_with(|| {
let this = self.clone();
TakeCmd(Rc::new(move |sch| {
TakeCmd(Arc::new(move |sch| {
let _ = sch.seal(this.clone(), |_| Vec::new());
}))
})
@@ -110,7 +113,7 @@ impl<T: 'static> InertAtomic for SharedHandle<T> {
}
#[derive(Clone)]
pub struct TakeCmd(pub Rc<dyn Fn(SeqScheduler)>);
pub struct TakeCmd(pub Arc<dyn Fn(SeqScheduler) + Send + Sync>);
impl Debug for TakeCmd {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "A command to drop a shared resource")
@@ -134,8 +137,8 @@ pub fn take_and_drop(x: ExprInst) -> XfnResult<Clause> {
}
}
pub fn is_taken_error(x: ExprInst) -> XfnResult<Boolean> {
Ok(Boolean(x.downcast::<SealedOrTaken>().is_ok()))
pub fn is_taken_error(x: ExprInst) -> XfnResult<bool> {
Ok(x.downcast::<SealedOrTaken>().is_ok())
}
trait_set! {
@@ -195,11 +198,11 @@ impl SeqScheduler {
pub fn schedule<T: Send + 'static, U: Send + 'static>(
&self,
handle: SharedHandle<T>,
operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static,
handler: impl FnOnce(T, U, Canceller) -> (T, Vec<ExprInst>) + 'static,
early_cancel: impl FnOnce(T) -> (T, Vec<ExprInst>) + 'static,
operation: impl FnOnce(T, Canceller) -> (T, U) + Sync + Send + 'static,
handler: impl FnOnce(T, U, Canceller) -> (T, Vec<ExprInst>) + Sync + Send + 'static,
early_cancel: impl FnOnce(T) -> (T, Vec<ExprInst>) + Sync + Send + 'static,
) -> Result<Canceller, SealedOrTaken> {
take_with_output(&mut *handle.0.as_ref().borrow_mut(), {
take_with_output(&mut *handle.0.lock().unwrap(), {
let handle = handle.clone();
|state| {
match state {
@@ -246,10 +249,10 @@ impl SeqScheduler {
pub fn seal<T>(
&self,
handle: SharedHandle<T>,
seal: impl FnOnce(T) -> Vec<ExprInst> + 'static,
seal: impl FnOnce(T) -> Vec<ExprInst> + Sync + Send + 'static,
) -> Result<Vec<ExprInst>, SealedOrTaken> {
take_with_output(
&mut *handle.0.as_ref().borrow_mut(),
&mut *handle.0.lock().unwrap(),
|state| match state {
SharedResource::Busy(mut b) if !b.is_sealed() => {
b.seal(seal);
@@ -281,7 +284,7 @@ impl SeqScheduler {
let (t, u): (T, U) =
*data.downcast().expect("This is associated by ID");
let handle2 = handle.clone();
take_with_output(&mut *handle.0.as_ref().borrow_mut(), |state| {
take_with_output(&mut *handle.0.lock().unwrap(), |state| {
let busy = unwrap_or! { state => SharedResource::Busy;
panic!("Handle with outstanding invocation must be busy")
};
@@ -329,6 +332,8 @@ impl IntoSystem<'static> for SeqScheduler {
prelude: Vec::new(),
code: HashMap::new(),
handlers,
lexer_plugin: None,
line_parser: None,
constants: ConstTree::namespace(
[i.i("system"), i.i("scheduler")],
ConstTree::tree([