partway through fixes, macro system needs resdesign
Some checks failed
Rust / build (push) Has been cancelled

This commit is contained in:
2026-04-08 18:02:20 +02:00
parent 0909524dee
commit 9b4c7fa7d7
76 changed files with 1391 additions and 1065 deletions

View File

@@ -16,3 +16,4 @@ test_executors = "0.4.1"
[dependencies]
futures-io = "0.3.31"
itertools = "0.14.0"

View File

@@ -12,6 +12,7 @@ use std::task::{Context, Poll, Waker};
use std::{io, mem};
use futures_io::{AsyncRead, AsyncWrite};
use itertools::Itertools;
fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") }
@@ -130,7 +131,15 @@ impl AsyncRingbuffer {
}
fn writer_wait<T>(&mut self, waker: &Waker) -> Poll<io::Result<T>> {
if self.reader_dropped {
return Poll::Ready(Err(broken_pipe_error()));
let mut buf = vec![0; self.size];
let count = self.wrapping_read(&mut buf);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
format!(
"Pipe already closed from reader end with {count}b remaining in the buffer: [{}]",
buf[..count].chunks(4).map(|c| c.iter().map(|b| format!("{b:02x}")).join(" ")).join(" ")
),
)));
}
self.write_waker.drop();
self.write_waker = Trigger::new(waker.clone());
@@ -138,7 +147,10 @@ impl AsyncRingbuffer {
}
fn reader_wait(&mut self, waker: &Waker) -> Poll<io::Result<usize>> {
if self.writer_dropped {
return Poll::Ready(Err(broken_pipe_error()));
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"Pipe already closed from writer end.",
)));
}
self.read_waker.drop();
self.read_waker = Trigger::new(waker.clone());
@@ -150,6 +162,14 @@ impl AsyncRingbuffer {
unsafe { &mut *slc }.copy_from_slice(buf);
self.write_idx = (self.write_idx + buf.len()) % self.size;
}
/// Read a number of bytes from the reader head, then reset the head if
/// necessary.
///
/// # Safety
///
/// This function does not check for obstacles such as the writer head or the
/// end of the buffer. It is up to the caller to ensure that the requested
/// number of consecutive bytes is available.
unsafe fn non_wrapping_read_unchecked(&mut self, buf: &mut [u8]) {
let read_ptr = unsafe { self.start.add(self.read_idx) };
let slc = slice_from_raw_parts(read_ptr, buf.len()).cast_mut();
@@ -188,14 +208,33 @@ impl AsyncRingbuffer {
}
}
}
/// Read as many bytes as possible into the buffer, and return how many bytes
/// were written
fn wrapping_read(&mut self, buf: &mut [u8]) -> usize {
let AsyncRingbuffer { read_idx, write_idx, size, .. } = *self;
if read_idx < write_idx {
// Frontside non-wrapping read
let count = buf.len().min(write_idx - read_idx);
unsafe { self.non_wrapping_read_unchecked(&mut buf[0..count]) };
count
} else if read_idx + buf.len() < size {
// Backside non-wrapping read
unsafe { self.non_wrapping_read_unchecked(buf) };
buf.len()
} else {
// Wrapping read
let (end, start) = buf.split_at_mut(size - read_idx);
unsafe { self.non_wrapping_read_unchecked(end) };
let start_count = start.len().min(write_idx);
unsafe { self.non_wrapping_read_unchecked(&mut start[0..start_count]) };
end.len() + start_count
}
}
}
fn already_closed_error() -> io::Error {
io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from this end")
}
fn broken_pipe_error() -> io::Error {
io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end")
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum SyncWriteError {
@@ -286,37 +325,20 @@ impl AsyncRead for Reader {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unsafe {
let data = self.0.as_mut().expect("Cannot be null");
let AsyncRingbuffer { read_idx, write_idx, size, .. } = *data;
if !buf.is_empty() && data.is_full() {
data.wake_writer();
}
let poll = if !buf.is_empty() && data.is_empty() {
// Nothing to read, waiting...
data.reader_wait(cx.waker())
} else if read_idx < write_idx {
// Frontside non-wrapping read
let count = buf.len().min(write_idx - read_idx);
data.non_wrapping_read_unchecked(&mut buf[0..count]);
Poll::Ready(Ok(count))
} else if read_idx + buf.len() < size {
// Backside non-wrapping read
data.non_wrapping_read_unchecked(buf);
Poll::Ready(Ok(buf.len()))
} else {
// Wrapping read
let (end, start) = buf.split_at_mut(size - read_idx);
data.non_wrapping_read_unchecked(end);
let start_count = start.len().min(write_idx);
data.non_wrapping_read_unchecked(&mut start[0..start_count]);
Poll::Ready(Ok(end.len() + start_count))
};
if data.is_empty() {
data.wake_writer();
}
poll
let data = unsafe { self.0.as_mut().expect("Cannot be null") };
if !buf.is_empty() && data.is_full() {
data.wake_writer();
}
let poll = if !buf.is_empty() && data.is_empty() {
// Nothing to read, waiting...
data.reader_wait(cx.waker())
} else {
Poll::Ready(Ok(data.wrapping_read(buf)))
};
if data.is_empty() {
data.wake_writer();
}
poll
}
}
impl Drop for Reader {