Hello World works
This commit is contained in:
@@ -12,9 +12,9 @@ async-once-cell = "0.5.4"
|
||||
bound = "0.6.0"
|
||||
chrono = "0.4.44"
|
||||
derive_destructure = "1.0.0"
|
||||
futures = { version = "0.3.31", features = ["std"], default-features = false }
|
||||
futures = { version = "0.3.32", features = ["std"], default-features = false }
|
||||
futures-locks = "0.7.1"
|
||||
hashbrown = "0.16.1"
|
||||
hashbrown = "0.17.0"
|
||||
itertools = "0.14.0"
|
||||
lazy_static = "1.5.0"
|
||||
libloading = { version = "0.9.0", optional = true }
|
||||
@@ -23,13 +23,14 @@ never = "0.1.0"
|
||||
num-traits = "0.2.19"
|
||||
orchid-api = { version = "0.1.0", path = "../orchid-api" }
|
||||
orchid-api-traits = { version = "0.1.0", path = "../orchid-api-traits" }
|
||||
orchid-async-utils = { version = "0.1.0", path = "../orchid-async-utils" }
|
||||
orchid-base = { version = "0.1.0", path = "../orchid-base" }
|
||||
orchid-extension = { version = "0.1.0", path = "../orchid-extension", optional = true }
|
||||
ordered-float = "5.1.0"
|
||||
pastey = "0.2.1"
|
||||
ordered-float = "5.3.0"
|
||||
pastey = "0.2.2"
|
||||
substack = "1.1.1"
|
||||
task-local = "0.1.0"
|
||||
tokio = { version = "1.49.0", features = ["process"], optional = true }
|
||||
task-local = "0.1.1"
|
||||
tokio = { version = "1.52.1", features = ["process"], optional = true }
|
||||
tokio-util = { version = "0.7.18", features = ["compat"], optional = true }
|
||||
trait-set = "0.3.0"
|
||||
unsync-pipe = { version = "0.2.0", path = "../unsync-pipe" }
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::{self, Debug};
|
||||
use std::pin::pin;
|
||||
use std::rc::Rc;
|
||||
|
||||
@@ -9,10 +9,10 @@ use async_event::Event;
|
||||
use async_fn_stream::stream;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::channel::mpsc;
|
||||
use futures::future::LocalBoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{SinkExt, StreamExt, select};
|
||||
use futures::stream::select;
|
||||
use futures::{FutureExt, SinkExt, StreamExt};
|
||||
use never::Never;
|
||||
use orchid_async_utils::{LocalSet, LocalSetController, local_set};
|
||||
use orchid_base::{OrcErrv, Receipt, ReqHandle, Sym, fmt, is, log, mk_errv, sym};
|
||||
use orchid_extension::{self as ox, AtomicFeatures as _, get_arg};
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::system::System;
|
||||
use crate::tree::Root;
|
||||
|
||||
/// Events internally recognized by this system sent through [CommandQueue]
|
||||
#[derive(Debug)]
|
||||
enum Task {
|
||||
RunCommand(Expr),
|
||||
Sleep(DateTime<Utc>, Expr),
|
||||
@@ -72,7 +73,7 @@ impl CommandQueue {
|
||||
}
|
||||
}
|
||||
impl Debug for CommandQueue {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("CommandQueue").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
@@ -99,12 +100,24 @@ pub enum CmdEvent {
|
||||
Err(OrcErrv),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum TaskOutcome {
|
||||
/// We need this so that we get an opportunity to exit when the last tasks
|
||||
/// exited
|
||||
None,
|
||||
Next(Expr),
|
||||
NonCommand(Expr),
|
||||
}
|
||||
|
||||
pub struct CmdRunner {
|
||||
queue: CommandQueue,
|
||||
gas: Option<u64>,
|
||||
interrupted: Option<ExecCtx>,
|
||||
system: System,
|
||||
futures: FuturesUnordered<LocalBoxFuture<'static, Option<CmdEvent>>>,
|
||||
recv: mpsc::Receiver<TaskOutcome>,
|
||||
send: mpsc::Sender<TaskOutcome>,
|
||||
ctl: LocalSetController<'static, Never>,
|
||||
local_set: LocalSet<'static, Never>,
|
||||
}
|
||||
impl CmdRunner {
|
||||
pub async fn new(root: &mut Root, ctx: Ctx, init: impl IntoIterator<Item = Expr>) -> Self {
|
||||
@@ -116,7 +129,9 @@ impl CmdRunner {
|
||||
.expect("Missing command system ctor");
|
||||
let (cmd_root, system) = system_ctor.run(vec![]).await;
|
||||
*root = root.merge(&cmd_root).await.expect("Could not merge command system into tree");
|
||||
Self { futures: FuturesUnordered::new(), gas: None, interrupted: None, queue, system }
|
||||
let (send, recv) = mpsc::channel(0);
|
||||
let (ctl, local_set) = local_set();
|
||||
Self { send, recv, ctl, local_set, gas: None, interrupted: None, queue, system }
|
||||
}
|
||||
pub fn push(&self, expr: Expr) { self.queue.push(Task::RunCommand(expr)); }
|
||||
#[must_use]
|
||||
@@ -126,110 +141,79 @@ impl CmdRunner {
|
||||
pub fn set_gas(&mut self, gas: u64) { self.gas = Some(gas) }
|
||||
pub fn disable_gas(&mut self) { self.gas = None }
|
||||
pub async fn execute(&mut self, root: &Root) -> CmdEvent {
|
||||
let waiting_on_queue = RefCell::new(false);
|
||||
let (mut spawn, mut on_spawn) = mpsc::channel::<LocalBoxFuture<Option<CmdEvent>>>(1);
|
||||
let mut normalize_stream = pin!(
|
||||
stream(async |mut h| {
|
||||
loop {
|
||||
waiting_on_queue.replace(false);
|
||||
let mut xctx = match self.interrupted.take() {
|
||||
None => match self.queue.get_new().await {
|
||||
Task::RunCommand(expr) => ExecCtx::new(root.clone(), expr).await,
|
||||
Task::Sleep(until, expr) => {
|
||||
let queue = self.queue.clone();
|
||||
let ctx = queue.0.borrow_mut().ctx.clone();
|
||||
spawn
|
||||
.send(Box::pin(async move {
|
||||
let delta = until - Utc::now();
|
||||
match delta.to_std() {
|
||||
Err(_) =>
|
||||
writeln!(
|
||||
log("debug"),
|
||||
"Negative sleep found ({delta}), requeuing as instant"
|
||||
)
|
||||
.await,
|
||||
Ok(delay) => ctx.sleep(delay).await,
|
||||
};
|
||||
queue.push(Task::RunCommand(expr));
|
||||
None
|
||||
}))
|
||||
.await
|
||||
.expect("Receiver stored in parent future");
|
||||
continue;
|
||||
},
|
||||
Task::Exit => {
|
||||
h.emit(CmdEvent::Exit).await;
|
||||
break;
|
||||
},
|
||||
},
|
||||
Some(xctx) => xctx,
|
||||
};
|
||||
waiting_on_queue.replace(true);
|
||||
#[derive(Debug)]
|
||||
enum Event {
|
||||
Finished(TaskOutcome),
|
||||
Request(Task),
|
||||
}
|
||||
let queue_stream = pin!(stream(async |mut h| loop {
|
||||
h.emit(self.queue.get_new().await).await
|
||||
}));
|
||||
let mut event_stream = pin!(select(
|
||||
select(queue_stream.map(Event::Request), self.recv.by_ref().map(Event::Finished)),
|
||||
((&mut self.local_set).into_stream())
|
||||
.map(|_| panic!("self has the ctl so this should never exit cleanly"))
|
||||
));
|
||||
while let Some(ev) = event_stream.next().await {
|
||||
match ev {
|
||||
Event::Finished(TaskOutcome::None) if self.ctl.is_empty() => return CmdEvent::Settled,
|
||||
Event::Finished(TaskOutcome::None) => continue,
|
||||
Event::Finished(TaskOutcome::Next(expr)) => self.queue.push(Task::RunCommand(expr)),
|
||||
Event::Finished(TaskOutcome::NonCommand(val)) => return CmdEvent::NonCommand(val),
|
||||
Event::Request(Task::Exit) => return CmdEvent::Exit,
|
||||
Event::Request(Task::Sleep(until, expr)) => {
|
||||
let queue = self.queue.clone();
|
||||
let ctx = queue.0.borrow_mut().ctx.clone();
|
||||
self.ctl.spawn(async move {
|
||||
let delta = until - Utc::now();
|
||||
match delta.to_std() {
|
||||
Err(_) =>
|
||||
writeln!(log("debug"), "Negative sleep found ({delta}), requeuing as instant").await,
|
||||
Ok(delay) => ctx.sleep(delay).await,
|
||||
};
|
||||
queue.push(Task::RunCommand(expr));
|
||||
Ok(())
|
||||
})
|
||||
},
|
||||
Event::Request(Task::RunCommand(expr)) => {
|
||||
let mut xctx = ExecCtx::new(root.clone(), expr).await;
|
||||
xctx.set_gas(self.gas);
|
||||
let res = xctx.execute().await;
|
||||
match res {
|
||||
ExecResult::Err(e, gas) => {
|
||||
let norm = match xctx.execute().await {
|
||||
ExecResult::Err(err, gas) => {
|
||||
self.gas = gas;
|
||||
h.emit(CmdEvent::Err(e)).await;
|
||||
},
|
||||
ExecResult::Gas(exec) => {
|
||||
self.interrupted = Some(exec);
|
||||
h.emit(CmdEvent::Gas).await;
|
||||
return CmdEvent::Err(err);
|
||||
},
|
||||
ExecResult::Value(val, gas) => {
|
||||
self.gas = gas;
|
||||
let Some(atom) = val.as_atom().await else {
|
||||
h.emit(CmdEvent::NonCommand(val)).await;
|
||||
continue;
|
||||
};
|
||||
let queue = self.queue.clone();
|
||||
let ctx = queue.0.borrow_mut().ctx.clone();
|
||||
spawn
|
||||
.send(Box::pin(async move {
|
||||
match atom.ipc(ox::std_reqs::StartCommand).await {
|
||||
None => Some(CmdEvent::NonCommand(val)),
|
||||
Some(None) => None,
|
||||
Some(Some(expr)) => {
|
||||
let from_api_cx = ExprFromApiCtx { ctx, sys: atom.api_ref().owner };
|
||||
queue.push(Task::RunCommand(
|
||||
Expr::from_api(expr, PathSetBuilder::new(), from_api_cx).await,
|
||||
));
|
||||
None
|
||||
},
|
||||
}
|
||||
}))
|
||||
.await
|
||||
.expect("Receiver is owned by the layer that polls this");
|
||||
val
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
.fuse()
|
||||
);
|
||||
loop {
|
||||
let task = select!(
|
||||
r_opt = self.futures.by_ref().next() => match r_opt {
|
||||
Some(Some(r)) => {
|
||||
eprintln!("Exiting because ");
|
||||
break r
|
||||
},
|
||||
None if *waiting_on_queue.borrow() => {
|
||||
eprintln!("Exiting because settled");
|
||||
break CmdEvent::Settled
|
||||
},
|
||||
None | Some(None) => continue,
|
||||
ExecResult::Gas(xctx) => {
|
||||
self.interrupted = Some(xctx);
|
||||
return CmdEvent::Gas;
|
||||
},
|
||||
};
|
||||
let Some(atom) = norm.as_atom().await else {
|
||||
return CmdEvent::NonCommand(norm);
|
||||
};
|
||||
let queue = self.queue.clone();
|
||||
let ctx = queue.0.borrow_mut().ctx.clone();
|
||||
let mut send = self.send.clone();
|
||||
self.ctl.spawn(async move {
|
||||
match atom.ipc(ox::std_reqs::StartCommand).await {
|
||||
None => send.send(TaskOutcome::NonCommand(norm)).await.unwrap(),
|
||||
Some(None) => send.send(TaskOutcome::None).await.unwrap(),
|
||||
Some(Some(expr)) => {
|
||||
let from_api_cx = ExprFromApiCtx { ctx, sys: atom.api_ref().owner };
|
||||
let expr = Expr::from_api(expr, PathSetBuilder::new(), from_api_cx).await;
|
||||
send.send(TaskOutcome::Next(expr)).await.unwrap();
|
||||
},
|
||||
};
|
||||
Ok(())
|
||||
});
|
||||
},
|
||||
r = normalize_stream.by_ref().next() => match r {
|
||||
None => break CmdEvent::Exit,
|
||||
Some(r) => break r,
|
||||
},
|
||||
task = on_spawn.by_ref().next() => match task {
|
||||
None => continue,
|
||||
Some(r) => r,
|
||||
},
|
||||
);
|
||||
self.futures.push(task)
|
||||
}
|
||||
}
|
||||
unreachable!("Since the localset cannot exit, the function cannot exit either")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user