use std::cell::RefCell; use std::io::{self, Write}; use std::pin::Pin; use async_process::{self, Child, ChildStdin, ChildStdout}; use futures::future::LocalBoxFuture; use futures::io::BufReader; use futures::lock::Mutex; use futures::{self, AsyncBufReadExt, AsyncWriteExt}; use orchid_api_traits::{Decode, Encode}; use orchid_base::builtin::{ExtInit, ExtPort}; use orchid_base::logging::Logger; use orchid_base::msg::{recv_msg, send_msg}; use crate::api; use crate::ctx::Ctx; pub async fn ext_command( cmd: std::process::Command, logger: Logger, msg_logs: Logger, ctx: Ctx, ) -> io::Result { let mut child = async_process::Command::from(cmd) .stdin(async_process::Stdio::piped()) .stdout(async_process::Stdio::piped()) .stderr(async_process::Stdio::piped()) .spawn()?; let mut stdin = child.stdin.take().unwrap(); api::HostHeader { log_strategy: logger.strat(), msg_logs: msg_logs.strat() } .encode(Pin::new(&mut stdin)) .await; let mut stdout = child.stdout.take().unwrap(); let header = api::ExtensionHeader::decode(Pin::new(&mut stdout)).await; let child_stderr = child.stderr.take().unwrap(); (ctx.spawn)(Box::pin(async move { let mut reader = BufReader::new(child_stderr); loop { let mut buf = String::new(); if 0 == reader.read_line(&mut buf).await.unwrap() { break; } logger.log(buf.strip_suffix('\n').expect("Readline implies this")); } })); Ok(ExtInit { port: Box::new(Subprocess { name: header.name.clone(), child: RefCell::new(Some(child)), stdin: Some(Mutex::new(Box::pin(stdin))), stdout: Mutex::new(Box::pin(stdout)), ctx, }), header, }) } pub struct Subprocess { name: String, child: RefCell>, stdin: Option>>>, stdout: Mutex>>, ctx: Ctx, } impl Drop for Subprocess { fn drop(&mut self) { let mut child = self.child.borrow_mut().take().unwrap(); let name = self.name.clone(); if std::thread::panicking() { eprintln!("Killing extension {name}"); // we don't really care to handle errors here let _: Result<_, _> = std::io::stderr().flush(); let _: Result<_, _> = child.kill(); return; } let stdin = self.stdin.take().unwrap(); (self.ctx.spawn)(Box::pin(async move { stdin.lock().await.close().await.unwrap(); let status = (child.status().await) .unwrap_or_else(|e| panic!("{e}, extension {name} exited with error")); assert!(status.success(), "Extension {name} exited with error {status}"); })) } } impl ExtPort for Subprocess { fn send<'a>(&'a self, msg: &'a [u8]) -> LocalBoxFuture<'a, ()> { Box::pin(async { send_msg(Pin::new(&mut *self.stdin.as_ref().unwrap().lock().await), msg).await.unwrap() }) } fn recv(&self) -> LocalBoxFuture<'_, Option>> { Box::pin(async { std::io::Write::flush(&mut std::io::stderr()).unwrap(); match recv_msg(self.stdout.lock().await.as_mut()).await { Ok(msg) => Some(msg), Err(e) if e.kind() == io::ErrorKind::BrokenPipe => None, Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None, Err(e) => panic!("Failed to read from stdout: {}, {e}", e.kind()), } }) } }