From 224c4ecca2a0730853e735159ce8153429aed9bc Mon Sep 17 00:00:00 2001 From: Lawrence Bethlenfalvy Date: Sat, 13 Dec 2025 02:28:10 +0100 Subject: [PATCH] Added unsync-pipe with some tests --- Cargo.lock | 194 ++++++++++++++++----- Cargo.toml | 22 +-- orchid-base/src/pipe.rs | 273 ----------------------------- unsync-pipe/Cargo.toml | 13 ++ unsync-pipe/src/lib.rs | 374 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 546 insertions(+), 330 deletions(-) create mode 100644 unsync-pipe/Cargo.toml create mode 100644 unsync-pipe/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 8c16c87..4671c66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom", + "getrandom 0.2.15", "once_cell", "version_check", ] @@ -115,7 +115,7 @@ name = "async-fn-stream" version = "0.1.0" dependencies = [ "futures", - "test_executors", + "test_executors 0.3.5", ] [[package]] @@ -269,7 +269,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88376f98b48312155a0ba2f868ad705f5d5b9a1065514b1f827e31c1d2f3dbb0" dependencies = [ - "logwise 0.3.0", + "logwise 0.4.0", ] [[package]] @@ -692,6 +692,18 @@ dependencies = [ "wasi", ] +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", +] + [[package]] name = "gimli" version = "0.31.1" @@ -799,9 +811,9 @@ checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "js-sys" -version = "0.3.77" +version = "0.3.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" dependencies = [ "once_cell", "wasm-bindgen", @@ -858,12 +870,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "log" -version = "0.4.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" - [[package]] name = "logwise" version = "0.2.3" @@ -889,6 +895,20 @@ dependencies = [ "web-time", ] +[[package]] +name = "logwise" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d72d41b7fd35d1314749844b571794e635721b3a78b30d275b5f430dedf0723e" +dependencies = [ + "logwise_proc 0.4.0", + "wasm-bindgen", + "wasm_safe_mutex", + "wasm_thread", + "web-sys", + "web-time", +] + [[package]] name = "logwise_proc" version = "0.1.1" @@ -901,6 +921,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7494645c8b485fcc0acaa8f382ef8cc081110838536a380e3b3048427f628306" +[[package]] +name = "logwise_proc" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc76389d352c1151645790c3cf157a8b2b84b5faf081fd25686a146e253fb0a8" + [[package]] name = "memchr" version = "2.7.4" @@ -984,7 +1010,7 @@ dependencies = [ "orchid-api-derive", "orchid-api-traits", "ordered-float", - "test_executors", + "test_executors 0.3.5", ] [[package]] @@ -1032,7 +1058,7 @@ dependencies = [ "rust-embed", "some_executor", "substack", - "test_executors", + "test_executors 0.3.5", "trait-set", ] @@ -1091,7 +1117,7 @@ dependencies = [ "ordered-float", "pastey", "substack", - "test_executors", + "test_executors 0.3.5", "trait-set", ] @@ -1116,7 +1142,7 @@ dependencies = [ "rust_decimal", "subslice-offset", "substack", - "test_executors", + "test_executors 0.3.5", "tokio", ] @@ -1299,6 +1325,12 @@ dependencies = [ "proc-macro2 1.0.101", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "radium" version = "0.7.0" @@ -1312,8 +1344,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -1323,7 +1365,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -1332,7 +1384,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.4", ] [[package]] @@ -1455,7 +1516,7 @@ dependencies = [ "borsh", "bytes", "num-traits", - "rand", + "rand 0.8.5", "rkyv", "serde", "serde_json", @@ -1711,6 +1772,20 @@ dependencies = [ "web-time", ] +[[package]] +name = "test_executors" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18edea7d204c604e14fde7c82ce4685b60a8e8bef019ea7b8dc053e7995f7ffc" +dependencies = [ + "logwise 0.4.0", + "some_executor", + "test_executors_proc", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-time", +] + [[package]] name = "test_executors_proc" version = "0.3.0" @@ -1869,6 +1944,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" +[[package]] +name = "unsync-pipe" +version = "0.1.0" +dependencies = [ + "futures", + "itertools", + "rand 0.9.2", + "rand_chacha 0.9.0", + "test_executors 0.4.0", +] + [[package]] name = "utf8parse" version = "0.2.2" @@ -1904,36 +1990,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] -name = "wasm-bindgen" -version = "0.2.100" +name = "wasip2" +version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" dependencies = [ "cfg-if", "once_cell", "rustversion", "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" -dependencies = [ - "bumpalo", - "log", - "proc-macro2 1.0.101", - "quote 1.0.40", - "syn 2.0.106", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.50" +version = "0.4.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" +checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c" dependencies = [ "cfg-if", "js-sys", @@ -1944,9 +2026,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.100" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" dependencies = [ "quote 1.0.40", "wasm-bindgen-macro-support", @@ -1954,26 +2036,38 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.100" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" dependencies = [ + "bumpalo", "proc-macro2 1.0.101", "quote 1.0.40", "syn 2.0.106", - "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.100" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm_safe_mutex" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f69a2ad6ca6df36405b47a2314f0a28804d349ca98394a83af9598843e916f1" +dependencies = [ + "continue", + "wasm-bindgen", + "wasm_thread", + "web-time", +] + [[package]] name = "wasm_thread" version = "0.3.3" @@ -1988,9 +2082,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.77" +version = "0.3.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" +checksum = "9b32828d774c412041098d182a8b38b16ea816958e07cf40eec2bc080ae137ac" dependencies = [ "js-sys", "wasm-bindgen", @@ -2106,6 +2200,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index a63d212..0982d91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,14 +2,16 @@ resolver = "2" members = [ - "orcx", - "orchid-std", - "orchid-host", - "orchid-extension", - "orchid-base", - "orchid-api", - "orchid-api-derive", - "orchid-api-traits", - "stdio-perftest", - "xtask", "async-fn-stream", + "orcx", + "orchid-std", + "orchid-host", + "orchid-extension", + "orchid-base", + "orchid-api", + "orchid-api-derive", + "orchid-api-traits", + "stdio-perftest", + "xtask", + "async-fn-stream", + "unsync-pipe", ] diff --git a/orchid-base/src/pipe.rs b/orchid-base/src/pipe.rs index 3f99b9e..8b13789 100644 --- a/orchid-base/src/pipe.rs +++ b/orchid-base/src/pipe.rs @@ -1,274 +1 @@ -use std::alloc::{Layout, alloc, dealloc}; -use std::io; -use std::pin::Pin; -use std::process::abort; -use std::ptr::{null, null_mut, slice_from_raw_parts}; -use std::task::{Context, Poll, Waker}; -use futures::{AsyncRead, AsyncWrite}; - -fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") } - -pub fn pipe(size: usize) -> (Writer, Reader) { - assert!(0 < size, "cannot create async pipe without buffer"); - // SAFETY: the - let start = unsafe { alloc(pipe_layout(size)) }; - extern "C" fn drop(val: *const ()) { - let AsyncRingbuffer { - start, - size, - mut read_waker, - mut write_waker, - reader_dropped, - writer_dropped, - // irrelevant if correctly dropped - read_idx: _, - write_idx: _, - // data used to make this call - drop: _, - state: _, - } = *unsafe { Box::from_raw(val as *mut AsyncRingbuffer) }; - if !writer_dropped || !reader_dropped { - eprintln!("Pipe dropped in err before reader or writer"); - abort() - } - read_waker.drop(); - write_waker.drop(); - unsafe { dealloc(start, pipe_layout(size)) } - } - let state = Box::into_raw(Box::new(AsyncRingbuffer { - start, - size, - state: null(), - read_idx: 0, - write_idx: 0, - read_waker: Trigger::empty(), - write_waker: Trigger::empty(), - reader_dropped: false, - writer_dropped: false, - drop, - })); - let state_mut = unsafe { state.as_mut().unwrap() }; - state_mut.state = state as *const (); - (Writer(state_mut as *mut _), Reader(state_mut as *mut _)) -} - -/// A single-fire empty event, to be distributed by value. Either one of the -/// functions can be called exactly once. -#[repr(C)] -struct Trigger { - state: *const (), - invoke: extern "C" fn(*const ()), - drop: extern "C" fn(*const ()), -} -impl Trigger { - fn new(waker: Waker) -> Self { - let state = Box::into_raw(Box::new(waker)) as *const (); - extern "C" fn drop(state: *const ()) { unsafe { Box::from_raw(state as *mut Waker) }; } - extern "C" fn invoke(state: *const ()) { unsafe { Box::from_raw(state as *mut Waker) }.wake(); } - Self { state, invoke, drop } - } - fn empty() -> Self { - extern "C" fn empty_fn_ptr(_: *const ()) { abort() } - Self { state: null(), drop: empty_fn_ptr, invoke: empty_fn_ptr } - } - fn is_empty(&self) -> bool { self.state.is_null() } - fn invoke(&mut self) { - if let Some(this) = self.take() { - (this.invoke)(this.state) - } - } - fn drop(&mut self) { - if let Some(this) = self.take() { - (this.drop)(this.state) - } - } - fn take(&mut self) -> Option { - (!self.is_empty()).then(|| std::mem::replace(self, Self::empty())) - } -} - -/// A ringbuffer for single-threaded synchronized communication. -#[repr(C)] -struct AsyncRingbuffer { - state: *const (), - start: *mut u8, - size: usize, - read_idx: usize, - write_idx: usize, - read_waker: Trigger, - write_waker: Trigger, - reader_dropped: bool, - writer_dropped: bool, - drop: extern "C" fn(*const ()), -} -impl AsyncRingbuffer { - fn drop_writer(&mut self) { - self.writer_dropped = true; - if self.reader_dropped { - (self.drop)(self.state) - } - } - fn drop_reader(&mut self) { - self.reader_dropped = true; - if self.writer_dropped { - (self.drop)(self.state) - } - } - fn writer_wait(&mut self, waker: &Waker) -> Poll> { - if self.reader_dropped { - return Poll::Ready(Err(broken_pipe_error())); - } - self.read_waker.invoke(); - self.write_waker.drop(); - self.write_waker = Trigger::new(waker.clone()); - Poll::Pending - } - fn reader_wait(&mut self, waker: &Waker) -> Poll> { - if self.writer_dropped { - return Poll::Ready(Err(broken_pipe_error())); - } - self.write_waker.invoke(); - self.read_waker.drop(); - self.read_waker = Trigger::new(waker.clone()); - Poll::Pending - } - unsafe fn non_wrapping_write_unchecked(&mut self, buf: &[u8]) { - let write_ptr = unsafe { self.start.offset(self.write_idx as isize) }; - let slc = slice_from_raw_parts(write_ptr, buf.len()).cast_mut(); - unsafe { &mut *slc }.copy_from_slice(buf); - self.write_idx = (self.write_idx + buf.len()) % self.size; - } - unsafe fn non_wrapping_read_unchecked(&mut self, buf: &mut [u8]) { - let read_ptr = unsafe { self.start.offset(self.read_idx as isize) }; - let slc = slice_from_raw_parts(read_ptr, buf.len()).cast_mut(); - buf.copy_from_slice(unsafe { &*slc }); - self.read_idx = (self.read_idx + buf.len()) % self.size; - } - fn is_full(&self) -> bool { (self.write_idx + 1) % self.size == self.read_idx } - fn is_empty(&self) -> bool { self.write_idx == self.read_idx } -} - -fn already_closed_error() -> io::Error { - io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from this end") -} -fn broken_pipe_error() -> io::Error { - io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end") -} - -#[repr(C)] -pub struct Writer(*mut AsyncRingbuffer); -impl Writer { - unsafe fn get_state(self: Pin<&mut Self>) -> io::Result<&mut AsyncRingbuffer> { - match unsafe { self.0.as_mut() } { - Some(data) => Ok(data), - None => Err(already_closed_error()), - } - } -} -impl AsyncWrite for Writer { - fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - unsafe { - match self.as_mut().get_state() { - Err(e) => return Poll::Ready(Err(e)), - Ok(data) => { - data.drop_writer(); - }, - } - } - self.0 = null_mut(); - Poll::Ready(Ok(())) - } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - let data = self.as_mut().get_state()?; - if data.read_idx == data.write_idx { - Poll::Ready(Ok(())) - } else { - data.writer_wait(cx.waker()) - } - } - } - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - unsafe { - let data = self.as_mut().get_state()?; - if !buf.is_empty() && data.is_full() { - data.writer_wait(cx.waker()) - } else if data.write_idx < data.read_idx { - // non-wrapping write on backside - let count = buf.len().min(data.read_idx - data.write_idx - 1); - data.non_wrapping_write_unchecked(&buf[0..count]); - Poll::Ready(Ok(count)) - } else if data.write_idx + buf.len() < data.size { - // non-wrapping write on frontside - data.non_wrapping_write_unchecked(&buf[0..buf.len()]); - Poll::Ready(Ok(buf.len())) - } else { - // wrapping write - let (end, start) = buf.split_at(data.size - data.write_idx); - data.non_wrapping_write_unchecked(end); - let start_count = start.len().min(data.read_idx - 1); - data.non_wrapping_write_unchecked(&start[0..start_count]); - Poll::Ready(Ok(end.len() + start_count)) - } - } - } -} -impl Drop for Writer { - fn drop(&mut self) { - unsafe { - if let Some(data) = self.0.as_mut() { - data.drop_writer(); - } - } - } -} - -#[repr(C)] -pub struct Reader(*mut AsyncRingbuffer); -impl AsyncRead for Reader { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - unsafe { - let data = self.0.as_mut().expect("Cannot be null"); - if buf.len() != 0 && data.is_full() { - data.write_waker.invoke(); - } - if data.is_empty() { - data.reader_wait(cx.waker()) - } else if data.read_idx < data.write_idx { - // frontside non-wrapping read - let count = buf.len().min(data.write_idx - data.read_idx); - data.non_wrapping_read_unchecked(&mut buf[0..count]); - Poll::Ready(Ok(count)) - } else if data.read_idx + buf.len() < data.size { - // backside non-wrapping read - data.non_wrapping_read_unchecked(buf); - Poll::Ready(Ok(buf.len())) - } else { - // wrapping read - let (end, start) = buf.split_at_mut(data.size - data.read_idx); - data.non_wrapping_read_unchecked(end); - let start_count = start.len().min(data.write_idx); - data.non_wrapping_read_unchecked(&mut start[0..start_count]); - Poll::Ready(Ok(end.len() + start_count)) - } - } - } -} -impl Drop for Reader { - fn drop(&mut self) { - unsafe { - if let Some(data) = self.0.as_mut() { - data.drop_reader(); - } - } - } -} diff --git a/unsync-pipe/Cargo.toml b/unsync-pipe/Cargo.toml new file mode 100644 index 0000000..3c005db --- /dev/null +++ b/unsync-pipe/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "unsync-pipe" +version = "0.1.0" +edition = "2024" + +[dev-dependencies] +itertools = "0.14.0" +rand = "0.9.2" +rand_chacha = "0.9.0" +test_executors = "0.4.0" + +[dependencies] +futures = "0.3.31" diff --git a/unsync-pipe/src/lib.rs b/unsync-pipe/src/lib.rs new file mode 100644 index 0000000..6a03163 --- /dev/null +++ b/unsync-pipe/src/lib.rs @@ -0,0 +1,374 @@ +use std::alloc::{Layout, alloc, dealloc}; +use std::pin::Pin; +use std::process::abort; +use std::ptr::{null, null_mut, slice_from_raw_parts}; +use std::task::{Context, Poll, Waker}; +use std::{io, mem}; + +use futures::{AsyncRead, AsyncWrite}; + +fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") } + +pub fn pipe(size: usize) -> (Writer, Reader) { + assert!(0 < size, "cannot create async pipe without buffer"); + // SAFETY: the + let start = unsafe { alloc(pipe_layout(size)) }; + extern "C" fn drop(val: *const ()) { + let AsyncRingbuffer { + start, + size, + mut read_waker, + mut write_waker, + reader_dropped, + writer_dropped, + // irrelevant if correctly dropped + read_idx: _, + write_idx: _, + // data used to make this call + drop: _, + state: _, + } = *unsafe { Box::from_raw(val as *mut AsyncRingbuffer) }; + if !writer_dropped || !reader_dropped { + eprintln!("Pipe dropped in err before reader or writer"); + abort() + } + read_waker.drop(); + write_waker.drop(); + unsafe { dealloc(start, pipe_layout(size)) } + } + let state = Box::into_raw(Box::new(AsyncRingbuffer { + start, + size, + state: null(), + read_idx: 0, + write_idx: 0, + read_waker: Trigger::empty(), + write_waker: Trigger::empty(), + reader_dropped: false, + writer_dropped: false, + drop, + })); + let state_mut = unsafe { state.as_mut().unwrap() }; + state_mut.state = state as *const (); + (Writer(state_mut as *mut _), Reader(state_mut as *mut _)) +} + +/// A single-fire empty event, to be distributed by value. Either one of the +/// functions can be called exactly once. +#[repr(C)] +struct Trigger { + state: *const (), + invoke: extern "C" fn(*const ()), + drop: extern "C" fn(*const ()), +} +impl Trigger { + fn new(waker: Waker) -> Self { + let state = Box::into_raw(Box::new(waker)) as *const (); + extern "C" fn drop(state: *const ()) { + unsafe { mem::drop(Box::from_raw(state as *mut Waker)) }; + } + extern "C" fn invoke(state: *const ()) { unsafe { Box::from_raw(state as *mut Waker) }.wake(); } + Self { state, invoke, drop } + } + fn empty() -> Self { + extern "C" fn empty_fn_ptr(_: *const ()) { abort() } + Self { state: null(), drop: empty_fn_ptr, invoke: empty_fn_ptr } + } + fn is_empty(&self) -> bool { self.state.is_null() } + fn invoke(&mut self) { + if let Some(this) = self.take() { + (this.invoke)(this.state) + } + } + fn drop(&mut self) { + if let Some(this) = self.take() { + (this.drop)(this.state) + } + } + fn take(&mut self) -> Option { + (!self.is_empty()).then(|| std::mem::replace(self, Self::empty())) + } +} + +/// A ringbuffer for single-threaded synchronized communication. +#[repr(C)] +struct AsyncRingbuffer { + state: *const (), + start: *mut u8, + size: usize, + read_idx: usize, + write_idx: usize, + read_waker: Trigger, + write_waker: Trigger, + reader_dropped: bool, + writer_dropped: bool, + drop: extern "C" fn(*const ()), +} +impl AsyncRingbuffer { + fn drop_writer(&mut self) { + self.writer_dropped = true; + if self.reader_dropped { + (self.drop)(self.state) + } + } + fn drop_reader(&mut self) { + self.reader_dropped = true; + if self.writer_dropped { + (self.drop)(self.state) + } + } + fn writer_wait(&mut self, waker: &Waker) -> Poll> { + if self.reader_dropped { + return Poll::Ready(Err(broken_pipe_error())); + } + self.read_waker.invoke(); + self.write_waker.drop(); + self.write_waker = Trigger::new(waker.clone()); + Poll::Pending + } + fn reader_wait(&mut self, waker: &Waker) -> Poll> { + if self.writer_dropped { + return Poll::Ready(Err(broken_pipe_error())); + } + self.write_waker.invoke(); + self.read_waker.drop(); + self.read_waker = Trigger::new(waker.clone()); + Poll::Pending + } + unsafe fn non_wrapping_write_unchecked(&mut self, buf: &[u8]) { + let write_ptr = unsafe { self.start.add(self.write_idx) }; + let slc = slice_from_raw_parts(write_ptr, buf.len()).cast_mut(); + unsafe { &mut *slc }.copy_from_slice(buf); + self.write_idx = (self.write_idx + buf.len()) % self.size; + } + unsafe fn non_wrapping_read_unchecked(&mut self, buf: &mut [u8]) { + let read_ptr = unsafe { self.start.add(self.read_idx) }; + let slc = slice_from_raw_parts(read_ptr, buf.len()).cast_mut(); + buf.copy_from_slice(unsafe { &*slc }); + self.read_idx = (self.read_idx + buf.len()) % self.size; + } + fn is_full(&self) -> bool { (self.write_idx + 1) % self.size == self.read_idx } + fn is_empty(&self) -> bool { self.write_idx == self.read_idx } +} + +fn already_closed_error() -> io::Error { + io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from this end") +} +fn broken_pipe_error() -> io::Error { + io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end") +} + +#[repr(C)] +pub struct Writer(*mut AsyncRingbuffer); +impl Writer { + unsafe fn get_state(self: Pin<&mut Self>) -> io::Result<&mut AsyncRingbuffer> { + match unsafe { self.0.as_mut() } { + Some(data) => Ok(data), + None => Err(already_closed_error()), + } + } +} +impl AsyncWrite for Writer { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + unsafe { + match self.as_mut().get_state() { + Err(e) => return Poll::Ready(Err(e)), + Ok(data) => { + data.drop_writer(); + }, + } + } + self.0 = null_mut(); + Poll::Ready(Ok(())) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let data = self.as_mut().get_state()?; + if data.is_empty() { Poll::Ready(Ok(())) } else { data.writer_wait(cx.waker()) } + } + } + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let w = unsafe { + let data = self.as_mut().get_state()?; + let AsyncRingbuffer { write_idx, read_idx, size, .. } = *data; + if !buf.is_empty() && data.is_empty() { + eprintln!("Wake reader"); + data.read_waker.invoke(); + } + if !buf.is_empty() && data.is_full() { + eprintln!("Writer is blocked, waiting"); + data.writer_wait(cx.waker()) + } else if write_idx < read_idx { + eprintln!("Non-wrapping backside write w={write_idx} < r={read_idx} <= s={size}"); + let count = buf.len().min(read_idx - write_idx - 1); + data.non_wrapping_write_unchecked(&buf[0..count]); + Poll::Ready(Ok(count)) + } else if data.write_idx + buf.len() < size { + eprintln!( + "Non-wrapping frontside write r={read_idx} <= w={write_idx} + b={} < s={size}", + buf.len() + ); + data.non_wrapping_write_unchecked(&buf[0..buf.len()]); + Poll::Ready(Ok(buf.len())) + } else if read_idx == 0 { + eprintln!("Frontside write up to origin r=0 < s={size} < w={write_idx} + b={}", buf.len()); + data.non_wrapping_write_unchecked(&buf[0..size - write_idx - 1]); + Poll::Ready(Ok(size - write_idx - 1)) + } else { + let (end, start) = buf.split_at(size - write_idx); + eprintln!("Wrapping write r={read_idx} < s={size} < w={write_idx} + b={}", buf.len()); + data.non_wrapping_write_unchecked(end); + let start_count = start.len().min(read_idx - 1); + data.non_wrapping_write_unchecked(&start[0..start_count]); + Poll::Ready(Ok(end.len() + start_count)) + } + }; + if let Poll::Ready(Ok(w)) = &w { + eprintln!("Wrote {w}") + } + w + } +} +impl Drop for Writer { + fn drop(&mut self) { + eprintln!("Dropping writer"); + unsafe { + if let Some(data) = self.0.as_mut() { + data.drop_writer(); + } + } + } +} + +#[repr(C)] +pub struct Reader(*mut AsyncRingbuffer); +impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + eprintln!("Beginning read of {}", buf.len()); + let r = unsafe { + let data = self.0.as_mut().expect("Cannot be null"); + let AsyncRingbuffer { read_idx, write_idx, size, .. } = *data; + if !buf.is_empty() && data.is_full() { + eprintln!("Wake writer"); + data.write_waker.invoke(); + } + if !buf.is_empty() && data.is_empty() { + eprintln!("Nothing to read, waiting..."); + data.reader_wait(cx.waker()) + } else if read_idx < write_idx { + eprintln!("frontside non-wrapping read"); + let count = buf.len().min(write_idx - read_idx); + data.non_wrapping_read_unchecked(&mut buf[0..count]); + Poll::Ready(Ok(count)) + } else if read_idx + buf.len() < size { + eprintln!("backside non-wrapping read"); + data.non_wrapping_read_unchecked(buf); + Poll::Ready(Ok(buf.len())) + } else { + eprintln!("wrapping read"); + let (end, start) = buf.split_at_mut(size - read_idx); + data.non_wrapping_read_unchecked(end); + let start_count = start.len().min(write_idx); + data.non_wrapping_read_unchecked(&mut start[0..start_count]); + Poll::Ready(Ok(end.len() + start_count)) + } + }; + if let Poll::Ready(Ok(r)) = &r { + eprintln!("Read {r}") + } + r + } +} +impl Drop for Reader { + fn drop(&mut self) { + eprintln!("Dropping reader"); + unsafe { + if let Some(data) = self.0.as_mut() { + data.drop_reader(); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::io::Write; + use std::pin::pin; + + use futures::future::join; + use futures::{AsyncReadExt, AsyncWriteExt}; + use itertools::Itertools; + use rand::{Rng, SeedableRng}; + use rand_chacha::ChaCha8Rng; + use test_executors::spin_on; + + use super::*; + + #[test] + fn basic_io() { + let mut w_rng = ChaCha8Rng::seed_from_u64(2); + let mut r_rng = ChaCha8Rng::seed_from_u64(1); + println!("Output check"); + spin_on(async { + let (w, r) = pipe(1024); + let test_length = 100_000; + let data = (0u32..test_length).flat_map(|num| num.to_be_bytes()); + let write_fut = async { + let mut w = pin!(w); + let mut source = data.clone(); + let mut tally = 0; + while tally < test_length * 8 { + let values = source.by_ref().take(w_rng.random_range(0..200)).collect::>(); + tally += values.len() as u32; + w.write_all(&values).await.unwrap(); + } + w.flush().await.unwrap(); + }; + let read_fut = async { + let mut r = pin!(r); + let mut expected = data.clone(); + let mut tally = 0; + let mut aggregate = Vec::new(); + let mut expected_aggregate = Vec::new(); + let mut percentage = 0; + while tally < test_length * 8 { + let next_percentage = tally * 100 / (test_length * 8); + if percentage < next_percentage { + percentage = next_percentage; + println!("{percentage}%"); + io::stdout().flush().unwrap(); + } + let expected_values = + expected.by_ref().take(r_rng.random_range(0..200)).collect::>(); + tally += expected_values.len() as u32; + let mut values = vec![0; expected_values.len()]; + r.read_exact(&mut values[..]).await.unwrap_or_else(|e| panic!("At {tally} bytes: {e}")); + aggregate.extend_from_slice(&values); + expected_aggregate.extend_from_slice(&expected_values); + if values != expected_values { + fn print_bytes(bytes: &[u8]) -> String { + (bytes.iter().map(|s| format!("{s:>2x}")).chunks(32).into_iter()) + .map(|c| c.into_iter().join(" ")) + .join("\n") + } + panic!( + "Difference in generated numbers\n{}\n{}", + print_bytes(&aggregate), + print_bytes(&expected_aggregate), + ) + } + } + eprintln!("Read {tally} correct bytes") + }; + join(write_fut, read_fut).await; + }) + } +}