#![cfg(target_arch = "wasm32")]
use futures::{prelude::*, sync::oneshot, try_ready};
use std::{error, fmt};
use std::cmp::{Eq, PartialEq, Ord, PartialOrd, Ordering};
use std::ops::{Add, Sub};
use std::time::Duration;
use wasm_bindgen::{prelude::*, JsCast};
pub use self::timeout::Timeout;
#[derive(Debug, Copy, Clone)]
pub struct Instant {
inner: f64,
}
impl PartialEq for Instant {
fn eq(&self, other: &Instant) -> bool {
self.inner == other.inner
}
}
impl Eq for Instant {}
impl PartialOrd for Instant {
fn partial_cmp(&self, other: &Instant) -> Option<Ordering> {
self.inner.partial_cmp(&other.inner)
}
}
impl Ord for Instant {
fn cmp(&self, other: &Self) -> Ordering {
self.inner.partial_cmp(&other.inner).unwrap()
}
}
impl Instant {
pub fn now() -> Instant {
let val = web_sys::window()
.expect("not in a browser")
.performance()
.expect("performance object not available")
.now();
Instant { inner: val }
}
pub fn duration_since(&self, earlier: Instant) -> Duration {
*self - earlier
}
pub fn elapsed(&self) -> Duration {
Instant::now() - *self
}
}
impl Add<Duration> for Instant {
type Output = Instant;
fn add(self, other: Duration) -> Instant {
let new_val = self.inner + other.as_millis() as f64;
Instant { inner: new_val as f64 }
}
}
impl Sub<Duration> for Instant {
type Output = Instant;
fn sub(self, other: Duration) -> Instant {
let new_val = self.inner - other.as_millis() as f64;
Instant { inner: new_val as f64 }
}
}
impl Sub<Instant> for Instant {
type Output = Duration;
fn sub(self, other: Instant) -> Duration {
let ms = self.inner - other.inner;
assert!(ms >= 0.0);
Duration::from_millis(ms as u64)
}
}
#[derive(Debug)]
pub struct Error;
impl error::Error for Error {
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Timer error")
}
}
pub struct Delay {
handle: i32,
deadline: Instant,
triggered_rx: oneshot::Receiver<()>,
_cb: send_wrapper::SendWrapper<Closure<FnMut()>>,
}
unsafe impl Sync for Delay {}
impl Delay {
pub fn new(deadline: Instant) -> Delay {
let now = Instant::now();
if deadline > now {
let dur = deadline - now;
Delay::new_timeout(deadline, dur)
} else {
Delay::new_timeout(deadline, Duration::new(0, 0))
}
}
pub fn deadline(&self) -> Instant {
self.deadline
}
fn new_timeout(deadline: Instant, duration: Duration) -> Delay {
let (tx, rx) = oneshot::channel();
let mut tx = Some(tx);
let cb = Closure::wrap(Box::new(move || {
let _ = tx.take().unwrap().send(());
}) as Box<FnMut()>);
let handle = web_sys::window()
.expect("not in a browser")
.set_timeout_with_callback_and_timeout_and_arguments_0(cb.as_ref().unchecked_ref(), duration.as_millis() as i32)
.expect("failed to call set_timeout");
Delay { handle, triggered_rx: rx, deadline, _cb: send_wrapper::SendWrapper::new(cb) }
}
fn reset_timeout(&mut self) {
}
pub fn reset(&mut self, deadline: Instant) {
*self = Delay::new(deadline);
}
}
impl Drop for Delay {
fn drop(&mut self) {
web_sys::window().unwrap().clear_timeout_with_handle(self.handle);
}
}
impl fmt::Debug for Delay {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("Delay").field(&self.deadline).finish()
}
}
impl Future for Delay {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.triggered_rx.poll().map_err(|_| unreachable!())
}
}
#[derive(Debug)]
pub struct Interval {
delay: Delay,
duration: Duration,
}
impl Interval {
pub fn new(at: Instant, duration: Duration) -> Interval {
assert!(
duration > Duration::new(0, 0),
"`duration` must be non-zero."
);
Interval::new_with_delay(Delay::new(at), duration)
}
pub fn new_interval(duration: Duration) -> Interval {
Interval::new(Instant::now() + duration, duration)
}
pub(crate) fn new_with_delay(delay: Delay, duration: Duration) -> Interval {
Interval { delay, duration }
}
}
impl Stream for Interval {
type Item = Instant;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let _ = try_ready!(self.delay.poll());
let now = self.delay.deadline();
self.delay.reset(now + self.duration);
Ok(Some(now).into())
}
}
pub mod timeout {
use super::{Delay, Instant};
use futures::prelude::*;
use std::{error, fmt, time::Duration};
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<T> {
value: T,
delay: Delay,
}
#[derive(Debug)]
pub struct Error<T>(Kind<T>);
#[derive(Debug)]
enum Kind<T> {
Inner(T),
Elapsed,
Timer(crate::Error),
}
impl<T> Timeout<T> {
pub fn new(value: T, timeout: Duration) -> Timeout<T> {
let delay = Delay::new_timeout(Instant::now() + timeout, timeout);
Timeout {
value,
delay,
}
}
pub fn get_ref(&self) -> &T {
&self.value
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.value
}
pub fn into_inner(self) -> T {
self.value
}
}
impl<T: Future> Timeout<T> {
pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
let delay = Delay::new(deadline);
Timeout {
value: future,
delay,
}
}
}
impl<T> Future for Timeout<T>
where T: Future,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.value.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
Err(Error::elapsed())
},
Err(e) => Err(Error::timer(e)),
}
}
}
impl<T> Stream for Timeout<T>
where T: Stream,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.value.poll() {
Ok(Async::Ready(v)) => {
if v.is_some() {
self.delay.reset_timeout();
}
return Ok(Async::Ready(v))
}
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
self.delay.reset_timeout();
Err(Error::elapsed())
},
Err(e) => Err(Error::timer(e)),
}
}
}
impl<T> Error<T> {
pub fn inner(err: T) -> Error<T> {
Error(Kind::Inner(err))
}
pub fn is_inner(&self) -> bool {
match self.0 {
Kind::Inner(_) => true,
_ => false,
}
}
pub fn into_inner(self) -> Option<T> {
match self.0 {
Kind::Inner(err) => Some(err),
_ => None,
}
}
pub fn elapsed() -> Error<T> {
Error(Kind::Elapsed)
}
pub fn is_elapsed(&self) -> bool {
match self.0 {
Kind::Elapsed => true,
_ => false,
}
}
pub fn timer(err: crate::Error) -> Error<T> {
Error(Kind::Timer(err))
}
pub fn is_timer(&self) -> bool {
match self.0 {
Kind::Timer(_) => true,
_ => false,
}
}
pub fn into_timer(self) -> Option<crate::Error> {
match self.0 {
Kind::Timer(err) => Some(err),
_ => None,
}
}
}
impl<T: error::Error> error::Error for Error<T> {
fn description(&self) -> &str {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.description(),
Elapsed => "deadline has elapsed",
Timer(ref e) => e.description(),
}
}
}
impl<T: fmt::Display> fmt::Display for Error<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.fmt(fmt),
Elapsed => "deadline has elapsed".fmt(fmt),
Timer(ref e) => e.fmt(fmt),
}
}
}
}
#[cfg(test)]
mod tests {
use crate::Delay;
#[test]
fn test_send_sync() {
fn req<T: Send + Sync>() {}
req::<Delay>();
}
}