[go: up one dir, main page]

tower 0.4.11

Tower is a library of modular and reusable components for building robust clients and servers.
Documentation
#![cfg(feature = "spawn-ready")]
#[path = "../support.rs"]
mod support;

use tokio::time;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
use tower::spawn_ready::{SpawnReady, SpawnReadyLayer};
use tower::util::ServiceExt;
use tower_test::mock;

#[tokio::test(flavor = "current_thread")]
async fn when_inner_is_not_ready() {
    time::pause();

    let _t = support::trace_init();

    let layer = SpawnReadyLayer::new();
    let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);

    // Make the service NotReady
    handle.allow(0);

    assert_pending!(service.poll_ready());

    // Make the service is Ready
    handle.allow(1);
    time::sleep(time::Duration::from_millis(100)).await;
    assert_ready_ok!(service.poll_ready());
}

#[tokio::test(flavor = "current_thread")]
async fn when_inner_fails() {
    let _t = support::trace_init();

    let layer = SpawnReadyLayer::new();
    let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);

    // Make the service NotReady
    handle.allow(0);
    handle.send_error("foobar");

    assert_eq!(
        assert_ready_err!(service.poll_ready()).to_string(),
        "foobar"
    );
}

#[tokio::test(flavor = "current_thread")]
async fn propagates_trace_spans() {
    use tracing::Instrument;

    let _t = support::trace_init();

    let span = tracing::info_span!("my_span");

    let service = support::AssertSpanSvc::new(span.clone());
    let service = SpawnReady::new(service);
    let result = tokio::spawn(service.oneshot(()).instrument(span));

    result.await.expect("service panicked").expect("failed");
}

#[cfg(test)]
#[tokio::test(flavor = "current_thread")]
async fn abort_on_drop() {
    let (mock, mut handle) = mock::pair::<(), ()>();
    let mut svc = SpawnReady::new(mock);
    handle.allow(0);

    // Drive the service to readiness until we signal a drop.
    let (drop_tx, drop_rx) = tokio::sync::oneshot::channel();
    let mut task = tokio_test::task::spawn(async move {
        tokio::select! {
            _ = drop_rx => {}
            _ = svc.ready() => unreachable!("Service must not become ready"),
        }
    });
    assert_pending!(task.poll());
    assert_pending!(handle.poll_request());

    // End the task and ensure that the inner service has been dropped.
    assert!(drop_tx.send(()).is_ok());
    tokio_test::assert_ready!(task.poll());
    tokio::task::yield_now().await;
    assert!(tokio_test::assert_ready!(handle.poll_request()).is_none());
}