Fixed a tricky type erasure bug in the scheduler

This commit is contained in:
2023-10-12 14:36:59 +01:00
parent af3e9f67fa
commit cb395da894
32 changed files with 147 additions and 125 deletions

View File

@@ -8,8 +8,8 @@ use hashbrown::HashMap;
use itertools::Itertools;
use trait_set::trait_set;
use super::busy::{BusyState, NextItemReportKind};
use super::Canceller;
use super::busy::{BusyState, NextItemReportKind, SyncOperation};
use super::{Canceller, HandlerRes};
use crate::error::AssertionError;
use crate::facade::{IntoSystem, System};
use crate::foreign::cps_box::{init_cps, CPSBox};
@@ -80,13 +80,10 @@ impl<T> SharedHandle<T> {
/// is to return values synchronously after they have been removed with
/// [SyncHandle::untake].
pub fn untake(&self, value: T) -> Result<(), T> {
take_with_output(
&mut *self.0.lock().unwrap(),
|state| match state {
SharedResource::Taken => (SharedResource::Free(value), Ok(())),
_ => (state, Err(value)),
},
)
take_with_output(&mut *self.0.lock().unwrap(), |state| match state {
SharedResource::Taken => (SharedResource::Free(value), Ok(())),
_ => (state, Err(value)),
})
}
}
impl<T> Clone for SharedHandle<T> {
@@ -144,7 +141,7 @@ pub fn is_taken_error(x: ExprInst) -> XfnResult<bool> {
trait_set! {
/// The part of processing a blocking I/O task that cannot be done on a remote
/// thread, eg. because it accesses other systems or Orchid code.
trait NonSendFn = FnOnce(Box<dyn Any>, SeqScheduler) -> Vec<ExprInst>;
trait NonSendFn = FnOnce(Box<dyn Any + Send>, SeqScheduler) -> Vec<ExprInst>;
}
struct SyncReply {
@@ -198,9 +195,9 @@ impl SeqScheduler {
pub fn schedule<T: Send + 'static, U: Send + 'static>(
&self,
handle: SharedHandle<T>,
operation: impl FnOnce(T, Canceller) -> (T, U) + Sync + Send + 'static,
handler: impl FnOnce(T, U, Canceller) -> (T, Vec<ExprInst>) + Sync + Send + 'static,
early_cancel: impl FnOnce(T) -> (T, Vec<ExprInst>) + Sync + Send + 'static,
operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static,
handler: impl FnOnce(T, U, Canceller) -> HandlerRes<T> + Send + 'static,
early_cancel: impl FnOnce(T) -> HandlerRes<T> + Send + 'static,
) -> Result<Canceller, SealedOrTaken> {
take_with_output(&mut *handle.0.lock().unwrap(), {
let handle = handle.clone();
@@ -216,7 +213,11 @@ impl SeqScheduler {
SharedResource::Free(t) => {
let cancelled = Canceller::new();
drop(early_cancel); // cannot possibly be useful
self.submit(t, handle, cancelled.clone(), operation);
let op_erased: SyncOperation<T> = Box::new(|t, c| {
let (t, u) = operation(t, c);
(t, Box::new(u))
});
self.submit(t, handle, cancelled.clone(), op_erased);
(SharedResource::Busy(BusyState::new(handler)), Ok(cancelled))
},
}
@@ -233,9 +234,11 @@ impl SeqScheduler {
) -> Canceller {
let cancelled = Canceller::new();
let canc1 = cancelled.clone();
let opid = self.0.pending.borrow_mut().insert(Box::new(|data, _| {
handler(*data.downcast().expect("This is associated by ID"), canc1)
}));
let opid = self.0.pending.borrow_mut().insert(Box::new(
|data: Box<dyn Any + Send>, _| {
handler(*data.downcast().expect("This is associated by ID"), canc1)
},
));
let canc1 = cancelled.clone();
let mut port = self.0.port.clone();
self.0.pool.submit(Box::new(move || {
@@ -251,18 +254,15 @@ impl SeqScheduler {
handle: SharedHandle<T>,
seal: impl FnOnce(T) -> Vec<ExprInst> + Sync + Send + 'static,
) -> Result<Vec<ExprInst>, SealedOrTaken> {
take_with_output(
&mut *handle.0.lock().unwrap(),
|state| match state {
SharedResource::Busy(mut b) if !b.is_sealed() => {
b.seal(seal);
(SharedResource::Busy(b), Ok(Vec::new()))
},
SharedResource::Busy(_) => (state, Err(SealedOrTaken)),
SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)),
SharedResource::Free(t) => (SharedResource::Taken, Ok(seal(t))),
take_with_output(&mut *handle.0.lock().unwrap(), |state| match state {
SharedResource::Busy(mut b) if !b.is_sealed() => {
b.seal(seal);
(SharedResource::Busy(b), Ok(Vec::new()))
},
)
SharedResource::Busy(_) => (state, Err(SealedOrTaken)),
SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)),
SharedResource::Free(t) => (SharedResource::Taken, Ok(seal(t))),
})
}
/// Asynchronously recursive function to schedule a new task for execution and
@@ -270,18 +270,18 @@ impl SeqScheduler {
/// from the callback passed to the [AsynchSystem] so that if the task is
/// never resolved but the [AsynchSystem] through which the resolving event
/// would arrive is dropped this [SeqScheduler] is also dropped.
fn submit<T: Send + 'static, U: Send + 'static>(
fn submit<T: Send + 'static>(
&self,
t: T,
handle: SharedHandle<T>,
cancelled: Canceller,
operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static,
operation: SyncOperation<T>,
) {
// referenced by self until run, references handle
let opid = self.0.pending.borrow_mut().insert(Box::new({
let cancelled = cancelled.clone();
move |data, this| {
let (t, u): (T, U) =
move |data: Box<dyn Any + Send>, this: SeqScheduler| {
let (t, u): (T, Box<dyn Any + Send>) =
*data.downcast().expect("This is associated by ID");
let handle2 = handle.clone();
take_with_output(&mut *handle.0.lock().unwrap(), |state| {