[go: up one dir, main page]

mio 0.7.0

Lightweight non-blocking IO
Documentation
#![cfg(all(feature = "os-poll", feature = "tcp"))]

use mio::net::TcpListener;
use mio::{Interest, Token};
use std::io::{self, Read};
use std::net::{self, SocketAddr};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::sync::{Arc, Barrier};
use std::thread;

mod util;
use util::{
    any_local_address, any_local_ipv6_address, assert_send, assert_socket_close_on_exec,
    assert_socket_non_blocking, assert_sync, assert_would_block, expect_events, expect_no_events,
    init, init_with_poll, ExpectEvent,
};

const ID1: Token = Token(0);
const ID2: Token = Token(1);

#[test]
fn is_send_and_sync() {
    assert_send::<TcpListener>();
    assert_sync::<TcpListener>();
}

#[test]
fn tcp_listener() {
    smoke_test_tcp_listener(any_local_address(), TcpListener::bind);
}

#[test]
fn tcp_listener_ipv6() {
    smoke_test_tcp_listener(any_local_ipv6_address(), TcpListener::bind);
}

#[test]
fn tcp_listener_std() {
    smoke_test_tcp_listener(any_local_address(), |addr| {
        let listener = net::TcpListener::bind(addr).unwrap();
        // `std::net::TcpListener`s are blocking by default, so make sure it is in
        // non-blocking mode before wrapping in a Mio equivalent.
        listener.set_nonblocking(true).unwrap();
        Ok(TcpListener::from_std(listener))
    });
}

fn smoke_test_tcp_listener<F>(addr: SocketAddr, make_listener: F)
where
    F: FnOnce(SocketAddr) -> io::Result<TcpListener>,
{
    let (mut poll, mut events) = init_with_poll();

    let mut listener = make_listener(addr).unwrap();
    let address = listener.local_addr().unwrap();

    assert_socket_non_blocking(&listener);
    assert_socket_close_on_exec(&listener);

    poll.registry()
        .register(&mut listener, ID1, Interest::READABLE)
        .expect("unable to register TCP listener");

    let barrier = Arc::new(Barrier::new(2));
    let thread_handle = start_connections(address, 1, barrier.clone());

    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(ID1, Interest::READABLE)],
    );

    // Expect a single connection.
    let (mut stream, peer_address) = listener.accept().expect("unable to accept connection");
    assert!(peer_address.ip().is_loopback());
    assert_eq!(stream.peer_addr().unwrap(), peer_address);
    assert_eq!(stream.local_addr().unwrap(), address);

    // Expect the stream to be non-blocking.
    let mut buf = [0; 20];
    assert_would_block(stream.read(&mut buf));

    // Expect no more connections.
    assert_would_block(listener.accept());

    assert!(listener.take_error().unwrap().is_none());

    barrier.wait();
    thread_handle.join().expect("unable to join thread");
}

#[test]
fn set_get_ttl() {
    init();

    let listener = TcpListener::bind(any_local_address()).unwrap();

    // set TTL, get TTL, make sure it has the expected value
    const TTL: u32 = 10;
    listener.set_ttl(TTL).unwrap();
    assert_eq!(listener.ttl().unwrap(), TTL);
    assert!(listener.take_error().unwrap().is_none());
}

#[test]
fn get_ttl_without_previous_set() {
    init();

    let listener = TcpListener::bind(any_local_address()).unwrap();

    // expect a get TTL to work w/o any previous set_ttl
    listener.ttl().expect("unable to get TTL for TCP listener");
    assert!(listener.take_error().unwrap().is_none());
}

#[cfg(unix)]
#[test]
fn raw_fd() {
    init();

    let listener = TcpListener::bind(any_local_address()).unwrap();
    let address = listener.local_addr().unwrap();

    let raw_fd1 = listener.as_raw_fd();
    let raw_fd2 = listener.into_raw_fd();
    assert_eq!(raw_fd1, raw_fd2);

    let listener = unsafe { TcpListener::from_raw_fd(raw_fd2) };
    assert_eq!(listener.as_raw_fd(), raw_fd1);
    assert_eq!(listener.local_addr().unwrap(), address);
}

