[go: up one dir, main page]

tower 0.4.11

Tower is a library of modular and reusable components for building robust clients and servers.
Documentation
use crate::discover::ServiceList;
use crate::load;
use futures_util::pin_mut;
use std::task::Poll;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_test::{assert_request_eq, mock};

use super::*;

#[tokio::test]
async fn empty() {
    let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
    let disco = ServiceList::new(empty);
    let mut svc = mock::Spawn::new(Balance::new(disco));
    assert_pending!(svc.poll_ready());
}

#[tokio::test]
async fn single_endpoint() {
    let (mut svc, mut handle) = mock::spawn_with(|s| {
        let mock = load::Constant::new(s, 0);
        let disco = ServiceList::new(vec![mock].into_iter());
        Balance::new(disco)
    });

    handle.allow(0);
    assert_pending!(svc.poll_ready());
    assert_eq!(
        svc.get_ref().len(),
        1,
        "balancer must have discovered endpoint"
    );

    handle.allow(1);
    assert_ready_ok!(svc.poll_ready());

    let mut fut = task::spawn(svc.call(()));

    assert_request_eq!(handle, ()).send_response(1);

    assert_eq!(assert_ready_ok!(fut.poll()), 1);
    handle.allow(1);
    assert_ready_ok!(svc.poll_ready());

    handle.send_error("endpoint lost");
    assert_pending!(svc.poll_ready());
    assert!(
        svc.get_ref().len() == 0,
        "balancer must drop failed endpoints"
    );
}

#[tokio::test]
async fn two_endpoints_with_equal_load() {
    let (mock_a, handle_a) = mock::pair();
    let (mock_b, handle_b) = mock::pair();
    let mock_a = load::Constant::new(mock_a, 1);
    let mock_b = load::Constant::new(mock_b, 1);

    pin_mut!(handle_a);
    pin_mut!(handle_b);

    let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
    let mut svc = mock::Spawn::new(Balance::new(disco));

    handle_a.allow(0);
    handle_b.allow(0);
    assert_pending!(svc.poll_ready());
    assert_eq!(
        svc.get_ref().len(),
        2,
        "balancer must have discovered both endpoints"
    );

    handle_a.allow(1);
    handle_b.allow(0);
    assert_ready_ok!(
        svc.poll_ready(),
        "must be ready when one of two services is ready"
    );
    {
        let mut fut = task::spawn(svc.call(()));
        assert_request_eq!(handle_a, ()).send_response("a");
        assert_eq!(assert_ready_ok!(fut.poll()), "a");
    }

    handle_a.allow(0);
    handle_b.allow(1);
    assert_ready_ok!(
        svc.poll_ready(),
        "must be ready when both endpoints are ready"
    );
    {
        let mut fut = task::spawn(svc.call(()));
        assert_request_eq!(handle_b, ()).send_response("b");
        assert_eq!(assert_ready_ok!(fut.poll()), "b");
    }

    handle_a.allow(1);
    handle_b.allow(1);
    for _ in 0..2 {
        assert_ready_ok!(
            svc.poll_ready(),
            "must be ready when both endpoints are ready"
        );
        let mut fut = task::spawn(svc.call(()));

        for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
            if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request() {
                tracing::info!("using {}", c);
                tx.send_response(c);
                h.allow(0);
            }
        }
        assert_ready_ok!(fut.poll());
    }

    handle_a.send_error("endpoint lost");
    assert_pending!(svc.poll_ready());
    assert_eq!(
        svc.get_ref().len(),
        1,
        "balancer must drop failed endpoints",
    );
}