Fix hanging when a future wakes itself

This commit is contained in:
2022-09-02 13:57:18 +02:00
parent 384869bc68
commit 279654ea24
3 changed files with 53 additions and 8 deletions

View File

@@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- Fixed potential hanging when the waker is called while polling.
## [0.1.0]
- Initial release

View File

@@ -26,15 +26,36 @@ use std::task::Wake;
use std::task::Waker;
#[derive(Default)]
struct CondvarWake(Condvar);
struct CondvarWake {
park: Condvar,
awoken: Mutex<bool>,
}
impl CondvarWake {
pub fn park(&self) {
let mut guard = self.awoken.lock().unwrap_or_else(PoisonError::into_inner);
// Until we are awoken, we can park on the condvar. This also handles the case where we're
// awoken while we're actually polling.
while !*guard {
guard = self
.park
.wait(guard)
.unwrap_or_else(PoisonError::into_inner);
}
*guard = false;
}
}
impl Wake for CondvarWake {
fn wake(self: Arc<Self>) {
self.0.notify_one()
self.wake_by_ref()
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.notify_one()
*self.awoken.lock().unwrap_or_else(PoisonError::into_inner) = true;
self.park.notify_one();
}
}
@@ -48,14 +69,10 @@ pub fn execute<T>(f: impl Future<Output = T>) -> T {
let mut context = Context::from_waker(&waker);
let mutex = Mutex::new(());
// Cannot panic but avoids generating the unwrap code
let mut guard = mutex.lock().unwrap_or_else(PoisonError::into_inner);
loop {
match pinned.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => guard = wake.0.wait(guard).unwrap_or_else(PoisonError::into_inner),
Poll::Pending => wake.park(),
}
}
}

View File

@@ -52,3 +52,27 @@ fn test_threaded_future() {
// Future should be polled twice, once initially and once after the wake-up
assert_eq!(beul::execute(future), 2);
}
#[test]
fn test_self_waking_futures() {
struct SelfWakingFuture(bool);
impl Future for SelfWakingFuture {
type Output = ();
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0 {
Poll::Ready(())
} else {
// Next time we complete
self.0 = true;
// Request to be woken up
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
beul::execute(SelfWakingFuture(false));
}