added a new test for comm
This commit is contained in:
@@ -354,7 +354,7 @@ struct ReplySub {
|
||||
cb: oneshot::Sender<ReplyRecord>,
|
||||
}
|
||||
|
||||
struct IoClient {
|
||||
pub struct IoClient {
|
||||
output: IoLock<dyn AsyncWrite>,
|
||||
id: Rc<RefCell<u64>>,
|
||||
subscribe: Rc<Sender<ReplySub>>,
|
||||
@@ -491,30 +491,33 @@ impl MsgWriter for IoNotifWriter {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CommCtx {
|
||||
pub struct CommCx {
|
||||
exit: Sender<()>,
|
||||
}
|
||||
|
||||
impl CommCtx {
|
||||
impl CommCx {
|
||||
pub async fn exit(self) -> io::Result<()> {
|
||||
self.exit.clone().send(()).await.expect("quit channel dropped");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IoComm {
|
||||
pub client: IoClient,
|
||||
pub cx: CommCx,
|
||||
pub srv: IoCommServer,
|
||||
}
|
||||
|
||||
/// Establish bidirectional request-notification communication over a duplex
|
||||
/// channel. The returned [IoClient] can be used for notifications immediately,
|
||||
/// but requests can only be received while the future is running. The future
|
||||
/// will only resolve when [CommCtx::exit] is called.
|
||||
pub fn io_comm(
|
||||
o: Pin<Box<dyn AsyncWrite>>,
|
||||
i: Pin<Box<dyn AsyncRead>>,
|
||||
) -> (impl Client + 'static, CommCtx, IoCommServer) {
|
||||
pub fn io_comm(o: Pin<Box<dyn AsyncWrite>>, i: Pin<Box<dyn AsyncRead>>) -> IoComm {
|
||||
let i = Rc::new(Mutex::new(i));
|
||||
let o = Rc::new(Mutex::new(o));
|
||||
let (onsub, client) = IoClient::new(o.clone());
|
||||
let (exit, onexit) = channel(1);
|
||||
(client, CommCtx { exit }, IoCommServer { o, i, onsub, onexit })
|
||||
IoComm { client, cx: CommCx { exit }, srv: IoCommServer { o, i, onsub, onexit } }
|
||||
}
|
||||
pub struct IoCommServer {
|
||||
o: Rc<Mutex<Pin<Box<dyn AsyncWrite>>>>,
|
||||
@@ -657,53 +660,61 @@ impl IoCommServer {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
pub mod test {
|
||||
use std::cell::RefCell;
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::{FutureExt, SinkExt, StreamExt, join, select};
|
||||
use futures::future::{join3, select};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use orchid_api_derive::{Coding, Hierarchy};
|
||||
use orchid_api_traits::Request;
|
||||
use orchid_async_utils::debug::{spin_on, with_label};
|
||||
use unsync_pipe::pipe;
|
||||
|
||||
use crate::comm::{ClientExt, MsgReaderExt, ReqReaderExt, io_comm};
|
||||
use super::*;
|
||||
use crate::with_stash;
|
||||
|
||||
pub fn test_pair() -> (IoComm, IoComm) {
|
||||
let (right_in, left_out) = pipe(1024);
|
||||
let (left_in, right_out) = pipe(1024);
|
||||
(
|
||||
io_comm(Box::pin(left_in), Box::pin(left_out)),
|
||||
io_comm(Box::pin(right_in), Box::pin(right_out)),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn listen_no_ingress(srv: IoCommServer) {
|
||||
srv
|
||||
.listen(
|
||||
async |_| panic!("Not expecting ingress notif"),
|
||||
async |_| panic!("Not expecting ingress req"),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Coding, Hierarchy)]
|
||||
#[extendable]
|
||||
struct TestNotif(u64);
|
||||
|
||||
#[test]
|
||||
fn notification() {
|
||||
spin_on(false, async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (received, mut on_receive) = mpsc::channel(2);
|
||||
let (_, recv_ctx, recv_srv) = io_comm(Box::pin(in2), Box::pin(out2));
|
||||
let (sender, ..) = io_comm(Box::pin(in1), Box::pin(out1));
|
||||
join!(
|
||||
async {
|
||||
recv_srv
|
||||
.listen(
|
||||
async |notif| {
|
||||
received.clone().send(notif.read::<TestNotif>().await?).await.unwrap();
|
||||
Ok(())
|
||||
},
|
||||
async |_| panic!("Should receive notif, not request"),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
},
|
||||
async {
|
||||
sender.notify(TestNotif(3)).await.unwrap();
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(3)));
|
||||
sender.notify(TestNotif(4)).await.unwrap();
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(4)));
|
||||
recv_ctx.exit().await.unwrap();
|
||||
}
|
||||
);
|
||||
})
|
||||
let (left, right) = test_pair();
|
||||
let (received, mut on_receive) = mpsc::channel(2);
|
||||
let right_srv_fut = right.srv.listen(
|
||||
async |notif| {
|
||||
received.clone().send(notif.read::<TestNotif>().await?).await.unwrap();
|
||||
Ok(())
|
||||
},
|
||||
async |_| panic!("Should receive notif, not request"),
|
||||
);
|
||||
spin_on(join(async { right_srv_fut.await.unwrap() }, async {
|
||||
left.client.notify(TestNotif(3)).await.unwrap();
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(3)));
|
||||
left.client.notify(TestNotif(4)).await.unwrap();
|
||||
assert_eq!(on_receive.next().await, Some(TestNotif(4)));
|
||||
right.cx.exit().await.unwrap();
|
||||
}));
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Coding, Hierarchy)]
|
||||
@@ -715,140 +726,144 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn request() {
|
||||
spin_on(false, async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2));
|
||||
let (client, client_ctx, client_srv) = io_comm(Box::pin(in1), Box::pin(out1));
|
||||
join!(
|
||||
async {
|
||||
srv
|
||||
.listen(
|
||||
async |_| panic!("No notifs expected"),
|
||||
async |mut req| {
|
||||
let val = req.read_req::<DummyRequest>().await?;
|
||||
req.reply(&val, val.0 + 1).await
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
},
|
||||
async {
|
||||
client_srv
|
||||
.listen(
|
||||
async |_| panic!("Not expecting ingress notif"),
|
||||
async |_| panic!("Not expecting ingress req"),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
},
|
||||
async {
|
||||
let response = client.request(DummyRequest(5)).await.unwrap();
|
||||
assert_eq!(response, 6);
|
||||
srv_ctx.exit().await.unwrap();
|
||||
client_ctx.exit().await.unwrap();
|
||||
}
|
||||
);
|
||||
})
|
||||
let (left, right) = test_pair();
|
||||
let right_srv_fut = right.srv.listen(
|
||||
async |_| panic!("No notifs expected"),
|
||||
async |mut req| {
|
||||
let val = req.read_req::<DummyRequest>().await?;
|
||||
req.reply(&val, val.0 + 1).await
|
||||
},
|
||||
);
|
||||
let left_srv_fut = left.srv.listen(
|
||||
async |_| panic!("Not expecting ingress notif"),
|
||||
async |_| panic!("Not expecting ingress req"),
|
||||
);
|
||||
spin_on(join3(
|
||||
async { right_srv_fut.await.unwrap() },
|
||||
async { left_srv_fut.await.unwrap() },
|
||||
async {
|
||||
let response = left.client.request(DummyRequest(5)).await.unwrap();
|
||||
assert_eq!(response, 6);
|
||||
right.cx.exit().await.unwrap();
|
||||
left.cx.exit().await.unwrap();
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exit() {
|
||||
spin_on(false, async {
|
||||
let (input1, output1) = pipe(1024);
|
||||
let (input2, output2) = pipe(1024);
|
||||
let (reply_client, reply_context, reply_server) =
|
||||
io_comm(Box::pin(input1), Box::pin(output2));
|
||||
let (req_client, req_context, req_server) = io_comm(Box::pin(input2), Box::pin(output1));
|
||||
let reply_context = RefCell::new(Some(reply_context));
|
||||
let (exit, onexit) = futures::channel::oneshot::channel::<()>();
|
||||
join!(
|
||||
with_label("reply", async move {
|
||||
reply_server
|
||||
.listen(
|
||||
async |hand| {
|
||||
let _notif = hand.read::<TestNotif>().await.unwrap();
|
||||
let context = reply_context.borrow_mut().take().unwrap();
|
||||
context.exit().await?;
|
||||
Ok(())
|
||||
},
|
||||
async |mut hand| {
|
||||
let req = hand.read_req::<DummyRequest>().await?;
|
||||
hand.reply(&req, req.0 + 1).await
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
exit.send(()).unwrap();
|
||||
let _client = reply_client;
|
||||
}),
|
||||
with_label("client", async move {
|
||||
req_server
|
||||
.listen(
|
||||
async |_| panic!("Only the other server expected notifs"),
|
||||
async |_| panic!("Only the other server expected requests"),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let _ctx = req_context;
|
||||
}),
|
||||
async move {
|
||||
req_client.request(DummyRequest(0)).await.unwrap();
|
||||
req_client.notify(TestNotif(0)).await.unwrap();
|
||||
onexit.await.unwrap();
|
||||
}
|
||||
)
|
||||
});
|
||||
let (left, right) = test_pair();
|
||||
let reply_context = RefCell::new(Some(right.cx));
|
||||
let (exit, onexit) = oneshot::channel::<()>();
|
||||
let right_srv_fut = right.srv.listen(
|
||||
async |hand| {
|
||||
let _notif = hand.read::<TestNotif>().await.unwrap();
|
||||
let context = reply_context.borrow_mut().take().unwrap();
|
||||
context.exit().await?;
|
||||
Ok(())
|
||||
},
|
||||
async |mut hand| {
|
||||
let req = hand.read_req::<DummyRequest>().await?;
|
||||
hand.reply(&req, req.0 + 1).await
|
||||
},
|
||||
);
|
||||
let left_srv_fut = left.srv.listen(
|
||||
async |_| panic!("Only the other server expected notifs"),
|
||||
async |_| panic!("Only the other server expected requests"),
|
||||
);
|
||||
spin_on(join3(
|
||||
with_label("reply", async move {
|
||||
right_srv_fut.await.unwrap();
|
||||
exit.send(()).unwrap();
|
||||
let _client = right.client;
|
||||
}),
|
||||
with_label("client", async move {
|
||||
left_srv_fut.await.unwrap();
|
||||
let _ctx = left.cx;
|
||||
}),
|
||||
async move {
|
||||
left.client.request(DummyRequest(0)).await.unwrap();
|
||||
left.client.notify(TestNotif(0)).await.unwrap();
|
||||
onexit.await.unwrap();
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timely_cancel() {
|
||||
spin_on(false, async {
|
||||
let (in1, out2) = pipe(1024);
|
||||
let (in2, out1) = pipe(1024);
|
||||
let (wait_in, mut wait_out) = mpsc::channel(0);
|
||||
let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2));
|
||||
let (client, client_ctx, client_srv) = io_comm(Box::pin(in1), Box::pin(out1));
|
||||
join!(
|
||||
with_label("server", async {
|
||||
srv
|
||||
.listen(
|
||||
async |_| panic!("No notifs expected"),
|
||||
async |mut req| {
|
||||
let _ = req.read_req::<DummyRequest>().await?;
|
||||
let _ = req.finish().await;
|
||||
wait_in.clone().send(()).await.unwrap();
|
||||
// This will never return, so if the cancellation does not work, it would block
|
||||
// the loop
|
||||
futures::future::pending().await
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}),
|
||||
with_label("client", async {
|
||||
client_srv
|
||||
.listen(
|
||||
async |_| panic!("Not expecting ingress notif"),
|
||||
async |_| panic!("Not expecting ingress req"),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}),
|
||||
let (left, right) = test_pair();
|
||||
let (wait_in, mut wait_out) = mpsc::channel(0);
|
||||
let right_srv_fut = right.srv.listen(
|
||||
async |_| panic!("No notifs expected"),
|
||||
async |mut req| {
|
||||
let _ = req.read_req::<DummyRequest>().await?;
|
||||
let _ = req.finish().await;
|
||||
wait_in.clone().send(()).await.unwrap();
|
||||
// This will never return, so if the cancellation does not work, it would block
|
||||
// the loop
|
||||
futures::future::pending().await
|
||||
},
|
||||
);
|
||||
spin_on(join3(
|
||||
with_label("server", async { right_srv_fut.await.unwrap() }),
|
||||
with_label("client", listen_no_ingress(left.srv)),
|
||||
with_label(
|
||||
"outer_stash",
|
||||
with_stash(async {
|
||||
with_stash(async {
|
||||
select! {
|
||||
_ = client.request(DummyRequest(5)).fuse() => {
|
||||
panic!("This one should not run")
|
||||
},
|
||||
rep = wait_out.next() => rep.expect("something?"),
|
||||
}
|
||||
select(
|
||||
Box::pin(async {
|
||||
left.client.request(DummyRequest(5)).await.unwrap();
|
||||
panic!("This one should not run");
|
||||
}),
|
||||
Box::pin(async {
|
||||
wait_out.next().await.expect("something?");
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
})
|
||||
.await;
|
||||
srv_ctx.exit().await.unwrap();
|
||||
client_ctx.exit().await.unwrap();
|
||||
})
|
||||
);
|
||||
})
|
||||
right.cx.exit().await.unwrap();
|
||||
left.cx.exit().await.unwrap();
|
||||
}),
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn late_cancel() {
|
||||
let (left, right) = test_pair();
|
||||
let (send, mut recv) = mpsc::channel(0);
|
||||
let right_srv_fut = right.srv.listen(
|
||||
async |_| panic!("Expected a request"),
|
||||
async |mut req| {
|
||||
req.read_req::<DummyRequest>().await?;
|
||||
let mut reply_writer = req.start_reply().await?;
|
||||
let (stop_wait, wait) = oneshot::channel();
|
||||
send.clone().send(stop_wait).await.unwrap();
|
||||
wait.await.unwrap();
|
||||
(1 as <DummyRequest as Request>::Response).encode(reply_writer.writer()).await?;
|
||||
reply_writer.finish().await
|
||||
},
|
||||
);
|
||||
spin_on(join3(
|
||||
async { right_srv_fut.await.unwrap() },
|
||||
with_label("client", listen_no_ingress(left.srv)),
|
||||
async {
|
||||
with_stash(Box::pin(async {
|
||||
select(
|
||||
Box::pin(async {
|
||||
left.client.request(DummyRequest(5)).await.unwrap();
|
||||
panic!("This one should not run");
|
||||
}),
|
||||
Box::pin(async { recv.next().await.unwrap().send(()) }),
|
||||
)
|
||||
.await;
|
||||
}))
|
||||
.await;
|
||||
right.cx.exit().await.unwrap();
|
||||
left.cx.exit().await.unwrap();
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,41 +148,34 @@ mod test {
|
||||
#[test]
|
||||
fn run_stashed_future() {
|
||||
let (mut send, recv) = mpsc::channel(0);
|
||||
spin_on(
|
||||
false,
|
||||
join(
|
||||
with_stash(async {
|
||||
let mut send1 = send.clone();
|
||||
spin_on(join(
|
||||
with_stash(async {
|
||||
let mut send1 = send.clone();
|
||||
stash(async move {
|
||||
send1.send(1).await.unwrap();
|
||||
});
|
||||
let mut send1 = send.clone();
|
||||
stash(async move {
|
||||
let mut send2 = send1.clone();
|
||||
stash(async move {
|
||||
send1.send(1).await.unwrap();
|
||||
send2.send(2).await.unwrap();
|
||||
});
|
||||
let mut send1 = send.clone();
|
||||
send1.send(3).await.unwrap();
|
||||
stash(async move {
|
||||
let mut send2 = send1.clone();
|
||||
stash(async move {
|
||||
send2.send(2).await.unwrap();
|
||||
});
|
||||
send1.send(3).await.unwrap();
|
||||
stash(async move {
|
||||
send1.send(4).await.unwrap();
|
||||
})
|
||||
});
|
||||
let mut send1 = send.clone();
|
||||
stash(async move {
|
||||
send1.send(5).await.unwrap();
|
||||
});
|
||||
send.send(6).await.unwrap();
|
||||
}),
|
||||
async {
|
||||
let mut results = recv.take(6).collect::<Vec<_>>().await;
|
||||
results.sort();
|
||||
assert_eq!(
|
||||
&results,
|
||||
&[1, 2, 3, 4, 5, 6],
|
||||
"all variations completed in unspecified order"
|
||||
);
|
||||
},
|
||||
),
|
||||
);
|
||||
send1.send(4).await.unwrap();
|
||||
})
|
||||
});
|
||||
let mut send1 = send.clone();
|
||||
stash(async move {
|
||||
send1.send(5).await.unwrap();
|
||||
});
|
||||
send.send(6).await.unwrap();
|
||||
}),
|
||||
async {
|
||||
let mut results = recv.take(6).collect::<Vec<_>>().await;
|
||||
results.sort();
|
||||
assert_eq!(&results, &[1, 2, 3, 4, 5, 6], "all variations completed in unspecified order");
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user