From 3c0056c2db0ba3225d57bc2f6656860ce1cec6a5 Mon Sep 17 00:00:00 2001 From: Lawrence Bethlenfalvy Date: Thu, 14 Sep 2023 22:54:42 +0100 Subject: [PATCH] Generic mutation scheduling system IO adapted to use it Also, Atoms can now dispatch type-erased requests --- Cargo.lock | 78 ++++-- Cargo.toml | 4 +- src/bin/orcx.rs | 15 +- src/error/no_targets.rs | 2 +- src/error/project_error.rs | 2 +- src/facade/system.rs | 4 +- src/foreign/atom.rs | 6 + src/foreign/cps_box.rs | 17 +- src/foreign_macros/atomic_inert.rs | 37 ++- src/lib.rs | 3 +- src/parse/multiname.rs | 2 +- src/pipeline/dealias/resolve_aliases.rs | 2 +- src/pipeline/import_abs_path.rs | 2 +- src/pipeline/project_tree/build_tree.rs | 4 +- src/pipeline/project_tree/import_tree.rs | 4 +- src/pipeline/project_tree/rebuild_tree.rs | 2 +- src/pipeline/source_loader/load_source.rs | 2 +- src/pipeline/source_loader/preparse.rs | 4 +- src/representations/ast.rs | 2 +- src/representations/ast_to_postmacro.rs | 2 +- src/representations/const_tree.rs | 2 +- src/representations/path_set.rs | 3 +- src/representations/project.rs | 2 +- src/representations/sourcefile.rs | 2 +- src/representations/tree.rs | 3 +- src/systems/asynch/mod.rs | 12 +- src/systems/asynch/system.rs | 75 ++--- src/systems/asynch/types.rs | 30 -- src/systems/io/bindings.rs | 12 +- src/systems/io/facade.rs | 35 +-- src/systems/io/flow.rs | 154 +---------- src/systems/io/instances.rs | 78 ++---- src/systems/io/io.orc | 6 +- src/systems/io/mod.rs | 9 +- src/systems/io/service.rs | 119 ++++++++ src/systems/mod.rs | 8 +- src/systems/scheduler/busy.rs | 133 +++++++++ src/systems/scheduler/canceller.rs | 32 +++ src/systems/scheduler/mod.rs | 10 + src/systems/scheduler/system.rs | 305 +++++++++++++++++++++ src/systems/scheduler/take_and_drop.rs | 44 +++ src/systems/stl/bin.rs | 3 +- src/systems/stl/bool.rs | 4 +- src/systems/stl/state.rs | 2 +- src/utils/{iter.rs => boxed_iter.rs} | 2 +- src/utils/{get_or_default.rs => get_or.rs} | 0 src/utils/id_map.rs | 71 +++++ src/utils/mod.rs | 20 +- src/utils/{event_poller.rs => poller.rs} | 0 src/utils/{pushed.rs => pure_push.rs} | 0 src/utils/{rc_to_owned.rs => rc_tools.rs} | 0 51 files changed, 991 insertions(+), 379 deletions(-) delete mode 100644 src/systems/asynch/types.rs create mode 100644 src/systems/io/service.rs create mode 100644 src/systems/scheduler/busy.rs create mode 100644 src/systems/scheduler/canceller.rs create mode 100644 src/systems/scheduler/mod.rs create mode 100644 src/systems/scheduler/system.rs create mode 100644 src/systems/scheduler/take_and_drop.rs rename src/utils/{iter.rs => boxed_iter.rs} (90%) rename src/utils/{get_or_default.rs => get_or.rs} (100%) create mode 100644 src/utils/id_map.rs rename src/utils/{event_poller.rs => poller.rs} (100%) rename src/utils/{pushed.rs => pure_push.rs} (100%) rename src/utils/{rc_to_owned.rs => rc_tools.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 3899947..cc78c6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,6 +100,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" + [[package]] name = "block-buffer" version = "0.10.4" @@ -160,7 +166,7 @@ checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636" dependencies = [ "anstream", "anstyle", - "bitflags", + "bitflags 1.3.2", "clap_lex", "strsim", ] @@ -388,7 +394,7 @@ checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ "hermit-abi", "io-lifetimes", - "rustix", + "rustix 0.37.19", "windows-sys", ] @@ -403,9 +409,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.142" +version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a987beff54b60ffa6d51982e1aa1146bc42f19bd26be28b0586f252fccf5317" +checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" [[package]] name = "linux-raw-sys" @@ -413,6 +419,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f" +[[package]] +name = "linux-raw-sys" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" + [[package]] name = "log" version = "0.4.17" @@ -487,17 +499,15 @@ checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" [[package]] name = "polling" -version = "2.8.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +checksum = "51348b98db9d4a18ada4fdf7ff5274666e7e6c5a50c42a7d77c5e5c0cb6b036b" dependencies = [ - "autocfg", - "bitflags", "cfg-if", "concurrent-queue", - "libc", - "log", "pin-project-lite", + "rustix 0.38.13", + "tracing", "windows-sys", ] @@ -571,9 +581,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "rust-embed" -version = "6.6.1" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b68543d5527e158213414a92832d2aab11a84d2571a5eb021ebe22c43aab066" +checksum = "b1e7d90385b59f0a6bf3d3b757f3ca4ece2048265d70db20a2016043d4509a40" dependencies = [ "rust-embed-impl", "rust-embed-utils", @@ -582,22 +592,22 @@ dependencies = [ [[package]] name = "rust-embed-impl" -version = "6.5.0" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d4e0f0ced47ded9a68374ac145edd65a6c1fa13a96447b873660b2a568a0fd7" +checksum = "3c3d8c6fd84090ae348e63a84336b112b5c3918b3bf0493a581f7bd8ee623c29" dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 1.0.109", + "syn 2.0.13", "walkdir", ] [[package]] name = "rust-embed-utils" -version = "7.5.0" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "512b0ab6853f7e14e3c8754acb43d6f748bb9ced66aa5915a6553ac8213f7731" +checksum = "873feff8cb7bf86fdf0a71bb21c95159f4e4a37dd7a4bd1855a940909b583ada" dependencies = [ "globset", "sha2", @@ -619,11 +629,24 @@ version = "0.37.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.7", + "windows-sys", +] + +[[package]] +name = "rustix" +version = "0.38.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" +dependencies = [ + "bitflags 2.4.0", + "errno", + "libc", + "linux-raw-sys 0.4.7", "windows-sys", ] @@ -726,6 +749,23 @@ dependencies = [ "syn 2.0.13", ] +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" + [[package]] name = "trait-set" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 60f3492..3313db4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,9 +31,9 @@ dyn-clone = "1.0" clap = { version = "4.3", features = ["derive"] } trait-set = "0.3" paste = "1.0" -rust-embed = { version = "6.6", features = ["include-exclude"] } +rust-embed = { version = "8.0", features = ["include-exclude"] } duplicate = "1.0.0" take_mut = "0.2.2" unicode-segmentation = "1.10.1" -polling = "2.8.0" +polling = "3.0.0" derive_more = "0.99.17" diff --git a/src/bin/orcx.rs b/src/bin/orcx.rs index caa5718..28810bf 100644 --- a/src/bin/orcx.rs +++ b/src/bin/orcx.rs @@ -8,8 +8,9 @@ use std::process; use clap::Parser; use itertools::Itertools; use orchidlang::facade::{Environment, PreMacro}; +use orchidlang::systems::asynch::AsynchSystem; use orchidlang::systems::stl::StlConfig; -use orchidlang::systems::{io_system, AsynchConfig, IOStream}; +use orchidlang::systems::{io, scheduler}; use orchidlang::{ast, interpreted, interpreter, Interner, Sym, VName}; use crate::cli::cmd_prompt; @@ -134,15 +135,17 @@ pub fn main() { let dir = PathBuf::try_from(args.dir).unwrap(); let i = Interner::new(); let main = to_vname(&args.main, &i); - let mut asynch = AsynchConfig::new(); - let io = io_system(&mut asynch, None, None, [ - ("stdin", IOStream::Source(BufReader::new(Box::new(std::io::stdin())))), - ("stdout", IOStream::Sink(Box::new(std::io::stdout()))), - ("stderr", IOStream::Sink(Box::new(std::io::stderr()))), + let mut asynch = AsynchSystem::new(); + let scheduler = scheduler::SeqScheduler::new(&mut asynch); + let io = io::Service::new(scheduler.clone(), [ + ("stdin", io::Stream::Source(BufReader::new(Box::new(std::io::stdin())))), + ("stdout", io::Stream::Sink(Box::new(std::io::stdout()))), + ("stderr", io::Stream::Sink(Box::new(std::io::stderr()))), ]); let env = Environment::new(&i) .add_system(StlConfig { impure: true }) .add_system(asynch) + .add_system(scheduler) .add_system(io); let premacro = env.load_dir(&dir, &main).unwrap(); if args.dump_repo { diff --git a/src/error/no_targets.rs b/src/error/no_targets.rs index 3b98dca..6942187 100644 --- a/src/error/no_targets.rs +++ b/src/error/no_targets.rs @@ -1,7 +1,7 @@ use super::{ErrorPosition, ProjectError}; #[allow(unused)] // for doc use crate::parse_layer; -use crate::utils::iter::box_empty; +use crate::utils::boxed_iter::box_empty; use crate::utils::BoxedIter; /// Error produced when [parse_layer] is called without targets. This function diff --git a/src/error/project_error.rs b/src/error/project_error.rs index 91edbb6..df9e5e8 100644 --- a/src/error/project_error.rs +++ b/src/error/project_error.rs @@ -2,7 +2,7 @@ use std::fmt::{Debug, Display}; use std::rc::Rc; use crate::representations::location::Location; -use crate::utils::iter::box_once; +use crate::utils::boxed_iter::box_once; use crate::utils::BoxedIter; /// A point of interest in resolving the error, such as the point where diff --git a/src/facade/system.rs b/src/facade/system.rs index df03450..1d07328 100644 --- a/src/facade/system.rs +++ b/src/facade/system.rs @@ -4,7 +4,7 @@ use crate::error::{ErrorPosition, ProjectError}; use crate::interpreter::HandlerTable; use crate::pipeline::file_loader::{IOResult, Loaded}; use crate::sourcefile::FileEntry; -use crate::utils::iter::box_empty; +use crate::utils::boxed_iter::box_empty; use crate::utils::BoxedIter; use crate::{ConstTree, Interner, Tok, VName}; @@ -66,7 +66,7 @@ impl ProjectError for MissingSystemCode { /// Trait for objects that can be converted into a [System] in the presence /// of an [Interner]. -pub trait IntoSystem<'a>: 'a { +pub trait IntoSystem<'a> { /// Convert this object into a system using an interner fn into_system(self, i: &Interner) -> System<'a>; } diff --git a/src/foreign/atom.rs b/src/foreign/atom.rs index 559f54d..7934dd3 100644 --- a/src/foreign/atom.rs +++ b/src/foreign/atom.rs @@ -34,6 +34,12 @@ pub trait Atomic: Any + Debug + DynClone where Self: 'static, { + /// A fully type-erased interface to issue a command to the unknown type + /// and see if it supports it + fn request(&self, _request: Box) -> Option> { + None + } + /// Casts this value to [Any] so that its original value can be salvaged /// during introspection by other external code. There is no other way to /// interact with values of unknown types at the moment. diff --git a/src/foreign/cps_box.rs b/src/foreign/cps_box.rs index 5ac7735..518053d 100644 --- a/src/foreign/cps_box.rs +++ b/src/foreign/cps_box.rs @@ -4,11 +4,11 @@ use std::fmt::Debug; use trait_set::trait_set; -use super::{Atomic, AtomicResult, AtomicReturn, ExternFn, XfnResult}; +use super::{Atomic, ExternFn, XfnResult}; use crate::interpreted::{Clause, ExprInst}; use crate::interpreter::{Context, HandlerRes}; -use crate::utils::pushed::pushed_ref; -use crate::{atomic_defaults, ConstTree}; +use crate::utils::pure_push::pushed_ref; +use crate::{ConstTree, atomic_inert}; trait_set! { /// A "well behaved" type that can be used as payload in a CPS box @@ -93,16 +93,7 @@ impl CPSBox { } } -impl Atomic for CPSBox { - atomic_defaults!(); - fn run(&self, ctx: Context) -> AtomicResult { - Ok(AtomicReturn { - clause: self.clone().atom_cls(), - gas: ctx.gas, - inert: true, - }) - } -} +atomic_inert!(CPSBox(T:(CPSPayload)), typestr = "a CPS box"); /// Like [init_cps] but wrapped in a [ConstTree] for init-time usage pub fn const_cps(argc: usize, payload: T) -> ConstTree { diff --git a/src/foreign_macros/atomic_inert.rs b/src/foreign_macros/atomic_inert.rs index c836a28..acb89bf 100644 --- a/src/foreign_macros/atomic_inert.rs +++ b/src/foreign_macros/atomic_inert.rs @@ -12,10 +12,27 @@ use crate::foreign::Atomic; /// Implement [Atomic] for a structure that cannot be transformed any further. /// This would be optimal for atomics encapsulating raw data. [Atomic] depends /// on [Any], [Debug] and [DynClone]. +/// +/// If the type in question is parametric, the angle brackets must be replaced +/// by parentheses, and the contraints must be parenthesized, for conenient +/// parsing. See the below example: +/// +/// ```ignore +/// use orchidlang::atomic_inert; +/// +/// struct MyContainer() +/// +/// atomic_inert!( MyContainer(T, U:(Clone), V:(Eq + Hash)), "my container" ); +/// ``` #[macro_export] macro_rules! atomic_inert { - ($typ:ident, $typename:expr) => { - impl $crate::foreign::Atomic for $typ { + ( $typ:ident $( ( + $( $typevar:ident $( : ( + $( $constraints:tt )* + ) )? ),+ ) )? + , typestr = $typename:expr $( , request = $reqhandler:expr )?) => { + impl $(< $($typevar : $( $($constraints)* + )? 'static ),+ >)? $crate::foreign::Atomic + for $typ $(< $($typevar),+ >)? { $crate::atomic_defaults! {} fn run( @@ -28,9 +45,21 @@ macro_rules! atomic_inert { inert: true, }) } + + $( + fn request( + &self, + request: Box + ) -> Option> { + let lambda = $reqhandler; + lambda(request, self) + } + )? } - impl TryFrom<&ExprInst> for $typ { + impl $(< $($typevar : $( $($constraints)* + )? 'static ),+ >)? + TryFrom<&$crate::interpreted::ExprInst> + for $typ $(< $($typevar),+ >)? { type Error = std::rc::Rc; fn try_from( @@ -39,7 +68,7 @@ macro_rules! atomic_inert { $crate::systems::cast_exprinst::with_atom( value, $typename, - |a: &$typ| Ok(a.clone()), + |a: &$typ $(< $($typevar),+ >)?| Ok(a.clone()), ) } } diff --git a/src/lib.rs b/src/lib.rs index a4b35db..7cb29ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,4 +33,5 @@ pub use representations::{ ast, from_const_tree, interpreted, sourcefile, tree, ConstTree, Literal, Location, NameLike, OrcString, PathSet, Primitive, Sym, VName, }; -pub use utils::{thread_pool, Side, Substack}; +pub use utils::substack::Substack; +pub use utils::{IdMap, Side}; diff --git a/src/parse/multiname.rs b/src/parse/multiname.rs index 2beb905..d3fa9cf 100644 --- a/src/parse/multiname.rs +++ b/src/parse/multiname.rs @@ -6,7 +6,7 @@ use super::stream::Stream; use super::Lexeme; use crate::error::{ProjectError, ProjectResult}; use crate::sourcefile::Import; -use crate::utils::iter::{box_chain, box_once}; +use crate::utils::boxed_iter::{box_chain, box_once}; use crate::utils::BoxedIter; use crate::{Location, Tok}; diff --git a/src/pipeline/dealias/resolve_aliases.rs b/src/pipeline/dealias/resolve_aliases.rs index f11294b..2b25568 100644 --- a/src/pipeline/dealias/resolve_aliases.rs +++ b/src/pipeline/dealias/resolve_aliases.rs @@ -8,7 +8,7 @@ use crate::representations::project::{ ItemKind, ProjectExt, ProjectItem, ProjectMod, }; use crate::tree::{ModEntry, ModMember, Module}; -use crate::utils::pushed::pushed; +use crate::utils::pure_push::pushed; use crate::{Interner, ProjectTree, Tok, VName}; fn resolve_aliases_rec( diff --git a/src/pipeline/import_abs_path.rs b/src/pipeline/import_abs_path.rs index 512a273..df1bd2f 100644 --- a/src/pipeline/import_abs_path.rs +++ b/src/pipeline/import_abs_path.rs @@ -1,7 +1,7 @@ use crate::error::ProjectResult; use crate::interner::{Interner, Tok}; use crate::representations::sourcefile::absolute_path; -use crate::utils::Substack; +use crate::utils::substack::Substack; use crate::{Location, VName}; pub fn import_abs_path( diff --git a/src/pipeline/project_tree/build_tree.rs b/src/pipeline/project_tree/build_tree.rs index a8c9efd..516f7ec 100644 --- a/src/pipeline/project_tree/build_tree.rs +++ b/src/pipeline/project_tree/build_tree.rs @@ -12,8 +12,8 @@ use crate::sourcefile::{ FileEntry, FileEntryKind, Member, MemberKind, ModuleBlock, }; use crate::tree::{ModEntry, ModMember, Module}; -use crate::utils::get_or_default; -use crate::utils::pushed::pushed_ref; +use crate::utils::get_or::get_or_default; +use crate::utils::pure_push::pushed_ref; use crate::{Tok, VName}; pub struct TreeReport { diff --git a/src/pipeline/project_tree/import_tree.rs b/src/pipeline/project_tree/import_tree.rs index 1a27462..518de9a 100644 --- a/src/pipeline/project_tree/import_tree.rs +++ b/src/pipeline/project_tree/import_tree.rs @@ -10,8 +10,8 @@ use crate::pipeline::source_loader::{PreMod, Preparsed}; use crate::representations::project::ImpReport; use crate::sourcefile::{absolute_path, Import}; use crate::tree::{ErrKind, ModEntry, ModMember, Module, WalkError}; -use crate::utils::iter::{box_chain, box_once}; -use crate::utils::pushed::pushed_ref; +use crate::utils::boxed_iter::{box_chain, box_once}; +use crate::utils::pure_push::pushed_ref; use crate::utils::{unwrap_or, BoxedIter}; use crate::{Interner, ProjectTree, Tok, VName}; diff --git a/src/pipeline/project_tree/rebuild_tree.rs b/src/pipeline/project_tree/rebuild_tree.rs index 3f30e64..0412c77 100644 --- a/src/pipeline/project_tree/rebuild_tree.rs +++ b/src/pipeline/project_tree/rebuild_tree.rs @@ -13,7 +13,7 @@ use crate::representations::project::{ImpReport, ProjectExt, ProjectMod}; use crate::sourcefile::FileEntry; use crate::tree::{ModEntry, ModMember, Module}; use crate::utils::never::{always, unwrap_always}; -use crate::utils::pushed::pushed_ref; +use crate::utils::pure_push::pushed_ref; use crate::utils::unwrap_or; use crate::{parse, Interner, ProjectTree, Tok, VName}; diff --git a/src/pipeline/source_loader/load_source.rs b/src/pipeline/source_loader/load_source.rs index f376c79..640a8dd 100644 --- a/src/pipeline/source_loader/load_source.rs +++ b/src/pipeline/source_loader/load_source.rs @@ -11,7 +11,7 @@ use crate::pipeline::file_loader::{IOResult, Loaded}; use crate::pipeline::import_abs_path::import_abs_path; use crate::representations::sourcefile::FileEntry; use crate::tree::Module; -use crate::utils::pushed::pushed_ref; +use crate::utils::pure_push::pushed_ref; use crate::utils::{split_max_prefix, unwrap_or}; use crate::Location; diff --git a/src/pipeline/source_loader/preparse.rs b/src/pipeline/source_loader/preparse.rs index dc91e6d..95233a6 100644 --- a/src/pipeline/source_loader/preparse.rs +++ b/src/pipeline/source_loader/preparse.rs @@ -14,8 +14,8 @@ use crate::parse::{self, ParsingContext}; use crate::representations::sourcefile::{FileEntry, MemberKind}; use crate::representations::tree::{ModEntry, ModMember, Module}; use crate::sourcefile::{FileEntryKind, Import, Member, ModuleBlock}; -use crate::utils::pushed::pushed; -use crate::utils::{get_or_default, get_or_make}; +use crate::utils::pure_push::pushed; +use crate::utils::get_or::{get_or_default, get_or_make}; use crate::{Location, Tok, VName}; struct FileReport { diff --git a/src/representations/ast.rs b/src/representations/ast.rs index c117335..830c071 100644 --- a/src/representations/ast.rs +++ b/src/representations/ast.rs @@ -18,7 +18,7 @@ use super::namelike::{NameLike, VName}; use super::primitive::Primitive; use crate::interner::Tok; use crate::parse::print_nat16; -use crate::utils::map_rc; +use crate::utils::rc_tools::map_rc; /// A [Clause] with associated metadata #[derive(Clone, Debug, PartialEq)] diff --git a/src/representations/ast_to_postmacro.rs b/src/representations/ast_to_postmacro.rs index 858a8f9..2857c1c 100644 --- a/src/representations/ast_to_postmacro.rs +++ b/src/representations/ast_to_postmacro.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use super::location::Location; use super::{ast, postmacro}; use crate::error::ProjectError; -use crate::utils::Substack; +use crate::utils::substack::Substack; use crate::Sym; #[derive(Debug, Clone)] diff --git a/src/representations/const_tree.rs b/src/representations/const_tree.rs index 5b5d354..e86d6e1 100644 --- a/src/representations/const_tree.rs +++ b/src/representations/const_tree.rs @@ -10,7 +10,7 @@ use crate::representations::location::Location; use crate::representations::project::{ProjectExt, ProjectMod, ProjectTree}; use crate::representations::tree::{ModEntry, ModMember, Module}; use crate::representations::{Primitive, VName}; -use crate::utils::Substack; +use crate::utils::substack::Substack; /// A lightweight module tree that can be built declaratively by hand to /// describe libraries of external functions in Rust. It implements [Add] for diff --git a/src/representations/path_set.rs b/src/representations/path_set.rs index 5edfcb5..dca7bb4 100644 --- a/src/representations/path_set.rs +++ b/src/representations/path_set.rs @@ -2,7 +2,8 @@ use std::fmt::Debug; use std::ops::Add; use std::rc::Rc; -use crate::utils::{rc_to_owned, Side}; +use crate::utils::rc_tools::rc_to_owned; +use crate::utils::Side; /// A branching path selecting some placeholders (but at least one) in a Lambda /// expression diff --git a/src/representations/project.rs b/src/representations/project.rs index 3b9ee2f..49fb165 100644 --- a/src/representations/project.rs +++ b/src/representations/project.rs @@ -9,7 +9,7 @@ use crate::representations::tree::{ModMember, Module}; use crate::representations::NameLike; use crate::tree::ModEntry; use crate::utils::never::{always, Always}; -use crate::utils::Substack; +use crate::utils::substack::Substack; use crate::{Sym, VName}; #[derive(Debug, Clone)] diff --git a/src/representations/sourcefile.rs b/src/representations/sourcefile.rs index 74b7868..31d64b6 100644 --- a/src/representations/sourcefile.rs +++ b/src/representations/sourcefile.rs @@ -8,7 +8,7 @@ use super::namelike::VName; use crate::ast::{Constant, Rule}; use crate::error::{ProjectError, ProjectResult, TooManySupers}; use crate::interner::{Interner, Tok}; -use crate::utils::pushed::pushed; +use crate::utils::pure_push::pushed; use crate::utils::{unwrap_or, BoxedIter}; use crate::Location; diff --git a/src/representations/tree.rs b/src/representations/tree.rs index 708a8f9..93a8849 100644 --- a/src/representations/tree.rs +++ b/src/representations/tree.rs @@ -10,7 +10,8 @@ use hashbrown::HashMap; use super::Location; use crate::error::ProjectError; use crate::interner::Tok; -use crate::utils::{BoxedIter, Substack}; +use crate::utils::substack::Substack; +use crate::utils::BoxedIter; use crate::{Interner, VName}; /// The member in a [ModEntry] which is associated with a name in a [Module] diff --git a/src/systems/asynch/mod.rs b/src/systems/asynch/mod.rs index 0ebfbe9..d94a886 100644 --- a/src/systems/asynch/mod.rs +++ b/src/systems/asynch/mod.rs @@ -1,5 +1,9 @@ -mod system; -mod types; +//! An event queue other systems can use to trigger events on the main +//! interpreter thread. These events are handled when the Orchid code returns +//! `system::async::yield`, and may cause additional Orchid code to be executed +//! beyond being general Rust functions. +//! It also exposes timers. -pub use system::{AsynchConfig, InfiniteBlock}; -pub use types::{Asynch, MessagePort}; +mod system; + +pub use system::{AsynchSystem, InfiniteBlock, MessagePort}; diff --git a/src/systems/asynch/system.rs b/src/systems/asynch/system.rs index 69e86e4..16c8e94 100644 --- a/src/systems/asynch/system.rs +++ b/src/systems/asynch/system.rs @@ -1,5 +1,6 @@ use std::any::{type_name, Any, TypeId}; use std::cell::RefCell; +use std::collections::VecDeque; use std::fmt::{Debug, Display}; use std::rc::Rc; use std::sync::mpsc::Sender; @@ -8,16 +9,15 @@ use std::time::Duration; use hashbrown::HashMap; use ordered_float::NotNan; -use super::types::MessagePort; -use super::Asynch; use crate::facade::{IntoSystem, System}; use crate::foreign::cps_box::{init_cps, CPSBox}; -use crate::foreign::ExternError; +use crate::foreign::{Atomic, ExternError}; use crate::interpreted::ExprInst; use crate::interpreter::HandlerTable; use crate::systems::codegen::call; use crate::systems::stl::Boolean; -use crate::utils::{unwrap_or, PollEvent, Poller}; +use crate::utils::poller::{PollEvent, Poller}; +use crate::utils::unwrap_or; use crate::{atomic_inert, define_fn, ConstTree, Interner}; #[derive(Debug, Clone)] @@ -45,7 +45,7 @@ impl Debug for CancelTimer { #[derive(Clone, Debug)] struct Yield; -atomic_inert!(Yield, "a yield command"); +atomic_inert!(Yield, typestr = "a yield command"); /// Error indicating a yield command when all event producers and timers had /// exited @@ -59,43 +59,43 @@ impl Display for InfiniteBlock { } } -impl MessagePort for Sender> { - fn send(&mut self, message: T) { - let _ = Self::send(self, Box::new(message)); +/// A thread-safe handle that can be used to send events of any type +#[derive(Clone)] +pub struct MessagePort(Sender>); +impl MessagePort { + /// Send an event. Any type is accepted, handlers are dispatched by type ID + pub fn send(&mut self, message: T) { + let _ = self.0.send(Box::new(message)); } } -impl MessagePort for F -where - F: FnMut(Box) + Send + Clone + 'static, -{ - fn send(&mut self, message: T) { - self(Box::new(message)) - } -} +type AnyHandler<'a> = Box) -> Vec + 'a>; -type AnyHandler<'a> = Box) -> Option + 'a>; - -/// Datastructures the asynch system will eventually be constructed from -pub struct AsynchConfig<'a> { +/// Datastructures the asynch system will eventually be constructed from. +pub struct AsynchSystem<'a> { poller: Poller, ExprInst, ExprInst>, sender: Sender>, handlers: HashMap>, } -impl<'a> AsynchConfig<'a> { + +impl<'a> AsynchSystem<'a> { /// Create a new async event loop that allows registering handlers and taking /// references to the port before it's converted into a [System] pub fn new() -> Self { let (sender, poller) = Poller::new(); Self { poller, sender, handlers: HashMap::new() } } -} -impl<'a> Asynch for AsynchConfig<'a> { - type Port = Sender>; - fn register( + /// Register a callback to be called on the owning thread when an object of + /// the given type is found on the queue. Each type should signify a single + /// command so each type should have exactly one handler. + /// + /// # Panics + /// + /// if the given type is already handled. + pub fn register( &mut self, - mut f: impl FnMut(Box) -> Option + 'a, + mut f: impl FnMut(Box) -> Vec + 'a, ) { let cb = move |a: Box| f(a.downcast().expect("keyed by TypeId")); let prev = self.handlers.insert(TypeId::of::(), Box::new(cb)); @@ -106,18 +106,21 @@ impl<'a> Asynch for AsynchConfig<'a> { ) } - fn get_port(&self) -> Self::Port { - self.sender.clone() + /// Obtain a message port for sending messages to the main thread. If an + /// object is passed to the MessagePort that does not have a handler, the + /// main thread panics. + pub fn get_port(&self) -> MessagePort { + MessagePort(self.sender.clone()) } } -impl<'a> Default for AsynchConfig<'a> { +impl<'a> Default for AsynchSystem<'a> { fn default() -> Self { Self::new() } } -impl<'a> IntoSystem<'a> for AsynchConfig<'a> { +impl<'a> IntoSystem<'a> for AsynchSystem<'a> { fn into_system(self, i: &Interner) -> System<'a> { let Self { mut handlers, poller, .. } = self; let mut handler_table = HandlerTable::new(); @@ -143,7 +146,11 @@ impl<'a> IntoSystem<'a> for AsynchConfig<'a> { }); handler_table.register({ let polly = polly.clone(); + let mut microtasks = VecDeque::new(); move |_: &Yield| { + if let Some(expr) = microtasks.pop_front() { + return Ok(expr); + } let mut polly = polly.borrow_mut(); loop { let next = unwrap_or!(polly.run(); @@ -157,8 +164,12 @@ impl<'a> IntoSystem<'a> for AsynchConfig<'a> { .unwrap_or_else(|| { panic!("Unhandled messgae type: {:?}", ev.type_id()) }); - if let Some(expr) = handler(ev) { - return Ok(expr); + let events = handler(ev); + // we got new microtasks + if !events.is_empty() { + microtasks = VecDeque::from(events); + // trampoline + return Ok(Yield.atom_exi()); } }, } diff --git a/src/systems/asynch/types.rs b/src/systems/asynch/types.rs deleted file mode 100644 index 8a418dc..0000000 --- a/src/systems/asynch/types.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::interpreted::ExprInst; - -/// A thread-safe handle that can be used to send events of any type -pub trait MessagePort: Send + Clone + 'static { - /// Send an event. Any type is accepted, handlers are dispatched by type ID - fn send(&mut self, message: T); -} - -pub trait Asynch { - /// A thread-safe handle that can be used to push events into the dispatcher - type Port: MessagePort; - - /// Register a function that will be called synchronously when an event of the - /// accepted type is dispatched. Only one handler may be specified for each - /// event type. The handler may choose to process the event autonomously, or - /// return an Orchid thunk for the interpreter to execute. - /// - /// # Panics - /// - /// When the function is called with an argument type it was previously called - /// with - fn register( - &mut self, - f: impl FnMut(Box) -> Option + 'static, - ); - - /// Return a handle that can be passed to worker threads and used to push - /// events onto the dispatcher - fn get_port(&self) -> Self::Port; -} diff --git a/src/systems/io/bindings.rs b/src/systems/io/bindings.rs index 64890cb..b2fa6f9 100644 --- a/src/systems/io/bindings.rs +++ b/src/systems/io/bindings.rs @@ -33,7 +33,7 @@ define_fn! { n: u64 } => Ok(init_cps(3, IOCmdHandlePack{ cmd: ReadCmd::RBytes(BRead::N((*n).try_into().unwrap())), - handle: *stream + handle: stream.clone() })) } define_fn! { @@ -47,7 +47,7 @@ define_fn! { ))?; Ok(init_cps(3, IOCmdHandlePack{ cmd: ReadCmd::RBytes(BRead::Until(delim)), - handle: *stream + handle: stream.clone() })) } } @@ -57,7 +57,7 @@ define_fn! { string: OrcString } => Ok(init_cps(3, IOCmdHandlePack { cmd: WriteCmd::WStr(string.get_string()), - handle: *stream, + handle: stream.clone(), })) } define_fn! { @@ -66,7 +66,7 @@ define_fn! { bytes: Binary } => Ok(init_cps(3, IOCmdHandlePack { cmd: WriteCmd::WBytes(bytes.clone()), - handle: *stream + handle: stream.clone(), })) } define_fn! { @@ -76,9 +76,9 @@ define_fn! { })) } -pub fn io_bindings( +pub fn io_bindings<'a>( i: &Interner, - std_streams: impl IntoIterator)>, + std_streams: impl IntoIterator)>, ) -> ConstTree { ConstTree::namespace( [i.i("system"), i.i("io")], diff --git a/src/systems/io/facade.rs b/src/systems/io/facade.rs index 4d0ad4c..4d40ae9 100644 --- a/src/systems/io/facade.rs +++ b/src/systems/io/facade.rs @@ -17,7 +17,7 @@ use crate::foreign::{Atomic, ExternError}; use crate::interpreter::HandlerTable; use crate::pipeline::file_loader::embed_to_map; use crate::sourcefile::{FileEntry, FileEntryKind, Import}; -use crate::systems::asynch::{Asynch, MessagePort}; +use crate::systems::asynch::AsynchSystem; use crate::{Interner, Location}; trait_set! { @@ -32,25 +32,25 @@ struct IOEmbed; /// A registry that stores IO streams and executes blocking operations on them /// in a distinct thread pool -pub struct IOSystem { - read_system: Rc>>, - write_system: Rc>>, +pub struct IOSystem { + read_system: Rc>, + write_system: Rc>, global_streams: ST, } -impl IOSystem { +impl IOSystem { fn new( - mut get_port: impl FnMut() -> P, + asynch: &AsynchSystem, on_sink_close: Option>, on_source_close: Option>, global_streams: ST, ) -> Self { Self { read_system: Rc::new(RefCell::new(IOManager::new( - get_port(), + asynch.get_port(), on_source_close, ))), write_system: Rc::new(RefCell::new(IOManager::new( - get_port(), + asynch.get_port(), on_sink_close, ))), global_streams, @@ -93,26 +93,19 @@ pub enum IOStream { /// takes a generic parameter which is initialized from an existential in the /// [AsynchConfig]. pub fn io_system( - asynch: &'_ mut impl Asynch, + asynch: &'_ mut AsynchSystem, on_sink_close: Option>, on_source_close: Option>, std_streams: impl IntoIterator, -) -> IOSystem { - let this = IOSystem::new( - || asynch.get_port(), - on_sink_close, - on_source_close, - std_streams, - ); +) -> IOSystem { + let this = IOSystem::new(asynch, on_sink_close, on_source_close, std_streams); let (r, w) = (this.read_system.clone(), this.write_system.clone()); - asynch.register(move |event| r.borrow_mut().dispatch(*event)); - asynch.register(move |event| w.borrow_mut().dispatch(*event)); + asynch.register(move |event| vec![r.borrow_mut().dispatch(*event)]); + asynch.register(move |event| vec![w.borrow_mut().dispatch(*event)]); this } -impl<'a, P: MessagePort, ST: StreamTable + 'a> IntoSystem<'a> - for IOSystem -{ +impl<'a, ST: StreamTable + 'a> IntoSystem<'a> for IOSystem { fn into_system(self, i: &Interner) -> System<'a> { let (r, w) = (self.read_system.clone(), self.write_system.clone()); let mut handlers = HandlerTable::new(); diff --git a/src/systems/io/flow.rs b/src/systems/io/flow.rs index 2c0ef7a..6a9b0cc 100644 --- a/src/systems/io/flow.rs +++ b/src/systems/io/flow.rs @@ -1,22 +1,13 @@ -use std::collections::VecDeque; use std::fmt::Display; -use hashbrown::HashMap; - use crate::foreign::ExternError; -use crate::systems::asynch::MessagePort; -use crate::thread_pool::{Task, ThreadPool}; -use crate::utils::take_with_output; +use crate::systems::scheduler::Canceller; -pub trait StreamHandle: Clone + Send { - fn new(id: usize) -> Self; - fn id(&self) -> usize; -} - -pub trait IOHandler { +pub trait IOHandler { type Product; - fn handle(self, result: Cmd::Result) -> Self::Product; + fn handle(self, result: T) -> Self::Product; + fn early_cancel(self) -> Self::Product; } pub trait IOResult: Send { @@ -26,33 +17,16 @@ pub trait IOResult: Send { fn handle(self, handler: Self::Handler) -> Self::HandlerProduct; } -pub struct IOEvent { - pub result: Cmd::Result, - pub stream: Cmd::Stream, - pub handle: Cmd::Handle, -} - pub trait IOCmd: Send { type Stream: Send; type Result: Send; - type Handle: StreamHandle; + type Handle; - fn execute(self, stream: &mut Self::Stream) -> Self::Result; -} - -pub struct IOTask { - pub cmd: Cmd, - pub stream: Cmd::Stream, - pub handle: Cmd::Handle, - pub port: P, -} - -impl Task for IOTask { - fn run(self) { - let Self { cmd, handle, mut port, mut stream } = self; - let result = cmd.execute(&mut stream); - port.send(IOEvent:: { handle, result, stream }) - } + fn execute( + self, + stream: &mut Self::Stream, + cancel: Canceller, + ) -> Self::Result; } #[derive(Debug, Clone)] @@ -61,11 +35,6 @@ pub struct IOCmdHandlePack { pub handle: Cmd::Handle, } -enum StreamState> { - Free(Cmd::Stream), - Busy { handler: H, queue: VecDeque<(Cmd, H)>, closing: bool }, -} - #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub struct NoActiveStream(usize); impl ExternError for NoActiveStream {} @@ -74,106 +43,3 @@ impl Display for NoActiveStream { write!(f, "The stream {} had already been closed", self.0) } } - -pub struct IOManager> { - next_id: usize, - streams: HashMap>, - on_close: Option>, - thread_pool: ThreadPool>, - port: P, -} - -impl> IOManager { - pub fn new(port: P, on_close: Option>) -> Self { - Self { - next_id: 0, - streams: HashMap::new(), - thread_pool: ThreadPool::new(), - on_close, - port, - } - } - - pub fn add_stream(&mut self, stream: Cmd::Stream) -> Cmd::Handle { - let id = self.next_id; - self.next_id += 1; - self.streams.insert(id, StreamState::Free(stream)); - Cmd::Handle::new(id) - } - - fn dispose_stream(&mut self, stream: Cmd::Stream) { - match &mut self.on_close { - Some(f) => f(stream), - None => drop(stream), - } - } - - pub fn close_stream( - &mut self, - handle: Cmd::Handle, - ) -> Result<(), NoActiveStream> { - let state = - (self.streams.remove(&handle.id())).ok_or(NoActiveStream(handle.id()))?; - match state { - StreamState::Free(stream) => self.dispose_stream(stream), - StreamState::Busy { handler, queue, closing } => { - let new_state = StreamState::Busy { handler, queue, closing: true }; - self.streams.insert(handle.id(), new_state); - if closing { - return Err(NoActiveStream(handle.id())); - } - }, - } - Ok(()) - } - - pub fn command( - &mut self, - handle: Cmd::Handle, - cmd: Cmd, - new_handler: H, - ) -> Result<(), NoActiveStream> { - let state_mut = (self.streams.get_mut(&handle.id())) - .ok_or(NoActiveStream(handle.id()))?; - take_with_output(state_mut, |state| match state { - StreamState::Busy { closing: true, .. } => - (state, Err(NoActiveStream(handle.id()))), - StreamState::Busy { handler, mut queue, closing: false } => { - queue.push_back((cmd, new_handler)); - (StreamState::Busy { handler, queue, closing: false }, Ok(())) - }, - StreamState::Free(stream) => { - let task = IOTask { cmd, stream, handle, port: self.port.clone() }; - self.thread_pool.submit(task); - let new_state = StreamState::Busy { - handler: new_handler, - queue: VecDeque::new(), - closing: false, - }; - (new_state, Ok(())) - }, - }) - } - - pub fn dispatch(&mut self, event: IOEvent) -> Option { - let IOEvent { handle, result, stream } = event; - let id = handle.id(); - let state = - (self.streams.remove(&id)).expect("Event dispatched on unknown stream"); - let (handler, mut queue, closing) = match state { - StreamState::Busy { handler, queue, closing } => - (handler, queue, closing), - _ => panic!("Event dispatched but the source isn't locked"), - }; - if let Some((cmd, handler)) = queue.pop_front() { - let port = self.port.clone(); - self.thread_pool.submit(IOTask { handle, stream, cmd, port }); - self.streams.insert(id, StreamState::Busy { handler, queue, closing }); - } else if closing { - self.dispose_stream(stream) - } else { - self.streams.insert(id, StreamState::Free(stream)); - }; - Some(handler.handle(result)) - } -} diff --git a/src/systems/io/instances.rs b/src/systems/io/instances.rs index 26e2c09..bf549e0 100644 --- a/src/systems/io/instances.rs +++ b/src/systems/io/instances.rs @@ -1,38 +1,19 @@ use std::io::{self, BufRead, BufReader, Read, Write}; use std::sync::Arc; -use super::flow::{IOCmd, IOHandler, IOManager, StreamHandle}; +use super::flow::IOCmd; use crate::foreign::Atomic; use crate::interpreted::ExprInst; use crate::systems::codegen::call; +use crate::systems::scheduler::{Canceller, SharedHandle}; use crate::systems::stl::Binary; -use crate::{atomic_inert, Literal}; +use crate::Literal; pub type Source = BufReader>; pub type Sink = Box; -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct SourceHandle(usize); -atomic_inert!(SourceHandle, "an input stream handle"); -impl StreamHandle for SourceHandle { - fn new(id: usize) -> Self { - Self(id) - } - fn id(&self) -> usize { - self.0 - } -} -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct SinkHandle(usize); -atomic_inert!(SinkHandle, "an output stream handle"); -impl StreamHandle for SinkHandle { - fn new(id: usize) -> Self { - Self(id) - } - fn id(&self) -> usize { - self.0 - } -} +pub type SourceHandle = SharedHandle; +pub type SinkHandle = SharedHandle; /// String reading command #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] @@ -62,7 +43,11 @@ impl IOCmd for ReadCmd { // This is a buggy rule, check manually #[allow(clippy::read_zero_byte_vec)] - fn execute(self, stream: &mut Self::Stream) -> Self::Result { + fn execute( + self, + stream: &mut Self::Stream, + _cancel: Canceller, + ) -> Self::Result { match self { Self::RBytes(bread) => { let mut buf = Vec::new(); @@ -93,19 +78,17 @@ pub enum ReadResult { RStr(SRead, io::Result), RBin(BRead, io::Result>), } - -impl IOHandler for (ExprInst, ExprInst) { - type Product = ExprInst; - - fn handle(self, result: ReadResult) -> Self::Product { - let (succ, fail) = self; - match result { +impl ReadResult { + pub fn dispatch(self, succ: ExprInst, fail: ExprInst) -> Vec { + match self { ReadResult::RBin(_, Err(e)) | ReadResult::RStr(_, Err(e)) => - call(fail, vec![wrap_io_error(e)]).wrap(), - ReadResult::RBin(_, Ok(bytes)) => - call(succ, vec![Binary(Arc::new(bytes)).atom_cls().wrap()]).wrap(), + vec![call(fail, vec![wrap_io_error(e)]).wrap()], + ReadResult::RBin(_, Ok(bytes)) => { + let arg = Binary(Arc::new(bytes)).atom_cls().wrap(); + vec![call(succ, vec![arg]).wrap()] + }, ReadResult::RStr(_, Ok(text)) => - call(succ, vec![Literal::Str(text.into()).into()]).wrap(), + vec![call(succ, vec![Literal::Str(text.into()).into()]).wrap()], } } } @@ -116,8 +99,6 @@ fn wrap_io_error(_e: io::Error) -> ExprInst { Literal::Uint(0u64).into() } -pub type ReadManager

= IOManager; - /// Writing command (string or binary) #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum WriteCmd { @@ -131,7 +112,11 @@ impl IOCmd for WriteCmd { type Handle = SinkHandle; type Result = WriteResult; - fn execute(self, stream: &mut Self::Stream) -> Self::Result { + fn execute( + self, + stream: &mut Self::Stream, + _cancel: Canceller, + ) -> Self::Result { let result = match &self { Self::Flush => stream.flush(), Self::WStr(str) => write!(stream, "{}", str).map(|_| ()), @@ -145,16 +130,11 @@ pub struct WriteResult { pub cmd: WriteCmd, pub result: io::Result<()>, } -impl IOHandler for (ExprInst, ExprInst) { - type Product = ExprInst; - - fn handle(self, result: WriteResult) -> Self::Product { - let (succ, fail) = self; - match result.result { - Ok(_) => succ, - Err(e) => call(fail, vec![wrap_io_error(e)]).wrap(), +impl WriteResult { + pub fn dispatch(self, succ: ExprInst, fail: ExprInst) -> Vec { + match self.result { + Ok(_) => vec![succ], + Err(e) => vec![call(fail, vec![wrap_io_error(e)]).wrap()], } } } - -pub type WriteManager

= IOManager; diff --git a/src/systems/io/io.orc b/src/systems/io/io.orc index 07c1fbd..ddc9ba1 100644 --- a/src/systems/io/io.orc +++ b/src/systems/io/io.orc @@ -7,10 +7,10 @@ export const print := \text.\ok. ( (io::flush io::stdout ok (\e. panic "println threw on flush") - yield + \_. yield ) (\e. panic "print threw on write") - yield + \_. yield ) export const println := \line.\ok. ( @@ -21,7 +21,7 @@ export const readln := \ok. ( io::read_line io::stdin ok (\e. panic "readln threw") - yield + \_. yield ) export module prelude ( diff --git a/src/systems/io/mod.rs b/src/systems/io/mod.rs index 86fd421..2c0bbd4 100644 --- a/src/systems/io/mod.rs +++ b/src/systems/io/mod.rs @@ -1,6 +1,11 @@ +//! System that allows Orchid to interact with trait objects of Rust's `Writer` +//! and with `BufReader`s of `Reader` trait objects + mod bindings; -mod facade; +// mod facade; mod flow; mod instances; +mod service; -pub use facade::{io_system, IOStream, IOSystem}; +// pub use facade::{io_system, IOStream, IOSystem}; +pub use service::{Service, Stream, StreamTable}; diff --git a/src/systems/io/service.rs b/src/systems/io/service.rs new file mode 100644 index 0000000..611cf08 --- /dev/null +++ b/src/systems/io/service.rs @@ -0,0 +1,119 @@ +#[allow(unused)] // for doc +use std::io::{BufReader, Read, Write}; + +use rust_embed::RustEmbed; +use trait_set::trait_set; + +use super::bindings::io_bindings; +use super::flow::{IOCmd, IOCmdHandlePack}; +use super::instances::{ReadCmd, Sink, Source, WriteCmd}; +use crate::facade::{IntoSystem, System}; +use crate::foreign::cps_box::{init_cps, CPSBox}; +use crate::foreign::Atomic; +use crate::interpreter::HandlerTable; +use crate::pipeline::file_loader::embed_to_map; +use crate::sourcefile::{FileEntry, FileEntryKind, Import}; +use crate::systems::codegen::call; +use crate::systems::scheduler::{SeqScheduler, SharedHandle}; +use crate::Location; + +/// A shared type for sinks and sources +pub enum Stream { + /// A Source, aka. a BufReader + Source(Source), + /// A Sink, aka. a Writer + Sink(Sink), +} + +trait_set! { + /// The table of default streams to be overlain on the I/O module, typicially + /// stdin, stdout, stderr. + pub trait StreamTable<'a> = IntoIterator +} + +#[derive(RustEmbed)] +#[folder = "src/systems/io"] +#[prefix = "system/"] +#[include = "*.orc"] +struct IOEmbed; + +/// A streaming I/O service for interacting with Rust's [Write] and [Read] +/// traits. +pub struct Service<'a, ST: IntoIterator> { + scheduler: SeqScheduler, + global_streams: ST, +} +impl<'a, ST: IntoIterator> Service<'a, ST> { + /// Construct a new instance of the service + pub fn new(scheduler: SeqScheduler, global_streams: ST) -> Self { + Self { scheduler, global_streams } + } +} + +impl<'a, ST: IntoIterator> IntoSystem<'static> + for Service<'a, ST> +{ + fn into_system(self, i: &crate::Interner) -> crate::facade::System<'static> { + let scheduler = self.scheduler.clone(); + let mut handlers = HandlerTable::new(); + handlers.register(move |cps: &CPSBox>| { + let (IOCmdHandlePack { cmd, handle }, succ, fail, tail) = cps.unpack3(); + let (cmd, succ1, fail1) = (*cmd, succ.clone(), fail.clone()); + let result = scheduler.schedule( + handle.clone(), + move |mut stream, cancel| { + let ret = cmd.execute(&mut stream, cancel); + (stream, ret) + }, + move |stream, res, _cancel| (stream, res.dispatch(succ1, fail1)), + |stream| (stream, Vec::new()), + ); + match result { + Ok(cancel) => + Ok(call(tail.clone(), vec![init_cps(1, cancel).wrap()]).wrap()), + Err(e) => Ok(call(fail.clone(), vec![e.atom_exi()]).wrap()), + } + }); + let scheduler = self.scheduler.clone(); + handlers.register(move |cps: &CPSBox>| { + let (IOCmdHandlePack { cmd, handle }, succ, fail, tail) = cps.unpack3(); + let (cmd, succ1, fail1) = (cmd.clone(), succ.clone(), fail.clone()); + let result = scheduler.schedule( + handle.clone(), + move |mut stream, cancel| { + let ret = cmd.execute(&mut stream, cancel); + (stream, ret) + }, + move |stream, res, _cancel| (stream, res.dispatch(succ1, fail1)), + |stream| (stream, Vec::new()), + ); + match result { + Ok(cancel) => + Ok(call(tail.clone(), vec![init_cps(1, cancel).wrap()]).wrap()), + Err(e) => Ok(call(fail.clone(), vec![e.atom_exi()]).wrap()), + } + }); + let streams = self.global_streams.into_iter().map(|(n, stream)| { + let handle = match stream { + Stream::Sink(sink) => + Box::new(SharedHandle::wrap(sink)) as Box, + Stream::Source(source) => Box::new(SharedHandle::wrap(source)), + }; + (n, handle) + }); + System { + handlers, + name: vec!["system".to_string(), "io".to_string()], + constants: io_bindings(i, streams).unwrap_tree(), + code: embed_to_map::(".orc", i), + prelude: vec![FileEntry { + locations: vec![Location::Unknown], + kind: FileEntryKind::Import(vec![Import { + location: Location::Unknown, + path: vec![i.i("system"), i.i("io"), i.i("prelude")], + name: None, + }]), + }], + } + } +} diff --git a/src/systems/mod.rs b/src/systems/mod.rs index 3e5ca13..23b1f52 100644 --- a/src/systems/mod.rs +++ b/src/systems/mod.rs @@ -1,13 +1,13 @@ //! Constants exposed to usercode by the interpreter mod assertion_error; -mod asynch; +pub mod asynch; pub mod cast_exprinst; pub mod codegen; -mod io; +pub mod io; mod runtime_error; pub mod stl; +mod directfs; +pub mod scheduler; pub use assertion_error::AssertionError; -pub use asynch::{AsynchConfig, InfiniteBlock, MessagePort}; -pub use io::{io_system, IOStream, IOSystem}; pub use runtime_error::RuntimeError; diff --git a/src/systems/scheduler/busy.rs b/src/systems/scheduler/busy.rs new file mode 100644 index 0000000..e377f6b --- /dev/null +++ b/src/systems/scheduler/busy.rs @@ -0,0 +1,133 @@ +use std::any::Any; +use std::collections::VecDeque; + +use crate::interpreted::ExprInst; + +use super::Canceller; + +pub type SyncResult = (T, Box); +pub type SyncOperation = + Box SyncResult + Send>; +pub type SyncOpResultHandler = Box< + dyn FnOnce(T, Box, Canceller) -> (T, Vec), +>; + + +struct SyncQueueItem { + cancelled: Canceller, + operation: SyncOperation, + handler: SyncOpResultHandler, + early_cancel: Box (T, Vec)>, +} + +pub enum NextItemReportKind { + Free(T), + Next { + instance: T, + cancelled: Canceller, + operation: SyncOperation, + rest: BusyState, + }, + Taken, +} + +pub struct NextItemReport { + pub kind: NextItemReportKind, + pub events: Vec, +} + +pub struct BusyState { + handler: SyncOpResultHandler, + queue: VecDeque>, + seal: Option Vec>>, +} +impl BusyState { + pub fn new( + handler: impl FnOnce(T, U, Canceller) -> (T, Vec) + 'static, + ) -> Self { + BusyState { + handler: Box::new(|t, payload, cancel| { + let u = *payload + .downcast() + .expect("mismatched initial handler and operation"); + handler(t, u, cancel) + }), + queue: VecDeque::new(), + seal: None, + } + } + + /// Add a new operation to the queue. Returns Some if the operation was + /// successfully enqueued and None if the queue is already sealed. + pub fn enqueue( + &mut self, + operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static, + handler: impl FnOnce(T, U, Canceller) -> (T, Vec) + 'static, + early_cancel: impl FnOnce(T) -> (T, Vec) + 'static, + ) -> Option { + if self.seal.is_some() { + return None; + } + let cancelled = Canceller::new(); + self.queue.push_back(SyncQueueItem { + cancelled: cancelled.clone(), + early_cancel: Box::new(early_cancel), + operation: Box::new(|t, c| { + let (t, r) = operation(t, c); + (t, Box::new(r)) + }), + handler: Box::new(|t, u, c| { + let u = u.downcast().expect("mismatched handler and operation"); + handler(t, *u, c) + }), + }); + Some(cancelled) + } + + pub fn seal(&mut self, recipient: impl FnOnce(T) -> Vec + 'static) { + assert!(self.seal.is_none(), "Already sealed"); + self.seal = Some(Box::new(recipient)) + } + + pub fn is_sealed(&self) -> bool { + self.seal.is_some() + } + + pub fn rotate( + mut self, + instance: T, + result: U, + cancelled: Canceller, + ) -> NextItemReport { + let (mut instance, mut events) = + (self.handler)(instance, Box::new(result), cancelled); + let next_item = loop { + if let Some(candidate) = self.queue.pop_front() { + if candidate.cancelled.is_cancelled() { + let ret = (candidate.early_cancel)(instance); + instance = ret.0; + events.extend(ret.1.into_iter()); + } else { + break candidate; + } + } else if let Some(seal) = self.seal.take() { + seal(instance); + let kind = NextItemReportKind::Taken; + return NextItemReport { events, kind }; + } else { + let kind = NextItemReportKind::Free(instance); + return NextItemReport { events, kind }; + } + }; + self.handler = next_item.handler; + NextItemReport { + events, + kind: NextItemReportKind::Next { + instance, + cancelled: next_item.cancelled, + operation: next_item.operation, + rest: self, + }, + } + } +} diff --git a/src/systems/scheduler/canceller.rs b/src/systems/scheduler/canceller.rs new file mode 100644 index 0000000..f80f496 --- /dev/null +++ b/src/systems/scheduler/canceller.rs @@ -0,0 +1,32 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use crate::atomic_inert; + +/// A single-fire thread-safe boolean flag with relaxed ordering +#[derive(Debug, Clone)] +pub struct Canceller(Arc); +atomic_inert!(Canceller, typestr = "a canceller"); + +impl Canceller { + /// Create a new canceller + pub fn new() -> Self { + Canceller(Arc::new(AtomicBool::new(false))) + } + + /// Check whether the operation has been cancelled + pub fn is_cancelled(&self) -> bool { + self.0.load(Ordering::Relaxed) + } + + /// Cancel the operation + pub fn cancel(&self) { + self.0.store(true, Ordering::Relaxed) + } +} + +impl Default for Canceller { + fn default() -> Self { + Self::new() + } +} diff --git a/src/systems/scheduler/mod.rs b/src/systems/scheduler/mod.rs new file mode 100644 index 0000000..02df4f6 --- /dev/null +++ b/src/systems/scheduler/mod.rs @@ -0,0 +1,10 @@ +//! A generic utility to sequence long blocking mutations that require a mutable +//! reference to a shared resource. + +mod busy; +mod system; +mod canceller; +mod take_and_drop; + +pub use canceller::Canceller; +pub use system::{SealedOrTaken, SeqScheduler, SharedHandle, SharedState}; diff --git a/src/systems/scheduler/system.rs b/src/systems/scheduler/system.rs new file mode 100644 index 0000000..331d24c --- /dev/null +++ b/src/systems/scheduler/system.rs @@ -0,0 +1,305 @@ +use std::any::{type_name, Any}; +use std::cell::RefCell; +use std::fmt::Debug; +use std::rc::Rc; + +use hashbrown::HashMap; +use itertools::Itertools; +use trait_set::trait_set; + +use super::busy::{BusyState, NextItemReportKind}; +use super::take_and_drop::{request, TakeAndDrop, TakeCmd}; +use super::Canceller; +use crate::facade::{IntoSystem, System}; +use crate::foreign::cps_box::CPSBox; +use crate::interpreted::ExprInst; +use crate::interpreter::HandlerTable; +use crate::systems::asynch::{AsynchSystem, MessagePort}; +use crate::systems::stl::Boolean; +use crate::utils::thread_pool::ThreadPool; +use crate::utils::{take_with_output, unwrap_or, IdMap}; +use crate::{atomic_inert, define_fn, ConstTree}; + +enum SharedResource { + Free(T), + Busy(BusyState), + Taken, +} + +/// Possible states of a shared resource +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +pub enum SharedState { + /// The resource is ready to be used or taken + Free, + /// The resource is currently in use but operations can be asynchronously + /// scheduled on it + Busy, + /// The resource is currently in use and a consuming seal has already been + /// scheduled, therefore further operations cannot access it and it will + /// transition to [SharedState::Taken] as soon as the currently pending + /// operations finish or are cancelled. + Sealed, + /// The resource has been removed from this location. + Taken, +} + +/// A shared handle for a resource of type `T` that can be used with a +/// [SeqScheduler] to execute mutating operations one by one in worker threads. +pub struct SharedHandle { + state: Rc>>, +} +impl SharedHandle { + /// Wrap a value to be accessible to a [SeqScheduler]. + pub fn wrap(t: T) -> Self { + Self { state: Rc::new(RefCell::new(SharedResource::Free(t))) } + } + + /// Check the state of the handle + pub fn state(&self) -> SharedState { + match &*self.state.as_ref().borrow() { + SharedResource::Busy(b) if b.is_sealed() => SharedState::Sealed, + SharedResource::Busy(_) => SharedState::Busy, + SharedResource::Free(_) => SharedState::Free, + SharedResource::Taken => SharedState::Taken, + } + } + + /// Remove the value from the handle if it's free. To interact with a handle + /// you probably want to use a [SeqScheduler], but sometimes this makes + /// sense as eg. an optimization. You can return the value after processing + /// via [SyncHandle::untake]. + pub fn take(&self) -> Option { + take_with_output( + &mut *self.state.as_ref().borrow_mut(), + |state| match state { + SharedResource::Free(t) => (SharedResource::Taken, Some(t)), + _ => (state, None), + }, + ) + } + + /// Return the value to a handle that doesn't have one. The intended use case + /// is to return values synchronously after they have been removed with + /// [SyncHandle::untake]. + pub fn untake(&self, value: T) -> Result<(), T> { + take_with_output( + &mut *self.state.as_ref().borrow_mut(), + |state| match state { + SharedResource::Taken => (SharedResource::Free(value), Ok(())), + _ => (state, Err(value)), + }, + ) + } +} +impl Clone for SharedHandle { + fn clone(&self) -> Self { + Self { state: self.state.clone() } + } +} +impl Debug for SharedHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SharedHandle") + .field("state", &self.state()) + .field("type", &type_name::()) + .finish() + } +} + +atomic_inert! { + SharedHandle(T), + typestr = "a shared handle", + request = |req: Box, this: &SharedHandle| request(this, req) +} + +/// Error produced when an operation is scheduled or a seal placed on a resource +/// which is either already sealed or taken. +#[derive(Debug, Clone)] +pub struct SealedOrTaken; +atomic_inert!( + SealedOrTaken, + typestr = "a sealed-or-taken error for a shared resource" +); +define_fn! { + IsTakenError = |x| Ok(Boolean(SealedOrTaken::try_from(x).is_ok()).atom_cls()) +} + +trait_set! { + /// The part of processing a blocking I/O task that cannot be done on a remote + /// thread, eg. because it accesses other systems or Orchid code. + trait NonSendFn = FnOnce(Box, SeqScheduler) -> Vec; +} + +struct SyncReply { + opid: u64, + data: Box, +} + +struct CheshireCat { + pool: ThreadPool>, + pending: RefCell>>, + port: MessagePort, +} + +/// A task scheduler that executes long blocking operations that have mutable +/// access to a shared one by one on a worker thread. The resources are +/// held in [SharedHandle]s +#[derive(Clone)] +pub struct SeqScheduler(Rc); +impl SeqScheduler { + /// Creates a new [SeqScheduler]. The new object is also kept alive by a + /// callback in the provided [AsynchSystem]. There should be at most one + pub fn new(asynch: &mut AsynchSystem) -> Self { + let this = Self(Rc::new(CheshireCat { + pending: RefCell::new(IdMap::new()), + pool: ThreadPool::new(), + port: asynch.get_port(), + })); + let this1 = this.clone(); + // referenced by asynch, references this + asynch.register(move |res: Box| { + let callback = this1.0.pending.borrow_mut().remove(res.opid).expect( + "Received reply for task we didn't start. This likely means that \ + there are multiple SequencingContexts attached to the same \ + AsynchSystem.", + ); + callback(res.data, this1.clone()) + }); + this + } + + /// Submit an action to be executed on a worker thread which can own the data + /// in the handle. + /// + /// * handle - data to be transformed + /// * operation - long blocking mutation to execute off-thread. + /// * handler - process the results, talk to other systems, generate and run + /// Orchid code. + /// * early_cancel - clean up in case the task got cancelled before it was + /// scheduled. This is an optimization so that threads aren't spawned if a + /// large batch of tasks is scheduled and then cancelled. + pub fn schedule( + &self, + handle: SharedHandle, + operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static, + handler: impl FnOnce(T, U, Canceller) -> (T, Vec) + 'static, + early_cancel: impl FnOnce(T) -> (T, Vec) + 'static, + ) -> Result { + take_with_output(&mut *handle.state.as_ref().borrow_mut(), { + let handle = handle.clone(); + |state| { + match state { + SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)), + SharedResource::Busy(mut b) => + match b.enqueue(operation, handler, early_cancel) { + Some(cancelled) => (SharedResource::Busy(b), Ok(cancelled)), + None => (SharedResource::Busy(b), Err(SealedOrTaken)), + }, + SharedResource::Free(t) => { + let cancelled = Canceller::new(); + drop(early_cancel); // cannot possibly be useful + self.submit(t, handle, cancelled.clone(), operation); + (SharedResource::Busy(BusyState::new(handler)), Ok(cancelled)) + }, + } + } + }) + } + + /// Schedule a function that will consume the value. After this the handle is + /// considered sealed and all [SeqScheduler::schedule] calls will fail. + pub fn seal( + &self, + handle: SharedHandle, + seal: impl FnOnce(T) -> Vec + 'static, + ) -> Result, SealedOrTaken> { + take_with_output(&mut *handle.state.as_ref().borrow_mut(), |state| { + match state { + SharedResource::Busy(mut b) if !b.is_sealed() => { + b.seal(seal); + (SharedResource::Busy(b), Ok(Vec::new())) + }, + SharedResource::Busy(_) => (state, Err(SealedOrTaken)), + SharedResource::Taken => (SharedResource::Taken, Err(SealedOrTaken)), + SharedResource::Free(t) => (SharedResource::Taken, Ok(seal(t))), + } + }) + } + + /// Asynchronously recursive function to schedule a new task for execution and + /// act upon its completion. The self-reference is passed into the callback + /// from the callback passed to the [AsynchSystem] so that if the task is + /// never resolved but the [AsynchSystem] through which the resolving event + /// would arrive is dropped this [SeqScheduler] is also dropped. + fn submit( + &self, + t: T, + handle: SharedHandle, + cancelled: Canceller, + operation: impl FnOnce(T, Canceller) -> (T, U) + Send + 'static, + ) { + // referenced by self until run, references handle + let opid = self.0.pending.borrow_mut().insert(Box::new({ + let cancelled = cancelled.clone(); + move |data, this| { + let (t, u): (T, U) = + *data.downcast().expect("This is associated by ID"); + let handle2 = handle.clone(); + take_with_output(&mut *handle.state.as_ref().borrow_mut(), |state| { + let busy = unwrap_or! { state => SharedResource::Busy; + panic!("Handle with outstanding invocation must be busy") + }; + let report = busy.rotate(t, u, cancelled); + match report.kind { + NextItemReportKind::Free(t) => + (SharedResource::Free(t), report.events), + NextItemReportKind::Taken => (SharedResource::Taken, report.events), + NextItemReportKind::Next { + instance, + cancelled, + operation, + rest, + } => { + this.submit(instance, handle2, cancelled, operation); + (SharedResource::Busy(rest), report.events) + }, + } + }) + } + })); + let mut port = self.0.port.clone(); + // referenced by thread until run, references port + self.0.pool.submit(Box::new(move || { + port.send(SyncReply { opid, data: Box::new(operation(t, cancelled)) }) + })) + } +} + +impl IntoSystem<'static> for SeqScheduler { + fn into_system(self, i: &crate::Interner) -> crate::facade::System<'static> { + let mut handlers = HandlerTable::new(); + handlers.register(|cmd: &CPSBox| { + let (canceller, cont) = cmd.unpack1(); + canceller.cancel(); + Ok(cont.clone()) + }); + handlers.register(move |cmd: &CPSBox| { + let (TakeCmd(cb), cont) = cmd.unpack1(); + cb(self.clone()); + Ok(cont.clone()) + }); + System { + name: ["system", "scheduler"].into_iter().map_into().collect(), + prelude: Vec::new(), + code: HashMap::new(), + handlers, + constants: ConstTree::namespace( + [i.i("system"), i.i("scheduler")], + ConstTree::tree([ + (i.i("is_taken_error"), ConstTree::xfn(IsTakenError)), + (i.i("take_and_drop"), ConstTree::xfn(TakeAndDrop)), + ]), + ) + .unwrap_tree(), + } + } +} diff --git a/src/systems/scheduler/take_and_drop.rs b/src/systems/scheduler/take_and_drop.rs new file mode 100644 index 0000000..a698c67 --- /dev/null +++ b/src/systems/scheduler/take_and_drop.rs @@ -0,0 +1,44 @@ +use std::any::Any; +use std::fmt::Debug; +use std::rc::Rc; + +use super::{SeqScheduler, SharedHandle}; +use crate::foreign::cps_box::{init_cps, CPSBox}; +use crate::foreign::Atom; +use crate::interpreted::Clause; +use crate::systems::AssertionError; +use crate::{define_fn, Primitive}; + +pub fn request( + handle: &SharedHandle, + request: Box, +) -> Option> { + if request.downcast::().is_ok() { + let handle = handle.clone(); + let cmd = TakeCmd(Rc::new(move |sch| { + let _ = sch.seal(handle.clone(), |_| Vec::new()); + })); + return Some(Box::new(init_cps(1, cmd))) + } + None +} + +pub struct TakerRequest; +#[derive(Clone)] +pub struct TakeCmd(pub Rc); +impl Debug for TakeCmd { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "A command to drop a shared resource") + } +} +define_fn! { + pub TakeAndDrop = |x| x.inspect(|c| match c { + Clause::P(Primitive::Atom(Atom(atomic))) => { + let t = atomic.request(Box::new(TakerRequest)) + .ok_or_else(|| AssertionError::ext(x.clone(), "a SharedHandle"))?; + let data: CPSBox = *t.downcast().expect("implied by request"); + Ok(data.atom_cls()) + }, + _ => AssertionError::fail(x.clone(), "an atom"), + }) +} diff --git a/src/systems/stl/bin.rs b/src/systems/stl/bin.rs index 9104746..6e4f52b 100644 --- a/src/systems/stl/bin.rs +++ b/src/systems/stl/bin.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use itertools::Itertools; use super::Boolean; -use crate::interpreted::ExprInst; use crate::systems::cast_exprinst::with_uint; use crate::systems::codegen::{orchid_opt, tuple}; use crate::systems::RuntimeError; @@ -14,7 +13,7 @@ use crate::{atomic_inert, define_fn, ConstTree, Interner, Literal}; /// A block of binary data #[derive(Clone, Hash, PartialEq, Eq)] pub struct Binary(pub Arc>); -atomic_inert!(Binary, "a binary blob"); +atomic_inert!(Binary, typestr = "a binary blob"); impl Debug for Binary { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/src/systems/stl/bool.rs b/src/systems/stl/bool.rs index f9c28e6..95c0663 100644 --- a/src/systems/stl/bool.rs +++ b/src/systems/stl/bool.rs @@ -1,14 +1,14 @@ use std::rc::Rc; use crate::interner::Interner; -use crate::representations::interpreted::{Clause, ExprInst}; +use crate::representations::interpreted::Clause; use crate::systems::AssertionError; use crate::{atomic_inert, define_fn, ConstTree, Literal, PathSet}; /// Booleans exposed to Orchid #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Boolean(pub bool); -atomic_inert!(Boolean, "a boolean"); +atomic_inert!(Boolean, typestr = "a boolean"); impl From for Boolean { fn from(value: bool) -> Self { diff --git a/src/systems/stl/state.rs b/src/systems/stl/state.rs index 3fa1238..3e4be27 100644 --- a/src/systems/stl/state.rs +++ b/src/systems/stl/state.rs @@ -10,7 +10,7 @@ use crate::{atomic_inert, define_fn, ConstTree, Interner}; #[derive(Debug, Clone)] pub struct State(Rc>); -atomic_inert!(State, "a state"); +atomic_inert!(State, typestr = "a state"); #[derive(Debug, Clone)] struct NewStateCmd; diff --git a/src/utils/iter.rs b/src/utils/boxed_iter.rs similarity index 90% rename from src/utils/iter.rs rename to src/utils/boxed_iter.rs index cb0a630..4d6cce2 100644 --- a/src/utils/iter.rs +++ b/src/utils/boxed_iter.rs @@ -20,7 +20,7 @@ macro_rules! box_chain { Box::new($curr) as BoxedIter<_> }; ($curr:expr, $($rest:expr),*) => { - Box::new($curr$(.chain($rest))*) as $crate::utils::iter::BoxedIter<_> + Box::new($curr$(.chain($rest))*) as $crate::utils::boxed_iter::BoxedIter<_> }; } diff --git a/src/utils/get_or_default.rs b/src/utils/get_or.rs similarity index 100% rename from src/utils/get_or_default.rs rename to src/utils/get_or.rs diff --git a/src/utils/id_map.rs b/src/utils/id_map.rs new file mode 100644 index 0000000..358aad9 --- /dev/null +++ b/src/utils/id_map.rs @@ -0,0 +1,71 @@ +use hashbrown::HashMap; + +/// A set that automatically assigns a unique ID to every entry. +/// +/// # How unique? +/// +/// If you inserted a new entry every nanosecond, it would take more than +/// 550_000 years to run out of indices. Realistically Orchid might insert a new +/// entry every 10ms, so these 64-bit indices will probably outlast humanity. +#[derive(Clone, Debug)] +pub struct IdMap { + next_id: u64, + data: HashMap, +} +impl IdMap { + /// Create a new empty set + pub fn new() -> Self { + Self { next_id: 0, data: HashMap::new() } + } + + /// Obtain a reference to the underlying map for iteration + pub fn map(&self) -> &HashMap { + &self.data + } + + /// Insert an element with a new ID and return the ID + pub fn insert(&mut self, t: T) -> u64 { + let id = self.next_id; + self.next_id += 1; + (self.data.try_insert(id, t)) + .unwrap_or_else(|_| panic!("IdMap keys should be unique")); + id + } + + /// Obtain a reference to the element with the given ID + pub fn get(&self, id: u64) -> Option<&T> { + self.data.get(&id) + } + + /// Obtain a mutable reference to the element with the given ID + pub fn get_mut(&mut self, id: u64) -> Option<&mut T> { + self.data.get_mut(&id) + } + + /// Remove the element with the given ID from the set. The ID will not be + /// reused. + pub fn remove(&mut self, id: u64) -> Option { + self.data.remove(&id) + } +} + +impl Default for IdMap { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod test { + use super::IdMap; + + #[test] + fn basic_test() { + let mut map = IdMap::new(); + let a = map.insert(1); + let b = map.insert(2); + assert_eq!(map.remove(a), Some(1)); + assert_eq!(map.remove(a), None); + assert_eq!(map.get(b), Some(&2)); + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ad88a5b..97701cf 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,32 +1,30 @@ mod cache; mod delete_cell; -mod event_poller; -mod get_or_default; +pub mod poller; +pub mod get_or; mod iter_find; pub mod never; -pub mod pushed; -mod rc_to_owned; +pub mod pure_push; +pub mod rc_tools; mod replace_first; mod side; mod split_max_prefix; mod string_from_charset; -mod substack; +pub mod substack; mod take_with_output; pub mod thread_pool; mod unwrap_or; +mod id_map; pub use cache::Cache; -pub use rc_to_owned::{map_rc, rc_to_owned}; pub use replace_first::replace_first; pub use side::Side; pub use split_max_prefix::split_max_prefix; -pub use substack::{Stackframe, Substack, SubstackIterator}; pub(crate) use unwrap_or::unwrap_or; -pub mod iter; +pub mod boxed_iter; pub use delete_cell::DeleteCell; -pub use event_poller::{PollEvent, Poller}; -pub use get_or_default::{get_or_default, get_or_make}; -pub use iter::BoxedIter; +pub use boxed_iter::BoxedIter; pub use iter_find::iter_find; pub use string_from_charset::string_from_charset; pub use take_with_output::take_with_output; +pub use id_map::IdMap; diff --git a/src/utils/event_poller.rs b/src/utils/poller.rs similarity index 100% rename from src/utils/event_poller.rs rename to src/utils/poller.rs diff --git a/src/utils/pushed.rs b/src/utils/pure_push.rs similarity index 100% rename from src/utils/pushed.rs rename to src/utils/pure_push.rs diff --git a/src/utils/rc_to_owned.rs b/src/utils/rc_tools.rs similarity index 100% rename from src/utils/rc_to_owned.rs rename to src/utils/rc_tools.rs