|
|
|
|
@@ -1,20 +1,28 @@
|
|
|
|
|
use std::any::TypeId;
|
|
|
|
|
use std::cell::RefCell;
|
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
|
use std::future::Future;
|
|
|
|
|
use std::marker::PhantomData;
|
|
|
|
|
use std::mem;
|
|
|
|
|
use std::ops::{BitAnd, Deref};
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::pin::{Pin, pin};
|
|
|
|
|
use std::rc::Rc;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::task::Poll;
|
|
|
|
|
use std::thread::panicking;
|
|
|
|
|
|
|
|
|
|
use async_fn_stream::stream;
|
|
|
|
|
use bound::Bound;
|
|
|
|
|
use derive_destructure::destructure;
|
|
|
|
|
use dyn_clone::{DynClone, clone_box};
|
|
|
|
|
use futures::channel::mpsc;
|
|
|
|
|
use futures::future::LocalBoxFuture;
|
|
|
|
|
use futures::lock::Mutex;
|
|
|
|
|
use futures::{SinkExt, StreamExt};
|
|
|
|
|
use futures::channel::mpsc::{self, Receiver, Sender, channel};
|
|
|
|
|
use futures::channel::oneshot;
|
|
|
|
|
use futures::future::{LocalBoxFuture, select_all};
|
|
|
|
|
use futures::lock::{Mutex, MutexGuard};
|
|
|
|
|
use futures::{AsyncRead, AsyncWrite, SinkExt, Stream, StreamExt, stream, stream_select};
|
|
|
|
|
use hashbrown::HashMap;
|
|
|
|
|
use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request, enc_vec};
|
|
|
|
|
use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request, UnderRoot, enc_vec};
|
|
|
|
|
use some_executor::task_local;
|
|
|
|
|
use trait_set::trait_set;
|
|
|
|
|
|
|
|
|
|
use crate::clone;
|
|
|
|
|
@@ -22,12 +30,347 @@ use crate::logging::Logger;
|
|
|
|
|
|
|
|
|
|
pub struct Receipt<'a>(PhantomData<&'a mut ()>);
|
|
|
|
|
|
|
|
|
|
/// Write guard to outbound for the purpose of serializing a request. Only one
|
|
|
|
|
/// can exist at a time. Dropping this object should panic.
|
|
|
|
|
pub trait ReqWriter {
|
|
|
|
|
/// Access to the underlying channel. This may be buffered.
|
|
|
|
|
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>;
|
|
|
|
|
/// Finalize the request, release the outbound channel, then queue for the
|
|
|
|
|
/// reply on the inbound channel.
|
|
|
|
|
fn send(self: Box<Self>) -> LocalBoxFuture<'static, Box<dyn RepReader>>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Write guard to inbound for the purpose of deserializing a reply. While held,
|
|
|
|
|
/// no inbound requests or other replies can be processed.
|
|
|
|
|
///
|
|
|
|
|
/// Dropping this object should panic even if [RepReader::finish] returns
|
|
|
|
|
/// synchronously, because the API isn't cancellation safe in general so it is a
|
|
|
|
|
/// programmer error in all cases to drop an object related to it without proper
|
|
|
|
|
/// cleanup.
|
|
|
|
|
pub trait RepReader {
|
|
|
|
|
/// Access to the underlying channel. The length of the message is inferred
|
|
|
|
|
/// from the number of bytes read so this must not be buffered.
|
|
|
|
|
fn reader(&mut self) -> Pin<&mut dyn AsyncRead>;
|
|
|
|
|
/// Finish reading the request
|
|
|
|
|
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Write guard to outbound for the purpose of serializing a notification.
|
|
|
|
|
///
|
|
|
|
|
/// Dropping this object should panic for the same reason [RepReader] panics
|
|
|
|
|
pub trait MsgWriter {
|
|
|
|
|
/// Access to the underlying channel. This may be buffered.
|
|
|
|
|
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>;
|
|
|
|
|
/// Send the notification
|
|
|
|
|
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// For initiating outbound requests and notifications
|
|
|
|
|
pub trait Client {
|
|
|
|
|
fn root_req_tid(&self) -> TypeId;
|
|
|
|
|
fn root_notif_tid(&self) -> TypeId;
|
|
|
|
|
fn start_request(&self) -> LocalBoxFuture<'_, Box<dyn ReqWriter>>;
|
|
|
|
|
fn start_notif(&self) -> LocalBoxFuture<'_, Box<dyn MsgWriter>>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Extension trait with convenience methods that handle outbound request and
|
|
|
|
|
/// notif lifecycle and typing
|
|
|
|
|
#[allow(async_fn_in_trait)]
|
|
|
|
|
pub trait ClientExt: Client {
|
|
|
|
|
async fn request<T: Request + UnderRoot<Root: Encode>>(&self, t: T) -> T::Response {
|
|
|
|
|
assert_eq!(TypeId::of::<<T as UnderRoot>::Root>(), self.root_req_tid());
|
|
|
|
|
let mut req = self.start_request().await;
|
|
|
|
|
t.into_root().encode(req.writer().as_mut()).await;
|
|
|
|
|
let mut rep = req.send().await;
|
|
|
|
|
let response = T::Response::decode(rep.reader()).await;
|
|
|
|
|
rep.finish().await;
|
|
|
|
|
response
|
|
|
|
|
}
|
|
|
|
|
async fn notify<T: UnderRoot<Root: Encode> + 'static>(&self, t: T) {
|
|
|
|
|
assert_eq!(TypeId::of::<<T as UnderRoot>::Root>(), self.root_notif_tid());
|
|
|
|
|
let mut notif = self.start_notif().await;
|
|
|
|
|
t.into_root().encode(notif.writer().as_mut()).await;
|
|
|
|
|
notif.finish().await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl<T: Client + ?Sized> ClientExt for T {}
|
|
|
|
|
|
|
|
|
|
/// A form of [Evidence] that doesn't require the value to be kept around
|
|
|
|
|
pub struct Witness<T>(PhantomData<T>);
|
|
|
|
|
impl<T> Witness<T> {
|
|
|
|
|
fn of(t: &T) -> Self { Self(PhantomData) }
|
|
|
|
|
}
|
|
|
|
|
impl<T> Copy for Witness<T> {}
|
|
|
|
|
impl<T> Clone for Witness<T> {
|
|
|
|
|
fn clone(&self) -> Self { Self(PhantomData) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A proxy for the type of a value either previously saved into a [Witness] or
|
|
|
|
|
/// still available.
|
|
|
|
|
pub trait Evidence<T> {}
|
|
|
|
|
impl<T> Evidence<T> for &'_ T {}
|
|
|
|
|
impl<T> Evidence<T> for Witness<T> {}
|
|
|
|
|
|
|
|
|
|
type IoRef<T> = Pin<Box<T>>;
|
|
|
|
|
type IoLock<T> = Rc<Mutex<Pin<Box<T>>>>;
|
|
|
|
|
type IoGuard<T> = Bound<MutexGuard<'static, Pin<Box<T>>>, IoLock<T>>;
|
|
|
|
|
|
|
|
|
|
/// An incoming request. This holds a lock on the ingress channel.
|
|
|
|
|
pub struct ReqReader<'a> {
|
|
|
|
|
id: u64,
|
|
|
|
|
read: MutexGuard<'a, IoRef<dyn AsyncRead>>,
|
|
|
|
|
write: &'a Mutex<IoRef<dyn AsyncWrite>>,
|
|
|
|
|
}
|
|
|
|
|
impl<'a> ReqReader<'a> {
|
|
|
|
|
/// Access
|
|
|
|
|
pub fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() }
|
|
|
|
|
pub async fn read_req<R: Decode>(&mut self) -> R { R::decode(self.reader()).await }
|
|
|
|
|
pub async fn start_reply(self) -> RepWriter<'a> { self.branch().await.start_reply().await }
|
|
|
|
|
pub async fn reply<R: Request>(self, req: impl Evidence<R>, rep: &R::Response) -> Receipt<'a> {
|
|
|
|
|
self.branch().await.reply(req, rep).await
|
|
|
|
|
}
|
|
|
|
|
pub async fn branch(self) -> ReqHandle<'a> { ReqHandle { id: self.id, write: self.write } }
|
|
|
|
|
}
|
|
|
|
|
pub struct ReqHandle<'a> {
|
|
|
|
|
id: u64,
|
|
|
|
|
write: &'a Mutex<IoRef<dyn AsyncWrite>>,
|
|
|
|
|
}
|
|
|
|
|
impl<'a> ReqHandle<'a> {
|
|
|
|
|
pub async fn reply<Req: Request>(
|
|
|
|
|
self,
|
|
|
|
|
_: impl Evidence<Req>,
|
|
|
|
|
rep: &Req::Response,
|
|
|
|
|
) -> Receipt<'a> {
|
|
|
|
|
let mut reply = self.start_reply().await;
|
|
|
|
|
rep.encode(reply.writer()).await;
|
|
|
|
|
reply.send().await
|
|
|
|
|
}
|
|
|
|
|
pub async fn start_reply(self) -> RepWriter<'a> {
|
|
|
|
|
let mut write = self.write.lock().await;
|
|
|
|
|
(!self.id).encode(write.as_mut()).await;
|
|
|
|
|
RepWriter { write }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pub struct RepWriter<'a> {
|
|
|
|
|
write: MutexGuard<'a, IoRef<dyn AsyncWrite>>,
|
|
|
|
|
}
|
|
|
|
|
impl<'a> RepWriter<'a> {
|
|
|
|
|
pub fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.write.as_mut() }
|
|
|
|
|
pub async fn send(self) -> Receipt<'a> { Receipt(PhantomData) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct NotifReader<'a> {
|
|
|
|
|
read: MutexGuard<'a, IoRef<dyn AsyncRead>>,
|
|
|
|
|
}
|
|
|
|
|
impl<'a> NotifReader<'a> {
|
|
|
|
|
pub fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.read.as_mut() }
|
|
|
|
|
pub async fn read<N: Decode>(mut self) -> N {
|
|
|
|
|
let n = N::decode(self.reader()).await;
|
|
|
|
|
self.release().await;
|
|
|
|
|
n
|
|
|
|
|
}
|
|
|
|
|
pub async fn release(self) {}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct IO {
|
|
|
|
|
i: IoLock<dyn AsyncRead>,
|
|
|
|
|
o: IoLock<dyn AsyncWrite>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct ReplySub {
|
|
|
|
|
id: u64,
|
|
|
|
|
ack: oneshot::Sender<()>,
|
|
|
|
|
cb: oneshot::Sender<IoGuard<dyn AsyncRead>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct IoClient {
|
|
|
|
|
output: IoLock<dyn AsyncWrite>,
|
|
|
|
|
id: Rc<RefCell<u64>>,
|
|
|
|
|
subscribe: Rc<Sender<ReplySub>>,
|
|
|
|
|
req_tid: TypeId,
|
|
|
|
|
notif_tid: TypeId,
|
|
|
|
|
}
|
|
|
|
|
impl IoClient {
|
|
|
|
|
pub async fn new<Req: 'static, Not: 'static>(
|
|
|
|
|
output: IoLock<dyn AsyncWrite>,
|
|
|
|
|
) -> (Receiver<ReplySub>, Self) {
|
|
|
|
|
let (req, rep) = mpsc::channel(0);
|
|
|
|
|
(rep, Self {
|
|
|
|
|
output,
|
|
|
|
|
id: Rc::new(RefCell::new(0)),
|
|
|
|
|
req_tid: TypeId::of::<Req>(),
|
|
|
|
|
notif_tid: TypeId::of::<Not>(),
|
|
|
|
|
subscribe: Rc::new(req),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
async fn lock_out(&self) -> IoGuard<dyn AsyncWrite> {
|
|
|
|
|
Bound::async_new(self.output.clone(), async |o| o.lock().await).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl Client for IoClient {
|
|
|
|
|
fn root_notif_tid(&self) -> TypeId { self.notif_tid }
|
|
|
|
|
fn root_req_tid(&self) -> TypeId { self.req_tid }
|
|
|
|
|
fn start_notif(&self) -> LocalBoxFuture<'_, Box<dyn MsgWriter>> {
|
|
|
|
|
Box::pin(async {
|
|
|
|
|
let mut o = self.lock_out().await;
|
|
|
|
|
0u64.encode(o.as_mut()).await;
|
|
|
|
|
Box::new(IoNotifWriter { o }) as Box<dyn MsgWriter>
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
fn start_request(&self) -> LocalBoxFuture<'_, Box<dyn ReqWriter>> {
|
|
|
|
|
Box::pin(async {
|
|
|
|
|
let id = {
|
|
|
|
|
let mut id_g = self.id.borrow_mut();
|
|
|
|
|
*id_g += 1;
|
|
|
|
|
*id_g
|
|
|
|
|
};
|
|
|
|
|
let (cb, reply) = oneshot::channel();
|
|
|
|
|
let (ack, got_ack) = oneshot::channel();
|
|
|
|
|
self.subscribe.as_ref().clone().send(ReplySub { id, ack, cb }).await;
|
|
|
|
|
got_ack.await;
|
|
|
|
|
let mut w = self.lock_out().await;
|
|
|
|
|
id.encode(w.as_mut()).await;
|
|
|
|
|
Box::new(IoReqWriter { reply, w }) as Box<dyn ReqWriter>
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct IoReqWriter {
|
|
|
|
|
reply: oneshot::Receiver<IoGuard<dyn AsyncRead>>,
|
|
|
|
|
w: IoGuard<dyn AsyncWrite>,
|
|
|
|
|
}
|
|
|
|
|
impl ReqWriter for IoReqWriter {
|
|
|
|
|
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.w.as_mut() }
|
|
|
|
|
fn send(self: Box<Self>) -> LocalBoxFuture<'static, Box<dyn RepReader>> {
|
|
|
|
|
Box::pin(async {
|
|
|
|
|
let Self { reply, .. } = *self;
|
|
|
|
|
let i = reply.await.expect("Client dropped before reply received");
|
|
|
|
|
Box::new(IoRepReader { i }) as Box<dyn RepReader>
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct IoRepReader {
|
|
|
|
|
i: IoGuard<dyn AsyncRead>,
|
|
|
|
|
}
|
|
|
|
|
impl RepReader for IoRepReader {
|
|
|
|
|
fn reader(&mut self) -> Pin<&mut dyn AsyncRead> { self.i.as_mut() }
|
|
|
|
|
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()> { Box::pin(async {}) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(destructure)]
|
|
|
|
|
struct IoNotifWriter {
|
|
|
|
|
o: IoGuard<dyn AsyncWrite>,
|
|
|
|
|
}
|
|
|
|
|
impl MsgWriter for IoNotifWriter {
|
|
|
|
|
fn writer(&mut self) -> Pin<&mut dyn AsyncWrite> { self.o.as_mut() }
|
|
|
|
|
fn finish(self: Box<Self>) -> LocalBoxFuture<'static, ()> {
|
|
|
|
|
self.destructure();
|
|
|
|
|
Box::pin(async {})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct CommCtx {
|
|
|
|
|
quit: Sender<()>,
|
|
|
|
|
client: Rc<IoClient>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl CommCtx {
|
|
|
|
|
pub async fn quit(self) { self.quit.clone().send(()).await.expect("quit channel dropped"); }
|
|
|
|
|
pub fn client(&self) -> Rc<dyn Client> { self.client.clone() as Rc<dyn Client> }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn io_comm<Req: 'static, Not: 'static>(
|
|
|
|
|
o: Rc<Mutex<Pin<Box<dyn AsyncWrite>>>>,
|
|
|
|
|
i: Mutex<Pin<Box<dyn AsyncRead>>>,
|
|
|
|
|
notif: impl for<'a> AsyncFn(&mut CommCtx, NotifReader<'a>),
|
|
|
|
|
req: impl for<'a> AsyncFn(&mut CommCtx, ReqReader<'a>),
|
|
|
|
|
) {
|
|
|
|
|
let i = Rc::new(i);
|
|
|
|
|
let (onsub, client) = IoClient::new::<Req, Not>(o.clone()).await;
|
|
|
|
|
let client = Rc::new(client);
|
|
|
|
|
let (exit, onexit) = channel(1);
|
|
|
|
|
enum Event {
|
|
|
|
|
Input(u64),
|
|
|
|
|
Sub(ReplySub),
|
|
|
|
|
Exit,
|
|
|
|
|
}
|
|
|
|
|
let exiting = RefCell::new(false);
|
|
|
|
|
let input_stream = stream(async |mut h| {
|
|
|
|
|
loop {
|
|
|
|
|
let id = u64::decode(i.lock().await.as_mut()).await;
|
|
|
|
|
h.emit(Event::Input(id)).await;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
let pending_reqs = RefCell::new(VecDeque::<LocalBoxFuture<()>>::new());
|
|
|
|
|
// this stream will never yield a value
|
|
|
|
|
let mut fork_stream = pin!(
|
|
|
|
|
stream::poll_fn(|cx| {
|
|
|
|
|
let mut reqs_g = pending_reqs.borrow_mut();
|
|
|
|
|
reqs_g.retain_mut(|req| match req.as_mut().poll(cx) {
|
|
|
|
|
Poll::Pending => true,
|
|
|
|
|
Poll::Ready(()) => false,
|
|
|
|
|
});
|
|
|
|
|
if *exiting.borrow() { Poll::Ready(None) } else { Poll::Pending }
|
|
|
|
|
})
|
|
|
|
|
.fuse()
|
|
|
|
|
);
|
|
|
|
|
{
|
|
|
|
|
let mut shared = pin!(stream_select!(
|
|
|
|
|
Box::pin(input_stream) as Pin<Box<dyn Stream<Item = Event>>>,
|
|
|
|
|
Box::pin(onsub.map(Event::Sub)) as Pin<Box<dyn Stream<Item = Event>>>,
|
|
|
|
|
Box::pin(fork_stream.as_mut()) as Pin<Box<dyn Stream<Item = Event>>>,
|
|
|
|
|
Box::pin(onexit.map(|()| Event::Exit)) as Pin<Box<dyn Stream<Item = Event>>>,
|
|
|
|
|
));
|
|
|
|
|
let mut pending_replies = HashMap::new();
|
|
|
|
|
while let Some(next) = shared.next().await {
|
|
|
|
|
match next {
|
|
|
|
|
Event::Exit => {
|
|
|
|
|
*exiting.borrow_mut() = true;
|
|
|
|
|
break;
|
|
|
|
|
},
|
|
|
|
|
Event::Sub(ReplySub { id, ack, cb }) => {
|
|
|
|
|
pending_replies.insert(id, cb);
|
|
|
|
|
ack.send(());
|
|
|
|
|
},
|
|
|
|
|
Event::Input(id) if id == 0 => {
|
|
|
|
|
let (i, notif, exit, client) = (i.clone(), ¬if, exit.clone(), client.clone());
|
|
|
|
|
pending_reqs.borrow_mut().push_back(Box::pin(async move {
|
|
|
|
|
let g = i.lock().await;
|
|
|
|
|
notif(&mut CommCtx { client, quit: exit.clone() }, NotifReader { read: g }).await
|
|
|
|
|
}));
|
|
|
|
|
},
|
|
|
|
|
// id.msb == 0 is a request, !id where id.msb == 1 is the equivalent response
|
|
|
|
|
Event::Input(id) =>
|
|
|
|
|
if (id & (1 << (u64::BITS - 1))) == 0 {
|
|
|
|
|
let (i, o, req, exit, client) =
|
|
|
|
|
(i.clone(), o.clone(), &req, exit.clone(), client.clone());
|
|
|
|
|
pending_reqs.borrow_mut().push_back(Box::pin(async move {
|
|
|
|
|
let g = i.lock().await;
|
|
|
|
|
req(&mut CommCtx { client, quit: exit.clone() }, ReqReader {
|
|
|
|
|
id,
|
|
|
|
|
read: g,
|
|
|
|
|
write: &*o,
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
}) as LocalBoxFuture<()>);
|
|
|
|
|
} else {
|
|
|
|
|
let cb = pending_replies.remove(&!id).expect("Reply to unrecognized request");
|
|
|
|
|
let _ = cb.send(Bound::async_new(i.clone(), |i| i.lock()).await);
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fork_stream.as_mut().count().await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait_set! {
|
|
|
|
|
pub trait SendFn<T: MsgSet> =
|
|
|
|
|
for<'a> FnMut(&'a [u8], ReqNot<T>) -> LocalBoxFuture<'a, ()>
|
|
|
|
|
+ DynClone + 'static;
|
|
|
|
|
pub trait ReqFn<T: MsgSet> =
|
|
|
|
|
for<'a> FnMut(RequestHandle<'a, T>, <T::In as Channel>::Req)
|
|
|
|
|
for<'a> FnMut(ReqReader<'a, T>, <T::In as Channel>::Req)
|
|
|
|
|
-> LocalBoxFuture<'a, Receipt<'a>>
|
|
|
|
|
+ DynClone + 'static;
|
|
|
|
|
pub trait NotifFn<T: MsgSet> =
|
|
|
|
|
@@ -59,7 +402,7 @@ pub struct RequestHandle<'a, MS: MsgSet> {
|
|
|
|
|
parent: ReqNot<MS>,
|
|
|
|
|
raw_reply: RefCell<Option<LocalAsyncFnOnceBox>>,
|
|
|
|
|
}
|
|
|
|
|
impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> {
|
|
|
|
|
impl<'a, MS: MsgSet + 'static> ReqReader<'a, MS> {
|
|
|
|
|
pub fn new(parent: ReqNot<MS>, raw_reply: impl AsyncFnOnce(Vec<u8>) + 'static) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
defer: RefCell::default(),
|
|
|
|
|
@@ -87,12 +430,12 @@ impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> {
|
|
|
|
|
Receipt(PhantomData)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl<MS: MsgSet> ReqHandlish for RequestHandle<'_, MS> {
|
|
|
|
|
impl<MS: MsgSet> ReqHandlish for ReqReader<'_, MS> {
|
|
|
|
|
fn defer_objsafe(&self, val: Pin<Box<dyn Future<Output = ()>>>) {
|
|
|
|
|
self.defer.borrow_mut().push(val)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl<MS: MsgSet> Drop for RequestHandle<'_, MS> {
|
|
|
|
|
impl<MS: MsgSet> Drop for ReqReader<'_, MS> {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
if !panicking() {
|
|
|
|
|
debug_assert!(self.raw_reply.borrow().is_none(), "Request dropped without response")
|
|
|
|
|
@@ -158,7 +501,7 @@ impl<T: MsgSet> ReqNot<T> {
|
|
|
|
|
let rn = self.clone();
|
|
|
|
|
let rn2 = self.clone();
|
|
|
|
|
req_cb(
|
|
|
|
|
RequestHandle::new(rn, async move |vec| {
|
|
|
|
|
ReqReader::new(rn, async move |vec| {
|
|
|
|
|
let mut buf = (!id).to_be_bytes().to_vec();
|
|
|
|
|
buf.extend(vec);
|
|
|
|
|
let mut send = clone_box(&*rn2.0.lock().await.send);
|
|
|
|
|
|