task_local context over context objects
- interner impls logically separate from API in orchid-base (default host interner still in base for testing) - error reporting, logging, and a variety of other features passed down via context in extension, not yet in host to maintain library-ish profile, should consider options - no global spawn mechanic, the host has a spawn function but extensions only get a stash for enqueuing async work in sync callbacks which is then explicitly, manually, and with strict order popped and awaited - still deadlocks nondeterministically for some ungodly reason
This commit is contained in:
@@ -24,9 +24,9 @@ orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" }
|
||||
ordered-float = "5.0.0"
|
||||
regex = "1.11.2"
|
||||
rust-embed = "8.7.2"
|
||||
some_executor = "0.6.1"
|
||||
substack = "1.1.1"
|
||||
trait-set = "0.3.0"
|
||||
task-local = "0.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
futures = "0.3.31"
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
use std::ops::Deref;
|
||||
use std::rc::Rc;
|
||||
|
||||
use futures::future::LocalBoxFuture;
|
||||
|
||||
use crate::api;
|
||||
|
||||
pub type Spawner = Rc<dyn Fn(LocalBoxFuture<'static, ()>)>;
|
||||
|
||||
/// The 3 primary contact points with an extension are
|
||||
/// - send a message
|
||||
/// - wait for a message to arrive
|
||||
/// - wait for the extension to stop after exit (this is the implicit Drop)
|
||||
///
|
||||
/// There are no ordering guarantees about these
|
||||
pub trait ExtPort {
|
||||
#[must_use]
|
||||
fn send<'a>(&'a self, msg: &'a [u8]) -> LocalBoxFuture<'a, ()>;
|
||||
#[must_use]
|
||||
fn recv(&self) -> LocalBoxFuture<'_, Option<Vec<u8>>>;
|
||||
}
|
||||
|
||||
pub struct ExtInit {
|
||||
pub header: api::ExtensionHeader,
|
||||
pub port: Box<dyn ExtPort>,
|
||||
}
|
||||
impl ExtInit {
|
||||
pub async fn send(&self, msg: &[u8]) { self.port.send(msg).await }
|
||||
pub async fn recv(&self) -> Option<Vec<u8>> { self.port.recv().await }
|
||||
}
|
||||
impl Deref for ExtInit {
|
||||
type Target = api::ExtensionHeader;
|
||||
fn deref(&self) -> &Self::Target { &self.header }
|
||||
}
|
||||
@@ -5,9 +5,10 @@ use std::ops::Add;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
use futures::future::join_all;
|
||||
use itertools::Itertools;
|
||||
use some_executor::task_local;
|
||||
use task_local::task_local;
|
||||
|
||||
use crate::api;
|
||||
use crate::interner::{IStr, es, is};
|
||||
@@ -237,7 +238,19 @@ task_local! {
|
||||
static REPORTER: Reporter;
|
||||
}
|
||||
|
||||
pub async fn with_reporter<T>(fut: impl Future<Output = OrcRes<T>>) -> OrcRes<T> {
|
||||
/// Run the future with a new reporter, and return all errors reported within.
|
||||
///
|
||||
/// If your future returns [OrcRes], see [try_with_reporter]
|
||||
pub async fn with_reporter<T>(fut: impl Future<Output = T>) -> OrcRes<T> {
|
||||
try_with_reporter(fut.map(Ok)).await
|
||||
}
|
||||
|
||||
/// Run the future with a new reporter, and return all errors either returned or
|
||||
/// reported by it
|
||||
///
|
||||
/// If your future may report errors but always returns an approximate value,
|
||||
/// see [with_reporter]
|
||||
pub async fn try_with_reporter<T>(fut: impl Future<Output = OrcRes<T>>) -> OrcRes<T> {
|
||||
let rep = Reporter::default();
|
||||
let res = REPORTER.scope(rep.clone(), fut).await;
|
||||
let errors = rep.errors.take();
|
||||
@@ -249,9 +262,8 @@ pub async fn with_reporter<T>(fut: impl Future<Output = OrcRes<T>>) -> OrcRes<T>
|
||||
}
|
||||
|
||||
pub async fn is_erroring() -> bool {
|
||||
REPORTER.with(|r| {
|
||||
!r.expect("Sidechannel errors must be caught by a reporter").errors.borrow().is_empty()
|
||||
})
|
||||
(REPORTER.try_with(|r| !r.errors.borrow().is_empty()))
|
||||
.expect("Sidechannel errors must be caught by a reporter")
|
||||
}
|
||||
|
||||
/// Report an error that is fatal and prevents a correct output, but
|
||||
@@ -259,11 +271,10 @@ pub async fn is_erroring() -> bool {
|
||||
/// This can be used for
|
||||
pub fn report(e: impl Into<OrcErrv>) {
|
||||
let errv = e.into();
|
||||
REPORTER.with(|r| match r {
|
||||
Some(r) => r.errors.borrow_mut().extend(errv),
|
||||
None => panic!(
|
||||
REPORTER.try_with(|r| r.errors.borrow_mut().extend(errv.clone())).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"Unhandled error! Sidechannel errors must be caught by an enclosing call to with_reporter.\n\
|
||||
Error: {errv}",
|
||||
),
|
||||
Error: {errv}"
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -304,6 +304,7 @@ pub async fn take_first_fmt(v: &(impl Format + ?Sized)) -> String {
|
||||
take_first(&v.print(&FmtCtxImpl { _foo: PhantomData }).await, false)
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct FmtCtxImpl<'a> {
|
||||
_foo: PhantomData<&'a ()>,
|
||||
}
|
||||
@@ -331,8 +332,8 @@ impl Format for Never {
|
||||
/// Format with default strategy. Currently equal to [take_first_fmt]
|
||||
pub async fn fmt(v: &(impl Format + ?Sized)) -> String { take_first_fmt(v).await }
|
||||
/// Format a sequence with default strategy. Currently equal to [take_first_fmt]
|
||||
pub async fn fmt_v<F: Format + ?Sized, R: Borrow<F>>(
|
||||
v: impl IntoIterator<Item = R>,
|
||||
pub async fn fmt_v<F: Format + ?Sized>(
|
||||
v: impl IntoIterator<Item: Borrow<F>>,
|
||||
) -> impl Iterator<Item = String> {
|
||||
join_all(v.into_iter().map(|f| async move { take_first_fmt(f.borrow()).await })).await.into_iter()
|
||||
}
|
||||
|
||||
@@ -6,12 +6,16 @@ use std::rc::Rc;
|
||||
use std::{fmt, hash};
|
||||
|
||||
use futures::future::LocalBoxFuture;
|
||||
use some_executor::task_local;
|
||||
use task_local::task_local;
|
||||
|
||||
use crate::api;
|
||||
|
||||
pub trait IStrHandle: AsRef<str> {}
|
||||
pub trait IStrvHandle: AsRef<[IStr]> {}
|
||||
pub trait IStrHandle: AsRef<str> {
|
||||
fn rc(&self) -> Rc<String>;
|
||||
}
|
||||
pub trait IStrvHandle: AsRef<[IStr]> {
|
||||
fn rc(&self) -> Rc<Vec<IStr>>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IStr(pub api::TStr, pub Rc<dyn IStrHandle>);
|
||||
@@ -22,6 +26,7 @@ impl IStr {
|
||||
/// the same value only as long as at least one instance exists. If a value is
|
||||
/// no longer interned, the interner is free to forget about it.
|
||||
pub fn to_api(&self) -> api::TStr { self.0 }
|
||||
pub fn rc(&self) -> Rc<String> { self.1.rc() }
|
||||
}
|
||||
impl Deref for IStr {
|
||||
type Target = str;
|
||||
@@ -49,6 +54,7 @@ impl IStrv {
|
||||
/// the same value only as long as at least one instance exists. If a value is
|
||||
/// no longer interned, the interner is free to forget about it.
|
||||
pub fn to_api(&self) -> api::TStrv { self.0 }
|
||||
pub fn rc(&self) -> Rc<Vec<IStr>> { self.1.rc() }
|
||||
}
|
||||
impl Deref for IStrv {
|
||||
type Target = [IStr];
|
||||
@@ -79,10 +85,10 @@ impl Debug for IStrv {
|
||||
}
|
||||
|
||||
pub trait InternerSrv {
|
||||
fn is(&self, v: &str) -> LocalBoxFuture<'static, IStr>;
|
||||
fn es(&self, t: api::TStr) -> LocalBoxFuture<'static, IStr>;
|
||||
fn iv(&self, v: &[IStr]) -> LocalBoxFuture<'static, IStrv>;
|
||||
fn ev(&self, t: api::TStrv) -> LocalBoxFuture<'static, IStrv>;
|
||||
fn is<'a>(&'a self, v: &'a str) -> LocalBoxFuture<'a, IStr>;
|
||||
fn es(&self, t: api::TStr) -> LocalBoxFuture<'_, IStr>;
|
||||
fn iv<'a>(&'a self, v: &'a [IStr]) -> LocalBoxFuture<'a, IStrv>;
|
||||
fn ev(&self, t: api::TStrv) -> LocalBoxFuture<'_, IStrv>;
|
||||
}
|
||||
|
||||
task_local! {
|
||||
@@ -94,10 +100,283 @@ pub async fn with_interner<F: Future>(val: Rc<dyn InternerSrv>, fut: F) -> F::Ou
|
||||
}
|
||||
|
||||
fn get_interner() -> Rc<dyn InternerSrv> {
|
||||
INTERNER.with(|i| i.expect("Interner not initialized").clone())
|
||||
INTERNER.try_with(|i| i.clone()).expect("Interner not initialized")
|
||||
}
|
||||
|
||||
pub async fn is(v: &str) -> IStr { get_interner().is(v).await }
|
||||
pub async fn iv(v: &[IStr]) -> IStrv { get_interner().iv(v).await }
|
||||
pub async fn es(v: api::TStr) -> IStr { get_interner().es(v).await }
|
||||
pub async fn ev(v: api::TStrv) -> IStrv { get_interner().ev(v).await }
|
||||
|
||||
pub mod local_interner {
|
||||
use std::borrow::Borrow;
|
||||
use std::cell::RefCell;
|
||||
use std::fmt::Debug;
|
||||
use std::future;
|
||||
use std::hash::{BuildHasher, Hash};
|
||||
use std::num::NonZeroU64;
|
||||
use std::rc::{Rc, Weak};
|
||||
|
||||
use futures::future::LocalBoxFuture;
|
||||
use hashbrown::hash_table::{Entry, OccupiedEntry, VacantEntry};
|
||||
use hashbrown::{DefaultHashBuilder, HashTable};
|
||||
use orchid_api_traits::Coding;
|
||||
|
||||
use super::{IStr, IStrHandle, IStrv, IStrvHandle, InternerSrv};
|
||||
use crate::api;
|
||||
|
||||
/// Associated types and methods for parallel concepts between scalar and
|
||||
/// vector interning
|
||||
pub trait InternableCard: 'static + Sized + Default + Debug {
|
||||
/// API representation of an interner key
|
||||
type Token: Clone + Copy + Debug + Hash + Eq + PartialOrd + Ord + Coding + 'static;
|
||||
/// Owned version of interned value physically held by `'static` interner
|
||||
/// and token
|
||||
type Data: 'static + Borrow<Self::Borrow> + Eq + Hash + Debug;
|
||||
/// Borrowed version of interned value placed in intern queries to avoid a
|
||||
/// copy
|
||||
type Borrow: ToOwned<Owned = Self::Data> + ?Sized + Eq + Hash + Debug;
|
||||
/// Smart object handed out by the interner for storage and comparison in
|
||||
/// third party code. [IStr] or [IStrv]
|
||||
type Interned: Clone + Debug;
|
||||
/// Create smart object from token for fast comparison and a handle for
|
||||
/// everything else incl. virtual drop
|
||||
fn new_interned(token: Self::Token, handle: Rc<Handle<Self>>) -> Self::Interned;
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct StrBranch;
|
||||
impl InternableCard for StrBranch {
|
||||
type Data = String;
|
||||
type Token = api::TStr;
|
||||
type Borrow = str;
|
||||
type Interned = IStr;
|
||||
fn new_interned(t: Self::Token, h: Rc<Handle<Self>>) -> Self::Interned { IStr(t, h) }
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct StrvBranch;
|
||||
impl InternableCard for StrvBranch {
|
||||
type Data = Vec<IStr>;
|
||||
type Token = api::TStrv;
|
||||
type Borrow = [IStr];
|
||||
type Interned = IStrv;
|
||||
fn new_interned(t: Self::Token, h: Rc<Handle<Self>>) -> Self::Interned { IStrv(t, h) }
|
||||
}
|
||||
|
||||
/// Pairs interned data with its internment key
|
||||
#[derive(Debug)]
|
||||
struct Data<B: InternableCard> {
|
||||
token: B::Token,
|
||||
data: Rc<B::Data>,
|
||||
}
|
||||
impl<B: InternableCard> Clone for Data<B> {
|
||||
fn clone(&self) -> Self { Self { token: self.token, data: self.data.clone() } }
|
||||
}
|
||||
|
||||
/// Implementor for the trait objects held by [IStr] and [IStrv]
|
||||
pub struct Handle<B: InternableCard> {
|
||||
data: Data<B>,
|
||||
parent: Weak<RefCell<IntData<B>>>,
|
||||
}
|
||||
impl IStrHandle for Handle<StrBranch> {
|
||||
fn rc(&self) -> Rc<String> { self.data.data.clone() }
|
||||
}
|
||||
impl AsRef<str> for Handle<StrBranch> {
|
||||
fn as_ref(&self) -> &str { self.data.data.as_ref().as_ref() }
|
||||
}
|
||||
impl IStrvHandle for Handle<StrvBranch> {
|
||||
fn rc(&self) -> Rc<Vec<IStr>> { self.data.data.clone() }
|
||||
}
|
||||
impl AsRef<[IStr]> for Handle<StrvBranch> {
|
||||
fn as_ref(&self) -> &[IStr] { self.data.data.as_ref().as_ref() }
|
||||
}
|
||||
impl<B: InternableCard> Drop for Handle<B> {
|
||||
fn drop(&mut self) {
|
||||
let Some(parent) = self.parent.upgrade() else { return };
|
||||
if let Entry::Occupied(ent) =
|
||||
parent.borrow_mut().entry_by_data(self.data.data.as_ref().borrow())
|
||||
{
|
||||
ent.remove();
|
||||
}
|
||||
if let Entry::Occupied(ent) = parent.borrow_mut().entry_by_tok(self.data.token) {
|
||||
ent.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Information retained about an interned token indexed both by key and
|
||||
/// value.
|
||||
struct Rec<B: InternableCard> {
|
||||
/// This reference is weak, but the [Drop] handler of [Handle] removes all
|
||||
/// [Rec]s from the interner so it is guaranteed to be live.
|
||||
handle: Weak<Handle<B>>,
|
||||
/// Keys for indexing from either table
|
||||
data: Data<B>,
|
||||
}
|
||||
|
||||
/// Read data from an occupied entry in an interner. The equivalent insert
|
||||
/// command is [insert]
|
||||
fn read<B: InternableCard>(entry: OccupiedEntry<'_, Rec<B>>) -> B::Interned {
|
||||
let hand = entry.get().handle.upgrade().expect("Found entry but handle already dropped");
|
||||
B::new_interned(entry.get().data.token, hand)
|
||||
}
|
||||
|
||||
/// Insert some data into an entry borrowed from this same interner.
|
||||
/// The equivalent read command is [read]
|
||||
fn insert<B: InternableCard>(entry: VacantEntry<'_, Rec<B>>, handle: Rc<Handle<B>>) {
|
||||
entry.insert(Rec { data: handle.data.clone(), handle: Rc::downgrade(&handle) });
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct IntData<B: InternableCard> {
|
||||
by_tok: HashTable<Rec<B>>,
|
||||
by_data: HashTable<Rec<B>>,
|
||||
hasher: DefaultHashBuilder,
|
||||
}
|
||||
impl<B: InternableCard> IntData<B> {
|
||||
fn entry_by_data(&mut self, query: &B::Borrow) -> Entry<'_, Rec<B>> {
|
||||
self.by_data.entry(
|
||||
self.hasher.hash_one(query),
|
||||
|rec| rec.data.data.as_ref().borrow() == query,
|
||||
|rec| self.hasher.hash_one(rec.data.data.as_ref().borrow()),
|
||||
)
|
||||
}
|
||||
fn entry_by_tok(&mut self, token: B::Token) -> Entry<'_, Rec<B>> {
|
||||
self.by_tok.entry(
|
||||
self.hasher.hash_one(token),
|
||||
|rec| rec.data.token == token,
|
||||
|rec| self.hasher.hash_one(rec.data.token),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Failing intern command that can be recovered if the value is found
|
||||
/// elsewhere
|
||||
pub struct InternError<'a, B: InternableCard> {
|
||||
int: &'a Int<B>,
|
||||
query: &'a B::Borrow,
|
||||
}
|
||||
impl<B: InternableCard> InternError<'_, B> {
|
||||
/// If a racing write populates the entry, the continuation returns that
|
||||
/// value and discards its argument
|
||||
pub fn set_if_empty(self, token: B::Token) -> B::Interned {
|
||||
let mut int_data = self.int.0.borrow_mut();
|
||||
match int_data.entry_by_data(self.query) {
|
||||
Entry::Occupied(ent) => read(ent),
|
||||
Entry::Vacant(ent) => {
|
||||
let hand = self.int.mk_handle(Data { token, data: Rc::new(self.query.to_owned()) });
|
||||
insert(ent, hand.clone());
|
||||
let Entry::Vacant(other_ent) = int_data.entry_by_tok(token) else {
|
||||
panic!("Data and key tables out of sync")
|
||||
};
|
||||
insert(other_ent, hand.clone());
|
||||
B::new_interned(token, hand)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<B: InternableCard> Debug for InternError<'_, B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("InternEntry").field(&self.query).finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Failing extern command that can be recovered if the value is found
|
||||
/// elsewhere
|
||||
pub struct ExternError<'a, B: InternableCard> {
|
||||
int: &'a Int<B>,
|
||||
token: B::Token,
|
||||
}
|
||||
impl<B: InternableCard> ExternError<'_, B> {
|
||||
/// If a racing write populates the entry, the continuation returns that
|
||||
/// value and discards its argument
|
||||
pub fn set_if_empty(&self, data: Rc<B::Data>) -> B::Interned {
|
||||
let mut int_data = self.int.0.borrow_mut();
|
||||
match int_data.entry_by_tok(self.token) {
|
||||
Entry::Occupied(ent) => read(ent),
|
||||
Entry::Vacant(ent) => {
|
||||
let hand = self.int.mk_handle(Data { token: self.token, data: data.clone() });
|
||||
insert(ent, hand.clone());
|
||||
let Entry::Vacant(other_ent) = int_data.entry_by_data(data.as_ref().borrow()) else {
|
||||
panic!("Data and key tables out of sync")
|
||||
};
|
||||
insert(other_ent, hand.clone());
|
||||
B::new_interned(self.token, hand)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<B: InternableCard> Debug for ExternError<'_, B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("ExternEntry").field(&self.token).finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Int<B: InternableCard>(Rc<RefCell<IntData<B>>>);
|
||||
impl<B: InternableCard> Int<B> {
|
||||
fn mk_handle(&self, data: Data<B>) -> Rc<Handle<B>> {
|
||||
Rc::new(Handle { data: data.clone(), parent: Rc::downgrade(&self.0.clone()) })
|
||||
}
|
||||
|
||||
/// Look up by value, or yield to figure out its ID from elsewhere
|
||||
pub fn i<'a>(&'a self, query: &'a B::Borrow) -> Result<B::Interned, InternError<'a, B>> {
|
||||
if let Entry::Occupied(val) = self.0.borrow_mut().entry_by_data(query) {
|
||||
return Ok(read(val));
|
||||
}
|
||||
Err(InternError { int: self, query })
|
||||
}
|
||||
|
||||
/// Look up by key or yield to figure out its value from elsewhere
|
||||
pub fn e(&self, token: B::Token) -> Result<B::Interned, ExternError<'_, B>> {
|
||||
if let Entry::Occupied(ent) = self.0.borrow_mut().entry_by_tok(token) {
|
||||
return Ok(read(ent));
|
||||
}
|
||||
Err(ExternError { int: self, token })
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static NEXT_ID: RefCell<u64> = 0.into();
|
||||
}
|
||||
|
||||
fn with_new_id<T>(fun: impl FnOnce(NonZeroU64) -> T) -> T {
|
||||
fun(
|
||||
NonZeroU64::new(NEXT_ID.with_borrow_mut(|id| {
|
||||
*id += 1;
|
||||
*id
|
||||
}))
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct LocalInterner {
|
||||
str: Int<StrBranch>,
|
||||
strv: Int<StrvBranch>,
|
||||
}
|
||||
impl InternerSrv for LocalInterner {
|
||||
fn is<'a>(&'a self, v: &'a str) -> LocalBoxFuture<'a, IStr> {
|
||||
match self.str.i(v) {
|
||||
Ok(int) => Box::pin(future::ready(int)),
|
||||
Err(e) => with_new_id(|id| Box::pin(future::ready(e.set_if_empty(api::TStr(id))))),
|
||||
}
|
||||
}
|
||||
fn es(&self, t: api::TStr) -> LocalBoxFuture<'_, IStr> {
|
||||
Box::pin(future::ready(self.str.e(t).expect("Unrecognized token cannot be externed")))
|
||||
}
|
||||
fn iv<'a>(&'a self, v: &'a [IStr]) -> LocalBoxFuture<'a, IStrv> {
|
||||
match self.strv.i(v) {
|
||||
Ok(int) => Box::pin(future::ready(int)),
|
||||
Err(e) => with_new_id(|id| Box::pin(future::ready(e.set_if_empty(api::TStrv(id))))),
|
||||
}
|
||||
}
|
||||
fn ev(&self, t: orchid_api::TStrv) -> LocalBoxFuture<'_, IStrv> {
|
||||
Box::pin(future::ready(self.strv.e(t).expect("Unrecognized token cannot be externed")))
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a basic thread-local interner for testing and root role.
|
||||
pub fn local_interner() -> Rc<dyn InternerSrv> { Rc::<LocalInterner>::default() }
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ use orchid_api as api;
|
||||
|
||||
pub mod box_cow;
|
||||
pub mod boxed_iter;
|
||||
pub mod builtin;
|
||||
pub mod char_filter;
|
||||
pub mod clone;
|
||||
pub mod combine;
|
||||
@@ -14,6 +13,7 @@ pub mod id_store;
|
||||
pub mod interner;
|
||||
pub mod iter_utils;
|
||||
pub mod join;
|
||||
mod localset;
|
||||
pub mod location;
|
||||
pub mod logging;
|
||||
mod match_mapping;
|
||||
|
||||
48
orchid-base/src/localset.rs
Normal file
48
orchid-base/src/localset.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
|
||||
use futures::StreamExt;
|
||||
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
|
||||
use futures::future::LocalBoxFuture;
|
||||
|
||||
pub struct LocalSet<'a, E> {
|
||||
receiver: UnboundedReceiver<LocalBoxFuture<'a, Result<(), E>>>,
|
||||
pending: VecDeque<LocalBoxFuture<'a, Result<(), E>>>,
|
||||
}
|
||||
impl<'a, E> LocalSet<'a, E> {
|
||||
pub fn new() -> (UnboundedSender<LocalBoxFuture<'a, Result<(), E>>>, Self) {
|
||||
let (sender, receiver) = unbounded();
|
||||
(sender, Self { receiver, pending: VecDeque::new() })
|
||||
}
|
||||
}
|
||||
impl<E> Future for LocalSet<'_, E> {
|
||||
type Output = Result<(), E>;
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
let mut any_pending = false;
|
||||
loop {
|
||||
match this.receiver.poll_next_unpin(cx) {
|
||||
Poll::Pending => {
|
||||
any_pending = true;
|
||||
break;
|
||||
},
|
||||
Poll::Ready(None) => break,
|
||||
Poll::Ready(Some(fut)) => this.pending.push_back(fut),
|
||||
}
|
||||
}
|
||||
let count = this.pending.len();
|
||||
for _ in 0..count {
|
||||
let mut req = this.pending.pop_front().unwrap();
|
||||
match req.as_mut().poll(cx) {
|
||||
Poll::Ready(Ok(())) => (),
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
||||
Poll::Pending => {
|
||||
any_pending = true;
|
||||
this.pending.push_back(req)
|
||||
},
|
||||
}
|
||||
}
|
||||
if any_pending { Poll::Pending } else { Poll::Ready(Ok(())) }
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ use std::io::{Write, stderr};
|
||||
|
||||
pub use api::LogStrategy;
|
||||
use itertools::Itertools;
|
||||
use task_local::task_local;
|
||||
|
||||
use crate::api;
|
||||
|
||||
@@ -34,3 +35,13 @@ impl Logger {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
task_local! {
|
||||
static LOGGER: Logger;
|
||||
}
|
||||
|
||||
pub async fn with_logger<F: Future>(logger: Logger, fut: F) -> F::Output {
|
||||
LOGGER.scope(logger, fut).await
|
||||
}
|
||||
|
||||
pub fn logger() -> Logger { LOGGER.try_with(|l| l.clone()).expect("Logger not set!") }
|
||||
|
||||
@@ -6,7 +6,8 @@ use orchid_api_traits::{Decode, Encode};
|
||||
|
||||
pub async fn send_msg(mut write: Pin<&mut impl AsyncWrite>, msg: &[u8]) -> io::Result<()> {
|
||||
let mut len_buf = vec![];
|
||||
u32::try_from(msg.len()).unwrap().encode(Pin::new(&mut len_buf)).await;
|
||||
let len_prefix = u32::try_from(msg.len()).expect("Message over 4GB not permitted on channel");
|
||||
len_prefix.encode_vec(&mut len_buf);
|
||||
write.write_all(&len_buf).await?;
|
||||
write.write_all(msg).await?;
|
||||
write.flush().await
|
||||
@@ -15,7 +16,7 @@ pub async fn send_msg(mut write: Pin<&mut impl AsyncWrite>, msg: &[u8]) -> io::R
|
||||
pub async fn recv_msg(mut read: Pin<&mut impl AsyncRead>) -> io::Result<Vec<u8>> {
|
||||
let mut len_buf = [0u8; (u32::BITS / 8) as usize];
|
||||
read.read_exact(&mut len_buf).await?;
|
||||
let len = u32::decode(Pin::new(&mut &len_buf[..])).await;
|
||||
let len = u32::decode(Pin::new(&mut &len_buf[..])).await?;
|
||||
let mut msg = vec![0u8; len as usize];
|
||||
read.read_exact(&mut msg).await?;
|
||||
Ok(msg)
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::{Pin, pin};
|
||||
use std::rc::Rc;
|
||||
use std::task::Poll;
|
||||
use std::{io, mem};
|
||||
|
||||
use async_fn_stream::stream;
|
||||
use async_fn_stream::try_stream;
|
||||
use bound::Bound;
|
||||
use derive_destructure::destructure;
|
||||
use futures::channel::mpsc::{self, Receiver, Sender, channel};
|
||||
@@ -14,23 +13,29 @@ use futures::channel::oneshot;
|
||||
use futures::future::LocalBoxFuture;
|
||||
use futures::lock::{Mutex, MutexGuard};
|
||||
use futures::{
|
||||
AsyncRead, AsyncWrite, AsyncWriteExt, SinkExt, Stream, StreamExt, stream, stream_select,
|
||||
AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, Stream, StreamExt, stream_select,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use orchid_api_traits::{Channel, Decode, Encode, Request, UnderRoot};
|
||||
use orchid_api_traits::{Decode, Encode, Request, UnderRoot};
|
||||
|
||||
use crate::localset::LocalSet;
|
||||
|
||||
#[must_use = "Receipts indicate that a required action has been performed within a function. \
|
||||
Most likely this should be returned somewhere."]
|
||||
pub struct Receipt<'a>(PhantomData<&'a mut ()>);
|
||||
impl Receipt<'_> {
|
||||
/// Only call this function from a custom implementation of [RepWriter]
|
||||
pub fn _new() -> Self { Self(PhantomData) }
|
||||
}
|
||||
|
||||
/// Write guard to outbound for the purpose of serializing a request. Only one
|
||||
/// can exist at a time. Dropping this object should panic.
|
||||
pub trait ReqWriter {
|
||||
pub trait ReqWriter<'a> {
|
||||
/// Access to the underlying channel. This may be buffered.
|
||||
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>;
|
||||
/// Finalize the request, release the outbound channel, then queue for the
|
||||
/// reply on the inbound channel.
|
||||
fn send(self: Box<Self>) -> LocalBoxFuture<'static, Box<dyn RepReader>>;
|
||||
fn send(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Box<dyn RepReader<'a> + 'a>>>;
|
||||
}
|
||||
|
||||
/// Write guard to inbound for the purpose of deserializing a reply. While held,
|
||||
@@ -40,49 +45,106 @@ pub trait ReqWriter {
|
||||
/// synchronously, because the API isn't cancellation safe in general so it is a
|
||||
/// programmer error in all cases to drop an object related to it without proper
|
||||
/// cleanup.
|
||||
pub trait RepReader {
|
||||
pub trait RepReader<'a> {
|
||||
/// Access to the underlying channel. The length of the message is inferred
|
||||
/// from the number of bytes read so this must not be buffered.
|
||||
fn reader(&mut self) -> Pin<&mut dyn AsyncRead>;
|
||||
/// Finish reading the request
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()>;
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, ()>;
|
||||
}
|
||||
|
||||
/// Write guard to outbound for the purpose of serializing a notification.
|
||||
///
|
||||
/// Dropping this object should panic for the same reason [RepReader] panics
|
||||
pub trait MsgWriter {
|
||||
pub trait MsgWriter<'a> {
|
||||
/// Access to the underlying channel. This may be buffered.
|
||||
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>;
|
||||
/// Send the notification
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()>;
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<()>>;
|
||||
}
|
||||
|
||||
/// For initiating outbound requests and notifications
|
||||
pub trait Client {
|
||||
fn start_request(&self) -> LocalBoxFuture<'_, Box<dyn ReqWriter>>;
|
||||
fn start_notif(&self) -> LocalBoxFuture<'_, Box<dyn MsgWriter>>;
|
||||
fn start_request(&self) -> LocalBoxFuture<'_, io::Result<Box<dyn ReqWriter<'_> + '_>>>;
|
||||
fn start_notif(&self) -> LocalBoxFuture<'_, io::Result<Box<dyn MsgWriter<'_> + '_>>>;
|
||||
}
|
||||
|
||||
impl<T: Client + ?Sized> ClientExt for T {}
|
||||
/// Extension trait with convenience methods that handle outbound request and
|
||||
/// notif lifecycle and typing
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait ClientExt<CH: Channel>: Client {
|
||||
async fn request<T: Request + UnderRoot<Root = CH::Req>>(&self, t: T) -> T::Response {
|
||||
let mut req = self.start_request().await;
|
||||
t.into_root().encode(req.writer().as_mut()).await;
|
||||
let mut rep = req.send().await;
|
||||
pub trait ClientExt: Client {
|
||||
async fn request<T: Request + UnderRoot<Root: Encode>>(&self, t: T) -> io::Result<T::Response> {
|
||||
let mut req = self.start_request().await?;
|
||||
t.into_root().encode(req.writer().as_mut()).await?;
|
||||
let mut rep = req.send().await?;
|
||||
let response = T::Response::decode(rep.reader()).await;
|
||||
rep.finish().await;
|
||||
response
|
||||
}
|
||||
async fn notify<T: UnderRoot<Root = CH::Notif>>(&self, t: T) {
|
||||
let mut notif = self.start_notif().await;
|
||||
t.into_root().encode(notif.writer().as_mut()).await;
|
||||
notif.finish().await;
|
||||
async fn notify<T: UnderRoot<Root: Encode>>(&self, t: T) -> io::Result<()> {
|
||||
let mut notif = self.start_notif().await?;
|
||||
t.into_root().encode(notif.writer().as_mut()).await?;
|
||||
notif.finish().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ReqReader<'a> {
|
||||
fn reader(&mut self) -> Pin<&mut dyn AsyncRead>;
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, Box<dyn ReqHandle<'a> + 'a>>;
|
||||
}
|
||||
impl<'a, T: ReqReader<'a> + ?Sized> ReqReaderExt<'a> for T {}
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait ReqReaderExt<'a>: ReqReader<'a> {
|
||||
async fn read_req<R: Decode>(&mut self) -> io::Result<R> { R::decode(self.reader()).await }
|
||||
async fn reply<R: Request>(
|
||||
self: Box<Self>,
|
||||
req: impl Evidence<R>,
|
||||
rep: &R::Response,
|
||||
) -> io::Result<Receipt<'a>> {
|
||||
self.finish().await.reply(req, rep).await
|
||||
}
|
||||
async fn start_reply(self: Box<Self>) -> io::Result<Box<dyn RepWriter<'a> + 'a>> {
|
||||
self.finish().await.start_reply().await
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ReqHandle<'a> {
|
||||
fn start_reply(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Box<dyn RepWriter<'a> + 'a>>>;
|
||||
}
|
||||
impl<'a, T: ReqHandle<'a> + ?Sized> ReqHandleExt<'a> for T {}
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait ReqHandleExt<'a>: ReqHandle<'a> {
|
||||
async fn reply<Req: Request>(
|
||||
self: Box<Self>,
|
||||
_: impl Evidence<Req>,
|
||||
rep: &Req::Response,
|
||||
) -> io::Result<Receipt<'a>> {
|
||||
let mut reply = self.start_reply().await?;
|
||||
rep.encode(reply.writer()).await?;
|
||||
reply.finish().await
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RepWriter<'a> {
|
||||
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>;
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Receipt<'a>>>;
|
||||
}
|
||||
|
||||
pub trait MsgReader<'a> {
|
||||
fn reader(&mut self) -> Pin<&mut dyn AsyncRead>;
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, ()>;
|
||||
}
|
||||
impl<'a, T: ?Sized + MsgReader<'a>> MsgReaderExt<'a> for T {}
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait MsgReaderExt<'a>: MsgReader<'a> {
|
||||
async fn read<N: Decode>(mut self: Box<Self>) -> io::Result<N> {
|
||||
let n = N::decode(self.reader()).await;
|
||||
self.finish().await;
|
||||
n
|
||||
}
|
||||
}
|
||||
impl<CH: Channel, T: Client + ?Sized> ClientExt<CH> for T {}
|
||||
|
||||
/// A form of [Evidence] that doesn't require the value to be kept around
|
||||
pub struct Witness<T>(PhantomData<T>);
|
||||
@@ -105,64 +167,52 @@ type IoLock<T> = Rc<Mutex<Pin<Box<T>>>>;
|
||||
type IoGuard<T> = Bound<MutexGuard<'static, Pin<Box<T>>>, IoLock<T>>;
|
||||
|
||||
/// An incoming request. This holds a lock on the ingress channel.
|
||||
pub struct ReqReader<'a> {
|
||||
id: u64,
|
||||
pub struct IoReqReader<'a> {
|
||||
prefix: &'a [u8],
|
||||
read: IoGuard<dyn AsyncRead>,
|
||||
write: &'a Mutex<IoRef<dyn AsyncWrite>>,
|
||||
}
|
||||
impl<'a> ReqReader<'a> {
|
||||
/// Access
|
||||
pub fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() }
|
||||
pub async fn read_req<R: Decode>(&mut self) -> R { R::decode(self.reader()).await }
|
||||
pub async fn start_reply(self) -> RepWriter<'a> { self.branch().await.start_reply().await }
|
||||
pub async fn reply<R: Request>(self, req: impl Evidence<R>, rep: &R::Response) -> Receipt<'a> {
|
||||
self.branch().await.reply(req, rep).await
|
||||
impl<'a> ReqReader<'a> for IoReqReader<'a> {
|
||||
fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() }
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'a, Box<dyn ReqHandle<'a> + 'a>> {
|
||||
Box::pin(async {
|
||||
Box::new(IoReqHandle { prefix: self.prefix, write: self.write }) as Box<dyn ReqHandle<'a>>
|
||||
})
|
||||
}
|
||||
pub async fn branch(self) -> ReqHandle<'a> { ReqHandle { id: self.id, write: self.write } }
|
||||
}
|
||||
pub struct ReqHandle<'a> {
|
||||
id: u64,
|
||||
pub struct IoReqHandle<'a> {
|
||||
prefix: &'a [u8],
|
||||
write: &'a Mutex<IoRef<dyn AsyncWrite>>,
|
||||
}
|
||||
impl<'a> ReqHandle<'a> {
|
||||
pub async fn reply<Req: Request>(
|
||||
self,
|
||||
_: impl Evidence<Req>,
|
||||
rep: &Req::Response,
|
||||
) -> Receipt<'a> {
|
||||
let mut reply = self.start_reply().await;
|
||||
rep.encode(reply.writer()).await;
|
||||
reply.send().await
|
||||
}
|
||||
pub async fn start_reply(self) -> RepWriter<'a> {
|
||||
let mut write = self.write.lock().await;
|
||||
(!self.id).encode(write.as_mut()).await;
|
||||
RepWriter { write }
|
||||
impl<'a> ReqHandle<'a> for IoReqHandle<'a> {
|
||||
fn start_reply(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Box<dyn RepWriter<'a> + 'a>>> {
|
||||
Box::pin(async move {
|
||||
let mut write = self.write.lock().await;
|
||||
write.as_mut().write_all(self.prefix).await?;
|
||||
Ok(Box::new(IoRepWriter { write }) as Box<dyn RepWriter<'a>>)
|
||||
})
|
||||
}
|
||||
}
|
||||
pub struct RepWriter<'a> {
|
||||
pub struct IoRepWriter<'a> {
|
||||
write: MutexGuard<'a, IoRef<dyn AsyncWrite>>,
|
||||
}
|
||||
impl<'a> RepWriter<'a> {
|
||||
pub fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.write.as_mut() }
|
||||
pub async fn send(mut self) -> Receipt<'a> {
|
||||
self.writer().flush().await.unwrap();
|
||||
Receipt(PhantomData)
|
||||
impl<'a> RepWriter<'a> for IoRepWriter<'a> {
|
||||
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.write.as_mut() }
|
||||
fn finish(mut self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Receipt<'a>>> {
|
||||
Box::pin(async move {
|
||||
self.writer().flush().await?;
|
||||
Ok(Receipt(PhantomData))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NotifReader<'a> {
|
||||
pub struct IoMsgReader<'a> {
|
||||
_pd: PhantomData<&'a mut ()>,
|
||||
read: IoGuard<dyn AsyncRead>,
|
||||
}
|
||||
impl<'a> NotifReader<'a> {
|
||||
pub fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() }
|
||||
pub async fn read<N: Decode>(mut self) -> N {
|
||||
let n = N::decode(self.reader()).await;
|
||||
self.release().await;
|
||||
n
|
||||
}
|
||||
pub async fn release(self) {}
|
||||
impl<'a> MsgReader<'a> for IoMsgReader<'a> {
|
||||
fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() }
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()> { Box::pin(async {}) }
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -187,14 +237,14 @@ impl IoClient {
|
||||
}
|
||||
}
|
||||
impl Client for IoClient {
|
||||
fn start_notif(&self) -> LocalBoxFuture<'_, Box<dyn MsgWriter>> {
|
||||
fn start_notif(&self) -> LocalBoxFuture<'_, io::Result<Box<dyn MsgWriter<'_> + '_>>> {
|
||||
Box::pin(async {
|
||||
let mut o = self.lock_out().await;
|
||||
0u64.encode(o.as_mut()).await;
|
||||
Box::new(IoNotifWriter { o }) as Box<dyn MsgWriter>
|
||||
0u64.encode(o.as_mut()).await?;
|
||||
Ok(Box::new(IoNotifWriter { o }) as Box<dyn MsgWriter>)
|
||||
})
|
||||
}
|
||||
fn start_request(&self) -> LocalBoxFuture<'_, Box<dyn ReqWriter>> {
|
||||
fn start_request(&self) -> LocalBoxFuture<'_, io::Result<Box<dyn ReqWriter<'_> + '_>>> {
|
||||
Box::pin(async {
|
||||
let id = {
|
||||
let mut id_g = self.id.borrow_mut();
|
||||
@@ -206,8 +256,8 @@ impl Client for IoClient {
|
||||
self.subscribe.as_ref().clone().send(ReplySub { id, ack, cb }).await.unwrap();
|
||||
got_ack.await.unwrap();
|
||||
let mut w = self.lock_out().await;
|
||||
id.encode(w.as_mut()).await;
|
||||
Box::new(IoReqWriter { reply, w }) as Box<dyn ReqWriter>
|
||||
id.encode(w.as_mut()).await?;
|
||||
Ok(Box::new(IoReqWriter { reply, w }) as Box<dyn ReqWriter>)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -216,13 +266,15 @@ struct IoReqWriter {
|
||||
reply: oneshot::Receiver<IoGuard<dyn AsyncRead>>,
|
||||
w: IoGuard<dyn AsyncWrite>,
|
||||
}
|
||||
impl ReqWriter for IoReqWriter {
|
||||
impl<'a> ReqWriter<'a> for IoReqWriter {
|
||||
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.w.as_mut() }
|
||||
fn send(self: Box<Self>) -> LocalBoxFuture<'static, Box<dyn RepReader>> {
|
||||
fn send(self: Box<Self>) -> LocalBoxFuture<'a, io::Result<Box<dyn RepReader<'a> + 'a>>> {
|
||||
Box::pin(async {
|
||||
let Self { reply, .. } = *self;
|
||||
let Self { reply, mut w } = *self;
|
||||
w.flush().await?;
|
||||
mem::drop(w);
|
||||
let i = reply.await.expect("Client dropped before reply received");
|
||||
Box::new(IoRepReader { i }) as Box<dyn RepReader>
|
||||
Ok(Box::new(IoRepReader { i }) as Box<dyn RepReader>)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -230,7 +282,7 @@ impl ReqWriter for IoReqWriter {
|
||||
struct IoRepReader {
|
||||
i: IoGuard<dyn AsyncRead>,
|
||||
}
|
||||
impl RepReader for IoRepReader {
|
||||
impl<'a> RepReader<'a> for IoRepReader {
|
||||
fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.i.as_mut() }
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()> { Box::pin(async {}) }
|
||||
}
|
||||
@@ -239,11 +291,10 @@ impl RepReader for IoRepReader {
|
||||
struct IoNotifWriter {
|
||||
o: IoGuard<dyn AsyncWrite>,
|
||||
}
|
||||
impl MsgWriter for IoNotifWriter {
|
||||
impl<'a> MsgWriter<'a> for IoNotifWriter {
|
||||
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.o.as_mut() }
|
||||
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()> {
|
||||
self.destructure();
|
||||
Box::pin(async {})
|
||||
fn finish(mut self: Box<Self>) -> LocalBoxFuture<'static, io::Result<()>> {
|
||||
Box::pin(async move { self.o.flush().await })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,12 +313,12 @@ impl CommCtx {
|
||||
/// parameters are associated with the client and serve to ensure with a runtime
|
||||
/// check that the correct message families are sent in the correct directions
|
||||
/// across the channel.
|
||||
pub fn io_comm<CH: Channel>(
|
||||
pub fn io_comm(
|
||||
o: Rc<Mutex<Pin<Box<dyn AsyncWrite>>>>,
|
||||
i: Mutex<Pin<Box<dyn AsyncRead>>>,
|
||||
notif: impl for<'a> AsyncFn(NotifReader<'a>),
|
||||
req: impl for<'a> AsyncFn(ReqReader<'a>) -> Receipt<'a>,
|
||||
) -> (impl ClientExt<CH>, CommCtx, impl Future<Output = ()>) {
|
||||
notif: impl for<'a> AsyncFn(Box<dyn MsgReader<'a> + 'a>) -> io::Result<()>,
|
||||
req: impl for<'a> AsyncFn(Box<dyn ReqReader<'a> + 'a>) -> io::Result<Receipt<'a>>,
|
||||
) -> (impl Client + 'static, CommCtx, impl Future<Output = io::Result<()>>) {
|
||||
let i = Rc::new(i);
|
||||
let (onsub, client) = IoClient::new(o.clone());
|
||||
let (exit, onexit) = channel(1);
|
||||
@@ -278,65 +329,76 @@ pub fn io_comm<CH: Channel>(
|
||||
Exit,
|
||||
}
|
||||
let exiting = RefCell::new(false);
|
||||
let input_stream = stream(async |mut h| {
|
||||
let input_stream = try_stream(async |mut h| {
|
||||
loop {
|
||||
let mut g = Bound::async_new(i.clone(), async |i| i.lock().await).await;
|
||||
let id = u64::decode(g.as_mut()).await;
|
||||
h.emit(Event::Input(id, g)).await;
|
||||
match u64::decode(g.as_mut()).await {
|
||||
Ok(id) => h.emit(Event::Input(id, g)).await,
|
||||
Err(e)
|
||||
if matches!(
|
||||
e.kind(),
|
||||
io::ErrorKind::BrokenPipe
|
||||
| io::ErrorKind::ConnectionAborted
|
||||
| io::ErrorKind::UnexpectedEof
|
||||
) =>
|
||||
h.emit(Event::Exit).await,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
});
|
||||
let pending_reqs = RefCell::new(VecDeque::<LocalBoxFuture<()>>::new());
|
||||
// this stream will never yield a value
|
||||
let mut fork_stream = pin!(
|
||||
stream::poll_fn(|cx| {
|
||||
let mut reqs_g = pending_reqs.borrow_mut();
|
||||
reqs_g.retain_mut(|req| match req.as_mut().poll(cx) {
|
||||
Poll::Pending => true,
|
||||
Poll::Ready(()) => false,
|
||||
});
|
||||
if *exiting.borrow() { Poll::Ready(None) } else { Poll::Pending }
|
||||
})
|
||||
.fuse()
|
||||
);
|
||||
let (mut add_pending_req, fork_future) = LocalSet::new();
|
||||
let mut fork_stream = pin!(fork_future.fuse().into_stream());
|
||||
let mut pending_replies = HashMap::new();
|
||||
{
|
||||
'body: {
|
||||
let mut shared = pin!(stream_select!(
|
||||
pin!(input_stream) as Pin<&mut dyn Stream<Item = Event>>,
|
||||
onsub.map(Event::Sub),
|
||||
fork_stream.as_mut(),
|
||||
onexit.map(|()| Event::Exit),
|
||||
pin!(input_stream) as Pin<&mut dyn Stream<Item = io::Result<Event>>>,
|
||||
onsub.map(|sub| Ok(Event::Sub(sub))),
|
||||
fork_stream.as_mut().map(|res| {
|
||||
res.map(|()| panic!("this substream cannot exit while the loop is running"))
|
||||
}),
|
||||
onexit.map(|()| Ok(Event::Exit)),
|
||||
));
|
||||
while let Some(next) = shared.next().await {
|
||||
match next {
|
||||
Event::Exit => {
|
||||
Err(e) => break 'body Err(e),
|
||||
Ok(Event::Exit) => {
|
||||
*exiting.borrow_mut() = true;
|
||||
break;
|
||||
},
|
||||
Event::Sub(ReplySub { id, ack, cb }) => {
|
||||
Ok(Event::Sub(ReplySub { id, ack, cb })) => {
|
||||
pending_replies.insert(id, cb);
|
||||
ack.send(()).unwrap();
|
||||
},
|
||||
Event::Input(0, read) => {
|
||||
Ok(Event::Input(0, read)) => {
|
||||
let notif = ¬if;
|
||||
pending_reqs.borrow_mut().push_back(Box::pin(async move {
|
||||
notif(NotifReader { _pd: PhantomData, read }).await
|
||||
}));
|
||||
let notif_job =
|
||||
async move { notif(Box::new(IoMsgReader { _pd: PhantomData, read })).await };
|
||||
add_pending_req.send(Box::pin(notif_job)).await.unwrap();
|
||||
},
|
||||
// MSB == 0 is a request, !id where MSB == 1 is the corresponding response
|
||||
Ok(Event::Input(id, read)) if (id & (1 << (u64::BITS - 1))) == 0 => {
|
||||
let (o, req) = (o.clone(), &req);
|
||||
let req_job = async move {
|
||||
let mut prefix = Vec::new();
|
||||
(!id).encode_vec(&mut prefix);
|
||||
let _ = req(Box::new(IoReqReader { prefix: &pin!(prefix), read, write: &o })).await;
|
||||
Ok(())
|
||||
};
|
||||
add_pending_req.send(Box::pin(req_job)).await.unwrap();
|
||||
},
|
||||
Ok(Event::Input(id, read)) => {
|
||||
let cb = pending_replies.remove(&!id).expect("Reply to unrecognized request");
|
||||
cb.send(read).unwrap_or_else(|_| panic!("Failed to send reply"));
|
||||
},
|
||||
// id.msb == 0 is a request, !id where id.msb == 1 is the equivalent response
|
||||
Event::Input(id, read) =>
|
||||
if (id & (1 << (u64::BITS - 1))) == 0 {
|
||||
let (o, req) = (o.clone(), &req);
|
||||
pending_reqs.borrow_mut().push_back(Box::pin(async move {
|
||||
let _ = req(ReqReader { id, read, write: &o }).await;
|
||||
}) as LocalBoxFuture<()>);
|
||||
} else {
|
||||
let cb = pending_replies.remove(&!id).expect("Reply to unrecognized request");
|
||||
cb.send(read).unwrap_or_else(|_| panic!("Failed to send reply"));
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}?;
|
||||
mem::drop(add_pending_req);
|
||||
while let Some(next) = fork_stream.next().await {
|
||||
next?
|
||||
}
|
||||
fork_stream.as_mut().count().await;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -347,18 +409,48 @@ mod test {
|
||||
use futures::channel::mpsc;
|
||||
use futures::lock::Mutex;
|
||||
use futures::{SinkExt, StreamExt, join};
|
||||
use never::Never;
|
||||
use orchid_api_derive::{Coding, Hierarchy};
|
||||
use orchid_api_traits::{Channel, Request};
|
||||
use orchid_api_traits::Request;
|
||||
use test_executors::spin_on;
|
||||
use unsync_pipe::pipe;
|
||||
|
||||
use crate::reqnot::{ClientExt, NotifReader, io_comm};
|
||||
use crate::reqnot::{ClientExt, MsgReaderExt, ReqReaderExt, io_comm};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Coding, Hierarchy)]
|
||||
#[extendable]
|
||||
struct TestNotif(u64);
|
||||
|
||||
#[test]
|
||||
fn notification() {
|
||||
spin_on(async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (received, mut on_receive) = mpsc::channel(2);
|
||||
let (_, recv_ctx, run_recv) = io_comm(
|
||||
Rc::new(Mutex::new(Box::pin(in2))),
|
||||
Mutex::new(Box::pin(out2)),
|
||||
async |notif| {
|
||||
received.clone().send(notif.read::<TestNotif>().await?).await.unwrap();
|
||||
Ok(())
|
||||
},
|
||||
async |_| panic!("Should receive notif, not request"),
|
||||
);
|
||||
let (sender, ..) = io_comm(
|
||||
Rc::new(Mutex::new(Box::pin(in1))),
|
||||
Mutex::new(Box::pin(out1)),
|
||||
async |_| panic!("Should not receive notif"),
|
||||
async |_| panic!("Should not receive request"),
|
||||
);
|
||||
join!(async { run_recv.await.unwrap() }, async {
|
||||
sender.notify(TestNotif(3)).await.unwrap();
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(3)));
|
||||
sender.notify(TestNotif(4)).await.unwrap();
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(4)));
|
||||
recv_ctx.exit().await;
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Coding, Hierarchy)]
|
||||
#[extendable]
|
||||
struct DummyRequest(u64);
|
||||
@@ -366,64 +458,28 @@ mod test {
|
||||
type Response = u64;
|
||||
}
|
||||
|
||||
struct TestChannel;
|
||||
impl Channel for TestChannel {
|
||||
type Notif = TestNotif;
|
||||
type Req = DummyRequest;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notification() {
|
||||
spin_on(async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (received, mut on_receive) = mpsc::channel(2);
|
||||
let (_, recv_ctx, run_recv) = io_comm::<Never>(
|
||||
Rc::new(Mutex::new(Box::pin(in2))),
|
||||
Mutex::new(Box::pin(out2)),
|
||||
async |notif: NotifReader| {
|
||||
received.clone().send(notif.read::<TestNotif>().await).await.unwrap();
|
||||
},
|
||||
async |_| panic!("Should receive notif, not request"),
|
||||
);
|
||||
let (sender, ..) = io_comm::<TestChannel>(
|
||||
Rc::new(Mutex::new(Box::pin(in1))),
|
||||
Mutex::new(Box::pin(out1)),
|
||||
async |_| panic!("Should not receive notif"),
|
||||
async |_| panic!("Should not receive request"),
|
||||
);
|
||||
join!(run_recv, async {
|
||||
sender.notify(TestNotif(3)).await;
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(3)));
|
||||
sender.notify(TestNotif(4)).await;
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(4)));
|
||||
recv_ctx.exit().await;
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn request() {
|
||||
spin_on(async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (_, srv_ctx, run_srv) = io_comm::<Never>(
|
||||
let (_, srv_ctx, run_srv) = io_comm(
|
||||
Rc::new(Mutex::new(Box::pin(in2))),
|
||||
Mutex::new(Box::pin(out2)),
|
||||
async |_| panic!("No notifs expected"),
|
||||
async |mut req| {
|
||||
let val = req.read_req::<DummyRequest>().await;
|
||||
let val = req.read_req::<DummyRequest>().await?;
|
||||
req.reply(&val, &(val.0 + 1)).await
|
||||
},
|
||||
);
|
||||
let (client, client_ctx, run_client) = io_comm::<TestChannel>(
|
||||
let (client, client_ctx, run_client) = io_comm(
|
||||
Rc::new(Mutex::new(Box::pin(in1))),
|
||||
Mutex::new(Box::pin(out1)),
|
||||
async |_| panic!("Not expecting ingress notif"),
|
||||
async |_| panic!("Not expecting ingress req"),
|
||||
);
|
||||
join!(run_srv, run_client, async {
|
||||
let response = client.request(DummyRequest(5)).await;
|
||||
join!(async { run_srv.await.unwrap() }, async { run_client.await.unwrap() }, async {
|
||||
let response = client.request(DummyRequest(5)).await.unwrap();
|
||||
assert_eq!(response, 6);
|
||||
srv_ctx.exit().await;
|
||||
client_ctx.exit().await;
|
||||
|
||||
@@ -1,16 +1,21 @@
|
||||
//! A pattern for running async code from sync destructors and other
|
||||
//! unfortunately sync callbacks
|
||||
//!
|
||||
//! We create a task_local
|
||||
//! We create a task_local vecdeque which is moved into a thread_local whenever
|
||||
//! the task is being polled. A call to [stash] pushes the future onto this
|
||||
//! deque. Before [with_stash] returns, it pops everything from the deque
|
||||
//! individually and awaits each of them, pushing any additionally stashed
|
||||
//! futures onto the back of the same deque.
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
|
||||
use some_executor::task_local;
|
||||
use task_local::task_local;
|
||||
|
||||
#[derive(Default)]
|
||||
struct StashedFutures {
|
||||
queue: VecDeque<Pin<Box<dyn Future<Output = ()>>>>,
|
||||
queue: RefCell<VecDeque<Pin<Box<dyn Future<Output = ()>>>>>,
|
||||
}
|
||||
|
||||
task_local! {
|
||||
@@ -23,7 +28,7 @@ pub async fn with_stash<F: Future>(fut: F) -> F::Output {
|
||||
STASHED_FUTURES
|
||||
.scope(StashedFutures::default(), async {
|
||||
let val = fut.await;
|
||||
while let Some(fut) = STASHED_FUTURES.with_mut(|sf| sf.unwrap().queue.pop_front()) {
|
||||
while let Some(fut) = STASHED_FUTURES.with(|sf| sf.queue.borrow_mut().pop_front()) {
|
||||
fut.await;
|
||||
}
|
||||
val
|
||||
@@ -33,10 +38,7 @@ pub async fn with_stash<F: Future>(fut: F) -> F::Output {
|
||||
|
||||
/// Schedule a future to be run before the next [with_stash] guard ends. This is
|
||||
/// most useful for sending messages from destructors.
|
||||
pub fn stash<F: Future + 'static>(fut: F) {
|
||||
STASHED_FUTURES.with_mut(|sf| {
|
||||
sf.expect("No stash! Timely completion cannot be guaranteed").queue.push_back(Box::pin(async {
|
||||
fut.await;
|
||||
}))
|
||||
})
|
||||
pub fn stash<F: Future<Output = ()> + 'static>(fut: F) {
|
||||
(STASHED_FUTURES.try_with(|sf| sf.queue.borrow_mut().push_back(Box::pin(fut))))
|
||||
.expect("No stash! Timely completion cannot be guaranteed")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user