Made some progress towards effectful programs

This commit is contained in:
2026-04-18 13:25:03 +00:00
parent 286040c3ec
commit 6eed6b9831
7 changed files with 241 additions and 23 deletions

View File

@@ -1,6 +1,8 @@
#![allow(refining_impl_trait, reason = "Has various false-positives around lints")]
use orchid_api as api;
mod streams;
pub use streams::*;
mod atom;
pub use atom::*;
mod cmd_atom;

View File

@@ -1,3 +1,4 @@
use std::io;
use std::num::NonZero;
use chrono::{DateTime, Utc};
@@ -58,6 +59,20 @@ pub struct IoError {
pub message: String,
pub kind: IoErrorKind,
}
impl From<io::Error> for IoError {
fn from(value: io::Error) -> Self {
Self {
message: value.to_string(),
kind: match value.kind() {
io::ErrorKind::Interrupted
| io::ErrorKind::BrokenPipe
| io::ErrorKind::NetworkDown
| io::ErrorKind::ConnectionReset => IoErrorKind::Interrupted,
_ => IoErrorKind::Other,
},
}
}
}
#[derive(Clone, Debug, Coding)]
pub enum ReadLimit {
@@ -69,7 +84,9 @@ pub enum ReadLimit {
/// Read all available data from a stream. If the returned vector is empty, the
/// stream has reached its end.
#[derive(Clone, Debug, Coding, Hierarchy)]
pub struct ReadReq(pub ReadLimit);
pub struct ReadReq {
pub limit: ReadLimit,
}
impl Request for ReadReq {
type Response = Result<Vec<u8>, IoError>;
}

View File

@@ -0,0 +1,103 @@
use std::borrow::Cow;
use std::io::Result;
use std::pin::Pin;
use std::rc::Rc;
use futures::io::BufReader;
use futures::lock::Mutex;
use futures::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use never::Never;
use orchid_base::{Receipt, ReqHandle, ReqHandleExt};
use crate::gen_expr::{GExpr, new_atom};
use crate::std_reqs::{CloseReq, FlushReq, OutputReq, ReadLimit, ReadReq, WriteReq};
use crate::{Atomic, MethodSetBuilder, OwnedAtom, OwnedVariant, Supports, ToExpr};
struct WriterState {
buf: Vec<u8>,
writer: Pin<Box<dyn AsyncWrite>>,
}
pub struct OrcWriter<T: AsyncWrite + 'static>(T);
impl<T: AsyncWrite + 'static> ToExpr for OrcWriter<T> {
async fn to_gen(self) -> GExpr {
new_atom(WriterAtom(Rc::new(Mutex::new(WriterState {
buf: Vec::new(),
writer: Box::pin(self.0),
}))))
}
}
#[derive(Clone)]
pub struct WriterAtom(Rc<Mutex<WriterState>>);
impl Atomic for WriterAtom {
type Variant = OwnedVariant;
type Data = ();
fn reg_methods() -> MethodSetBuilder<Self> { MethodSetBuilder::new().handle::<OutputReq>() }
}
impl OwnedAtom for WriterAtom {
type Refs = Never;
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
}
impl Supports<OutputReq> for WriterAtom {
async fn handle<'a>(
&self,
hand: Box<dyn ReqHandle<'a> + '_>,
req: OutputReq,
) -> Result<Receipt<'a>> {
match req {
OutputReq::WriteReq(ref wr @ WriteReq { ref data }) => {
self.0.lock().await.buf.extend(data);
hand.reply(wr, &Ok(())).await
},
OutputReq::FlushReq(ref fr @ FlushReq) => {
let mut g = self.0.lock().await;
let WriterState { buf, writer } = &mut *g;
hand.reply(fr, &writer.write_all(&buf[..]).await.map_err(|e| e.into())).await
},
OutputReq::CloseReq(ref cr @ CloseReq) =>
hand.reply(cr, &self.0.lock().await.writer.close().await.map_err(|e| e.into())).await,
}
}
}
pub struct OrcReader<T: AsyncRead + 'static>(T);
impl<T: AsyncRead + 'static> ToExpr for OrcReader<T> {
async fn to_gen(self) -> GExpr {
new_atom(ReaderAtom(Rc::new(Mutex::new(BufReader::new(Box::pin(self.0))))))
}
}
#[derive(Clone)]
pub struct ReaderAtom(Rc<Mutex<BufReader<Pin<Box<dyn AsyncRead>>>>>);
impl Atomic for ReaderAtom {
type Variant = OwnedVariant;
type Data = ();
fn reg_methods() -> MethodSetBuilder<Self> { MethodSetBuilder::new().handle::<ReadReq>() }
}
impl OwnedAtom for ReaderAtom {
type Refs = Never;
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
}
impl Supports<ReadReq> for ReaderAtom {
async fn handle<'a>(
&self,
hand: Box<dyn ReqHandle<'a> + '_>,
req: ReadReq,
) -> Result<Receipt<'a>> {
let mut buf = Vec::new();
let mut reader = self.0.lock().await;
let rep = match match req.limit {
ReadLimit::End => reader.read_to_end(&mut buf).await.map(|_| ()),
ReadLimit::Delimiter(b) => reader.read_until(b, &mut buf).await.map(|_| ()),
ReadLimit::Length(n) => {
buf = vec![0u8; n.get() as usize];
reader.read_exact(&mut buf).await
},
} {
Err(e) => Err(e.into()),
Ok(()) => Ok(buf),
};
hand.reply(&req, &rep).await
}
}

View File

@@ -5,7 +5,10 @@ use std::num::NonZero;
use orchid_api_traits::Coding;
use orchid_base::BoxedIter;
use crate::{AtomOps, AtomTypeId, Atomic, AtomicFeatures, Fun, Lambda, Replier, SystemCtor};
use crate::{
AtomOps, AtomTypeId, Atomic, AtomicFeatures, CmdAtom, Fun, Lambda, ReaderAtom, Replier,
SystemCtor, WriterAtom,
};
/// Description of a system. This is intended to be a ZST storing the static
/// properties of a [SystemCtor] which should be known to foreign systems
@@ -56,5 +59,13 @@ pub(crate) trait DynSystemCardExt: DynSystemCard {
/// The indices of these are bitwise negated, such that the MSB of an atom index
/// marks whether it belongs to this package (0) or the importer (1)
pub(crate) fn general_atoms() -> impl Iterator<Item = Option<Box<dyn AtomOps>>> {
[Some(Fun::ops()), Some(Lambda::ops()), Some(Replier::ops())].into_iter()
[
Some(Fun::ops()),
Some(Lambda::ops()),
Some(Replier::ops()),
Some(CmdAtom::ops()),
Some(WriterAtom::ops()),
Some(ReaderAtom::ops()),
]
.into_iter()
}