Initial release

This commit is contained in:
Bethlenfalvi, Lorinc (ext)
2025-01-29 11:42:38 +01:00
parent 0c962c336b
commit aa6426fb0f
3 changed files with 97 additions and 15 deletions

View File

@@ -2,33 +2,109 @@
title: Async Iterator::map
author: lbfalvy
tags: [programming, rust, langdev]
pubDate: 2025-01-27T10:47Z[UTC]
pubDate: 2025-01-30T17:00Z[UTC]
summary: On the state of async Rust, limitations of the type system, and Iterator::map
unlisted: false
---
import Graphic from "../../components/Graphic.astro"
The async equivalent to iterators are streams, which are exactly the same as AsyncIterators in JS or C#; the consumer pulls on the stream but the stream is allowed to defer responding, so both sides must be able to pause. Simple stuff.
In synchronous rust, `Iterator::map` takes an `FnMut`, a function which can only be called if you can prove that it's not already running. This is good because it's pretty common to want to either use mutable context for a transformation or equivalently perform a sequence of effectful operations and then collect their results into a datastructure, and both of these are obviously expressed as `sequence.map(|item| /* some mutation */).collect()` which as a bonus propagates size hints. The Orchid codebase is FULL of this pattern.
<Graphic>
```rust
pub trait Iterator {
type Item;
fn map<B, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> B;
}
```
<p slot="label">
source (modified): https://doc.rust-lang.org/src/core/iter/traits/iterator.rs.html#745
</p>
</Graphic>
The async equivalent however has to take a function that returns some type that implements `Future` because that's how you statically type an asynchronous function, you parameterize on the state machine the compiler will eventually generate for its paused data. This is again perfectly normal, C++ coroutines do the same as I'm pretty sure every other language that supports any kind of stack-allocated coroutine has to. The problem emerges from lifetimes, because in order for that Future to hold onto a mutable reference, the async equivalent of map (which happens to be called `StreamExt::then` for reference) has to not only guarantee that the callback will not be running when its next called, but that its return value (the `Future` instance) will not exist (either because it's finished or because it's been freed) by the time the function is called again.
The type of the callback then has to be _some function which for any lifetime `'a` returns some type is valid for the same lifetime `'a`_. The type of the return value is parametric!
<Graphic>
```rust
// don't mind the extension trait
pub trait Stream {
type Item;
}
pub trait StreamExt: Stream {
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future;
}
```
<p slot="label">
source (modified): https://docs.rs/futures-util/0.3.31/src/futures_util/stream/stream/mod.rs.html#488
</p>
</Graphic>
Actually this isn't completely impossible to represent in Rust because it has a weird bastard type system where generic parameters to traits (types that pick an implementation such as `u32` in `impl From<u32> for u64 {}`) must be concrete types, but associated types (types that are picked by the implementation such as the `str` in `impl Deref for String { type Target = str }`) may themselves be generic. So it's imaginable for callbacks to be mapped into a bound for a trait with an associated return type that's generic on a lifetime, like this:
The type of the callback then _should be_ **some function which for any lifetime `'a` returns some type is valid for the same lifetime `'a`**. The type of the return value is parametric!
```rust
trait FnMutMap {
type Args<'a>;
type Output<'a>;
fn call<'a>(&'a mut self, args: Self::Args<'a>) -> Self::Output<'a>;
}
```
Since structs and functions can only be parametric on concrete types, not generics, a callback whose return type has a different contract depending on how you called the function is illegal on general. So if you want to access mutable data in an async stream, you have to make an ad-hoc `Mutex<&mut T>` right there on the stack which the closure and its return value can capture by shared reference and then immediately lock for its entire runtime. Streams are lazy and a new value will not be pulled until the current one is finished so this mutex can never ever be contested, but there is no way at all to explain this to the type system.
Since structs and functions can only be parametric on concrete types, a callback whose return type has a different contract depending on how you called the function is illegal, so if you want to access mutable data in an async stream, you have to make an ad-hoc `Mutex<&mut T>` right there on the stack which the closure and its return value can capture by shared reference and then immediately lock for its entire runtime. Streams are lazy and a new value will not be pulled until the current one is finished so this mutex can never ever be contested, but there is no way at all to explain this to the type system.
<Graphic>
```rust
pub trait StreamExt: Stream {
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
where
F: FnMut<'a>(Self::Item) -> Fut<'a>,
Fut<'a>: Future + 'a;
}
```
<p slot="label">This would not work</p>
</Graphic>
There's also another trick, `for<'a> Fn(&'a [u8]) -> &'a u8` is a valid type, but this `for<'a>` syntax is again a late addition to the borrow checker which isn't integrated in the type system at all, so `for<'a> Fn(T) -> Box<dyn Future<Output = U> + 'a>` is valid but only because the higher-kinded lifetime parametrism has been localized to a single type constraint. Streams don't do this because it requires a heap allocation and subsequent virtual function calls to interact with the Future, and an uncontested mutex lock-release cycle is vastly faster.
There's an escape hatch, `for<'a> Fn(&'a [u8]) -> &'a u8` is a valid type, but this `for<'a>` syntax is a late addition to the borrow checker which isn't integrated in the type system at all, so `for<'a> Fn(T) -> Box<dyn Future<Output = U> + 'a>` is valid but only because the higher-kinded lifetime parametrism has been localized to a single type constraint. Streams don't do this because it requires a heap allocation and subsequent virtual function calls to interact with the Future, and an uncontested mutex lock-release cycle is vastly faster.
So what are people doing? Well, for the most part they're using `async_stream::stream!` with a `for` loop from what I gather in a fashion a lot like the below example. This uses an MPSC to force the stream to be lazy.
<Graphic>
```rust
pub trait StreamExt: Stream {
fn then<C, F, R>(self, ctx: &mut C, f: F)
-> Then<Self, Box<dyn Future<Output = R> + '_>, F>
where F: for<'a> FnMut(&'a mut C, Self::Item)
-> Box<dyn Future<Output = R> + 'a>
}
```
<p slot="label">This would work, but it would be slow</p>
</Graphic>
Actually, this kind of contract isn't inherently impossible to represent in Rust because it has a weird bastard type system where generic parameters to traits (types that pick an implementation such as `u32` in `impl From<u32> for u64 {}`) must be concrete types, but associated types (types that are picked by the implementation such as the `str` in `impl Deref for String { type Target = str }`) may themselves be generic. So it's imaginable for callbacks to be mapped into a bound for a trait with an associated return type that's generic on a lifetime. Incidentally, this is exactly what [the async_closure proposal](https://rust-lang.github.io/rfcs/3668-async-closures.html#asyncfn) does.
<Graphic>
```rust
pub trait FnOnce<Args: Tuple> {
type Output;
}
pub trait FnMut<Args: Tuple>: FnOnce<Args> {
fn call_mut(&mut self, args: Args) -> Self::Output;
}
```
```rust
pub trait AsyncFnOnce<Args> {
type Output;
}
pub trait AsyncFnMut<Args>: AsyncFnOnce<Args> {
type CallRefFuture<'a>: Future<Output = Self::Output>
where
Self: 'a;
fn async_call_mut(&mut self, args: Args) -> Self::CallRefFuture<'_>;
}
```
<p slot="label">
FnMut from the standard library and AsyncFnMut from the RFC, edited for brevity.
</p>
</Graphic>
As a reminder, in ref and mut member functions `'_` always refers to the self argument's lifetime, so The lifetime bound on `async_call_mut` basically declares that once it's called, its output is valid for as long as the function is in scope, and the function may not be called again until the return value is freed because for as long as it's live, so is the mutable borrow. Perfect!
But this is unstable, so what to do? Well, for the specific case of streams we can use `async_stream::stream!` with a `for` loop in a fashion a lot like the below example.
```rust
stream! {
@@ -40,5 +116,4 @@ stream! {
.await
```
In principle this should cost a constant number of virtual calls per step when the MPSC wakes either thread, which might be faster, but I haven't measured as this is already fast enough and I'm a staunch believer in benchmarking only what needs to be faster. It's nice to keep track though.
This macro is crafted very well; the resulting Stream implementor uses a thread local but no heap allocation or virtual calls. I think it is a solid solution, and just like async/await itself, a fine candidate for eventual inclusion in the language itself.