[go: up one dir, main page]

tokio 0.2.10

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
//! A concurrent, lock-free, FIFO list.

use crate::loom::{
    sync::atomic::{AtomicPtr, AtomicUsize},
    thread,
};
use crate::sync::mpsc::block::{self, Block};

use std::fmt;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// List queue transmit handle
pub(crate) struct Tx<T> {
    /// Tail in the `Block` mpmc list.
    block_tail: AtomicPtr<Block<T>>,

    /// Position to push the next message. This reference a block and offset
    /// into the block.
    tail_position: AtomicUsize,
}

/// List queue receive handle
pub(crate) struct Rx<T> {
    /// Pointer to the block being processed
    head: NonNull<Block<T>>,

    /// Next slot index to process
    index: usize,

    /// Pointer to the next block pending release
    free_head: NonNull<Block<T>>,
}

pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
    // Create the initial block shared between the tx and rx halves.
    let initial_block = Box::new(Block::new(0));
    let initial_block_ptr = Box::into_raw(initial_block);

    let tx = Tx {
        block_tail: AtomicPtr::new(initial_block_ptr),
        tail_position: AtomicUsize::new(0),
    };

    let head = NonNull::new(initial_block_ptr).unwrap();

    let rx = Rx {
        head,
        index: 0,
        free_head: head,
    };

    (tx, rx)
}

impl<T> Tx<T> {
    /// Push a value into the list.
    pub(crate) fn push(&self, value: T) {
        // First, claim a slot for the value. `Acquire` is used here to
        // synchronize with the `fetch_add` in `reclaim_blocks`.
        let slot_index = self.tail_position.fetch_add(1, Acquire);

        // Load the current block and write the value
        let block = self.find_block(slot_index);

        unsafe {
            // Write the value to the block
            block.as_ref().write(slot_index, value);
        }
    }

    /// Close the send half of the list
    ///
    /// Similar process as pushing a value, but instead of writing the value &
    /// setting the ready flag, the TX_CLOSED flag is set on the block.
    pub(crate) fn close(&self) {
        // First, claim a slot for the value. This is the last slot that will be
        // claimed.
        let slot_index = self.tail_position.fetch_add(1, Acquire);

        let block = self.find_block(slot_index);

        unsafe { block.as_ref().tx_close() }
    }

    fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
        // The start index of the block that contains `index`.
        let start_index = block::start_index(slot_index);

        // The index offset into the block
        let offset = block::offset(slot_index);

        // Load the current head of the block
        let mut block_ptr = self.block_tail.load(Acquire);

        let block = unsafe { &*block_ptr };

        // Calculate the distance between the tail ptr and the target block
        let distance = block.distance(start_index);

        // Decide if this call to `find_block` should attempt to update the
        // `block_tail` pointer.
        //
        // Updating `block_tail` is not always performed in order to reduce
        // contention.
        //
        // When set, as the routine walks the linked list, it attempts to update
        // `block_tail`. If the update cannot be performed, `try_updating_tail`
        // is unset.
        let mut try_updating_tail = distance > offset;

        // Walk the linked list of blocks until the block with `start_index` is
        // found.
        loop {
            let block = unsafe { &(*block_ptr) };

            if block.is_at_index(start_index) {
                return unsafe { NonNull::new_unchecked(block_ptr) };
            }

            let next_block = block
                .load_next(Acquire)
                // There is no allocated next block, grow the linked list.
                .unwrap_or_else(|| block.grow());

            // If the block is **not** final, then the tail pointer cannot be
            // advanced any more.
            try_updating_tail &= block.is_final();

            if try_updating_tail {
                // Advancing `block_tail` must happen when walking the linked
                // list. `block_tail` may not advance passed any blocks that are
                // not "final". At the point a block is finalized, it is unknown
                // if there are any prior blocks that are unfinalized, which
                // makes it impossible to advance `block_tail`.
                //
                // While walking the linked list, `block_tail` can be advanced
                // as long as finalized blocks are traversed.
                //
                // Release ordering is used to ensure that any subsequent reads
                // are able to see the memory pointed to by `block_tail`.
                //
                // Acquire is not needed as any "actual" value is not accessed.
                // At this point, the linked list is walked to acquire blocks.
                let actual =
                    self.block_tail
                        .compare_and_swap(block_ptr, next_block.as_ptr(), Release);

                if actual == block_ptr {
                    // Synchronize with any senders
                    let tail_position = self.tail_position.fetch_add(0, Release);

                    unsafe {
                        block.tx_release(tail_position);
                    }
                } else {
                    // A concurrent sender is also working on advancing
                    // `block_tail` and this thread is falling behind.
                    //
                    // Stop trying to advance the tail pointer
                    try_updating_tail = false;
                }
            }

            block_ptr = next_block.as_ptr();

            thread::yield_now();
        }
    }

    pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
        // The block has been removed from the linked list and ownership
        // is reclaimed.
        //
        // Before dropping the block, see if it can be reused by
        // inserting it back at the end of the linked list.
        //
        // First, reset the data
        block.as_mut().reclaim();

        let mut reused = false;

        // Attempt to insert the block at the end
        //
        // Walk at most three times
        //
        let curr_ptr = self.block_tail.load(Acquire);

        // The pointer can never be null
        debug_assert!(!curr_ptr.is_null());

        let mut curr = NonNull::new_unchecked(curr_ptr);

        // TODO: Unify this logic with Block::grow
        for _ in 0..3 {
            match curr.as_ref().try_push(&mut block, AcqRel) {
                Ok(_) => {
                    reused = true;
                    break;
                }
                Err(next) => {
                    curr = next;
                }
            }
        }

        if !reused {
            let _ = Box::from_raw(block.as_ptr());
        }
    }
}

