From 8753d4c7513e73f0a636a40a8f50a3c7659e6d4b Mon Sep 17 00:00:00 2001 From: Lawrence Bethlenfalvy Date: Sun, 14 Dec 2025 01:32:24 +0100 Subject: [PATCH] Added docs to unsync-pipe --- Cargo.lock | 2 +- unsync-pipe/Cargo.toml | 6 +++- unsync-pipe/src/lib.rs | 74 ++++++++++++++++-------------------------- 3 files changed, 34 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4671c66..fc633b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1946,7 +1946,7 @@ checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" [[package]] name = "unsync-pipe" -version = "0.1.0" +version = "0.2.0" dependencies = [ "futures", "itertools", diff --git a/unsync-pipe/Cargo.toml b/unsync-pipe/Cargo.toml index 3c005db..5ffce61 100644 --- a/unsync-pipe/Cargo.toml +++ b/unsync-pipe/Cargo.toml @@ -1,7 +1,11 @@ [package] name = "unsync-pipe" -version = "0.1.0" +description = "Ringbuffer-backed !Send !Sync binary safe repr(C) AsyncWrite/AsyncRead pair" +version = "0.2.0" edition = "2024" +license = "MIT" +repository = "https://git.lbfalvy.com/Orchid/orchid" +homepage = "https://git.lbfalvy.com/Orchid/orchid" [dev-dependencies] itertools = "0.14.0" diff --git a/unsync-pipe/src/lib.rs b/unsync-pipe/src/lib.rs index 6a03163..8efab4f 100644 --- a/unsync-pipe/src/lib.rs +++ b/unsync-pipe/src/lib.rs @@ -1,3 +1,9 @@ +//! Single-threaded binary safe AsyncWrite/AsyncRead pair +//! +//! The main entry point is [pipe]. [Writer] and [Reader] can just be sent +//! across binary boundaries. A change to the ABI constitutes a major version +//! break. + use std::alloc::{Layout, alloc, dealloc}; use std::pin::Pin; use std::process::abort; @@ -9,6 +15,8 @@ use futures::{AsyncRead, AsyncWrite}; fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") } +/// Create a ringbuffer with the specified byte capacity. Once the buffer is +/// exhausted, the writer will block. pub fn pipe(size: usize) -> (Writer, Reader) { assert!(0 < size, "cannot create async pipe without buffer"); // SAFETY: the @@ -158,6 +166,8 @@ fn broken_pipe_error() -> io::Error { io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end") } +/// A binary safe [AsyncWrite] implementor writing to a ringbuffer created by +/// [pipe]. #[repr(C)] pub struct Writer(*mut AsyncRingbuffer); impl Writer { @@ -192,50 +202,41 @@ impl AsyncWrite for Writer { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let w = unsafe { + unsafe { let data = self.as_mut().get_state()?; let AsyncRingbuffer { write_idx, read_idx, size, .. } = *data; if !buf.is_empty() && data.is_empty() { - eprintln!("Wake reader"); data.read_waker.invoke(); } if !buf.is_empty() && data.is_full() { - eprintln!("Writer is blocked, waiting"); + // Writer is blocked data.writer_wait(cx.waker()) } else if write_idx < read_idx { - eprintln!("Non-wrapping backside write w={write_idx} < r={read_idx} <= s={size}"); + // Non-wrapping backside write w < r <= s let count = buf.len().min(read_idx - write_idx - 1); data.non_wrapping_write_unchecked(&buf[0..count]); Poll::Ready(Ok(count)) } else if data.write_idx + buf.len() < size { - eprintln!( - "Non-wrapping frontside write r={read_idx} <= w={write_idx} + b={} < s={size}", - buf.len() - ); + // Non-wrapping frontside write r <= w + b < s data.non_wrapping_write_unchecked(&buf[0..buf.len()]); Poll::Ready(Ok(buf.len())) } else if read_idx == 0 { - eprintln!("Frontside write up to origin r=0 < s={size} < w={write_idx} + b={}", buf.len()); + // Frontside write up to origin r=0 < s < w + b data.non_wrapping_write_unchecked(&buf[0..size - write_idx - 1]); Poll::Ready(Ok(size - write_idx - 1)) } else { let (end, start) = buf.split_at(size - write_idx); - eprintln!("Wrapping write r={read_idx} < s={size} < w={write_idx} + b={}", buf.len()); + // Wrapping write r < s < w + b data.non_wrapping_write_unchecked(end); let start_count = start.len().min(read_idx - 1); data.non_wrapping_write_unchecked(&start[0..start_count]); Poll::Ready(Ok(end.len() + start_count)) } - }; - if let Poll::Ready(Ok(w)) = &w { - eprintln!("Wrote {w}") } - w } } impl Drop for Writer { fn drop(&mut self) { - eprintln!("Dropping writer"); unsafe { if let Some(data) = self.0.as_mut() { data.drop_writer(); @@ -244,6 +245,8 @@ impl Drop for Writer { } } +/// A binary safe [AsyncRead] implementor reading from a ringbuffer created by +/// [pipe] #[repr(C)] pub struct Reader(*mut AsyncRingbuffer); impl AsyncRead for Reader { @@ -252,44 +255,37 @@ impl AsyncRead for Reader { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - eprintln!("Beginning read of {}", buf.len()); - let r = unsafe { + unsafe { let data = self.0.as_mut().expect("Cannot be null"); let AsyncRingbuffer { read_idx, write_idx, size, .. } = *data; if !buf.is_empty() && data.is_full() { - eprintln!("Wake writer"); data.write_waker.invoke(); } if !buf.is_empty() && data.is_empty() { - eprintln!("Nothing to read, waiting..."); + // Nothing to read, waiting... data.reader_wait(cx.waker()) } else if read_idx < write_idx { - eprintln!("frontside non-wrapping read"); + // Frontside non-wrapping read let count = buf.len().min(write_idx - read_idx); data.non_wrapping_read_unchecked(&mut buf[0..count]); Poll::Ready(Ok(count)) } else if read_idx + buf.len() < size { - eprintln!("backside non-wrapping read"); + // Backside non-wrapping read data.non_wrapping_read_unchecked(buf); Poll::Ready(Ok(buf.len())) } else { - eprintln!("wrapping read"); + // Wrapping read let (end, start) = buf.split_at_mut(size - read_idx); data.non_wrapping_read_unchecked(end); let start_count = start.len().min(write_idx); data.non_wrapping_read_unchecked(&mut start[0..start_count]); Poll::Ready(Ok(end.len() + start_count)) } - }; - if let Poll::Ready(Ok(r)) = &r { - eprintln!("Read {r}") } - r } } impl Drop for Reader { fn drop(&mut self) { - eprintln!("Dropping reader"); unsafe { if let Some(data) = self.0.as_mut() { data.drop_reader(); @@ -300,7 +296,6 @@ impl Drop for Reader { #[cfg(test)] mod tests { - use std::io::Write; use std::pin::pin; use futures::future::join; @@ -316,16 +311,15 @@ mod tests { fn basic_io() { let mut w_rng = ChaCha8Rng::seed_from_u64(2); let mut r_rng = ChaCha8Rng::seed_from_u64(1); - println!("Output check"); spin_on(async { let (w, r) = pipe(1024); - let test_length = 100_000; + let test_length = 10_000_000; let data = (0u32..test_length).flat_map(|num| num.to_be_bytes()); let write_fut = async { let mut w = pin!(w); let mut source = data.clone(); let mut tally = 0; - while tally < test_length * 8 { + while tally < test_length * 4 { let values = source.by_ref().take(w_rng.random_range(0..200)).collect::>(); tally += values.len() as u32; w.write_all(&values).await.unwrap(); @@ -336,23 +330,12 @@ mod tests { let mut r = pin!(r); let mut expected = data.clone(); let mut tally = 0; - let mut aggregate = Vec::new(); - let mut expected_aggregate = Vec::new(); - let mut percentage = 0; - while tally < test_length * 8 { - let next_percentage = tally * 100 / (test_length * 8); - if percentage < next_percentage { - percentage = next_percentage; - println!("{percentage}%"); - io::stdout().flush().unwrap(); - } + while tally < test_length * 4 { let expected_values = expected.by_ref().take(r_rng.random_range(0..200)).collect::>(); tally += expected_values.len() as u32; let mut values = vec![0; expected_values.len()]; r.read_exact(&mut values[..]).await.unwrap_or_else(|e| panic!("At {tally} bytes: {e}")); - aggregate.extend_from_slice(&values); - expected_aggregate.extend_from_slice(&expected_values); if values != expected_values { fn print_bytes(bytes: &[u8]) -> String { (bytes.iter().map(|s| format!("{s:>2x}")).chunks(32).into_iter()) @@ -361,12 +344,11 @@ mod tests { } panic!( "Difference in generated numbers\n{}\n{}", - print_bytes(&aggregate), - print_bytes(&expected_aggregate), + print_bytes(&values), + print_bytes(&expected_values), ) } } - eprintln!("Read {tally} correct bytes") }; join(write_fut, read_fut).await; })