Skip to content

Commit 611b793

Browse files
authored
coop: add cooperative and poll_proceed (#7405)
1 parent 888ee60 commit 611b793

File tree

2 files changed

+137
-26
lines changed

2 files changed

+137
-26
lines changed

tokio/src/task/coop/mod.rs

Lines changed: 136 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,27 @@ cfg_coop! {
250250
use pin_project_lite::pin_project;
251251
use std::cell::Cell;
252252
use std::future::Future;
253+
use std::marker::PhantomData;
253254
use std::pin::Pin;
254255
use std::task::{ready, Context, Poll};
255256

257+
/// Value returned by the [`poll_proceed`] method.
258+
#[derive(Debug)]
256259
#[must_use]
257-
pub(crate) struct RestoreOnPending(Cell<Budget>);
260+
pub struct RestoreOnPending(Cell<Budget>, PhantomData<*mut ()>);
258261

259262
impl RestoreOnPending {
260-
pub(crate) fn made_progress(&self) {
263+
fn new(budget: Budget) -> Self {
264+
RestoreOnPending(
265+
Cell::new(budget),
266+
PhantomData,
267+
)
268+
}
269+
270+
/// Signals that the task that obtained this `RestoreOnPending` was able to make
271+
/// progress. This prevents the task budget from being restored to the value
272+
/// it had prior to obtaining this instance when it is dropped.
273+
pub fn made_progress(&self) {
261274
self.0.set(Budget::unconstrained());
262275
}
263276
}
@@ -275,27 +288,102 @@ cfg_coop! {
275288
}
276289
}
277290

278-
/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
291+
/// Decrements the task budget and returns [`Poll::Pending`] if the budget is depleted.
292+
/// This indicates that the task should yield to the scheduler. Otherwise, returns
293+
/// [`RestoreOnPending`] which can be used to commit the budget consumption.
279294
///
280-
/// When you call this method, the current budget is decremented. However, to ensure that
281-
/// progress is made every time a task is polled, the budget is automatically restored to its
282-
/// former value if the returned `RestoreOnPending` is dropped. It is the caller's
283-
/// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
284-
/// that the budget empties appropriately.
295+
/// The returned [`RestoreOnPending`] will revert the budget to its former
296+
/// value when dropped unless [`RestoreOnPending::made_progress`]
297+
/// is called. It is the caller's responsibility to do so when it _was_ able to
298+
/// make progress after the call to [`poll_proceed`].
299+
/// Restoring the budget automatically ensures the task can try to make progress in some other
300+
/// way.
285301
///
286-
/// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
287-
/// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
288-
/// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
302+
/// Note that [`RestoreOnPending`] restores the budget **as it was before [`poll_proceed`]**.
303+
/// Therefore, if the budget is _further_ adjusted between when [`poll_proceed`] returns and
304+
/// [`RestoreOnPending`] is dropped, those adjustments are erased unless the caller indicates
289305
/// that progress was made.
306+
///
307+
/// # Examples
308+
///
309+
/// This example shows a simple countdown latch that uses [`poll_proceed`] to participate in
310+
/// cooperative scheduling.
311+
///
312+
/// ```
313+
/// use std::future::{Future};
314+
/// use std::pin::Pin;
315+
/// use std::task::{ready, Context, Poll, Waker};
316+
/// use tokio::task::coop;
317+
///
318+
/// struct CountdownLatch<T> {
319+
/// counter: usize,
320+
/// value: Option<T>,
321+
/// waker: Option<Waker>
322+
/// }
323+
///
324+
/// impl<T> CountdownLatch<T> {
325+
/// fn new(value: T, count: usize) -> Self {
326+
/// CountdownLatch {
327+
/// counter: count,
328+
/// value: Some(value),
329+
/// waker: None
330+
/// }
331+
/// }
332+
/// fn count_down(&mut self) {
333+
/// if self.counter <= 0 {
334+
/// return;
335+
/// }
336+
///
337+
/// self.counter -= 1;
338+
/// if self.counter == 0 {
339+
/// if let Some(w) = self.waker.take() {
340+
/// w.wake();
341+
/// }
342+
/// }
343+
/// }
344+
/// }
345+
///
346+
/// impl<T> Future for CountdownLatch<T> {
347+
/// type Output = T;
348+
///
349+
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350+
/// // `poll_proceed` checks with the runtime if this task is still allowed to proceed
351+
/// // with performing work.
352+
/// // If not, `Pending` is returned and `ready!` ensures this function returns.
353+
/// // If we are allowed to proceed, coop now represents the budget consumption
354+
/// let coop = ready!(coop::poll_proceed(cx));
355+
///
356+
/// // Get a mutable reference to the CountdownLatch
357+
/// let this = Pin::get_mut(self);
358+
///
359+
/// // Next we check if the latch is ready to release its value
360+
/// if this.counter == 0 {
361+
/// let t = this.value.take();
362+
/// // The latch made progress so call `made_progress` to ensure the budget
363+
/// // is not reverted.
364+
/// coop.made_progress();
365+
/// Poll::Ready(t.unwrap())
366+
/// } else {
367+
/// // If the latch is not ready so return pending and simply drop `coop`.
368+
/// // This will restore the budget making it available again to perform any
369+
/// // other work.
370+
/// this.waker = Some(cx.waker().clone());
371+
/// Poll::Pending
372+
/// }
373+
/// }
374+
/// }
375+
///
376+
/// impl<T> Unpin for CountdownLatch<T> {}
377+
/// ```
290378
#[inline]
291-
pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
379+
pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
292380
context::budget(|cell| {
293381
let mut budget = cell.get();
294382

295383
let decrement = budget.decrement();
296384

297385
if decrement.success {
298-
let restore = RestoreOnPending(Cell::new(cell.get()));
386+
let restore = RestoreOnPending::new(cell.get());
299387
cell.set(budget);
300388

301389
// avoid double counting
@@ -308,7 +396,7 @@ cfg_coop! {
308396
register_waker(cx);
309397
Poll::Pending
310398
}
311-
}).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
399+
}).unwrap_or(Poll::Ready(RestoreOnPending::new(Budget::unconstrained())))
312400
}
313401

