Initial commit
This commit is contained in:
44
src/content/blog/2025-01-27T11_47_async_iterator_map.mdx
Normal file
44
src/content/blog/2025-01-27T11_47_async_iterator_map.mdx
Normal file
@@ -0,0 +1,44 @@
|
||||
---
|
||||
title: Async Iterator::map
|
||||
author: lbfalvy
|
||||
tags: [programming, rust, langdev]
|
||||
pubDate: 2025-01-27T10:47Z[UTC]
|
||||
summary: On the state of async Rust, limitations of the type system, and Iterator::map
|
||||
unlisted: false
|
||||
---
|
||||
The async equivalent to iterators are streams, which are exactly the same as AsyncIterators in JS or C#; the consumer pulls on the stream but the stream is allowed to defer responding, so both sides must be able to pause. Simple stuff.
|
||||
|
||||
In synchronous rust, `Iterator::map` takes an `FnMut`, a function which can only be called if you can prove that it's not already running. This is good because it's pretty common to want to either use mutable context for a transformation or equivalently perform a sequence of effectful operations and then collect their results into a datastructure, and both of these are obviously expressed as `sequence.map(|item| /* some mutation */).collect()` which as a bonus propagates size hints. The Orchid codebase is FULL of this pattern.
|
||||
|
||||
The async equivalent however has to take a function that returns some type that implements `Future` because that's how you statically type an asynchronous function, you parameterize on the state machine the compiler will eventually generate for its paused data. This is again perfectly normal, C++ coroutines do the same as I'm pretty sure every other language that supports any kind of stack-allocated coroutine has to. The problem emerges from lifetimes, because in order for that Future to hold onto a mutable reference, the async equivalent of map (which happens to be called `StreamExt::then` for reference) has to not only guarantee that the callback will not be running when its next called, but that its return value (the `Future` instance) will not exist (either because it's finished or because it's been freed) by the time the function is called again.
|
||||
|
||||
The type of the callback then has to be _some function which for any lifetime `'a` returns some type is valid for the same lifetime `'a`_. The type of the return value is parametric!
|
||||
|
||||
Actually this isn't completely impossible to represent in Rust because it has a weird bastard type system where generic parameters to traits (types that pick an implementation such as `u32` in `impl From<u32> for u64 {}`) must be concrete types, but associated types (types that are picked by the implementation such as the `str` in `impl Deref for String { type Target = str }`) may themselves be generic. So it's imaginable for callbacks to be mapped into a bound for a trait with an associated return type that's generic on a lifetime, like this:
|
||||
|
||||
```rust
|
||||
trait FnMutMap {
|
||||
type Args<'a>;
|
||||
type Output<'a>;
|
||||
fn call<'a>(&'a mut self, args: Self::Args<'a>) -> Self::Output<'a>;
|
||||
}
|
||||
```
|
||||
|
||||
Since structs and functions can only be parametric on concrete types, a callback whose return type has a different contract depending on how you called the function is illegal, so if you want to access mutable data in an async stream, you have to make an ad-hoc `Mutex<&mut T>` right there on the stack which the closure and its return value can capture by shared reference and then immediately lock for its entire runtime. Streams are lazy and a new value will not be pulled until the current one is finished so this mutex can never ever be contested, but there is no way at all to explain this to the type system.
|
||||
|
||||
There's also another trick, `for<'a> Fn(&'a [u8]) -> &'a u8` is a valid type, but this `for<'a>` syntax is again a late addition to the borrow checker which isn't integrated in the type system at all, so `for<'a> Fn(T) -> Box<dyn Future<Output = U> + 'a>` is valid but only because the higher-kinded lifetime parametrism has been localized to a single type constraint. Streams don't do this because it requires a heap allocation and subsequent virtual function calls to interact with the Future, and an uncontested mutex lock-release cycle is vastly faster.
|
||||
|
||||
So what are people doing? Well, for the most part they're using `async_stream::stream!` with a `for` loop from what I gather in a fashion a lot like the below example. This uses an MPSC to force the stream to be lazy.
|
||||
|
||||
```rust
|
||||
stream! {
|
||||
for item in item_seq {
|
||||
yield item.process(&mut mutableCtx).await
|
||||
}
|
||||
}
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
```
|
||||
|
||||
In principle this should cost a constant number of virtual calls per step when the MPSC wakes either thread, which might be faster, but I haven't measured as this is already fast enough and I'm a staunch believer in benchmarking only what needs to be faster. It's nice to keep track though.
|
||||
|
||||
Reference in New Issue
Block a user