[go: up one dir, main page]

worker/
delay.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use std::{
    cell::Cell,
    future::Future,
    pin::Pin,
    rc::Rc,
    task::{Context, Poll},
    time::Duration,
};

use wasm_bindgen::{prelude::Closure, JsCast};

/// A [Future] for asynchronously waiting.
///
/// # Example:
/// ```rust,ignore
/// use std::time::Duration;
/// use worker::Delay;
///
/// let duration = Duration::from_millis(1000);
///
/// // Waits a second
/// Delay::from(duration).await;
/// ```
#[pin_project::pin_project(PinnedDrop)]
pub struct Delay {
    inner: Duration,
    closure: Option<Closure<dyn FnMut()>>,
    timeout_id: Option<i32>,
    awoken: Rc<Cell<bool>>,
}

impl Future for Delay {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        if !this.awoken.get() {
            if this.closure.is_none() {
                let awoken = this.awoken.clone();
                let callback_ref = this.closure.get_or_insert_with(move || {
                    let waker = cx.waker().clone();
                    let wake = Box::new(move || {
                        waker.wake_by_ref();
                        awoken.set(true);
                    });

                    Closure::wrap(wake as _)
                });

                // Then get that closure back and pass it to setTimeout so we can get woken up later.
                let global: web_sys::WorkerGlobalScope = js_sys::global().unchecked_into();
                let timeout_id = global
                    .set_timeout_with_callback_and_timeout_and_arguments_0(
                        callback_ref.as_ref().unchecked_ref::<js_sys::Function>(),
                        this.inner.as_millis() as i32,
                    )
                    .unwrap();
                *this.timeout_id = Some(timeout_id);
            }

            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

impl From<Duration> for Delay {
    fn from(inner: Duration) -> Self {
        Self {
            inner,
            closure: None,
            timeout_id: None,
            awoken: Rc::new(Cell::default()),
        }
    }
}

/// SAFETY: If, for whatever reason, the delay is dropped before the future is ready JS will invoke
/// a dropped future causing memory safety issues. To avoid this we will just clean up the timeout
/// if we drop the delay, cancelling the timeout.
#[pin_project::pinned_drop]
impl PinnedDrop for Delay {
    fn drop(self: Pin<&'_ mut Self>) {
        let this = self.project();

        // If we've already completed the future we don't need to clear the timeout.
        if this.awoken.get() {
            return;
        }

        if let Some(id) = this.timeout_id {
            crate::console_debug!("{:#?} has been dropped", &this.inner);
            let global: web_sys::WorkerGlobalScope = js_sys::global().unchecked_into();
            global.clear_timeout_with_handle(*id);
        }
    }
}