Pattern matching works now
This commit is contained in:
@@ -13,6 +13,7 @@ impl Logger {
|
||||
pub fn new(strat: api::LogStrategy) -> Self { Self(strat) }
|
||||
pub fn log(&self, msg: impl AsRef<str>) { writeln!(self, "{}", msg.as_ref()) }
|
||||
pub fn strat(&self) -> api::LogStrategy { self.0.clone() }
|
||||
pub fn is_active(&self) -> bool { !matches!(self.0, api::LogStrategy::Discard) }
|
||||
pub fn log_buf(&self, event: impl AsRef<str>, buf: &[u8]) {
|
||||
if std::env::var("ORCHID_LOG_BUFFERS").is_ok_and(|v| !v.is_empty()) {
|
||||
writeln!(self, "{}: [{}]", event.as_ref(), buf.iter().map(|b| format!("{b:02x}")).join(" "))
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::mem;
|
||||
use std::ops::{BitAnd, Deref};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread::panicking;
|
||||
|
||||
use derive_destructure::destructure;
|
||||
use dyn_clone::{DynClone, clone_box};
|
||||
@@ -14,7 +14,7 @@ use futures::future::LocalBoxFuture;
|
||||
use futures::lock::Mutex;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use hashbrown::HashMap;
|
||||
use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request};
|
||||
use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request, enc_vec};
|
||||
use trait_set::trait_set;
|
||||
|
||||
use crate::clone;
|
||||
@@ -50,17 +50,23 @@ impl ReqHandlish for &'_ dyn ReqHandlish {
|
||||
fn defer_objsafe(&self, val: Pin<Box<dyn Future<Output = ()>>>) { (**self).defer_objsafe(val) }
|
||||
}
|
||||
|
||||
type LocalAsyncFnOnceBox = Box<dyn FnOnce(Vec<u8>) -> LocalBoxFuture<'static, ()>>;
|
||||
|
||||
#[derive(destructure)]
|
||||
pub struct RequestHandle<'a, MS: MsgSet> {
|
||||
defer: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>>,
|
||||
fulfilled: AtomicBool,
|
||||
id: u64,
|
||||
_reqlt: PhantomData<&'a mut ()>,
|
||||
parent: ReqNot<MS>,
|
||||
raw_reply: RefCell<Option<LocalAsyncFnOnceBox>>,
|
||||
}
|
||||
impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> {
|
||||
fn new(parent: ReqNot<MS>, id: u64) -> Self {
|
||||
Self { defer: RefCell::default(), fulfilled: false.into(), _reqlt: PhantomData, parent, id }
|
||||
pub fn new(parent: ReqNot<MS>, raw_reply: impl AsyncFnOnce(Vec<u8>) + 'static) -> Self {
|
||||
Self {
|
||||
defer: RefCell::default(),
|
||||
_reqlt: PhantomData,
|
||||
parent,
|
||||
raw_reply: RefCell::new(Some(Box::new(|v| Box::pin(raw_reply(v))))),
|
||||
}
|
||||
}
|
||||
pub fn reqnot(&self) -> ReqNot<MS> { self.parent.clone() }
|
||||
pub async fn handle<U: Request>(&self, _: &U, rep: &U::Response) -> Receipt<'a> {
|
||||
@@ -71,11 +77,9 @@ impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> {
|
||||
self.respond(rep).await
|
||||
}
|
||||
pub async fn respond(&self, response: &impl Encode) -> Receipt<'a> {
|
||||
assert!(!self.fulfilled.swap(true, Ordering::Relaxed), "Already responded to {}", self.id);
|
||||
let mut buf = (!self.id).to_be_bytes().to_vec();
|
||||
response.encode(Pin::new(&mut buf)).await;
|
||||
let mut send = clone_box(&*self.reqnot().0.lock().await.send);
|
||||
(send)(&buf, self.parent.clone()).await;
|
||||
let replier = self.raw_reply.borrow_mut().take().expect("Already responded to request");
|
||||
let buf = enc_vec(response).await;
|
||||
(replier)(buf).await;
|
||||
let deferred = mem::take(&mut *self.defer.borrow_mut());
|
||||
for item in deferred {
|
||||
item.await
|
||||
@@ -90,8 +94,9 @@ impl<MS: MsgSet> ReqHandlish for RequestHandle<'_, MS> {
|
||||
}
|
||||
impl<MS: MsgSet> Drop for RequestHandle<'_, MS> {
|
||||
fn drop(&mut self) {
|
||||
let done = self.fulfilled.load(Ordering::Relaxed);
|
||||
debug_assert!(done, "Request {} dropped without response", self.id)
|
||||
if !panicking() {
|
||||
debug_assert!(self.raw_reply.borrow().is_none(), "Request dropped without response")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +156,17 @@ impl<T: MsgSet> ReqNot<T> {
|
||||
let mut req_cb = clone_box(&*g.req);
|
||||
mem::drop(g);
|
||||
let rn = self.clone();
|
||||
req_cb(RequestHandle::new(rn, id), message).await;
|
||||
let rn2 = self.clone();
|
||||
req_cb(
|
||||
RequestHandle::new(rn, async move |vec| {
|
||||
let mut buf = (!id).to_be_bytes().to_vec();
|
||||
buf.extend(vec);
|
||||
let mut send = clone_box(&*rn2.0.lock().await.send);
|
||||
(send)(&buf, rn2.clone()).await;
|
||||
}),
|
||||
message,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user