314402
/// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise.
@@ -380,15 +468,9 @@ cfg_coop! {
380468
}
381469

382470
pin_project! {
383-
/// Future wrapper to ensure cooperative scheduling.
384-
///
385-
/// When being polled `poll_proceed` is called before the inner future is polled to check
386-
/// if the inner future has exceeded its budget. If the inner future resolves, this will
387-
/// automatically call `RestoreOnPending::made_progress` before resolving this future with
388-
/// the result of the inner one. If polling the inner future is pending, polling this future
389-
/// type will also return a `Poll::Pending`.
471+
/// Future wrapper to ensure cooperative scheduling created by [`cooperative`].
390472
#[must_use = "futures do nothing unless polled"]
391-
pub(crate) struct Coop<F: Future> {
473+
pub struct Coop<F: Future> {
392474
#[pin]
393475
pub(crate) fut: F,
394476
}
@@ -409,11 +491,39 @@ cfg_coop! {
409491
}
410492
}
411493

412-
/// Run a future with a budget constraint for cooperative scheduling.
413-
/// If the future exceeds its budget while being polled, control is yielded back to the
414-
/// runtime.
494+
/// Creates a wrapper future that makes the inner future cooperate with the Tokio scheduler.
495+
///
496+
/// When polled, the wrapper will first call [`poll_proceed`] to consume task budget, and
497+
/// immediately yield if the budget has been depleted. If budget was available, the inner future
498+
/// is polled. The budget consumption will be made final using [`RestoreOnPending::made_progress`]
499+
/// if the inner future resolves to its final value.
500+
///
501+
/// # Examples
502+
///
503+
/// When you call `recv` on the `Receiver` of a [`tokio::sync::mpsc`](crate::sync::mpsc)
504+
/// channel, task budget will automatically be consumed when the next value is returned.
505+
/// This makes tasks that use Tokio mpsc channels automatically cooperative.
506+
///
507+
/// If you're using [`futures::channel::mpsc`](https://docs.rs/futures/latest/futures/channel/mpsc/index.html)
508+
/// instead, automatic task budget consumption will not happen. This example shows how can use
509+
/// `cooperative` to make `futures::channel::mpsc` channels cooperate with the scheduler in the
510+
/// same way Tokio channels do.
511+
///
512+
/// ```
513+
/// use tokio::task::coop::cooperative;
514+
/// use futures::channel::mpsc::Receiver;
515+
/// use futures::stream::StreamExt;
516+
///
517+
/// async fn receive_next<T>(receiver: &mut Receiver<T>) -> Option<T> {
518+
/// // Use `StreamExt::next` to obtain a `Future` that resolves to the next value
519+
/// let recv_future = receiver.next();
520+
/// // Wrap it a cooperative wrapper
521+
/// let coop_future = cooperative(recv_future);
522+
/// // And await
523+
/// coop_future.await
524+
/// }
415525
#[inline]
416-
pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> {
526+
pub fn cooperative<F: Future>(fut: F) -> Coop<F> {
417527
Coop { fut }
418528
}
419529
}

tokio/tests/async_send_sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ assert_value!(tokio::task::JoinSet<NN>: !Send & !Sync & Unpin);
454454
assert_value!(tokio::task::JoinSet<YN>: Send & Sync & Unpin);
455455
assert_value!(tokio::task::JoinSet<YY>: Send & Sync & Unpin);
456456
assert_value!(tokio::task::LocalSet: !Send & !Sync & Unpin);
457+
assert_value!(tokio::task::coop::RestoreOnPending: !Send & !Sync & Unpin);
457458
async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync & !Unpin);
458459
async_assert_fn!(tokio::sync::Mutex<NN>::lock(_): !Send & !Sync & !Unpin);
459460
async_assert_fn!(tokio::sync::Mutex<NN>::lock_owned(_): !Send & !Sync & !Unpin);

0 commit comments

Comments
 (0)