forked from Orchid/orchid
lex_hello worked for a second just now
this is just a backup however
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
use std::ops::Deref;
|
||||
use std::rc::Rc;
|
||||
|
||||
use futures::future::LocalBoxFuture;
|
||||
|
||||
use crate::api;
|
||||
|
||||
pub type Spawner = Rc<dyn Fn(LocalBoxFuture<'static, ()>)>;
|
||||
pub type RecvCB<'a> = Box<dyn for<'b> FnOnce(&'b [u8]) -> LocalBoxFuture<'b, ()> + 'a>;
|
||||
|
||||
/// The 3 primary contact points with an extension are
|
||||
/// - send a message
|
||||
/// - wait for a message to arrive
|
||||
@@ -12,10 +16,7 @@ use crate::api;
|
||||
/// There are no ordering guarantees about these
|
||||
pub trait ExtPort {
|
||||
fn send<'a>(&'a self, msg: &'a [u8]) -> LocalBoxFuture<'a, ()>;
|
||||
fn recv<'a>(
|
||||
&'a self,
|
||||
cb: Box<dyn FnOnce(&[u8]) -> LocalBoxFuture<'_, ()> + 'a>,
|
||||
) -> LocalBoxFuture<'a, ()>;
|
||||
fn recv<'a>(&'a self, cb: RecvCB<'a>) -> LocalBoxFuture<'a, ()>;
|
||||
}
|
||||
|
||||
pub struct ExtInit {
|
||||
@@ -24,12 +25,7 @@ pub struct ExtInit {
|
||||
}
|
||||
impl ExtInit {
|
||||
pub async fn send(&self, msg: &[u8]) { self.port.send(msg).await }
|
||||
pub async fn recv<'a, 's: 'a>(
|
||||
&'s self,
|
||||
cb: Box<dyn FnOnce(&[u8]) -> LocalBoxFuture<'_, ()> + 'a>,
|
||||
) {
|
||||
self.port.recv(Box::new(cb)).await
|
||||
}
|
||||
pub async fn recv<'a, 's: 'a>(&'s self, cb: RecvCB<'a>) { self.port.recv(Box::new(cb)).await }
|
||||
}
|
||||
impl Deref for ExtInit {
|
||||
type Target = api::ExtensionHeader;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::cell::RefCell;
|
||||
use std::fmt;
|
||||
use std::ops::Add;
|
||||
use std::sync::Arc;
|
||||
@@ -171,3 +172,17 @@ pub fn mk_errv(
|
||||
pub trait Reporter {
|
||||
fn report(&self, e: impl Into<OrcErrv>);
|
||||
}
|
||||
|
||||
pub struct ReporterImpl {
|
||||
errors: RefCell<Vec<OrcErr>>,
|
||||
}
|
||||
impl ReporterImpl {
|
||||
pub fn new() -> Self { Self { errors: RefCell::new(vec![]) } }
|
||||
pub fn errv(self) -> Option<OrcErrv> { OrcErrv::new(self.errors.into_inner()).ok() }
|
||||
}
|
||||
impl Reporter for ReporterImpl {
|
||||
fn report(&self, e: impl Into<OrcErrv>) { self.errors.borrow_mut().extend(e.into()) }
|
||||
}
|
||||
impl Default for ReporterImpl {
|
||||
fn default() -> Self { Self::new() }
|
||||
}
|
||||
|
||||
@@ -229,8 +229,6 @@ impl Interner {
|
||||
/// Intern some data; query its identifier if not known locally
|
||||
pub async fn i<T: Interned>(&self, t: &(impl Internable<Interned = T> + ?Sized)) -> Tok<T> {
|
||||
let data = t.get_owned();
|
||||
let job = format!("{t:?} in {}", if self.master.is_some() { "replica" } else { "master" });
|
||||
eprintln!("Interning {job}");
|
||||
let mut g = self.interners.lock().await;
|
||||
let typed = T::bimap(&mut g);
|
||||
if let Some(tok) = typed.by_value(&data) {
|
||||
@@ -243,7 +241,6 @@ impl Interner {
|
||||
};
|
||||
let tok = Tok::new(data, marker);
|
||||
T::bimap(&mut g).insert(tok.clone());
|
||||
eprintln!("Interned {job}");
|
||||
tok
|
||||
}
|
||||
/// Extern an identifier; query the data it represents if not known locally
|
||||
|
||||
@@ -20,9 +20,11 @@ impl Logger {
|
||||
}
|
||||
pub fn write_fmt(&self, fmt: Arguments) {
|
||||
match &self.0 {
|
||||
api::LogStrategy::Discard => (),
|
||||
api::LogStrategy::StdErr => stderr().write_fmt(fmt).expect("Could not write to stderr!"),
|
||||
api::LogStrategy::File(f) => {
|
||||
let mut file = File::open(f).expect("Could not open logfile");
|
||||
let mut file = (File::options().write(true).create(true).truncate(true).open(f))
|
||||
.expect("Could not open logfile");
|
||||
file.write_fmt(fmt).expect("Could not write to logfile");
|
||||
},
|
||||
}
|
||||
|
||||
@@ -2,8 +2,7 @@ use std::marker::PhantomData;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_std::stream;
|
||||
use async_std::sync::Mutex;
|
||||
use async_stream::stream;
|
||||
use futures::future::LocalBoxFuture;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use never::Never;
|
||||
@@ -63,7 +62,7 @@ pub enum MTok<'a, A> {
|
||||
Ref(Arc<MTok<'a, Never>>),
|
||||
/// Used in the matcher to skip previous macro output which can only go in
|
||||
/// vectorial placeholders
|
||||
Done(Arc<MTok<'a, A>>),
|
||||
Done(Rc<MTok<'a, A>>),
|
||||
}
|
||||
impl<'a, A> MTok<'a, A> {
|
||||
pub(crate) async fn from_api(
|
||||
@@ -99,17 +98,17 @@ impl<'a, A> MTok<'a, A> {
|
||||
}
|
||||
|
||||
pub async fn mtreev_from_api<'a, 'b, A>(
|
||||
api: impl IntoIterator<Item = &'b api::MacroTree>,
|
||||
apiv: impl IntoIterator<Item = &'b api::MacroTree>,
|
||||
i: &Interner,
|
||||
do_atom: &mut impl MacroAtomFromApi<'a, A>,
|
||||
do_atom: &'b mut (impl MacroAtomFromApi<'a, A> + 'b),
|
||||
) -> Vec<MTree<'a, A>> {
|
||||
let do_atom_lk = Mutex::new(do_atom);
|
||||
stream::from_iter(api)
|
||||
.then(|api| async {
|
||||
MTree::from_api(api, &mut *do_atom_lk.lock().await, i).boxed_local().await
|
||||
})
|
||||
.collect()
|
||||
.await
|
||||
stream! {
|
||||
for api in apiv {
|
||||
yield MTree::from_api(api, do_atom, i).boxed_local().await
|
||||
}
|
||||
}
|
||||
.collect()
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn mtreev_to_api<'a: 'b, 'b, A: 'b>(
|
||||
|
||||
@@ -192,13 +192,13 @@ pub struct Parsed<'a, 'b, T, A: AtomRepr, X: ExtraTok> {
|
||||
pub type ParseRes<'a, 'b, T, A, X> = OrcRes<Parsed<'a, 'b, T, A, X>>;
|
||||
|
||||
pub async fn parse_multiname<'a, 'b, A: AtomRepr, X: ExtraTok>(
|
||||
ctx: &impl Reporter,
|
||||
ctx: &(impl Reporter + ?Sized),
|
||||
tail: Snippet<'a, 'b, A, X>,
|
||||
) -> ParseRes<'a, 'b, Vec<(Import, Pos)>, A, X> {
|
||||
let ret = rec(ctx, tail).await;
|
||||
#[allow(clippy::type_complexity)] // it's an internal function
|
||||
pub async fn rec<'a, 'b, A: AtomRepr, X: ExtraTok>(
|
||||
ctx: &impl Reporter,
|
||||
ctx: &(impl Reporter + ?Sized),
|
||||
tail: Snippet<'a, 'b, A, X>,
|
||||
) -> ParseRes<'a, 'b, Vec<(Vec<Tok<String>>, Option<Tok<String>>, Pos)>, A, X> {
|
||||
let comma = tail.i(",").await;
|
||||
|
||||
@@ -18,6 +18,7 @@ use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request};
|
||||
use trait_set::trait_set;
|
||||
|
||||
use crate::clone;
|
||||
use crate::logging::Logger;
|
||||
|
||||
pub struct Receipt<'a>(PhantomData<&'a mut ()>);
|
||||
|
||||
@@ -110,16 +111,24 @@ impl Deref for RawReply {
|
||||
fn deref(&self) -> &Self::Target { get_id(&self.0[..]).1 }
|
||||
}
|
||||
|
||||
pub struct ReqNot<T: MsgSet>(Arc<Mutex<ReqNotData<T>>>);
|
||||
pub struct ReqNot<T: MsgSet>(Arc<Mutex<ReqNotData<T>>>, Logger);
|
||||
impl<T: MsgSet> ReqNot<T> {
|
||||
pub fn new(send: impl SendFn<T>, notif: impl NotifFn<T>, req: impl ReqFn<T>) -> Self {
|
||||
Self(Arc::new(Mutex::new(ReqNotData {
|
||||
id: 1,
|
||||
send: Box::new(send),
|
||||
notif: Box::new(notif),
|
||||
req: Box::new(req),
|
||||
responses: HashMap::new(),
|
||||
})))
|
||||
pub fn new(
|
||||
logger: Logger,
|
||||
send: impl SendFn<T>,
|
||||
notif: impl NotifFn<T>,
|
||||
req: impl ReqFn<T>,
|
||||
) -> Self {
|
||||
Self(
|
||||
Arc::new(Mutex::new(ReqNotData {
|
||||
id: 1,
|
||||
send: Box::new(send),
|
||||
notif: Box::new(notif),
|
||||
req: Box::new(req),
|
||||
responses: HashMap::new(),
|
||||
})),
|
||||
logger,
|
||||
)
|
||||
}
|
||||
|
||||
/// Can be called from a polling thread or dispatched in any other way
|
||||
@@ -133,7 +142,7 @@ impl<T: MsgSet> ReqNot<T> {
|
||||
notif_cb(notif_val, self.clone()).await
|
||||
} else if 0 < id.bitand(1 << 63) {
|
||||
let sender = g.responses.remove(&!id).expect("Received response for invalid message");
|
||||
sender.send(message.to_vec()).await.unwrap();
|
||||
sender.send(message.to_vec()).await.unwrap()
|
||||
} else {
|
||||
let message = <T::In as Channel>::Req::decode(Pin::new(&mut &payload[..])).await;
|
||||
let mut req_cb = clone_box(&*g.req);
|
||||
@@ -154,28 +163,34 @@ impl<T: MsgSet> ReqNot<T> {
|
||||
|
||||
pub trait DynRequester {
|
||||
type Transfer;
|
||||
fn logger(&self) -> &Logger;
|
||||
/// Encode and send a request, then receive the response buffer.
|
||||
fn raw_request(&self, data: Self::Transfer) -> LocalBoxFuture<'_, RawReply>;
|
||||
}
|
||||
|
||||
pub struct MappedRequester<'a, T: 'a>(Box<dyn Fn(T) -> LocalBoxFuture<'a, RawReply> + 'a>);
|
||||
pub struct MappedRequester<'a, T: 'a>(Box<dyn Fn(T) -> LocalBoxFuture<'a, RawReply> + 'a>, Logger);
|
||||
impl<'a, T> MappedRequester<'a, T> {
|
||||
fn new<U: DynRequester + 'a>(req: U) -> Self
|
||||
fn new<U: DynRequester + 'a>(req: U, logger: Logger) -> Self
|
||||
where T: Into<U::Transfer> {
|
||||
let req_arc = Arc::new(req);
|
||||
MappedRequester(Box::new(move |t| {
|
||||
Box::pin(clone!(req_arc; async move { req_arc.raw_request(t.into()).await}))
|
||||
}))
|
||||
MappedRequester(
|
||||
Box::new(move |t| {
|
||||
Box::pin(clone!(req_arc; async move { req_arc.raw_request(t.into()).await}))
|
||||
}),
|
||||
logger,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DynRequester for MappedRequester<'_, T> {
|
||||
type Transfer = T;
|
||||
fn logger(&self) -> &Logger { &self.1 }
|
||||
fn raw_request(&self, data: Self::Transfer) -> LocalBoxFuture<'_, RawReply> { self.0(data) }
|
||||
}
|
||||
|
||||
impl<T: MsgSet> DynRequester for ReqNot<T> {
|
||||
type Transfer = <T::Out as Channel>::Req;
|
||||
fn logger(&self) -> &Logger { &self.1 }
|
||||
fn raw_request(&self, req: Self::Transfer) -> LocalBoxFuture<'_, RawReply> {
|
||||
Box::pin(async move {
|
||||
let mut g = self.0.lock().await;
|
||||
@@ -203,17 +218,21 @@ pub trait Requester: DynRequester {
|
||||
) -> impl Future<Output = R::Response>;
|
||||
fn map<'a, U: Into<Self::Transfer>>(self) -> MappedRequester<'a, U>
|
||||
where Self: Sized + 'a {
|
||||
MappedRequester::new(self)
|
||||
let logger = self.logger().clone();
|
||||
MappedRequester::new(self, logger)
|
||||
}
|
||||
}
|
||||
impl<This: DynRequester + ?Sized> Requester for This {
|
||||
async fn request<R: Request + Into<Self::Transfer>>(&self, data: R) -> R::Response {
|
||||
R::Response::decode(Pin::new(&mut &self.raw_request(data.into()).await[..])).await
|
||||
let req = format!("{data:?}");
|
||||
let rep = R::Response::decode(Pin::new(&mut &self.raw_request(data.into()).await[..])).await;
|
||||
writeln!(self.logger(), "Request {req} got response {rep:?}");
|
||||
rep
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: MsgSet> Clone for ReqNot<T> {
|
||||
fn clone(&self) -> Self { Self(self.0.clone()) }
|
||||
fn clone(&self) -> Self { Self(self.0.clone(), self.1.clone()) }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -223,12 +242,14 @@ mod test {
|
||||
|
||||
use async_std::sync::Mutex;
|
||||
use futures::FutureExt;
|
||||
use orchid_api::LogStrategy;
|
||||
use orchid_api_derive::Coding;
|
||||
use orchid_api_traits::{Channel, Request};
|
||||
use test_executors::spin_on;
|
||||
|
||||
use super::{MsgSet, ReqNot};
|
||||
use crate::clone;
|
||||
use crate::logging::Logger;
|
||||
use crate::reqnot::Requester as _;
|
||||
|
||||
#[derive(Clone, Debug, Coding, PartialEq)]
|
||||
@@ -252,8 +273,10 @@ mod test {
|
||||
#[test]
|
||||
fn notification() {
|
||||
spin_on(async {
|
||||
let logger = Logger::new(LogStrategy::StdErr);
|
||||
let received = Arc::new(Mutex::new(None));
|
||||
let receiver = ReqNot::<TestMsgSet>::new(
|
||||
logger.clone(),
|
||||
|_, _| panic!("Should not send anything"),
|
||||
clone!(received; move |notif, _| clone!(received; async move {
|
||||
*received.lock().await = Some(notif);
|
||||
@@ -261,6 +284,7 @@ mod test {
|
||||
|_, _| panic!("Not receiving a request"),
|
||||
);
|
||||
let sender = ReqNot::<TestMsgSet>::new(
|
||||
logger,
|
||||
clone!(receiver; move |d, _| clone!(receiver; Box::pin(async move {
|
||||
receiver.receive(d).await
|
||||
}))),
|
||||
@@ -277,8 +301,10 @@ mod test {
|
||||
#[test]
|
||||
fn request() {
|
||||
spin_on(async {
|
||||
let logger = Logger::new(LogStrategy::StdErr);
|
||||
let receiver = Rc::new(Mutex::<Option<ReqNot<TestMsgSet>>>::new(None));
|
||||
let sender = Rc::new(ReqNot::<TestMsgSet>::new(
|
||||
logger.clone(),
|
||||
clone!(receiver; move |d, _| clone!(receiver; Box::pin(async move {
|
||||
receiver.lock().await.as_ref().unwrap().receive(d).await
|
||||
}))),
|
||||
@@ -286,6 +312,7 @@ mod test {
|
||||
|_, _| panic!("Should not receive request"),
|
||||
));
|
||||
*receiver.lock().await = Some(ReqNot::new(
|
||||
logger,
|
||||
clone!(sender; move |d, _| clone!(sender; Box::pin(async move {
|
||||
sender.receive(d).await
|
||||
}))),
|
||||
|
||||
@@ -7,8 +7,7 @@ use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use api::PhKind;
|
||||
use async_std::stream;
|
||||
use async_std::sync::Mutex;
|
||||
use async_stream::stream;
|
||||
use futures::future::{LocalBoxFuture, join_all};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use itertools::Itertools;
|
||||
@@ -161,14 +160,13 @@ pub async fn ttv_from_api<A: AtomRepr, X: ExtraTok>(
|
||||
ctx: &mut A::Ctx,
|
||||
i: &Interner,
|
||||
) -> Vec<TokTree<'static, A, X>> {
|
||||
let ctx_lk = Mutex::new(ctx);
|
||||
stream::from_iter(tokv.into_iter())
|
||||
.then(|t| async {
|
||||
let t = t;
|
||||
TokTree::<A, X>::from_api(t.borrow(), *ctx_lk.lock().await, i).boxed_local().await
|
||||
})
|
||||
.collect()
|
||||
.await
|
||||
stream! {
|
||||
for tok in tokv {
|
||||
yield TokTree::<A, X>::from_api(tok.borrow(), ctx, i).boxed_local().await
|
||||
}
|
||||
}
|
||||
.collect()
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn ttv_to_api<'a, A: AtomRepr, X: ExtraTok>(
|
||||
@@ -186,11 +184,13 @@ pub async fn ttv_into_api<'a, A: AtomRepr, X: ExtraTok>(
|
||||
tokv: impl IntoIterator<Item = TokTree<'a, A, X>>,
|
||||
do_extra: &mut impl FnMut(X, Range<u32>) -> api::TokenTree,
|
||||
) -> Vec<api::TokenTree> {
|
||||
let mut new_tokv = Vec::new();
|
||||
for item in tokv {
|
||||
new_tokv.push(item.into_api(do_extra).await)
|
||||
stream! {
|
||||
for tok in tokv {
|
||||
yield tok.into_api(do_extra).await
|
||||
}
|
||||
}
|
||||
new_tokv
|
||||
.collect()
|
||||
.await
|
||||
}
|
||||
|
||||
/// This takes a position and not a range because it assigns the range to
|
||||
|
||||
Reference in New Issue
Block a user