Initial networking module.

This commit is contained in:
2025-11-21 20:17:00 +01:00
parent 219033be0d
commit bad32fdef2
10 changed files with 624 additions and 0 deletions

View File

@@ -3,6 +3,7 @@ pub mod option;
pub mod protocol;
pub mod record;
pub mod reflection;
pub mod socket;
pub mod std_system;
pub mod string;
pub mod tuple;

View File

@@ -0,0 +1,2 @@
pub mod socket_atom;
pub mod socket_lib;

View File

@@ -0,0 +1,110 @@
use std::borrow::Cow;
use std::rc::Rc;
use std::cell::RefCell;
use futures::lock::Mutex;
use never::Never;
use orchid_base::format::{FmtCtx, FmtUnit};
use orchid_extension::atom::{Atomic, MethodSetBuilder};
use orchid_extension::atom_owned::{OwnedAtom, OwnedVariant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
pub struct TcpListenerAtom {
pub listener: Rc<Mutex<TcpListener>>,
pub addr: String,
}
impl Clone for TcpListenerAtom {
fn clone(&self) -> Self {
Self {
listener: self.listener.clone(),
addr: self.addr.clone(),
}
}
}
impl Atomic for TcpListenerAtom {
type Variant = OwnedVariant;
type Data = ();
fn reg_reqs() -> MethodSetBuilder<Self> { MethodSetBuilder::new() }
}
impl OwnedAtom for TcpListenerAtom {
type Refs = Never;
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
async fn print_atom<'a>(&'a self, _: &'a (impl FmtCtx + ?Sized + 'a)) -> FmtUnit {
format!("<TcpListener {}>", self.addr).into()
}
}
pub struct TcpStreamAtom {
pub stream: Rc<RefCell<Option<TcpStream>>>,
pub peer_addr: String,
pub local_addr: String,
}
impl Clone for TcpStreamAtom {
fn clone(&self) -> Self {
Self {
stream: self.stream.clone(),
peer_addr: self.peer_addr.clone(),
local_addr: self.local_addr.clone(),
}
}
}
impl Atomic for TcpStreamAtom {
type Variant = OwnedVariant;
type Data = ();
fn reg_reqs() -> MethodSetBuilder<Self> { MethodSetBuilder::new() }
}
impl OwnedAtom for TcpStreamAtom {
type Refs = Never;
async fn val(&self) -> Cow<'_, Self::Data> { Cow::Owned(()) }
async fn print_atom<'a>(&'a self, _: &'a (impl FmtCtx + ?Sized + 'a)) -> FmtUnit {
format!("<TcpStream {} -> {}>", self.local_addr, self.peer_addr).into()
}
}
impl TcpStreamAtom {
pub async fn read(&self, max_bytes: usize) -> std::io::Result<Vec<u8>> {
let mut stream_opt = self.stream.borrow_mut();
let stream = stream_opt.as_mut().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Stream closed")
})?;
let mut buf = vec![0u8; max_bytes];
let n = stream.read(&mut buf).await?;
buf.truncate(n);
Ok(buf)
}
pub async fn write(&self, data: &[u8]) -> std::io::Result<usize> {
let mut stream_opt = self.stream.borrow_mut();
let stream = stream_opt.as_mut().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Stream closed")
})?;
stream.write(data).await
}
pub async fn write_all(&self, data: &[u8]) -> std::io::Result<()> {
let mut stream_opt = self.stream.borrow_mut();
let stream = stream_opt.as_mut().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Stream closed")
})?;
stream.write_all(data).await
}
pub async fn flush(&self) -> std::io::Result<()> {
let mut stream_opt = self.stream.borrow_mut();
let stream = stream_opt.as_mut().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "Stream closed")
})?;
stream.flush().await
}
pub fn close(&self) {
let _ = self.stream.borrow_mut().take();
}
}

View File