impl<T> fmt::Debug for Tx<T> {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("Tx")
            .field("block_tail", &self.block_tail.load(Relaxed))
            .field("tail_position", &self.tail_position.load(Relaxed))
            .finish()
    }
}

impl<T> Rx<T> {
    /// Pop the next value off the queue
    pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
        // Advance `head`, if needed
        if !self.try_advancing_head() {
            return None;
        }

        self.reclaim_blocks(tx);

        unsafe {
            let block = self.head.as_ref();

            let ret = block.read(self.index);

            if let Some(block::Read::Value(..)) = ret {
                self.index = self.index.wrapping_add(1);
            }

            ret
        }
    }

    /// Try advancing the block pointer to the block referenced by `self.index`.
    ///
    /// Returns `true` if successful, `false` if there is no next block to load.
    fn try_advancing_head(&mut self) -> bool {
        let block_index = block::start_index(self.index);

        loop {
            let next_block = {
                let block = unsafe { self.head.as_ref() };

                if block.is_at_index(block_index) {
                    return true;
                }

                block.load_next(Acquire)
            };

            let next_block = match next_block {
                Some(next_block) => next_block,
                None => {
                    return false;
                }
            };

            self.head = next_block;

            thread::yield_now();
        }
    }

    fn reclaim_blocks(&mut self, tx: &Tx<T>) {
        while self.free_head != self.head {
            unsafe {
                // Get a handle to the block that will be freed and update
                // `free_head` to point to the next block.
                let block = self.free_head;

                let observed_tail_position = block.as_ref().observed_tail_position();

                let required_index = match observed_tail_position {
                    Some(i) => i,
                    None => return,
                };

                if required_index > self.index {
                    return;
                }

                // We may read the next pointer with `Relaxed` ordering as it is
                // guaranteed that the `reclaim_blocks` routine trails the `recv`
                // routine. Any memory accessed by `reclaim_blocks` has already
                // been acquired by `recv`.
                let next_block = block.as_ref().load_next(Relaxed);

                // Update the free list head
                self.free_head = next_block.unwrap();

                // Push the emptied block onto the back of the queue, making it
                // available to senders.
                tx.reclaim_block(block);
            }

            thread::yield_now();
        }
    }

    /// Effectively `Drop` all the blocks. Should only be called once, when
    /// the list is dropping.
    pub(super) unsafe fn free_blocks(&mut self) {
        debug_assert_ne!(self.free_head, NonNull::dangling());

        let mut cur = Some(self.free_head);

        #[cfg(debug_assertions)]
        {
            // to trigger the debug assert above so as to catch that we
            // don't call `free_blocks` more than once.
            self.free_head = NonNull::dangling();
            self.head = NonNull::dangling();
        }

        while let Some(block) = cur {
            cur = block.as_ref().load_next(Relaxed);
            drop(Box::from_raw(block.as_ptr()));
        }
    }
}

impl<T> fmt::Debug for Rx<T> {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("Rx")
            .field("head", &self.head)
            .field("index", &self.index)
            .field("free_head", &self.free_head)
            .finish()
    }
}