use std::alloc::{Layout, alloc, dealloc}; use std::io; use std::pin::Pin; use std::process::abort; use std::ptr::{null, null_mut, slice_from_raw_parts}; use std::task::{Context, Poll, Waker}; use futures::{AsyncRead, AsyncWrite}; fn pipe_layout(bs: usize) -> Layout { Layout::from_size_align(bs, 1).expect("1-align is trivial") } pub fn pipe(size: usize) -> (Writer, Reader) { assert!(0 < size, "cannot create async pipe without buffer"); // SAFETY: the let start = unsafe { alloc(pipe_layout(size)) }; extern "C" fn drop(val: *const ()) { let AsyncRingbuffer { start, size, mut read_waker, mut write_waker, reader_dropped, writer_dropped, // irrelevant if correctly dropped read_idx: _, write_idx: _, // data used to make this call drop: _, state: _, } = *unsafe { Box::from_raw(val as *mut AsyncRingbuffer) }; if !writer_dropped || !reader_dropped { eprintln!("Pipe dropped in err before reader or writer"); abort() } read_waker.drop(); write_waker.drop(); unsafe { dealloc(start, pipe_layout(size)) } } let state = Box::into_raw(Box::new(AsyncRingbuffer { start, size, state: null(), read_idx: 0, write_idx: 0, read_waker: Trigger::empty(), write_waker: Trigger::empty(), reader_dropped: false, writer_dropped: false, drop, })); let state_mut = unsafe { state.as_mut().unwrap() }; state_mut.state = state as *const (); (Writer(state_mut as *mut _), Reader(state_mut as *mut _)) } /// A single-fire empty event, to be distributed by value. Either one of the /// functions can be called exactly once. #[repr(C)] struct Trigger { state: *const (), invoke: extern "C" fn(*const ()), drop: extern "C" fn(*const ()), } impl Trigger { fn new(waker: Waker) -> Self { let state = Box::into_raw(Box::new(waker)) as *const (); extern "C" fn drop(state: *const ()) { unsafe { Box::from_raw(state as *mut Waker) }; } extern "C" fn invoke(state: *const ()) { unsafe { Box::from_raw(state as *mut Waker) }.wake(); } Self { state, invoke, drop } } fn empty() -> Self { extern "C" fn empty_fn_ptr(_: *const ()) { abort() } Self { state: null(), drop: empty_fn_ptr, invoke: empty_fn_ptr } } fn is_empty(&self) -> bool { self.state.is_null() } fn invoke(&mut self) { if let Some(this) = self.take() { (this.invoke)(this.state) } } fn drop(&mut self) { if let Some(this) = self.take() { (this.drop)(this.state) } } fn take(&mut self) -> Option { (!self.is_empty()).then(|| std::mem::replace(self, Self::empty())) } } /// A ringbuffer for single-threaded synchronized communication. #[repr(C)] struct AsyncRingbuffer { state: *const (), start: *mut u8, size: usize, read_idx: usize, write_idx: usize, read_waker: Trigger, write_waker: Trigger, reader_dropped: bool, writer_dropped: bool, drop: extern "C" fn(*const ()), } impl AsyncRingbuffer { fn drop_writer(&mut self) { self.writer_dropped = true; if self.reader_dropped { (self.drop)(self.state) } } fn drop_reader(&mut self) { self.reader_dropped = true; if self.writer_dropped { (self.drop)(self.state) } } fn writer_wait(&mut self, waker: &Waker) -> Poll> { if self.reader_dropped { return Poll::Ready(Err(broken_pipe_error())); } self.read_waker.invoke(); self.write_waker.drop(); self.write_waker = Trigger::new(waker.clone()); Poll::Pending } fn reader_wait(&mut self, waker: &Waker) -> Poll> { if self.writer_dropped { return Poll::Ready(Err(broken_pipe_error())); } self.write_waker.invoke(); self.read_waker.drop(); self.read_waker = Trigger::new(waker.clone()); Poll::Pending } unsafe fn non_wrapping_write_unchecked(&mut self, buf: &[u8]) { let write_ptr = unsafe { self.start.offset(self.write_idx as isize) }; let slc = slice_from_raw_parts(write_ptr, buf.len()).cast_mut(); unsafe { &mut *slc }.copy_from_slice(buf); self.write_idx = (self.write_idx + buf.len()) % self.size; } unsafe fn non_wrapping_read_unchecked(&mut self, buf: &mut [u8]) { let read_ptr = unsafe { self.start.offset(self.read_idx as isize) }; let slc = slice_from_raw_parts(read_ptr, buf.len()).cast_mut(); buf.copy_from_slice(unsafe { &*slc }); self.read_idx = (self.read_idx + buf.len()) % self.size; } fn is_full(&self) -> bool { (self.write_idx + 1) % self.size == self.read_idx } fn is_empty(&self) -> bool { self.write_idx == self.read_idx } } fn already_closed_error() -> io::Error { io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from this end") } fn broken_pipe_error() -> io::Error { io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end") } #[repr(C)] pub struct Writer(*mut AsyncRingbuffer); impl Writer { unsafe fn get_state(self: Pin<&mut Self>) -> io::Result<&mut AsyncRingbuffer> { match unsafe { self.0.as_mut() } { Some(data) => Ok(data), None => Err(already_closed_error()), } } } impl AsyncWrite for Writer { fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { unsafe { match self.as_mut().get_state() { Err(e) => return Poll::Ready(Err(e)), Ok(data) => { data.drop_writer(); }, } } self.0 = null_mut(); Poll::Ready(Ok(())) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { unsafe { let data = self.as_mut().get_state()?; if data.read_idx == data.write_idx { Poll::Ready(Ok(())) } else { data.writer_wait(cx.waker()) } } } fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { unsafe { let data = self.as_mut().get_state()?; if !buf.is_empty() && data.is_full() { data.writer_wait(cx.waker()) } else if data.write_idx < data.read_idx { // non-wrapping write on backside let count = buf.len().min(data.read_idx - data.write_idx - 1); data.non_wrapping_write_unchecked(&buf[0..count]); Poll::Ready(Ok(count)) } else if data.write_idx + buf.len() < data.size { // non-wrapping write on frontside data.non_wrapping_write_unchecked(&buf[0..buf.len()]); Poll::Ready(Ok(buf.len())) } else { // wrapping write let (end, start) = buf.split_at(data.size - data.write_idx); data.non_wrapping_write_unchecked(end); let start_count = start.len().min(data.read_idx - 1); data.non_wrapping_write_unchecked(&start[0..start_count]); Poll::Ready(Ok(end.len() + start_count)) } } } } impl Drop for Writer { fn drop(&mut self) { unsafe { if let Some(data) = self.0.as_mut() { data.drop_writer(); } } } } #[repr(C)] pub struct Reader(*mut AsyncRingbuffer); impl AsyncRead for Reader { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { unsafe { let data = self.0.as_mut().expect("Cannot be null"); if buf.len() != 0 && data.is_full() { data.write_waker.invoke(); } if data.is_empty() { data.reader_wait(cx.waker()) } else if data.read_idx < data.write_idx { // frontside non-wrapping read let count = buf.len().min(data.write_idx - data.read_idx); data.non_wrapping_read_unchecked(&mut buf[0..count]); Poll::Ready(Ok(count)) } else if data.read_idx + buf.len() < data.size { // backside non-wrapping read data.non_wrapping_read_unchecked(buf); Poll::Ready(Ok(buf.len())) } else { // wrapping read let (end, start) = buf.split_at_mut(data.size - data.read_idx); data.non_wrapping_read_unchecked(end); let start_count = start.len().min(data.write_idx); data.non_wrapping_read_unchecked(&mut start[0..start_count]); Poll::Ready(Ok(end.len() + start_count)) } } } } impl Drop for Reader { fn drop(&mut self) { unsafe { if let Some(data) = self.0.as_mut() { data.drop_reader(); } } } }