From 6a3c1d591733bda7fbdfee80e9019a0a1fd7e280 Mon Sep 17 00:00:00 2001 From: Lawrence Bethlenfalvy Date: Sat, 17 Jan 2026 00:23:35 +0100 Subject: [PATCH] Introduced dylib extension format, cleared up shutdown sequence --- .cargo/config.toml | 2 +- Cargo.lock | 1 + orchid-api-derive/src/hierarchy.rs | 3 - orchid-api/src/binary.rs | 18 +++--- orchid-base/src/binary.rs | 37 ++++++------ orchid-base/src/reqnot.rs | 57 +++++++++--------- orchid-extension/Cargo.toml | 1 + orchid-extension/src/binary.rs | 21 ++++++- orchid-extension/src/entrypoint.rs | 6 +- orchid-extension/src/tokio.rs | 11 +++- orchid-host/src/dylib.rs | 13 ++++- orchid-host/src/extension.rs | 10 +++- orchid-std/Cargo.toml | 3 +- orchid-std/src/lib.rs | 11 ++-- orchid-std/src/main.rs | 10 ++-- orcx/src/main.rs | 29 +++++++++- rustfmt.toml | 1 - unsync-pipe/src/lib.rs | 93 ++++++++++++++++++++++-------- 18 files changed, 214 insertions(+), 113 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 5cecad4..063a563 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -5,7 +5,7 @@ orcxdb = "xtask orcxdb" [env] CARGO_WORKSPACE_DIR = { value = "", relative = true } -ORCHID_EXTENSIONS = "target/debug/orchid-std" +ORCHID_EXTENSIONS = "target/debug/orchid-std-dbg" ORCHID_DEFAULT_SYSTEMS = "orchid::std;orchid::macros" ORCHID_LOG_BUFFERS = "true" RUST_BACKTRACE = "1" diff --git a/Cargo.lock b/Cargo.lock index 133e3ac..91c655c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -926,6 +926,7 @@ dependencies = [ "tokio", "tokio-util", "trait-set", + "unsync-pipe", ] [[package]] diff --git a/orchid-api-derive/src/hierarchy.rs b/orchid-api-derive/src/hierarchy.rs index cf7847f..9a5d5e0 100644 --- a/orchid-api-derive/src/hierarchy.rs +++ b/orchid-api-derive/src/hierarchy.rs @@ -120,6 +120,3 @@ fn get_ancestry(input: &DeriveInput) -> Option> { fn is_extendable(input: &DeriveInput) -> bool { input.attrs.iter().any(|a| a.path().get_ident().is_some_and(|i| *i == "extendable")) } - -#[test] -fn test_wtf() { eprintln!("{}", gen_casts(&[quote!(ExtHostReq)], "e!(BogusReq))) } diff --git a/orchid-api/src/binary.rs b/orchid-api/src/binary.rs index 6b3ff8a..9820818 100644 --- a/orchid-api/src/binary.rs +++ b/orchid-api/src/binary.rs @@ -13,7 +13,7 @@ use unsync_pipe::{Reader, Writer}; /// interactions must reflect a single logical owner #[derive(Clone, Copy)] #[repr(C)] -pub struct OwnedWakerVT { +pub struct OwnedWakerBin { pub data: *const (), /// `self` pub drop: extern "C" fn(*const ()), @@ -24,7 +24,7 @@ pub struct OwnedWakerVT { } /// !Send !Sync, equivalent to `&mut Context<'a>`, hence no `drop`. -/// When received in [FutureVT::poll], it must not outlive the call. +/// When received in [FutureBin::poll], it must not outlive the call. /// /// You cannot directly wake using this waker, because such a trampoline would /// pass through the binary interface twice for no reason. An efficient @@ -33,10 +33,10 @@ pub struct OwnedWakerVT { /// it up. #[derive(Clone, Copy)] #[repr(C)] -pub struct FutureContextVT { +pub struct FutureContextBin { pub data: *const (), /// `&self` - pub waker: extern "C" fn(*const ()) -> OwnedWakerVT, + pub waker: extern "C" fn(*const ()) -> OwnedWakerBin, } /// ABI-stable `Poll<()>` @@ -53,24 +53,24 @@ pub enum UnitPoll { /// interactions must reflect a single logical owner #[derive(Clone, Copy)] #[repr(C)] -pub struct FutureVT { +pub struct FutureBin { pub data: *const (), /// `self` pub drop: extern "C" fn(*const ()), /// `&mut self` Equivalent to [Future::poll] - pub poll: extern "C" fn(*const (), FutureContextVT) -> UnitPoll, + pub poll: extern "C" fn(*const (), FutureContextBin) -> UnitPoll, } /// Handle for a runtime that allows its holder to spawn futures across dynamic /// library boundaries #[derive(Clone, Copy)] #[repr(C)] -pub struct Spawner { +pub struct SpawnerBin { pub data: *const (), /// `self` pub drop: extern "C" fn(*const ()), /// `&self` Add a future to this extension's task - pub spawn: extern "C" fn(*const (), FutureVT), + pub spawn: extern "C" fn(*const (), FutureBin), } /// Extension context. @@ -80,7 +80,7 @@ pub struct Spawner { #[repr(C)] pub struct ExtensionContext { /// Spawns tasks associated with this extension - pub spawner: Spawner, + pub spawner: SpawnerBin, /// serialized [crate::HostExtChannel] pub input: Reader, /// serialized [crate::ExtHostChannel] diff --git a/orchid-base/src/binary.rs b/orchid-base/src/binary.rs index 7b57df5..d3e1a62 100644 --- a/orchid-base/src/binary.rs +++ b/orchid-base/src/binary.rs @@ -1,14 +1,15 @@ +use std::mem; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -use orchid_api::binary::{FutureContextVT, FutureVT, OwnedWakerVT, UnitPoll}; +use orchid_api::binary::{FutureBin, FutureContextBin, OwnedWakerBin, UnitPoll}; type WideBox = Box>; static OWNED_VTABLE: RawWakerVTable = RawWakerVTable::new( |data| { - let data = unsafe { Rc::::from_raw(data as *const _) }; + let data = unsafe { Rc::::from_raw(data as *const _) }; let val = RawWaker::new(Rc::into_raw(data.clone()) as *const (), &OWNED_VTABLE); // Clone must create a duplicate of the Rc, so it has to be un-leaked, cloned, // then leaked again. @@ -18,30 +19,32 @@ static OWNED_VTABLE: RawWakerVTable = RawWakerVTable::new( |data| { // Wake must awaken the task and then clean up the state, so the waker must be // un-leaked - let data = unsafe { Rc::::from_raw(data as *const _) }; + let data = unsafe { Rc::::from_raw(data as *const _) }; (data.wake)(data.data); + mem::drop(data); }, |data| { // Wake-by-ref must awaken the task while preserving the future, so the Rc is // untouched - let data = unsafe { (data as *const OwnedWakerVT).as_ref() }.unwrap(); + let data = unsafe { (data as *const OwnedWakerBin).as_ref() }.unwrap(); (data.wake_ref)(data.data); }, |data| { // Drop must clean up the state, so the waker must be un-leaked - let data = unsafe { Rc::::from_raw(data as *const _) }; + let data = unsafe { Rc::::from_raw(data as *const _) }; (data.drop)(data.data); + mem::drop(data); }, ); struct BorrowedWakerData<'a> { go_around: &'a mut bool, - cx: FutureContextVT, + cx: FutureContextBin, } static BORROWED_VTABLE: RawWakerVTable = RawWakerVTable::new( |data| { let data = unsafe { (data as *mut BorrowedWakerData).as_mut() }.unwrap(); - let owned_data = Rc::::new((data.cx.waker)(data.cx.data)); + let owned_data = Rc::::new((data.cx.waker)(data.cx.data)); RawWaker::new(Rc::into_raw(owned_data) as *const (), &OWNED_VTABLE) }, |data| *unsafe { (data as *mut BorrowedWakerData).as_mut() }.unwrap().go_around = true, @@ -51,13 +54,13 @@ static BORROWED_VTABLE: RawWakerVTable = RawWakerVTable::new( /// Convert a future to a binary-compatible format that can be sent across /// dynamic library boundaries -pub fn future_to_vt + 'static>(fut: Fut) -> FutureVT { +pub fn future_to_vt + 'static>(fut: Fut) -> FutureBin { let wide_box = Box::new(fut) as WideBox; let data = Box::into_raw(Box::new(wide_box)); extern "C" fn drop(raw: *const ()) { - std::mem::drop(unsafe { Box::::from_raw(raw as *mut _) }) + mem::drop(unsafe { Box::::from_raw(raw as *mut _) }) } - extern "C" fn poll(raw: *const (), cx: FutureContextVT) -> UnitPoll { + extern "C" fn poll(raw: *const (), cx: FutureContextBin) -> UnitPoll { let mut this = unsafe { Pin::new_unchecked(&mut **(raw as *mut WideBox).as_mut().unwrap()) }; loop { let mut go_around = false; @@ -77,22 +80,22 @@ pub fn future_to_vt + 'static>(fut: Fut) -> FutureVT { } } } - FutureVT { data: data as *const _, drop, poll } + FutureBin { data: data as *const _, drop, poll } } struct VirtualFuture { - vt: FutureVT, + vt: FutureBin, } impl Unpin for VirtualFuture {} impl Future for VirtualFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - extern "C" fn waker(raw: *const ()) -> OwnedWakerVT { + extern "C" fn waker(raw: *const ()) -> OwnedWakerBin { let waker = unsafe { (raw as *mut Context).as_mut() }.unwrap().waker().clone(); let data = Box::into_raw(Box::::new(waker)) as *const (); - return OwnedWakerVT { data, drop, wake, wake_ref }; + return OwnedWakerBin { data, drop, wake, wake_ref }; extern "C" fn drop(raw: *const ()) { - std::mem::drop(unsafe { Box::::from_raw(raw as *mut Waker) }) + mem::drop(unsafe { Box::::from_raw(raw as *mut Waker) }) } extern "C" fn wake(raw: *const ()) { unsafe { Box::::from_raw(raw as *mut Waker) }.wake(); @@ -101,7 +104,7 @@ impl Future for VirtualFuture { unsafe { (raw as *mut Waker).as_mut() }.unwrap().wake_by_ref(); } } - let cx = FutureContextVT { data: cx as *mut Context as *const (), waker }; + let cx = FutureContextBin { data: cx as *mut Context as *const (), waker }; let result = (self.vt.poll)(self.vt.data, cx); match result { UnitPoll::Pending => Poll::Pending, @@ -115,4 +118,4 @@ impl Drop for VirtualFuture { /// Receive a future sent across dynamic library boundaries and convert it into /// an owned object -pub fn vt_to_future(vt: FutureVT) -> impl Future { VirtualFuture { vt } } +pub fn vt_to_future(vt: FutureBin) -> impl Future { VirtualFuture { vt } } diff --git a/orchid-base/src/reqnot.rs b/orchid-base/src/reqnot.rs index 53d9768..1c73c71 100644 --- a/orchid-base/src/reqnot.rs +++ b/orchid-base/src/reqnot.rs @@ -299,10 +299,15 @@ impl<'a> MsgWriter<'a> for IoNotifWriter { pub struct CommCtx { exit: Sender<()>, + o: Rc>>>, } impl CommCtx { - pub async fn exit(self) { self.exit.clone().send(()).await.expect("quit channel dropped"); } + pub async fn exit(self) -> io::Result<()> { + self.o.lock().await.as_mut().close().await?; + self.exit.clone().send(()).await.expect("quit channel dropped"); + Ok(()) + } } /// Establish bidirectional request-notification communication over a duplex @@ -313,13 +318,14 @@ impl CommCtx { /// check that the correct message families are sent in the correct directions /// across the channel. pub fn io_comm( - o: Rc>>>, - i: Mutex>>, + o: Pin>, + i: Pin>, ) -> (impl Client + 'static, CommCtx, IoCommServer) { - let i = Rc::new(i); + let i = Rc::new(Mutex::new(i)); + let o = Rc::new(Mutex::new(o)); let (onsub, client) = IoClient::new(o.clone()); let (exit, onexit) = channel(1); - (client, CommCtx { exit }, IoCommServer { o, i, onsub, onexit }) + (client, CommCtx { exit, o: o.clone() }, IoCommServer { o, i, onsub, onexit }) } pub struct IoCommServer { o: Rc>>>, @@ -345,15 +351,12 @@ impl IoCommServer { let mut g = Bound::async_new(i.clone(), async |i| i.lock().await).await; match u64::decode(g.as_mut()).await { Ok(id) => h.emit(Event::Input(id, g)).await, - Err(e) - if matches!( - e.kind(), - io::ErrorKind::BrokenPipe - | io::ErrorKind::ConnectionAborted - | io::ErrorKind::UnexpectedEof - ) => - h.emit(Event::Exit).await, - Err(e) => return Err(e), + Err(e) => match e.kind() { + io::ErrorKind::BrokenPipe + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::UnexpectedEof => h.emit(Event::Exit).await, + _ => return Err(e), + }, } } }); @@ -419,10 +422,8 @@ impl IoCommServer { #[cfg(test)] mod test { use std::cell::RefCell; - use std::rc::Rc; use futures::channel::mpsc; - use futures::lock::Mutex; use futures::{SinkExt, StreamExt, join}; use orchid_api_derive::{Coding, Hierarchy}; use orchid_api_traits::Request; @@ -444,9 +445,8 @@ mod test { let (in1, out2) = pipe(1024); let (in2, out1) = pipe(1024); let (received, mut on_receive) = mpsc::channel(2); - let (_, recv_ctx, recv_srv) = - io_comm(Rc::new(Mutex::new(Box::pin(in2))), Mutex::new(Box::pin(out2))); - let (sender, ..) = io_comm(Rc::new(Mutex::new(Box::pin(in1))), Mutex::new(Box::pin(out1))); + let (_, recv_ctx, recv_srv) = io_comm(Box::pin(in2), Box::pin(out2)); + let (sender, ..) = io_comm(Box::pin(in1), Box::pin(out1)); join!( async { recv_srv @@ -465,7 +465,7 @@ mod test { assert_eq!(on_receive.next().await, Some(TestNotif(3))); sender.notify(TestNotif(4)).await.unwrap(); assert_eq!(on_receive.next().await, Some(TestNotif(4))); - recv_ctx.exit().await; + recv_ctx.exit().await.unwrap(); } ); })) @@ -484,10 +484,8 @@ mod test { spin_on(with_logger(logger, async { let (in1, out2) = pipe(1024); let (in2, out1) = pipe(1024); - let (_, srv_ctx, srv) = - io_comm(Rc::new(Mutex::new(Box::pin(in2))), Mutex::new(Box::pin(out2))); - let (client, client_ctx, client_srv) = - io_comm(Rc::new(Mutex::new(Box::pin(in1))), Mutex::new(Box::pin(out1))); + let (_, srv_ctx, srv) = io_comm(Box::pin(in2), Box::pin(out2)); + let (client, client_ctx, client_srv) = io_comm(Box::pin(in1), Box::pin(out1)); join!( async { srv @@ -513,8 +511,8 @@ mod test { async { let response = client.request(DummyRequest(5)).await.unwrap(); assert_eq!(response, 6); - srv_ctx.exit().await; - client_ctx.exit().await; + srv_ctx.exit().await.unwrap(); + client_ctx.exit().await.unwrap(); } ); })) @@ -527,9 +525,8 @@ mod test { let (input1, output1) = pipe(1024); let (input2, output2) = pipe(1024); let (reply_client, reply_context, reply_server) = - io_comm(Rc::new(Mutex::new(Box::pin(input1))), Mutex::new(Box::pin(output2))); - let (req_client, req_context, req_server) = - io_comm(Rc::new(Mutex::new(Box::pin(input2))), Mutex::new(Box::pin(output1))); + io_comm(Box::pin(input1), Box::pin(output2)); + let (req_client, req_context, req_server) = io_comm(Box::pin(input2), Box::pin(output1)); let reply_context = RefCell::new(Some(reply_context)); let (exit, onexit) = futures::channel::oneshot::channel::<()>(); join!( @@ -539,7 +536,7 @@ mod test { async |hand| { let _notif = hand.read::().await.unwrap(); let context = reply_context.borrow_mut().take().unwrap(); - context.exit().await; + context.exit().await?; Ok(()) }, async |mut hand| { diff --git a/orchid-extension/Cargo.toml b/orchid-extension/Cargo.toml index 7a7147a..af47d84 100644 --- a/orchid-extension/Cargo.toml +++ b/orchid-extension/Cargo.toml @@ -36,6 +36,7 @@ tokio = { version = "1.49.0", optional = true, features = [] } tokio-util = { version = "0.7.17", optional = true, features = ["compat"] } trait-set = "0.3.0" +unsync-pipe = { version = "0.2.0", path = "../unsync-pipe" } [features] tokio = ["dep:tokio", "dep:tokio-util"] diff --git a/orchid-extension/src/binary.rs b/orchid-extension/src/binary.rs index d870f3f..416b1bf 100644 --- a/orchid-extension/src/binary.rs +++ b/orchid-extension/src/binary.rs @@ -9,7 +9,7 @@ use crate::ext_port::ExtPort; pub type ExtCx = api::binary::ExtensionContext; -struct Spawner(api::binary::Spawner); +struct Spawner(api::binary::SpawnerBin); impl Drop for Spawner { fn drop(&mut self) { (self.0.drop)(self.0.data) } } @@ -28,3 +28,22 @@ pub fn orchid_extension_main_body(cx: ExtCx, builder: ExtensionBuilder) { spawn: Rc::new(move |fut| spawner.spawn(fut)), }); } + +/// Generate entrypoint for the dylib extension loader +/// +/// # Usage +/// +/// ``` +/// dylib_main! { +/// ExtensionBuilder::new("orchid-std::main") +/// } +/// ``` +#[macro_export] +macro_rules! dylib_main { + ($builder:expr) => { + #[unsafe(no_mangle)] + pub extern "C" fn orchid_extension_main(cx: ::orchid_api::binary::ExtensionContext) { + $crate::binary::orchid_extension_main_body(cx, $builder); + } + }; +} diff --git a/orchid-extension/src/entrypoint.rs b/orchid-extension/src/entrypoint.rs index 0c52271..ccb5ccf 100644 --- a/orchid-extension/src/entrypoint.rs +++ b/orchid-extension/src/entrypoint.rs @@ -6,7 +6,6 @@ use std::rc::Rc; use std::{io, mem}; use futures::future::{LocalBoxFuture, join_all}; -use futures::lock::Mutex; use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, StreamExt, stream}; use hashbrown::HashMap; use itertools::Itertools; @@ -50,7 +49,7 @@ task_local::task_local! { fn get_client() -> Rc { CLIENT.get() } pub async fn exit() { let cx = CTX.get().borrow_mut().take(); - cx.unwrap().exit().await + cx.unwrap().exit().await.unwrap() } /// Sent the client used for global [request] and [notify] functions within the @@ -139,8 +138,7 @@ impl ExtensionBuilder { ctx.output.as_mut().flush().await.unwrap(); let logger1 = LoggerImpl::from_api(&host_header.logger); let logger2 = logger1.clone(); - let (client, comm_ctx, extension_srv) = - io_comm(Rc::new(Mutex::new(ctx.output)), Mutex::new(ctx.input)); + let (client, comm_ctx, extension_srv) = io_comm(ctx.output, ctx.input); let extension_fut = extension_srv.listen( async |n: Box>| { let notif = n.read().await.unwrap(); diff --git a/orchid-extension/src/tokio.rs b/orchid-extension/src/tokio.rs index 33a856f..7e61135 100644 --- a/orchid-extension/src/tokio.rs +++ b/orchid-extension/src/tokio.rs @@ -10,7 +10,7 @@ use crate::ext_port::ExtPort; /// value returned by [crate::system_ctor::SystemCtor::inst] to initiate /// shutdown. #[cfg(feature = "tokio")] -pub async fn tokio_main(builder: ExtensionBuilder) -> ! { +pub async fn tokio_entrypoint(builder: ExtensionBuilder) { use tokio::io::{stderr, stdin, stdout}; use tokio::task::{LocalSet, spawn_local}; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; @@ -27,5 +27,12 @@ pub async fn tokio_main(builder: ExtensionBuilder) -> ! { }); }); local_set.await; - std::process::exit(0) +} + +#[macro_export] +macro_rules! tokio_main { + ($builder:expr) => { + #[tokio::main] + pub async fn main() { $crate::tokio::tokio_entrypoint($builder).await } + }; } diff --git a/orchid-host/src/dylib.rs b/orchid-host/src/dylib.rs index 376b9d2..2080da7 100644 --- a/orchid-host/src/dylib.rs +++ b/orchid-host/src/dylib.rs @@ -1,3 +1,4 @@ +use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; @@ -37,7 +38,13 @@ pub async fn ext_dylib(path: &Path, ctx: Ctx) -> Result {}", line.expect("Readline implies this")).await; + match line { + Ok(line) => writeln!(log("stderr"), "{log_path} err> {line}").await, + Err(e) => match e.kind() { + io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof => break, + _ => panic!("Error while reading stderr {e}"), + }, + } } }); let library = load_dylib(path)?; @@ -45,10 +52,10 @@ pub async fn ext_dylib(path: &Path, ctx: Ctx) -> Result(R, Sender); pub struct ExtensionData { name: String, ctx: Ctx, + comm_cx: Option, join_ext: Option>, client: Rc, systems: Vec, @@ -58,8 +61,10 @@ impl Drop for ExtensionData { fn drop(&mut self) { let client = self.client.clone(); let join_ext = self.join_ext.take().expect("Only called once in Drop"); + let comm_cx = self.comm_cx.take().expect("Only used here"); stash(async move { client.notify(api::HostExtNotif::Exit).await.unwrap(); + comm_cx.exit().await.unwrap(); join_ext.join().await; }) } @@ -76,7 +81,7 @@ impl Extension { let header2 = header.clone(); Ok(Self(Rc::new_cyclic(|weak: &Weak| { // context not needed because exit is extension-initiated - let (client, _, comm) = io_comm(Rc::new(Mutex::new(init.input)), Mutex::new(init.output)); + let (client, comm_cx, comm) = io_comm(init.input, init.output); let weak2 = weak; let weak = weak.clone(); let ctx2 = ctx.clone(); @@ -274,6 +279,7 @@ impl Extension { ExtensionData { name: header2.name.clone(), ctx: ctx2, + comm_cx: Some(comm_cx), systems: (header.systems.iter().cloned()) .map(|decl| SystemCtor { decl, ext: WeakExtension(weak2.clone()) }) .collect(), diff --git a/orchid-std/Cargo.toml b/orchid-std/Cargo.toml index c9f31e5..2491d81 100644 --- a/orchid-std/Cargo.toml +++ b/orchid-std/Cargo.toml @@ -4,11 +4,12 @@ version = "0.1.0" edition = "2024" [[bin]] -name = "orchid-std" +name = "orchid-std-dbg" path = "src/main.rs" [lib] crate-type = ["cdylib", "lib"] +name = "orchid_std" path = "src/lib.rs" [dependencies] diff --git a/orchid-std/src/lib.rs b/orchid-std/src/lib.rs index 6a840a1..4941a0d 100644 --- a/orchid-std/src/lib.rs +++ b/orchid-std/src/lib.rs @@ -11,12 +11,11 @@ pub use std::tuple::{HomoTpl, Tpl, Tuple, UntypedTuple}; pub use macros::macro_system::MacroSystem; pub use macros::mactree::{MacTok, MacTree}; use orchid_api as api; -use orchid_extension::binary::orchid_extension_main_body; +use orchid_extension::dylib_main; use orchid_extension::entrypoint::ExtensionBuilder; -pub extern "C" fn orchid_extension_main(cx: api::binary::ExtensionContext) { - orchid_extension_main_body( - cx, - ExtensionBuilder::new("orchid-std::main").system(StdSystem).system(MacroSystem), - ); +pub fn builder() -> ExtensionBuilder { + ExtensionBuilder::new("orchid-std::main").system(StdSystem).system(MacroSystem) } + +dylib_main! { builder() } diff --git a/orchid-std/src/main.rs b/orchid-std/src/main.rs index 215ac37..683a16f 100644 --- a/orchid-std/src/main.rs +++ b/orchid-std/src/main.rs @@ -1,8 +1,6 @@ -use orchid_extension::entrypoint::ExtensionBuilder; -use orchid_extension::tokio::tokio_main; -use orchid_std::{MacroSystem, StdSystem}; +use orchid_extension::tokio_main; +use orchid_std::builder; -#[tokio::main(flavor = "current_thread")] -pub async fn main() { - tokio_main(ExtensionBuilder::new("orchid-std::main").system(StdSystem).system(MacroSystem)).await +tokio_main! { + builder() } diff --git a/orcx/src/main.rs b/orcx/src/main.rs index d5973d2..a1287e5 100644 --- a/orcx/src/main.rs +++ b/orcx/src/main.rs @@ -1,4 +1,5 @@ use orchid_base::logging::Logger; +use orchid_host::dylib::ext_dylib; use tokio::time::Instant; pub mod parse_folder; @@ -9,7 +10,7 @@ use std::process::{Command, ExitCode}; use std::rc::Rc; use async_fn_stream::try_stream; -use camino::Utf8PathBuf; +use camino::{Utf8Path, Utf8PathBuf}; use clap::{Parser, Subcommand}; use futures::future::LocalBoxFuture; use futures::{FutureExt, Stream, TryStreamExt, io}; @@ -100,10 +101,32 @@ fn get_all_extensions<'a>( args: &'a Args, ctx: &'a Ctx, ) -> impl Stream> + 'a { + fn not_found_error(ext_path: &Utf8Path) -> io::Error { + io::Error::new( + std::io::ErrorKind::NotFound, + format!("None of the file candidates for {ext_path} were found"), + ) + } try_stream(async |mut cx| { for ext_path in args.extension.iter() { - let exe = if cfg!(windows) { ext_path.with_extension("exe") } else { ext_path.clone() }; - let init = ext_command(Command::new(exe.as_os_str()), ctx.clone()).await?; + let init = if cfg!(windows) { + if ext_path.with_extension("dll").exists() { + let dylib = + ext_dylib(ext_path.with_extension("dll").as_std_path(), ctx.clone()).await.unwrap(); + eprintln!("Loaded DLL {ext_path}.dll"); + dylib + } else if ext_path.with_extension("exe").exists() { + ext_command(Command::new(ext_path.with_extension("exe").as_os_str()), ctx.clone()).await? + } else { + return Err(not_found_error(ext_path)); + } + } else if ext_path.with_extension("so").exists() { + ext_dylib(ext_path.with_extension("so").as_std_path(), ctx.clone()).await.unwrap() + } else if ext_path.exists() { + ext_command(Command::new(ext_path.as_os_str()), ctx.clone()).await? + } else { + return Err(not_found_error(ext_path)); + }; cx.emit(Extension::new(init, ctx.clone()).await?).await; } Ok(cx) diff --git a/rustfmt.toml b/rustfmt.toml index 2ab8844..b28bf34 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,5 +1,4 @@ # meta -format_code_in_doc_comments = true unstable_features = true style_edition = "2024" diff --git a/unsync-pipe/src/lib.rs b/unsync-pipe/src/lib.rs index 227d7d7..9594893 100644 --- a/unsync-pipe/src/lib.rs +++ b/unsync-pipe/src/lib.rs @@ -27,6 +27,7 @@ pub fn pipe(size: usize) -> (Writer, Reader) { size, mut read_waker, mut write_waker, + mut flush_waker, reader_dropped, writer_dropped, // irrelevant if correctly dropped @@ -37,11 +38,11 @@ pub fn pipe(size: usize) -> (Writer, Reader) { state: _, } = *unsafe { Box::from_raw(val as *mut AsyncRingbuffer) }; if !writer_dropped || !reader_dropped { - eprintln!("Pipe dropped in err before reader or writer"); abort() } read_waker.drop(); write_waker.drop(); + flush_waker.drop(); unsafe { dealloc(start, pipe_layout(size)) } } let state = Box::into_raw(Box::new(AsyncRingbuffer { @@ -52,6 +53,7 @@ pub fn pipe(size: usize) -> (Writer, Reader) { write_idx: 0, read_waker: Trigger::empty(), write_waker: Trigger::empty(), + flush_waker: Trigger::empty(), reader_dropped: false, writer_dropped: false, drop, @@ -108,18 +110,21 @@ struct AsyncRingbuffer { write_idx: usize, read_waker: Trigger, write_waker: Trigger, + flush_waker: Trigger, reader_dropped: bool, writer_dropped: bool, drop: extern "C" fn(*const ()), } impl AsyncRingbuffer { fn drop_writer(&mut self) { + self.read_waker.invoke(); self.writer_dropped = true; if self.reader_dropped { (self.drop)(self.state) } } fn drop_reader(&mut self) { + self.write_waker.invoke(); self.reader_dropped = true; if self.writer_dropped { (self.drop)(self.state) @@ -134,6 +139,15 @@ impl AsyncRingbuffer { self.write_waker = Trigger::new(waker.clone()); Poll::Pending } + fn flush_wait(&mut self, waker: &Waker) -> Poll> { + if self.reader_dropped { + return Poll::Ready(Err(broken_pipe_error())); + } + self.read_waker.invoke(); + self.flush_waker.drop(); + self.flush_waker = Trigger::new(waker.clone()); + Poll::Pending + } fn reader_wait(&mut self, waker: &Waker) -> Poll> { if self.writer_dropped { return Poll::Ready(Err(broken_pipe_error())); @@ -157,6 +171,36 @@ impl AsyncRingbuffer { } fn is_full(&self) -> bool { (self.write_idx + 1) % self.size == self.read_idx } fn is_empty(&self) -> bool { self.write_idx == self.read_idx } + fn buf_free(&self) -> usize { + let Self { read_idx, write_idx, size, .. } = self; + if write_idx < read_idx { *read_idx - write_idx - 1 } else { size - write_idx + read_idx } + } + fn wrapping_write_unchecked(&mut self, buf: &[u8]) -> usize { + unsafe { + let Self { read_idx, write_idx, size, .. } = *self; + if write_idx < read_idx { + // Non-wrapping backside write w < r <= s + let count = buf.len().min(read_idx - write_idx - 1); + self.non_wrapping_write_unchecked(&buf[0..count]); + count + } else if write_idx + buf.len() < size { + // Non-wrapping frontside write r <= w + b < s + self.non_wrapping_write_unchecked(&buf[0..buf.len()]); + buf.len() + } else if read_idx == 0 { + // Frontside write up to origin r=0 < s < w + b + self.non_wrapping_write_unchecked(&buf[0..size - write_idx - 1]); + size - write_idx - 1 + } else { + let (end, start) = buf.split_at(size - write_idx); + // Wrapping write r < s < w + b + self.non_wrapping_write_unchecked(end); + let start_count = start.len().min(read_idx - 1); + self.non_wrapping_write_unchecked(&start[0..start_count]); + end.len() + start_count + } + } + } } fn already_closed_error() -> io::Error { @@ -166,6 +210,12 @@ fn broken_pipe_error() -> io::Error { io::Error::new(io::ErrorKind::BrokenPipe, "Pipe already closed from other end") } +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum SyncWriteError { + BufferFull, + AlreadyClosed, +} + /// A binary safe [AsyncWrite] implementor writing to a ringbuffer created by /// [pipe]. #[repr(C)] @@ -177,6 +227,17 @@ impl Writer { None => Err(already_closed_error()), } } + pub fn try_write_all(self: Pin<&mut Self>, data: &[u8]) -> Result<(), SyncWriteError> { + unsafe { + let state = self.get_state().map_err(|_| SyncWriteError::AlreadyClosed)?; + if state.buf_free() <= data.len() { + return Err(SyncWriteError::BufferFull); + } + state.wrapping_write_unchecked(data); + state.write_waker.invoke(); + Ok(()) + } + } } impl AsyncWrite for Writer { fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { @@ -194,7 +255,7 @@ impl AsyncWrite for Writer { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { unsafe { let data = self.as_mut().get_state()?; - if data.is_empty() { Poll::Ready(Ok(())) } else { data.writer_wait(cx.waker()) } + if data.is_empty() { Poll::Ready(Ok(())) } else { data.flush_wait(cx.waker()) } } } fn poll_write( @@ -204,33 +265,13 @@ impl AsyncWrite for Writer { ) -> Poll> { unsafe { let data = self.as_mut().get_state()?; - let AsyncRingbuffer { write_idx, read_idx, size, .. } = *data; if !buf.is_empty() && data.is_empty() { data.read_waker.invoke(); } if !buf.is_empty() && data.is_full() { - // Writer is blocked data.writer_wait(cx.waker()) - } else if write_idx < read_idx { - // Non-wrapping backside write w < r <= s - let count = buf.len().min(read_idx - write_idx - 1); - data.non_wrapping_write_unchecked(&buf[0..count]); - Poll::Ready(Ok(count)) - } else if data.write_idx + buf.len() < size { - // Non-wrapping frontside write r <= w + b < s - data.non_wrapping_write_unchecked(&buf[0..buf.len()]); - Poll::Ready(Ok(buf.len())) - } else if read_idx == 0 { - // Frontside write up to origin r=0 < s < w + b - data.non_wrapping_write_unchecked(&buf[0..size - write_idx - 1]); - Poll::Ready(Ok(size - write_idx - 1)) } else { - let (end, start) = buf.split_at(size - write_idx); - // Wrapping write r < s < w + b - data.non_wrapping_write_unchecked(end); - let start_count = start.len().min(read_idx - 1); - data.non_wrapping_write_unchecked(&start[0..start_count]); - Poll::Ready(Ok(end.len() + start_count)) + Poll::Ready(Ok(data.wrapping_write_unchecked(buf))) } } } @@ -261,7 +302,7 @@ impl AsyncRead for Reader { if !buf.is_empty() && data.is_full() { data.write_waker.invoke(); } - if !buf.is_empty() && data.is_empty() { + let poll = if !buf.is_empty() && data.is_empty() { // Nothing to read, waiting... data.reader_wait(cx.waker()) } else if read_idx < write_idx { @@ -280,7 +321,11 @@ impl AsyncRead for Reader { let start_count = start.len().min(write_idx); data.non_wrapping_read_unchecked(&mut start[0..start_count]); Poll::Ready(Ok(end.len() + start_count)) + }; + if !buf.is_empty() && data.is_empty() { + data.flush_waker.invoke(); } + poll } } }