use std::collections::VecDeque; use std::pin::Pin; use std::task::Poll; use futures::StreamExt; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded}; use futures::future::LocalBoxFuture; pub struct LocalSet<'a, E> { receiver: UnboundedReceiver>>, pending: VecDeque>>, } impl<'a, E> LocalSet<'a, E> { pub fn new() -> (UnboundedSender>>, Self) { let (sender, receiver) = unbounded(); (sender, Self { receiver, pending: VecDeque::new() }) } } impl Future for LocalSet<'_, E> { type Output = Result<(), E>; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let this = self.get_mut(); let mut any_pending = false; loop { match this.receiver.poll_next_unpin(cx) { Poll::Ready(Some(fut)) => this.pending.push_back(fut), Poll::Ready(None) => break, Poll::Pending => { any_pending = true; break; }, } } let count = this.pending.len(); for _ in 0..count { let mut req = this.pending.pop_front().unwrap(); match req.as_mut().poll(cx) { Poll::Ready(Ok(())) => (), Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => { any_pending = true; this.pending.push_back(req) }, } } if any_pending { Poll::Pending } else { Poll::Ready(Ok(())) } } }