diff --git a/orchid-base/src/reqnot.rs b/orchid-base/src/reqnot.rs index 6c4a59a..f277b8f 100644 --- a/orchid-base/src/reqnot.rs +++ b/orchid-base/src/reqnot.rs @@ -5,15 +5,18 @@ use std::marker::PhantomData; use std::mem; use std::ops::{BitAnd, Deref}; use std::pin::Pin; +use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; use derive_destructure::destructure; use dyn_clone::{DynClone, clone_box}; -use futures::channel::mpsc; +use futures::channel::mpsc::{self, Sender}; +use futures::channel::oneshot; use futures::future::LocalBoxFuture; use futures::lock::Mutex; -use futures::{SinkExt, StreamExt}; +use futures::{AsyncBufRead, AsyncWrite, SinkExt, Stream, StreamExt}; use hashbrown::HashMap; use orchid_api_traits::{Channel, Coding, Decode, Encode, MsgSet, Request}; use trait_set::trait_set; @@ -23,6 +26,72 @@ use crate::logging::Logger; pub struct Receipt<'a>(PhantomData<&'a mut ()>); +/// This object holds an exclusive lock on the outbound pipe. +pub trait DynRequestWriter { + fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>; + /// Release the outbound pipe and wait for the response to begin. + fn get_response(self: Box) -> Pin>>>; +} +/// This object holds an exclusive lock on the inbound pipe. +pub trait DynResponseHandle { + fn reader(&mut self) -> Pin<&mut dyn AsyncBufRead>; + fn finish(self: Box) -> Pin>>; +} +/// This object holds an exclusive lock on the outbound pipe. +pub trait DynNotifWriter { + fn writer(&mut self) -> Pin<&mut dyn AsyncWrite>; + fn finish(self: Box) -> Pin>>; +} + +pub trait DynClient { + fn request(&self) -> Pin>>>; + fn notif(&self) -> Pin>>>; +} + +pub struct Client(pub(crate) Rc, pub(crate) PhantomData); +impl Client { + pub async fn notify::Notif>>(&self, notif: Notif) { + let mut notif_writer = self.0.notif().await; + notif.into().encode(notif_writer.writer()).await; + notif_writer.finish().await; + } + pub async fn request::Req>>( + &self, + req: Req, + ) -> Req::Response { + let root_req = req.into(); + let mut req_writer = self.0.request().await; + root_req.encode(req_writer.writer()).await; + let mut req_hand = req_writer.get_response().await; + let res = Req::Response::decode(req_hand.reader()).await; + req_hand.finish().await; + res + } +} + +pub struct DuplexServerState { + pending_outbound: HashMap>, + sender: Pin>, + receiver: Pin>, +} +pub enum ServerEvent { + Notif(::Notif), + Req(RequestHandle, ::Req), +} +pub async fn run_duplex_server( + sender: Pin>, + receiver: Pin>, +) -> (impl Stream>, Client) { + let sender = Rc::new(Mutex::new(sender)); + let receiver = Rc::new(Mutex::new(receiver)); + let pending_outbound = Rc::new(Mutex::new(HashMap::new())); + +} +pub struct DuplexServer(Rc>); +impl DuplexServer { + pub fn receive(msg: ) +} + trait_set! { pub trait SendFn = for<'a> FnMut(&'a [u8], ReqNot) -> LocalBoxFuture<'a, ()> @@ -52,11 +121,10 @@ impl ReqHandlish for &'_ dyn ReqHandlish { } #[derive(destructure)] -pub struct RequestHandle<'a, MS: MsgSet> { +pub struct RequestHandle { defer_drop: RefCell>>, fulfilled: AtomicBool, id: u64, - _reqlt: PhantomData<&'a mut ()>, parent: ReqNot, } impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> { @@ -89,7 +157,7 @@ impl<'a, MS: MsgSet + 'static> RequestHandle<'a, MS> { impl ReqHandlish for RequestHandle<'_, MS> { fn defer_drop_objsafe(&self, val: Box) { self.defer_drop.borrow_mut().push(val); } } -impl Drop for RequestHandle<'_, MS> { +impl Drop for RequestHandle { fn drop(&mut self) { let done = self.fulfilled.load(Ordering::Relaxed); debug_assert!(done, "Request {} dropped without response", self.id) @@ -123,7 +191,7 @@ impl ReqNot { notif: impl NotifFn, req: impl ReqFn, ) -> Self { - Self( + let this = Self( Arc::new(Mutex::new(ReqNotData { id: 1, send: Box::new(send), @@ -132,7 +200,13 @@ impl ReqNot { responses: HashMap::new(), })), logger, - ) + ); + let (sig_send, sig_recv) = std::sync::mpsc::sync_channel(0); + std::thread::spawn(move || { + std::thread::sleep(Duration::from_secs(10)); + sig_send.send(()).expect("Crash!"); + }); + this } /// Can be called from a polling thread or dispatched in any other way diff --git a/orchid.code-workspace b/orchid.code-workspace index 4d0b844..f8737ba 100644 --- a/orchid.code-workspace +++ b/orchid.code-workspace @@ -55,6 +55,10 @@ ], "rust-analyzer.showUnlinkedFileNotification": false, "swissknife.notesEnabled": false, + "todo-tree.filtering.excludeGlobs": [ + "**/node_modules/*/**", + "orchidlang/**" + ] }, "extensions": { "recommendations": [