forked from Orchid/orchid
Committing for reference
This commit is contained in:
29
orchid-base/src/builtin.rs
Normal file
29
orchid-base/src/builtin.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
use std::future::Future;
|
||||
use std::ops::Deref;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::api;
|
||||
|
||||
/// The 3 primary contact points with an extension are
|
||||
/// - send a message
|
||||
/// - wait for a message to arrive
|
||||
/// - wait for the extension to stop after exit (this is the implicit Drop)
|
||||
///
|
||||
/// There are no ordering guarantees about these
|
||||
pub trait ExtPort {
|
||||
fn send(&self, msg: &[u8]) -> Pin<Box<dyn Future<Output = ()>>>;
|
||||
fn recv<'a>(&self, cb: Box<dyn FnOnce(&[u8]) + Send + 'a>) -> Pin<Box<dyn Future<Output = ()>>>;
|
||||
}
|
||||
|
||||
pub struct ExtInit {
|
||||
pub header: api::ExtensionHeader,
|
||||
pub port: Box<dyn ExtPort>,
|
||||
}
|
||||
impl ExtInit {
|
||||
pub async fn send(&self, msg: &[u8]) { self.port.send(msg).await }
|
||||
pub async fn recv(&self, cb: impl FnOnce(&[u8]) + Send) { self.port.recv(Box::new(cb)).await }
|
||||
}
|
||||
impl Deref for ExtInit {
|
||||
type Target = api::ExtensionHeader;
|
||||
fn deref(&self) -> &Self::Target { &self.header }
|
||||
}
|
||||
@@ -248,6 +248,7 @@ pub fn interner() -> impl DerefMut<Target = Interner> {
|
||||
G(g)
|
||||
}
|
||||
|
||||
/// Initialize the interner in replica mode. No messages are sent at this point.
|
||||
pub fn init_replica(req: impl DynRequester<Transfer = api::IntReq> + 'static) {
|
||||
let mut g = INTERNER.lock().unwrap();
|
||||
assert!(g.is_none(), "Attempted to initialize replica interner after first use");
|
||||
|
||||
@@ -2,6 +2,7 @@ use orchid_api as api;
|
||||
|
||||
pub mod box_cow;
|
||||
pub mod boxed_iter;
|
||||
pub mod builtin;
|
||||
pub mod char_filter;
|
||||
pub mod clone;
|
||||
pub mod combine;
|
||||
|
||||
@@ -1,16 +1,22 @@
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
|
||||
use async_std::io::{Read, ReadExt, Write, WriteExt};
|
||||
use orchid_api_traits::{Decode, Encode};
|
||||
|
||||
pub fn send_msg(write: &mut impl io::Write, msg: &[u8]) -> io::Result<()> {
|
||||
u32::try_from(msg.len()).unwrap().encode(write);
|
||||
write.write_all(msg)?;
|
||||
write.flush()
|
||||
pub async fn send_msg(mut write: Pin<&mut impl Write>, msg: &[u8]) -> io::Result<()> {
|
||||
let mut len_buf = vec![];
|
||||
u32::try_from(msg.len()).unwrap().encode(&mut len_buf);
|
||||
write.write_all(&len_buf).await?;
|
||||
write.write_all(msg).await?;
|
||||
write.flush().await
|
||||
}
|
||||
|
||||
pub fn recv_msg(read: &mut impl io::Read) -> io::Result<Vec<u8>> {
|
||||
let len = u32::decode(read);
|
||||
pub async fn recv_msg(mut read: Pin<&mut impl Read>) -> io::Result<Vec<u8>> {
|
||||
let mut len_buf = [0u8; (u32::BITS / 8) as usize];
|
||||
read.read_exact(&mut len_buf).await?;
|
||||
let len = u32::decode(&mut &len_buf[..]);
|
||||
let mut msg = vec![0u8; len as usize];
|
||||
read.read_exact(&mut msg)?;
|
||||
read.read_exact(&mut msg).await?;
|
||||
Ok(msg)
|
||||
}
|
||||
|
||||
@@ -1,32 +1,34 @@
|
||||
use std::any::Any;
|
||||
use std::cell::RefCell;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{BitAnd, Deref};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{SyncSender, sync_channel};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{mem, thread};
|
||||
|
||||
use async_std::channel::{self, Sender};
|
||||
use async_std::future;
|
||||
use derive_destructure::destructure;
|
||||
use dyn_clone::{DynClone, clone_box};
|
||||
use hashbrown::HashMap;
|
||||
use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request};
|
||||
use trait_set::trait_set;
|
||||
|
||||
pub struct Receipt;
|
||||
impl Receipt {
|
||||
pub fn off_thread(name: String, cb: impl FnOnce() -> Self + Send + 'static) -> Self {
|
||||
thread::Builder::new().name(name).spawn(cb).unwrap();
|
||||
Self
|
||||
}
|
||||
}
|
||||
pub struct Receipt<'a>(PhantomData<&'a mut ()>);
|
||||
|
||||
trait_set! {
|
||||
pub trait SendFn<T: MsgSet> = for<'a> FnMut(&'a [u8], ReqNot<T>) + DynClone + Send + 'static;
|
||||
pub trait SendFn<T: MsgSet> =
|
||||
for<'a> FnMut(&'a [u8], ReqNot<T>) -> Pin<Box<dyn Future<Output = ()> + 'a>>
|
||||
+ DynClone + Send + 'static;
|
||||
pub trait ReqFn<T: MsgSet> =
|
||||
FnMut(RequestHandle<T>, <T::In as Channel>::Req) -> Receipt + DynClone + Send + Sync + 'static;
|
||||
for<'a> FnMut(RequestHandle<T>, <T::In as Channel>::Req) -> Pin<Box<dyn Future<Output = Receipt>>>
|
||||
+ DynClone + Send + Sync + 'static;
|
||||
pub trait NotifFn<T: MsgSet> =
|
||||
for<'a> FnMut(<T::In as Channel>::Notif, ReqNot<T>) + DynClone + Send + Sync + 'static;
|
||||
for<'a> FnMut(<T::In as Channel>::Notif, ReqNot<T>) -> Pin<Box<dyn Future<Output = ()>>>
|
||||
+ DynClone + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
fn get_id(message: &[u8]) -> (u64, &[u8]) {
|
||||
@@ -38,35 +40,44 @@ pub trait ReqHandlish {
|
||||
}
|
||||
|
||||
#[derive(destructure)]
|
||||
pub struct RequestHandle<MS: MsgSet> {
|
||||
pub struct RequestHandle<'a, MS: MsgSet> {
|
||||
defer_drop: RefCell<Vec<Box<dyn Any>>>,
|
||||
fulfilled: AtomicBool,
|
||||
id: u64,
|
||||
_reqlt: PhantomData<&'a mut ()>,
|
||||
parent: ReqNot<MS>,
|
||||
}
|
||||
impl<MS: MsgSet + 'static> RequestHandle<MS> {
|
||||
impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> {
|
||||
fn new(parent: ReqNot<MS>, id: u64) -> Self {
|
||||
Self { defer_drop: RefCell::default(), fulfilled: false.into(), parent, id }
|
||||
Self {
|
||||
defer_drop: RefCell::default(),
|
||||
fulfilled: false.into(),
|
||||
_reqlt: PhantomData,
|
||||
parent,
|
||||
id,
|
||||
}
|
||||
}
|
||||
pub fn reqnot(&self) -> ReqNot<MS> { self.parent.clone() }
|
||||
pub fn handle<U: Request>(&self, _: &U, rep: &U::Response) -> Receipt { self.respond(rep) }
|
||||
pub fn will_handle_as<U: Request>(&self, _: &U) -> ReqTypToken<U> { ReqTypToken(PhantomData) }
|
||||
pub fn handle_as<U: Request>(&self, _: ReqTypToken<U>, rep: &U::Response) -> Receipt {
|
||||
self.respond(rep)
|
||||
pub async fn handle<U: Request>(&self, _: &U, rep: &U::Response) -> Receipt<'a> {
|
||||
self.respond(rep).await
|
||||
}
|
||||
pub fn respond(&self, response: &impl Encode) -> Receipt {
|
||||
pub fn will_handle_as<U: Request>(&self, _: &U) -> ReqTypToken<U> { ReqTypToken(PhantomData) }
|
||||
pub async fn handle_as<U: Request>(&self, _: ReqTypToken<U>, rep: &U::Response) -> Receipt<'a> {
|
||||
self.respond(rep).await
|
||||
}
|
||||
pub async fn respond(&self, response: &impl Encode) -> Receipt<'a> {
|
||||
assert!(!self.fulfilled.swap(true, Ordering::Relaxed), "Already responded to {}", self.id);
|
||||
let mut buf = (!self.id).to_be_bytes().to_vec();
|
||||
response.encode(&mut buf);
|
||||
let mut send = clone_box(&*self.reqnot().0.lock().unwrap().send);
|
||||
(send)(&buf, self.parent.clone());
|
||||
Receipt
|
||||
(send)(&buf, self.parent.clone()).await;
|
||||
Receipt(PhantomData)
|
||||
}
|
||||
}
|
||||
impl<MS: MsgSet> ReqHandlish for RequestHandle<MS> {
|
||||
impl<MS: MsgSet> ReqHandlish for RequestHandle<'_, MS> {
|
||||
fn defer_drop(&self, val: impl Any) { self.defer_drop.borrow_mut().push(Box::new(val)) }
|
||||
}
|
||||
impl<MS: MsgSet> Drop for RequestHandle<MS> {
|
||||
impl<MS: MsgSet> Drop for RequestHandle<'_, MS> {
|
||||
fn drop(&mut self) {
|
||||
let done = self.fulfilled.load(Ordering::Relaxed);
|
||||
debug_assert!(done, "Request {} dropped without response", self.id)
|
||||
@@ -80,7 +91,7 @@ pub struct ReqNotData<T: MsgSet> {
|
||||
send: Box<dyn SendFn<T>>,
|
||||
notif: Box<dyn NotifFn<T>>,
|
||||
req: Box<dyn ReqFn<T>>,
|
||||
responses: HashMap<u64, SyncSender<Vec<u8>>>,
|
||||
responses: HashMap<u64, channel::Sender<Vec<u8>>>,
|
||||
}
|
||||
|
||||
/// Wraps a raw message buffer to save on copying.
|
||||
@@ -105,44 +116,43 @@ impl<T: MsgSet> ReqNot<T> {
|
||||
}
|
||||
|
||||
/// Can be called from a polling thread or dispatched in any other way
|
||||
pub fn receive(&self, message: &[u8]) {
|
||||
pub async fn receive(&self, message: &[u8]) {
|
||||
let mut g = self.0.lock().unwrap();
|
||||
let (id, payload) = get_id(message);
|
||||
if id == 0 {
|
||||
let mut notif = clone_box(&*g.notif);
|
||||
mem::drop(g);
|
||||
notif(<T::In as Channel>::Notif::decode(&mut &payload[..]), self.clone())
|
||||
notif(<T::In as Channel>::Notif::decode(&mut &payload[..]), self.clone()).await
|
||||
} else if 0 < id.bitand(1 << 63) {
|
||||
let sender = g.responses.remove(&!id).expect("Received response for invalid message");
|
||||
sender.send(message.to_vec()).unwrap();
|
||||
sender.send(message.to_vec()).await.unwrap();
|
||||
} else {
|
||||
let message = <T::In as Channel>::Req::decode(&mut &payload[..]);
|
||||
let mut req = clone_box(&*g.req);
|
||||
mem::drop(g);
|
||||
let rn = self.clone();
|
||||
thread::Builder::new()
|
||||
.name(format!("request {id}"))
|
||||
.spawn(move || req(RequestHandle::new(rn, id), message))
|
||||
.unwrap();
|
||||
req(RequestHandle::new(rn, id), message).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn notify<N: Coding + Into<<T::Out as Channel>::Notif>>(&self, notif: N) {
|
||||
pub async fn notify<N: Coding + Into<<T::Out as Channel>::Notif>>(&self, notif: N) {
|
||||
let mut send = clone_box(&*self.0.lock().unwrap().send);
|
||||
let mut buf = vec![0; 8];
|
||||
let msg: <T::Out as Channel>::Notif = notif.into();
|
||||
msg.encode(&mut buf);
|
||||
send(&buf, self.clone())
|
||||
send(&buf, self.clone()).await
|
||||
}
|
||||
}
|
||||
|
||||
pub trait DynRequester: Send + Sync {
|
||||
type Transfer;
|
||||
/// Encode and send a request, then receive the response buffer.
|
||||
fn raw_request(&self, data: Self::Transfer) -> RawReply;
|
||||
fn raw_request(&self, data: Self::Transfer) -> Pin<Box<dyn Future<Output = RawReply>>>;
|
||||
}
|
||||
|
||||
pub struct MappedRequester<'a, T>(Box<dyn Fn(T) -> RawReply + Send + Sync + 'a>);
|
||||
pub struct MappedRequester<'a, T>(
|
||||
Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = RawReply>>> + Send + Sync + 'a>,
|
||||
);
|
||||
impl<'a, T> MappedRequester<'a, T> {
|
||||
fn new<U: DynRequester + 'a>(req: U) -> Self
|
||||
where T: Into<U::Transfer> {
|
||||
@@ -152,38 +162,43 @@ impl<'a, T> MappedRequester<'a, T> {
|
||||
|
||||
impl<T> DynRequester for MappedRequester<'_, T> {
|
||||
type Transfer = T;
|
||||
fn raw_request(&self, data: Self::Transfer) -> RawReply { self.0(data) }
|
||||
fn raw_request(&self, data: Self::Transfer) -> Pin<Box<dyn Future<Output = RawReply>>> {
|
||||
self.0(data)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: MsgSet> DynRequester for ReqNot<T> {
|
||||
type Transfer = <T::Out as Channel>::Req;
|
||||
fn raw_request(&self, req: Self::Transfer) -> RawReply {
|
||||
fn raw_request(&self, req: Self::Transfer) -> Pin<Box<dyn Future<Output = RawReply>>> {
|
||||
let mut g = self.0.lock().unwrap();
|
||||
let id = g.id;
|
||||
g.id += 1;
|
||||
let mut buf = id.to_be_bytes().to_vec();
|
||||
req.encode(&mut buf);
|
||||
let (send, recv) = sync_channel(1);
|
||||
let (send, recv) = channel::bounded(1);
|
||||
g.responses.insert(id, send);
|
||||
let mut send = clone_box(&*g.send);
|
||||
mem::drop(g);
|
||||
send(&buf, self.clone());
|
||||
RawReply(recv.recv().unwrap())
|
||||
let rn = self.clone();
|
||||
Box::pin(async move {
|
||||
send(&buf, rn).await;
|
||||
RawReply(recv.recv().await.unwrap())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Requester: DynRequester {
|
||||
#[must_use = "These types are subject to change with protocol versions. \
|
||||
If you don't want to use the return value, At a minimum, force the type."]
|
||||
fn request<R: Request + Into<Self::Transfer>>(&self, data: R) -> R::Response;
|
||||
async fn request<R: Request + Into<Self::Transfer>>(&self, data: R) -> R::Response;
|
||||
fn map<'a, U: Into<Self::Transfer>>(self) -> MappedRequester<'a, U>
|
||||
where Self: Sized + 'a {
|
||||
MappedRequester::new(self)
|
||||
}
|
||||
}
|
||||
impl<This: DynRequester + ?Sized> Requester for This {
|
||||
fn request<R: Request + Into<Self::Transfer>>(&self, data: R) -> R::Response {
|
||||
R::Response::decode(&mut &self.raw_request(data.into())[..])
|
||||
async fn request<R: Request + Into<Self::Transfer>>(&self, data: R) -> R::Response {
|
||||
R::Response::decode(&mut &self.raw_request(data.into()).await[..])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +210,7 @@ impl<T: MsgSet> Clone for ReqNot<T> {
|
||||
mod test {
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_std::future;
|
||||
use orchid_api_derive::Coding;
|
||||
use orchid_api_traits::{Channel, Request};
|
||||
|
||||
@@ -225,11 +241,16 @@ mod test {
|
||||
let received = Arc::new(Mutex::new(None));
|
||||
let receiver = ReqNot::<TestMsgSet>::new(
|
||||
|_, _| panic!("Should not send anything"),
|
||||
clone!(received; move |notif, _| *received.lock().unwrap() = Some(notif)),
|
||||
clone!(received; move |notif, _| {
|
||||
*received.lock().unwrap() = Some(notif);
|
||||
Box::pin(future::ready(()))
|
||||
}),
|
||||
|_, _| panic!("Not receiving a request"),
|
||||
);
|
||||
let sender = ReqNot::<TestMsgSet>::new(
|
||||
clone!(receiver; move |d, _| receiver.receive(d)),
|
||||
clone!(receiver; move |d, _| Box::pin(async {
|
||||
receiver.receive(d).await
|
||||
})),
|
||||
|_, _| panic!("Should not receive notif"),
|
||||
|_, _| panic!("Should not receive request"),
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user