#[test]
fn registering() {
    let (mut poll, mut events) = init_with_poll();

    let mut stream = TcpListener::bind(any_local_address()).unwrap();

    poll.registry()
        .register(&mut stream, ID1, Interest::READABLE)
        .expect("unable to register TCP listener");

    expect_no_events(&mut poll, &mut events);

    // NOTE: more tests are done in the smoke tests above.
}

#[test]
fn reregister() {
    let (mut poll, mut events) = init_with_poll();

    let mut listener = TcpListener::bind(any_local_address()).unwrap();
    let address = listener.local_addr().unwrap();

    poll.registry()
        .register(&mut listener, ID1, Interest::READABLE)
        .unwrap();
    poll.registry()
        .reregister(&mut listener, ID2, Interest::READABLE)
        .unwrap();

    let barrier = Arc::new(Barrier::new(2));
    let thread_handle = start_connections(address, 1, barrier.clone());

    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(ID2, Interest::READABLE)],
    );

    let (stream, peer_address) = listener.accept().expect("unable to accept connection");
    assert!(peer_address.ip().is_loopback());
    assert_eq!(stream.peer_addr().unwrap(), peer_address);
    assert_eq!(stream.local_addr().unwrap(), address);

    assert_would_block(listener.accept());

    assert!(listener.take_error().unwrap().is_none());

    barrier.wait();
    thread_handle.join().expect("unable to join thread");
}

#[test]
fn no_events_after_deregister() {
    let (mut poll, mut events) = init_with_poll();

    let mut listener = TcpListener::bind(any_local_address()).unwrap();
    let address = listener.local_addr().unwrap();

    poll.registry()
        .register(&mut listener, ID1, Interest::READABLE)
        .unwrap();

    let barrier = Arc::new(Barrier::new(2));
    let thread_handle = start_connections(address, 1, barrier.clone());

    poll.registry().deregister(&mut listener).unwrap();

    expect_no_events(&mut poll, &mut events);

    // Should still be able to accept the connection.
    let (stream, peer_address) = listener.accept().expect("unable to accept connection");
    assert!(peer_address.ip().is_loopback());
    assert_eq!(stream.peer_addr().unwrap(), peer_address);
    assert_eq!(stream.local_addr().unwrap(), address);

    assert_would_block(listener.accept());

    assert!(listener.take_error().unwrap().is_none());

    barrier.wait();
    thread_handle.join().expect("unable to join thread");
}

/// This tests reregister on successful accept works
#[test]
fn tcp_listener_two_streams() {
    let (mut poll1, mut events) = init_with_poll();

    let mut listener = TcpListener::bind(any_local_address()).unwrap();
    let address = listener.local_addr().unwrap();

    let barrier = Arc::new(Barrier::new(3));
    let thread_handle1 = start_connections(address, 1, barrier.clone());

    poll1
        .registry()
        .register(&mut listener, ID1, Interest::READABLE)
        .unwrap();

    expect_events(
        &mut poll1,
        &mut events,
        vec![ExpectEvent::new(ID1, Interest::READABLE)],
    );

    {
        let (stream, peer_address) = listener.accept().expect("unable to accept connection");
        assert!(peer_address.ip().is_loopback());
        assert_eq!(stream.peer_addr().unwrap(), peer_address);
        assert_eq!(stream.local_addr().unwrap(), address);
    }

    assert_would_block(listener.accept());

    let thread_handle2 = start_connections(address, 1, barrier.clone());

    expect_events(
        &mut poll1,
        &mut events,
        vec![ExpectEvent::new(ID1, Interest::READABLE)],
    );

    {
        let (stream, peer_address) = listener.accept().expect("unable to accept connection");
        assert!(peer_address.ip().is_loopback());
        assert_eq!(stream.peer_addr().unwrap(), peer_address);
        assert_eq!(stream.local_addr().unwrap(), address);
    }

    expect_no_events(&mut poll1, &mut events);

    barrier.wait();
    thread_handle1.join().expect("unable to join thread");
    thread_handle2.join().expect("unable to join thread");
}

/// Start `n_connections` connections to `address`. If a `barrier` is provided
/// it will wait on it after each connection is made before it is dropped.
fn start_connections(
    address: SocketAddr,
    n_connections: usize,
    barrier: Arc<Barrier>,
) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        for _ in 0..n_connections {
            let conn = net::TcpStream::connect(address).unwrap();
            barrier.wait();
            drop(conn);
        }
    })
}