From 279654ea24cacbf233414188769ba26ba5833f3c Mon Sep 17 00:00:00 2001 From: Bert Peters Date: Fri, 2 Sep 2022 13:57:18 +0200 Subject: [PATCH] Fix hanging when a future wakes itself --- CHANGELOG.md | 4 ++++ src/lib.rs | 33 +++++++++++++++++++++++++-------- tests/futures.rs | 24 ++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cdaed1..5d83242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed potential hanging when the waker is called while polling. + ## [0.1.0] - Initial release diff --git a/src/lib.rs b/src/lib.rs index 6e0695d..ece529d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,15 +26,36 @@ use std::task::Wake; use std::task::Waker; #[derive(Default)] -struct CondvarWake(Condvar); +struct CondvarWake { + park: Condvar, + awoken: Mutex, +} + +impl CondvarWake { + pub fn park(&self) { + let mut guard = self.awoken.lock().unwrap_or_else(PoisonError::into_inner); + + // Until we are awoken, we can park on the condvar. This also handles the case where we're + // awoken while we're actually polling. + while !*guard { + guard = self + .park + .wait(guard) + .unwrap_or_else(PoisonError::into_inner); + } + + *guard = false; + } +} impl Wake for CondvarWake { fn wake(self: Arc) { - self.0.notify_one() + self.wake_by_ref() } fn wake_by_ref(self: &Arc) { - self.0.notify_one() + *self.awoken.lock().unwrap_or_else(PoisonError::into_inner) = true; + self.park.notify_one(); } } @@ -48,14 +69,10 @@ pub fn execute(f: impl Future) -> T { let mut context = Context::from_waker(&waker); - let mutex = Mutex::new(()); - // Cannot panic but avoids generating the unwrap code - let mut guard = mutex.lock().unwrap_or_else(PoisonError::into_inner); - loop { match pinned.as_mut().poll(&mut context) { Poll::Ready(value) => return value, - Poll::Pending => guard = wake.0.wait(guard).unwrap_or_else(PoisonError::into_inner), + Poll::Pending => wake.park(), } } } diff --git a/tests/futures.rs b/tests/futures.rs index 5124e78..0f51f9b 100644 --- a/tests/futures.rs +++ b/tests/futures.rs @@ -52,3 +52,27 @@ fn test_threaded_future() { // Future should be polled twice, once initially and once after the wake-up assert_eq!(beul::execute(future), 2); } + +#[test] +fn test_self_waking_futures() { + struct SelfWakingFuture(bool); + + impl Future for SelfWakingFuture { + type Output = (); + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.0 { + Poll::Ready(()) + } else { + // Next time we complete + self.0 = true; + // Request to be woken up + cx.waker().wake_by_ref(); + + Poll::Pending + } + } + } + + beul::execute(SelfWakingFuture(false)); +}