From 0f89cde24636c7bc5423410296f3a87d7c8b37ce Mon Sep 17 00:00:00 2001 From: Lawrence Bethlenfalvy Date: Fri, 12 Dec 2025 17:32:01 +0100 Subject: [PATCH] added binary-safe (hopefully) pipe for upcoming dylib extension support --- orchid-base/src/lib.rs | 1 + orchid-base/src/pipe.rs | 274 ++++++++++++++++++++++++++++++++++++++ orchid-base/src/reqnot.rs | 2 + 3 files changed, 277 insertions(+) create mode 100644 orchid-base/src/pipe.rs diff --git a/orchid-base/src/lib.rs b/orchid-base/src/lib.rs index b812005..09ba43a 100644 --- a/orchid-base/src/lib.rs +++ b/orchid-base/src/lib.rs @@ -21,6 +21,7 @@ pub mod msg; pub mod name; pub mod number; pub mod parse; +pub mod pipe; pub mod pure_seq; pub mod reqnot; pub mod sequence; diff --git a/orchid-base/src/pipe.rs b/orchid-base/src/pipe.rs new file mode 100644 index 0000000..3f99b9e --- /dev/null +++ b/orchid-base/src/pipe.rs @@ -0,0 +1,274 @@ +use std::alloc::{Layout, alloc, dealloc}; +use std::io; +use std::pin::Pin; +use std::process::abort; +use std::ptr::{null, null_mut, slice_from_raw_parts}; +use std::task::{Context, Poll, Waker}; + +use futures::{AsyncRead, AsyncWrite}; + +fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") } + +pub fn pipe(size: usize) -> (Writer, Reader) { + assert!(0 < size, "cannot create async pipe without buffer"); + // SAFETY: the + let start = unsafe { alloc(pipe_layout(size)) }; + extern "C" fn drop(val: *const ()) { + let AsyncRingbuffer { + start, + size, + mut read_waker, + mut write_waker, + reader_dropped, + writer_dropped, + // irrelevant if correctly dropped + read_idx: _, + write_idx: _, + // data used to make this call + drop: _, + state: _, + } = *unsafe { Box::from_raw(val as *mut AsyncRingbuffer) }; + if !writer_dropped || !reader_dropped { + eprintln!("Pipe dropped in err before reader or writer"); + abort() + } + read_waker.drop(); + write_waker.drop(); + unsafe { dealloc(start, pipe_layout(size)) } + } + let state = Box::into_raw(Box::new(AsyncRingbuffer { + start, + size, + state: null(), + read_idx: 0, + write_idx: 0, + read_waker: Trigger::empty(), + write_waker: Trigger::empty(), + reader_dropped: false, + writer_dropped: false, + drop, + })); + let state_mut = unsafe { state.as_mut().unwrap() }; + state_mut.state = state as *const (); + (Writer(state_mut as *mut _), Reader(state_mut as *mut _)) +} + +/// A single-fire empty event, to be distributed by value. Either one of the +/// functions can be called exactly once. +#[repr(C)] +struct Trigger { + state: *const (), + invoke: extern "C" fn(*const ()), + drop: extern "C" fn(*const ()), +} +impl Trigger { + fn new(waker: Waker) -> Self { + let state = Box::into_raw(Box::new(waker)) as *const (); + extern "C" fn drop(state: *const ()) { unsafe { Box::from_raw(state as *mut Waker) }; } + extern "C" fn invoke(state: *const ()) { unsafe { Box::from_raw(state as *mut Waker) }.wake(); } + Self { state, invoke, drop } + } + fn empty() -> Self { + extern "C" fn empty_fn_ptr(_: *const ()) { abort() } + Self { state: null(), drop: empty_fn_ptr, invoke: empty_fn_ptr } + } + fn is_empty(&self) -> bool { self.state.is_null() } + fn invoke(&mut self) { + if let Some(this) = self.take() { + (this.invoke)(this.state) + } + } + fn drop(&mut self) { + if let Some(this) = self.take() { + (this.drop)(this.state) + } + } + fn take(&mut self) -> Option { + (!self.is_empty()).then(|| std::mem::replace(self, Self::empty())) + } +} + +/// A ringbuffer for single-threaded synchronized communication. +#[repr(C)] +struct AsyncRingbuffer { + state: *const (), + start: *mut u8, + size: usize, + read_idx: usize, + write_idx: usize, + read_waker: Trigger, + write_waker: Trigger, + reader_dropped: bool, + writer_dropped: bool, + drop: extern "C" fn(*const ()), +} +impl AsyncRingbuffer { + fn drop_writer(&mut self) { + self.writer_dropped = true; + if self.reader_dropped { + (self.drop)(self.state) + } + } + fn drop_reader(&mut self) { + self.reader_dropped = true; + if self.writer_dropped { + (self.drop)(self.state) + } + } + fn writer_wait(&mut self, waker: &Waker) -> Poll> { + if self.reader_dropped { + return Poll::Ready(Err(broken_pipe_error())); + } + self.read_waker.invoke(); + self.write_waker.drop(); + self.write_waker = Trigger::new(waker.clone()); + Poll::Pending + } + fn reader_wait(&mut self, waker: &Waker) -> Poll> { + if self.writer_dropped { + return Poll::Ready(Err(broken_pipe_error())); + } + self.write_waker.invoke(); + self.read_waker.drop(); + self.read_waker = Trigger::new(waker.clone()); + Poll::Pending + } + unsafe fn non_wrapping_write_unchecked(&mut self, buf: &[u8]) { + let write_ptr = unsafe { self.start.offset(self.write_idx as isize) }; + let slc = slice_from_raw_parts(write_ptr, buf.len()).cast_mut(); + unsafe { &mut *slc }.copy_from_slice(buf); + self.write_idx = (self.write_idx + buf.len()) % self.size; + } + unsafe fn non_wrapping_read_unchecked(&mut self, buf: &mut [u8]) { + let read_ptr = unsafe { self.start.offset(self.read_idx as isize) }; + let slc = slice_from_raw_parts(read_ptr, buf.len()).cast_mut(); + buf.copy_from_slice(unsafe { &*slc }); + self.read_idx = (self.read_idx + buf.len()) % self.size; + } + fn is_full(&self) -> bool { (self.write_idx + 1) % self.size == self.read_idx } + fn is_empty(&self) -> bool { self.write_idx == self.read_idx } +} + +fn already_closed_error() -> io::Error { + io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from this end") +} +fn broken_pipe_error() -> io::Error { + io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end") +} + +#[repr(C)] +pub struct Writer(*mut AsyncRingbuffer); +impl Writer { + unsafe fn get_state(self: Pin<&mut Self>) -> io::Result<&mut AsyncRingbuffer> { + match unsafe { self.0.as_mut() } { + Some(data) => Ok(data), + None => Err(already_closed_error()), + } + } +} +impl AsyncWrite for Writer { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + unsafe { + match self.as_mut().get_state() { + Err(e) => return Poll::Ready(Err(e)), + Ok(data) => { + data.drop_writer(); + }, + } + } + self.0 = null_mut(); + Poll::Ready(Ok(())) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let data = self.as_mut().get_state()?; + if data.read_idx == data.write_idx { + Poll::Ready(Ok(())) + } else { + data.writer_wait(cx.waker()) + } + } + } + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + unsafe { + let data = self.as_mut().get_state()?; + if !buf.is_empty() && data.is_full() { + data.writer_wait(cx.waker()) + } else if data.write_idx < data.read_idx { + // non-wrapping write on backside + let count = buf.len().min(data.read_idx - data.write_idx - 1); + data.non_wrapping_write_unchecked(&buf[0..count]); + Poll::Ready(Ok(count)) + } else if data.write_idx + buf.len() < data.size { + // non-wrapping write on frontside + data.non_wrapping_write_unchecked(&buf[0..buf.len()]); + Poll::Ready(Ok(buf.len())) + } else { + // wrapping write + let (end, start) = buf.split_at(data.size - data.write_idx); + data.non_wrapping_write_unchecked(end); + let start_count = start.len().min(data.read_idx - 1); + data.non_wrapping_write_unchecked(&start[0..start_count]); + Poll::Ready(Ok(end.len() + start_count)) + } + } + } +} +impl Drop for Writer { + fn drop(&mut self) { + unsafe { + if let Some(data) = self.0.as_mut() { + data.drop_writer(); + } + } + } +} + +#[repr(C)] +pub struct Reader(*mut AsyncRingbuffer); +impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + unsafe { + let data = self.0.as_mut().expect("Cannot be null"); + if buf.len() != 0 && data.is_full() { + data.write_waker.invoke(); + } + if data.is_empty() { + data.reader_wait(cx.waker()) + } else if data.read_idx < data.write_idx { + // frontside non-wrapping read + let count = buf.len().min(data.write_idx - data.read_idx); + data.non_wrapping_read_unchecked(&mut buf[0..count]); + Poll::Ready(Ok(count)) + } else if data.read_idx + buf.len() < data.size { + // backside non-wrapping read + data.non_wrapping_read_unchecked(buf); + Poll::Ready(Ok(buf.len())) + } else { + // wrapping read + let (end, start) = buf.split_at_mut(data.size - data.read_idx); + data.non_wrapping_read_unchecked(end); + let start_count = start.len().min(data.write_idx); + data.non_wrapping_read_unchecked(&mut start[0..start_count]); + Poll::Ready(Ok(end.len() + start_count)) + } + } + } +} +impl Drop for Reader { + fn drop(&mut self) { + unsafe { + if let Some(data) = self.0.as_mut() { + data.drop_reader(); + } + } + } +} diff --git a/orchid-base/src/reqnot.rs b/orchid-base/src/reqnot.rs index 9c2c07d..8d843da 100644 --- a/orchid-base/src/reqnot.rs +++ b/orchid-base/src/reqnot.rs @@ -281,6 +281,8 @@ impl CommCtx { pub fn client(&self) -> Rc { self.client.clone() as Rc } } +/// This function will exit only when one of the callbacks calls +/// [CommCtx::quit]. pub async fn io_comm( o: Rc>>>, i: Mutex>>,