orchid-base moved to async
This commit is contained in:
@@ -2,32 +2,33 @@ use std::any::Any;
|
||||
use std::cell::RefCell;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use std::ops::{BitAnd, Deref};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{SyncSender, sync_channel};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{mem, thread};
|
||||
|
||||
use async_std::channel::{self, Sender};
|
||||
use async_std::future;
|
||||
use async_std::channel;
|
||||
use async_std::sync::Mutex;
|
||||
use derive_destructure::destructure;
|
||||
use dyn_clone::{DynClone, clone_box};
|
||||
use futures::future::LocalBoxFuture;
|
||||
use hashbrown::HashMap;
|
||||
use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request};
|
||||
use trait_set::trait_set;
|
||||
|
||||
use crate::clone;
|
||||
|
||||
pub struct Receipt<'a>(PhantomData<&'a mut ()>);
|
||||
|
||||
trait_set! {
|
||||
pub trait SendFn<T: MsgSet> =
|
||||
for<'a> FnMut(&'a [u8], ReqNot<T>) -> Pin<Box<dyn Future<Output = ()> + 'a>>
|
||||
for<'a> FnMut(&'a [u8], ReqNot<T>) -> LocalBoxFuture<'a, ()>
|
||||
+ DynClone + Send + 'static;
|
||||
pub trait ReqFn<T: MsgSet> =
|
||||
for<'a> FnMut(RequestHandle<'a, T>, <T::In as Channel>::Req) -> Pin<Box<dyn Future<Output = Receipt<'a>>>>
|
||||
for<'a> FnMut(RequestHandle<'a, T>, <T::In as Channel>::Req) -> LocalBoxFuture<'a, Receipt<'a>>
|
||||
+ DynClone + Send + Sync + 'static;
|
||||
pub trait NotifFn<T: MsgSet> =
|
||||
for<'a> FnMut(<T::In as Channel>::Notif, ReqNot<T>) -> Pin<Box<dyn Future<Output = ()>>>
|
||||
FnMut(<T::In as Channel>::Notif, ReqNot<T>) -> LocalBoxFuture<'static, ()>
|
||||
+ DynClone + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
@@ -69,7 +70,7 @@ impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> {
|
||||
assert!(!self.fulfilled.swap(true, Ordering::Relaxed), "Already responded to {}", self.id);
|
||||
let mut buf = (!self.id).to_be_bytes().to_vec();
|
||||
response.encode(&mut buf);
|
||||
let mut send = clone_box(&*self.reqnot().0.lock().unwrap().send);
|
||||
let mut send = clone_box(&*self.reqnot().0.lock().await.send);
|
||||
(send)(&buf, self.parent.clone()).await;
|
||||
Receipt(PhantomData)
|
||||
}
|
||||
@@ -117,7 +118,7 @@ impl<T: MsgSet> ReqNot<T> {
|
||||
|
||||
/// Can be called from a polling thread or dispatched in any other way
|
||||
pub async fn receive(&self, message: &[u8]) {
|
||||
let mut g = self.0.lock().unwrap();
|
||||
let mut g = self.0.lock().await;
|
||||
let (id, payload) = get_id(message);
|
||||
if id == 0 {
|
||||
let mut notif = clone_box(&*g.notif);
|
||||
@@ -136,7 +137,7 @@ impl<T: MsgSet> ReqNot<T> {
|
||||
}
|
||||
|
||||
pub async fn notify<N: Coding + Into<<T::Out as Channel>::Notif>>(&self, notif: N) {
|
||||
let mut send = clone_box(&*self.0.lock().unwrap().send);
|
||||
let mut send = clone_box(&*self.0.lock().await.send);
|
||||
let mut buf = vec![0; 8];
|
||||
let msg: <T::Out as Channel>::Notif = notif.into();
|
||||
msg.encode(&mut buf);
|
||||
@@ -147,40 +148,41 @@ impl<T: MsgSet> ReqNot<T> {
|
||||
pub trait DynRequester: Send + Sync {
|
||||
type Transfer;
|
||||
/// Encode and send a request, then receive the response buffer.
|
||||
fn raw_request(&self, data: Self::Transfer) -> Pin<Box<dyn Future<Output = RawReply>>>;
|
||||
fn raw_request(&self, data: Self::Transfer) -> LocalBoxFuture<'_, RawReply>;
|
||||
}
|
||||
|
||||
pub struct MappedRequester<'a, T>(
|
||||
Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = RawReply>>> + Send + Sync + 'a>,
|
||||
pub struct MappedRequester<'a, T: 'a>(
|
||||
Box<dyn Fn(T) -> LocalBoxFuture<'a, RawReply> + Send + Sync + 'a>,
|
||||
);
|
||||
impl<'a, T> MappedRequester<'a, T> {
|
||||
fn new<U: DynRequester + 'a>(req: U) -> Self
|
||||
where T: Into<U::Transfer> {
|
||||
MappedRequester(Box::new(move |t| req.raw_request(t.into())))
|
||||
let req_arc = Arc::new(req);
|
||||
MappedRequester(Box::new(move |t| {
|
||||
Box::pin(clone!(req_arc; async move { req_arc.raw_request(t.into()).await}))
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DynRequester for MappedRequester<'_, T> {
|
||||
type Transfer = T;
|
||||
fn raw_request(&self, data: Self::Transfer) -> Pin<Box<dyn Future<Output = RawReply>>> {
|
||||
self.0(data)
|
||||
}
|
||||
fn raw_request(&self, data: Self::Transfer) -> LocalBoxFuture<'_, RawReply> { self.0(data) }
|
||||
}
|
||||
|
||||
impl<T: MsgSet> DynRequester for ReqNot<T> {
|
||||
type Transfer = <T::Out as Channel>::Req;
|
||||
fn raw_request(&self, req: Self::Transfer) -> Pin<Box<dyn Future<Output = RawReply>>> {
|
||||
let mut g = self.0.lock().unwrap();
|
||||
let id = g.id;
|
||||
g.id += 1;
|
||||
let mut buf = id.to_be_bytes().to_vec();
|
||||
req.encode(&mut buf);
|
||||
let (send, recv) = channel::bounded(1);
|
||||
g.responses.insert(id, send);
|
||||
let mut send = clone_box(&*g.send);
|
||||
mem::drop(g);
|
||||
let rn = self.clone();
|
||||
fn raw_request(&self, req: Self::Transfer) -> LocalBoxFuture<'_, RawReply> {
|
||||
Box::pin(async move {
|
||||
let mut g = self.0.lock().await;
|
||||
let id = g.id;
|
||||
g.id += 1;
|
||||
let mut buf = id.to_be_bytes().to_vec();
|
||||
req.encode(&mut buf);
|
||||
let (send, recv) = channel::bounded(1);
|
||||
g.responses.insert(id, send);
|
||||
let mut send = clone_box(&*g.send);
|
||||
mem::drop(g);
|
||||
let rn = self.clone();
|
||||
send(&buf, rn).await;
|
||||
RawReply(recv.recv().await.unwrap())
|
||||
})
|
||||
@@ -190,7 +192,10 @@ impl<T: MsgSet> DynRequester for ReqNot<T> {
|
||||
pub trait Requester: DynRequester {
|
||||
#[must_use = "These types are subject to change with protocol versions. \
|
||||
If you don't want to use the return value, At a minimum, force the type."]
|
||||
async fn request<R: Request + Into<Self::Transfer>>(&self, data: R) -> R::Response;
|
||||
fn request<R: Request + Into<Self::Transfer>>(
|
||||
&self,
|
||||
data: R,
|
||||
) -> impl Future<Output = R::Response>;
|
||||
fn map<'a, U: Into<Self::Transfer>>(self) -> MappedRequester<'a, U>
|
||||
where Self: Sized + 'a {
|
||||
MappedRequester::new(self)
|
||||
@@ -208,9 +213,10 @@ impl<T: MsgSet> Clone for ReqNot<T> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_std::future;
|
||||
use async_std::sync::Mutex;
|
||||
use futures::FutureExt;
|
||||
use orchid_api_derive::Coding;
|
||||
use orchid_api_traits::{Channel, Request};
|
||||
use test_executors::spin_on;
|
||||
@@ -239,49 +245,52 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn notification() {
|
||||
let received = Arc::new(Mutex::new(None));
|
||||
let receiver = ReqNot::<TestMsgSet>::new(
|
||||
|_, _| panic!("Should not send anything"),
|
||||
clone!(received; move |notif, _| {
|
||||
*received.lock().unwrap() = Some(notif);
|
||||
Box::pin(future::ready(()))
|
||||
}),
|
||||
|_, _| panic!("Not receiving a request"),
|
||||
);
|
||||
let sender = ReqNot::<TestMsgSet>::new(
|
||||
clone!(receiver; move |d, _| clone!(receiver; Box::pin(async move {
|
||||
receiver.receive(d).await
|
||||
}))),
|
||||
|_, _| panic!("Should not receive notif"),
|
||||
|_, _| panic!("Should not receive request"),
|
||||
);
|
||||
sender.notify(3);
|
||||
assert_eq!(*received.lock().unwrap(), Some(3));
|
||||
sender.notify(4);
|
||||
assert_eq!(*received.lock().unwrap(), Some(4));
|
||||
spin_on(async {
|
||||
let received = Arc::new(Mutex::new(None));
|
||||
let receiver = ReqNot::<TestMsgSet>::new(
|
||||
|_, _| panic!("Should not send anything"),
|
||||
clone!(received; move |notif, _| clone!(received; async move {
|
||||
*received.lock().await = Some(notif);
|
||||
}.boxed_local())),
|
||||
|_, _| panic!("Not receiving a request"),
|
||||
);
|
||||
let sender = ReqNot::<TestMsgSet>::new(
|
||||
clone!(receiver; move |d, _| clone!(receiver; Box::pin(async move {
|
||||
receiver.receive(d).await
|
||||
}))),
|
||||
|_, _| panic!("Should not receive notif"),
|
||||
|_, _| panic!("Should not receive request"),
|
||||
);
|
||||
sender.notify(3).await;
|
||||
assert_eq!(*received.lock().await, Some(3));
|
||||
sender.notify(4).await;
|
||||
assert_eq!(*received.lock().await, Some(4));
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn request() {
|
||||
let receiver = Arc::new(Mutex::<Option<ReqNot<TestMsgSet>>>::new(None));
|
||||
let sender = Arc::new(ReqNot::<TestMsgSet>::new(
|
||||
clone!(receiver; move |d, _| clone!(receiver; Box::pin(async move {
|
||||
receiver.lock().unwrap().as_ref().unwrap().receive(d).await
|
||||
}))),
|
||||
|_, _| panic!("Should not receive notif"),
|
||||
|_, _| panic!("Should not receive request"),
|
||||
));
|
||||
*receiver.lock().unwrap() = Some(ReqNot::new(
|
||||
clone!(sender; move |d, _| clone!(sender; Box::pin(async move { sender.receive(d).await }))),
|
||||
|_, _| panic!("Not receiving notifs"),
|
||||
|hand, req| {
|
||||
Box::pin(async move {
|
||||
assert_eq!(req, TestReq(5));
|
||||
hand.respond(&6u8).await
|
||||
})
|
||||
},
|
||||
));
|
||||
let response = spin_on(sender.request(TestReq(5)));
|
||||
assert_eq!(response, 6);
|
||||
spin_on(async {
|
||||
let receiver = Arc::new(Mutex::<Option<ReqNot<TestMsgSet>>>::new(None));
|
||||
let sender = Arc::new(ReqNot::<TestMsgSet>::new(
|
||||
clone!(receiver; move |d, _| clone!(receiver; Box::pin(async move {
|
||||
receiver.lock().await.as_ref().unwrap().receive(d).await
|
||||
}))),
|
||||
|_, _| panic!("Should not receive notif"),
|
||||
|_, _| panic!("Should not receive request"),
|
||||
));
|
||||
*receiver.lock().await = Some(ReqNot::new(
|
||||
clone!(sender; move |d, _| clone!(sender; Box::pin(async move { sender.receive(d).await }))),
|
||||
|_, _| panic!("Not receiving notifs"),
|
||||
|hand, req| {
|
||||
Box::pin(async move {
|
||||
assert_eq!(req, TestReq(5));
|
||||
hand.respond(&6u8).await
|
||||
})
|
||||
},
|
||||
));
|
||||
let response = sender.request(TestReq(5)).await;
|
||||
assert_eq!(response, 6);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user