//! Multiple-listener-single-delivery event system. use std::mem; use std::sync::Mutex; use std::sync::mpsc::{self, sync_channel}; struct Reply { resub: bool, outcome: Result, } struct Listener { sink: mpsc::SyncSender, source: mpsc::Receiver>, } pub struct Event { listeners: Mutex>>, } impl Event { pub const fn new() -> Self { Self { listeners: Mutex::new(Vec::new()) } } pub fn dispatch(&self, mut ev: T) -> Option { let mut listeners = self.listeners.lock().unwrap(); let mut alt_list = Vec::with_capacity(listeners.len()); mem::swap(&mut *listeners, &mut alt_list); let mut items = alt_list.into_iter(); while let Some(l) = items.next() { l.sink.send(ev).unwrap(); let Reply { resub, outcome } = l.source.recv().unwrap(); if resub { listeners.push(l); } match outcome { Ok(res) => { listeners.extend(items); return Some(res); }, Err(next) => { ev = next; }, } } None } pub fn get_one(&self, mut filter: impl FnMut(&T) -> bool, f: impl FnOnce(T) -> (U, V)) -> V { let mut listeners = self.listeners.lock().unwrap(); let (sink, request) = sync_channel(0); let (response, source) = sync_channel(0); listeners.push(Listener { sink, source }); mem::drop(listeners); loop { let t = request.recv().unwrap(); if filter(&t) { let (u, v) = f(t); response.send(Reply { resub: false, outcome: Ok(u) }).unwrap(); return v; } response.send(Reply { resub: true, outcome: Err(t) }).unwrap(); } } } impl Default for Event { fn default() -> Self { Self::new() } }