Added docs to unsync-pipe

This commit is contained in:
2025-12-14 01:32:24 +01:00
parent 224c4ecca2
commit 8753d4c751
3 changed files with 34 additions and 48 deletions

2
Cargo.lock generated
View File

@@ -1946,7 +1946,7 @@ checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
[[package]] [[package]]
name = "unsync-pipe" name = "unsync-pipe"
version = "0.1.0" version = "0.2.0"
dependencies = [ dependencies = [
"futures", "futures",
"itertools", "itertools",

View File

@@ -1,7 +1,11 @@
[package] [package]
name = "unsync-pipe" name = "unsync-pipe"
version = "0.1.0" description = "Ringbuffer-backed !Send !Sync binary safe repr(C) AsyncWrite/AsyncRead pair"
version = "0.2.0"
edition = "2024" edition = "2024"
license = "MIT"
repository = "https://git.lbfalvy.com/Orchid/orchid"
homepage = "https://git.lbfalvy.com/Orchid/orchid"
[dev-dependencies] [dev-dependencies]
itertools = "0.14.0" itertools = "0.14.0"

View File

@@ -1,3 +1,9 @@
//! Single-threaded binary safe AsyncWrite/AsyncRead pair
//!
//! The main entry point is [pipe]. [Writer] and [Reader] can just be sent
//! across binary boundaries. A change to the ABI constitutes a major version
//! break.
use std::alloc::{Layout, alloc, dealloc}; use std::alloc::{Layout, alloc, dealloc};
use std::pin::Pin; use std::pin::Pin;
use std::process::abort; use std::process::abort;
@@ -9,6 +15,8 @@ use futures::{AsyncRead, AsyncWrite};
fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") } fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") }
/// Create a ringbuffer with the specified byte capacity. Once the buffer is
/// exhausted, the writer will block.
pub fn pipe(size: usize) -> (Writer, Reader) { pub fn pipe(size: usize) -> (Writer, Reader) {
assert!(0 < size, "cannot create async pipe without buffer"); assert!(0 < size, "cannot create async pipe without buffer");
// SAFETY: the // SAFETY: the
@@ -158,6 +166,8 @@ fn broken_pipe_error() -> io::Error {
io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end") io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end")
} }
/// A binary safe [AsyncWrite] implementor writing to a ringbuffer created by
/// [pipe].
#[repr(C)] #[repr(C)]
pub struct Writer(*mut AsyncRingbuffer); pub struct Writer(*mut AsyncRingbuffer);
impl Writer { impl Writer {
@@ -192,50 +202,41 @@ impl AsyncWrite for Writer {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
let w = unsafe { unsafe {
let data = self.as_mut().get_state()?; let data = self.as_mut().get_state()?;
let AsyncRingbuffer { write_idx, read_idx, size, .. } = *data; let AsyncRingbuffer { write_idx, read_idx, size, .. } = *data;
if !buf.is_empty() && data.is_empty() { if !buf.is_empty() && data.is_empty() {
eprintln!("Wake reader");
data.read_waker.invoke(); data.read_waker.invoke();
} }
if !buf.is_empty() && data.is_full() { if !buf.is_empty() && data.is_full() {
eprintln!("Writer is blocked, waiting"); // Writer is blocked
data.writer_wait(cx.waker()) data.writer_wait(cx.waker())
} else if write_idx < read_idx { } else if write_idx < read_idx {
eprintln!("Non-wrapping backside write w={write_idx} < r={read_idx} <= s={size}"); // Non-wrapping backside write w < r <= s
let count = buf.len().min(read_idx - write_idx - 1); let count = buf.len().min(read_idx - write_idx - 1);
data.non_wrapping_write_unchecked(&buf[0..count]); data.non_wrapping_write_unchecked(&buf[0..count]);
Poll::Ready(Ok(count)) Poll::Ready(Ok(count))
} else if data.write_idx + buf.len() < size { } else if data.write_idx + buf.len() < size {
eprintln!( // Non-wrapping frontside write r <= w + b < s
"Non-wrapping frontside write r={read_idx} <= w={write_idx} + b={} < s={size}",
buf.len()
);
data.non_wrapping_write_unchecked(&buf[0..buf.len()]); data.non_wrapping_write_unchecked(&buf[0..buf.len()]);
Poll::Ready(Ok(buf.len())) Poll::Ready(Ok(buf.len()))
} else if read_idx == 0 { } else if read_idx == 0 {
eprintln!("Frontside write up to origin r=0 < s={size} < w={write_idx} + b={}", buf.len()); // Frontside write up to origin r=0 < s < w + b
data.non_wrapping_write_unchecked(&buf[0..size - write_idx - 1]); data.non_wrapping_write_unchecked(&buf[0..size - write_idx - 1]);
Poll::Ready(Ok(size - write_idx - 1)) Poll::Ready(Ok(size - write_idx - 1))
} else { } else {
let (end, start) = buf.split_at(size - write_idx); let (end, start) = buf.split_at(size - write_idx);
eprintln!("Wrapping write r={read_idx} < s={size} < w={write_idx} + b={}", buf.len()); // Wrapping write r < s < w + b
data.non_wrapping_write_unchecked(end); data.non_wrapping_write_unchecked(end);
let start_count = start.len().min(read_idx - 1); let start_count = start.len().min(read_idx - 1);
data.non_wrapping_write_unchecked(&start[0..start_count]); data.non_wrapping_write_unchecked(&start[0..start_count]);
Poll::Ready(Ok(end.len() + start_count)) Poll::Ready(Ok(end.len() + start_count))
} }
};
if let Poll::Ready(Ok(w)) = &w {
eprintln!("Wrote {w}")
} }
w
} }
} }
impl Drop for Writer { impl Drop for Writer {
fn drop(&mut self) { fn drop(&mut self) {
eprintln!("Dropping writer");
unsafe { unsafe {
if let Some(data) = self.0.as_mut() { if let Some(data) = self.0.as_mut() {
data.drop_writer(); data.drop_writer();
@@ -244,6 +245,8 @@ impl Drop for Writer {
} }
} }
/// A binary safe [AsyncRead] implementor reading from a ringbuffer created by
/// [pipe]
#[repr(C)] #[repr(C)]
pub struct Reader(*mut AsyncRingbuffer); pub struct Reader(*mut AsyncRingbuffer);
impl AsyncRead for Reader { impl AsyncRead for Reader {
@@ -252,44 +255,37 @@ impl AsyncRead for Reader {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
eprintln!("Beginning read of {}", buf.len()); unsafe {
let r = unsafe {
let data = self.0.as_mut().expect("Cannot be null"); let data = self.0.as_mut().expect("Cannot be null");
let AsyncRingbuffer { read_idx, write_idx, size, .. } = *data; let AsyncRingbuffer { read_idx, write_idx, size, .. } = *data;
if !buf.is_empty() && data.is_full() { if !buf.is_empty() && data.is_full() {
eprintln!("Wake writer");
data.write_waker.invoke(); data.write_waker.invoke();
} }
if !buf.is_empty() && data.is_empty() { if !buf.is_empty() && data.is_empty() {
eprintln!("Nothing to read, waiting..."); // Nothing to read, waiting...
data.reader_wait(cx.waker()) data.reader_wait(cx.waker())
} else if read_idx < write_idx { } else if read_idx < write_idx {
eprintln!("frontside non-wrapping read"); // Frontside non-wrapping read
let count = buf.len().min(write_idx - read_idx); let count = buf.len().min(write_idx - read_idx);
data.non_wrapping_read_unchecked(&mut buf[0..count]); data.non_wrapping_read_unchecked(&mut buf[0..count]);
Poll::Ready(Ok(count)) Poll::Ready(Ok(count))
} else if read_idx + buf.len() < size { } else if read_idx + buf.len() < size {
eprintln!("backside non-wrapping read"); // Backside non-wrapping read
data.non_wrapping_read_unchecked(buf); data.non_wrapping_read_unchecked(buf);
Poll::Ready(Ok(buf.len())) Poll::Ready(Ok(buf.len()))
} else { } else {
eprintln!("wrapping read"); // Wrapping read
let (end, start) = buf.split_at_mut(size - read_idx); let (end, start) = buf.split_at_mut(size - read_idx);
data.non_wrapping_read_unchecked(end); data.non_wrapping_read_unchecked(end);
let start_count = start.len().min(write_idx); let start_count = start.len().min(write_idx);
data.non_wrapping_read_unchecked(&mut start[0..start_count]); data.non_wrapping_read_unchecked(&mut start[0..start_count]);
Poll::Ready(Ok(end.len() + start_count)) Poll::Ready(Ok(end.len() + start_count))
} }
};
if let Poll::Ready(Ok(r)) = &r {
eprintln!("Read {r}")
} }
r
} }
} }
impl Drop for Reader { impl Drop for Reader {
fn drop(&mut self) { fn drop(&mut self) {
eprintln!("Dropping reader");
unsafe { unsafe {
if let Some(data) = self.0.as_mut() { if let Some(data) = self.0.as_mut() {
data.drop_reader(); data.drop_reader();
@@ -300,7 +296,6 @@ impl Drop for Reader {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Write;
use std::pin::pin; use std::pin::pin;
use futures::future::join; use futures::future::join;
@@ -316,16 +311,15 @@ mod tests {
fn basic_io() { fn basic_io() {
let mut w_rng = ChaCha8Rng::seed_from_u64(2); let mut w_rng = ChaCha8Rng::seed_from_u64(2);
let mut r_rng = ChaCha8Rng::seed_from_u64(1); let mut r_rng = ChaCha8Rng::seed_from_u64(1);
println!("Output check");
spin_on(async { spin_on(async {
let (w, r) = pipe(1024); let (w, r) = pipe(1024);
let test_length = 100_000; let test_length = 10_000_000;
let data = (0u32..test_length).flat_map(|num| num.to_be_bytes()); let data = (0u32..test_length).flat_map(|num| num.to_be_bytes());
let write_fut = async { let write_fut = async {
let mut w = pin!(w); let mut w = pin!(w);
let mut source = data.clone(); let mut source = data.clone();
let mut tally = 0; let mut tally = 0;
while tally < test_length * 8 { while tally < test_length * 4 {
let values = source.by_ref().take(w_rng.random_range(0..200)).collect::<Vec<_>>(); let values = source.by_ref().take(w_rng.random_range(0..200)).collect::<Vec<_>>();
tally += values.len() as u32; tally += values.len() as u32;
w.write_all(&values).await.unwrap(); w.write_all(&values).await.unwrap();
@@ -336,23 +330,12 @@ mod tests {
let mut r = pin!(r); let mut r = pin!(r);
let mut expected = data.clone(); let mut expected = data.clone();
let mut tally = 0; let mut tally = 0;
let mut aggregate = Vec::new(); while tally < test_length * 4 {
let mut expected_aggregate = Vec::new();
let mut percentage = 0;
while tally < test_length * 8 {
let next_percentage = tally * 100 / (test_length * 8);
if percentage < next_percentage {
percentage = next_percentage;
println!("{percentage}%");
io::stdout().flush().unwrap();
}
let expected_values = let expected_values =
expected.by_ref().take(r_rng.random_range(0..200)).collect::<Vec<_>>(); expected.by_ref().take(r_rng.random_range(0..200)).collect::<Vec<_>>();
tally += expected_values.len() as u32; tally += expected_values.len() as u32;
let mut values = vec![0; expected_values.len()]; let mut values = vec![0; expected_values.len()];
r.read_exact(&mut values[..]).await.unwrap_or_else(|e| panic!("At {tally} bytes: {e}")); r.read_exact(&mut values[..]).await.unwrap_or_else(|e| panic!("At {tally} bytes: {e}"));
aggregate.extend_from_slice(&values);
expected_aggregate.extend_from_slice(&expected_values);
if values != expected_values { if values != expected_values {
fn print_bytes(bytes: &[u8]) -> String { fn print_bytes(bytes: &[u8]) -> String {
(bytes.iter().map(|s| format!("{s:>2x}")).chunks(32).into_iter()) (bytes.iter().map(|s| format!("{s:>2x}")).chunks(32).into_iter())
@@ -361,12 +344,11 @@ mod tests {
} }
panic!( panic!(
"Difference in generated numbers\n{}\n{}", "Difference in generated numbers\n{}\n{}",
print_bytes(&aggregate), print_bytes(&values),
print_bytes(&expected_aggregate), print_bytes(&expected_values),
) )
} }
} }
eprintln!("Read {tally} correct bytes")
}; };
join(write_fut, read_fut).await; join(write_fut, read_fut).await;
}) })