@@ -0,0 +1,131 @@
use std::cell::RefCell;
use std::rc::Rc;
use futures::lock::Mutex;
use orchid_base::error::{OrcRes, OrcErrv, mk_errv_floating};
use orchid_extension::atom::TAtom;
use orchid_extension::atom_owned::own;
use orchid_extension::context::i;
use orchid_extension::conv::TryFromExpr;
use orchid_extension::expr::Expr;
use orchid_extension::tree::{GenMember, fun, prefix};
use tokio::net::{TcpListener, TcpStream};
use super::socket_atom::{TcpListenerAtom, TcpStreamAtom};
use crate::std::string::str_atom::StrAtom;
use crate::{Int, OrcString};
async fn io_err(e: std::io::Error) -> OrcErrv {
mk_errv_floating(i().i(&format!("IO error: {}", e)).await, "")
}
macro_rules! try_io {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Err(io_err(e).await),
}
};
}
impl TryFromExpr for TcpListenerAtom {
async fn try_from_expr(expr: Expr) -> OrcRes<Self> {
let tatom = TAtom::<TcpListenerAtom>::try_from_expr(expr).await?;
Ok(own(&tatom).await)
}
}
impl TryFromExpr for TcpStreamAtom {
async fn try_from_expr(expr: Expr) -> OrcRes<Self> {
let tatom = TAtom::<TcpStreamAtom>::try_from_expr(expr).await?;
Ok(own(&tatom).await)
}
}
pub fn gen_socket_lib() -> Vec<GenMember> {
prefix("std::socket", [
prefix("tcp", [
fun(true, "bind", async |addr: OrcString| -> OrcRes<TcpListenerAtom> {
let addr_str = addr.get_string().await;
let listener = try_io!(TcpListener::bind(addr_str.as_str()).await);
let local_addr = listener.local_addr().map(|a| a.to_string()).unwrap_or_default();
Ok(TcpListenerAtom {
listener: Rc::new(Mutex::new(listener)),
addr: local_addr,
})
}),
fun(true, "accept", async |listener: TcpListenerAtom| -> OrcRes<TcpStreamAtom> {
let guard = listener.listener.lock().await;
let (stream, peer_addr) = try_io!(guard.accept().await);
let local_addr = stream.local_addr().map(|a| a.to_string()).unwrap_or_default();
Ok(TcpStreamAtom {
stream: Rc::new(RefCell::new(Some(stream))),
peer_addr: peer_addr.to_string(),
local_addr,
})
}),
fun(true, "connect", async |addr: OrcString| -> OrcRes<TcpStreamAtom> {
let addr_str = addr.get_string().await;
let stream = try_io!(TcpStream::connect(addr_str.as_str()).await);
let peer_addr = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default();
let local_addr = stream.local_addr().map(|a| a.to_string()).unwrap_or_default();
Ok(TcpStreamAtom {
stream: Rc::new(RefCell::new(Some(stream))),
peer_addr,
local_addr,
})
}),
fun(true, "read", async |stream: TcpStreamAtom, max_bytes: Int| -> OrcRes<StrAtom> {
let data = try_io!(stream.read(max_bytes.0 as usize).await);
let s = String::from_utf8_lossy(&data).to_string();
Ok(StrAtom::new(Rc::new(s)))
}),
fun(true, "read_bytes", async |stream: TcpStreamAtom, max_bytes: Int| -> OrcRes<StrAtom> {
let data = try_io!(stream.read(max_bytes.0 as usize).await);
let s = data.iter().map(|b| *b as char).collect::<String>();
Ok(StrAtom::new(Rc::new(s)))
}),
fun(true, "read_exact", async |stream: TcpStreamAtom, num_bytes: Int| -> OrcRes<StrAtom> {
let mut result = Vec::new();
let target = num_bytes.0 as usize;
while result.len() < target {
let remaining = target - result.len();
let chunk = try_io!(stream.read(remaining).await);
if chunk.is_empty() {
return Err(mk_errv_floating(i().i("Connection closed before receiving all bytes").await, ""));
}
result.extend(chunk);
}
let s = String::from_utf8_lossy(&result).to_string();
Ok(StrAtom::new(Rc::new(s)))
}),
fun(true, "write", async |stream: TcpStreamAtom, data: OrcString| -> OrcRes<Int> {
let data_str = data.get_string().await;
let n = try_io!(stream.write(data_str.as_bytes()).await);
Ok(Int(n as i64))
}),
fun(true, "write_all", async |stream: TcpStreamAtom, data: OrcString| -> OrcRes<Int> {
let data_str = data.get_string().await;
try_io!(stream.write_all(data_str.as_bytes()).await);
Ok(Int(0))
}),
fun(true, "flush", async |stream: TcpStreamAtom| -> OrcRes<Int> {
try_io!(stream.flush().await);
Ok(Int(0))
}),
fun(true, "close", async |stream: TcpStreamAtom| -> Int {
stream.close();
Int(0)
}),
fun(true, "peer_addr", async |stream: TcpStreamAtom| {
StrAtom::new(Rc::new(stream.peer_addr.clone()))
}),
fun(true, "local_addr", async |stream: TcpStreamAtom| {
StrAtom::new(Rc::new(stream.local_addr.clone()))
}),
fun(true, "listener_addr", async |listener: TcpListenerAtom| {
StrAtom::new(Rc::new(listener.addr.clone()))
}),
]),
])
}

View File

@@ -27,6 +27,8 @@ use crate::std::protocol::types::{Tag, Tagged, gen_protocol_lib};
use crate::std::record::record_atom::Record;
use crate::std::record::record_lib::gen_record_lib;
use crate::std::reflection::sym_atom::{CreateSymAtom, SymAtom, gen_sym_lib};
use crate::std::socket::socket_atom::{TcpListenerAtom, TcpStreamAtom};
use crate::std::socket::socket_lib::gen_socket_lib;
use crate::std::string::str_lexer::StringLexer;
use crate::std::string::to_string::AsStrTag;
use crate::std::tuple::{CreateTuple, Tuple, TupleBuilder, gen_tuple_lib};
@@ -64,6 +66,8 @@ impl SystemCard for StdSystem {
Some(Tag::dynfo()),
Some(Tagged::dynfo()),
Some(AsStrTag::dynfo()),
Some(TcpListenerAtom::dynfo()),
Some(TcpStreamAtom::dynfo()),
]
}
}
@@ -92,6 +96,7 @@ impl System for StdSystem {
gen_tuple_lib(),
gen_protocol_lib(),
gen_sym_lib().await,
gen_socket_lib(),
])
}
async fn prelude() -> Vec<Sym> {