use crate::tokio_queue;
use st3;
pub enum GenericStealError {
Empty,
Busy,
}
pub trait GenericWorker<T>: Send {
type S: GenericStealer<T, W = Self>;
fn new() -> Self;
fn push(&self, item: T) -> Result<(), T>;
fn pop(&self) -> Option<T>;
fn stealer(&self) -> Self::S;
}
pub trait GenericStealer<T>: Clone + Send + Sync {
type W: GenericWorker<T>;
fn steal_batch_and_pop(&self, worker: &Self::W) -> Result<T, GenericStealError>;
}
impl<T: Send> GenericWorker<T> for st3::lifo::Worker<T> {
type S = st3::lifo::Stealer<T>;
fn new() -> Self {
Self::new(256)
}
fn push(&self, item: T) -> Result<(), T> {
self.push(item)
}
fn pop(&self) -> Option<T> {
self.pop()
}
fn stealer(&self) -> Self::S {
self.stealer()
}
}
impl<T: Send> GenericStealer<T> for st3::lifo::Stealer<T> {
type W = st3::lifo::Worker<T>;
fn steal_batch_and_pop(&self, worker: &Self::W) -> Result<T, GenericStealError> {
const MAX_BATCH_SIZE: usize = 32;
self.steal_and_pop(worker, |n| (n - n / 2).min(MAX_BATCH_SIZE))
.map(|out| out.0)
.map_err(|e| match e {
st3::StealError::Empty => GenericStealError::Empty,
st3::StealError::Busy => GenericStealError::Busy,
})
}
}
impl<T: Send> GenericWorker<T> for st3::fifo::Worker<T> {
type S = st3::fifo::Stealer<T>;
fn new() -> Self {
Self::new(256)
}
fn push(&self, item: T) -> Result<(), T> {
self.push(item)
}
fn pop(&self) -> Option<T> {
self.pop()
}
fn stealer(&self) -> Self::S {
self.stealer()
}
}
impl<T: Send> GenericStealer<T> for st3::fifo::Stealer<T> {
type W = st3::fifo::Worker<T>;
fn steal_batch_and_pop(&self, worker: &Self::W) -> Result<T, GenericStealError> {
const MAX_BATCH_SIZE: usize = 32;
self.steal_and_pop(worker, |n| (n - n / 2).min(MAX_BATCH_SIZE))
.map(|out| out.0)
.map_err(|e| match e {
st3::StealError::Empty => GenericStealError::Empty,
st3::StealError::Busy => GenericStealError::Busy,
})
}
}
impl<T: Send> GenericWorker<T> for tokio_queue::Local<T> {
type S = tokio_queue::Steal<T>;
fn new() -> Self {
Self::new()
}
fn push(&self, item: T) -> Result<(), T> {
self.push_back(item)
}
fn pop(&self) -> Option<T> {
self.pop()
}
fn stealer(&self) -> Self::S {
self.stealer()
}
}
impl<T: Send> GenericStealer<T> for tokio_queue::Steal<T> {
type W = tokio_queue::Local<T>;
fn steal_batch_and_pop(&self, worker: &Self::W) -> Result<T, GenericStealError> {
self.steal_into(worker).map_err(|e| match e {
tokio_queue::StealError::Empty => GenericStealError::Empty,
tokio_queue::StealError::Busy => GenericStealError::Busy,
})
}
}
pub struct CrossbeamFifoWorker<T>(crossbeam_deque::Worker<T>);
pub struct CrossbeamFifoStealer<T>(crossbeam_deque::Stealer<T>);
pub struct CrossbeamLifoWorker<T>(crossbeam_deque::Worker<T>);
pub struct CrossbeamLifoStealer<T>(crossbeam_deque::Stealer<T>);
impl<T: Send> GenericWorker<T> for CrossbeamFifoWorker<T> {
type S = CrossbeamFifoStealer<T>;
fn new() -> Self {
Self(crossbeam_deque::Worker::new_fifo())
}
fn push(&self, item: T) -> Result<(), T> {
self.0.push(item);
Ok(())
}
fn pop(&self) -> Option<T> {
self.0.pop()
}
fn stealer(&self) -> Self::S {
CrossbeamFifoStealer(self.0.stealer())
}
}
impl<T> Clone for CrossbeamFifoStealer<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: Send> GenericStealer<T> for CrossbeamFifoStealer<T> {
type W = CrossbeamFifoWorker<T>;
fn steal_batch_and_pop(&self, worker: &Self::W) -> Result<T, GenericStealError> {
match self.0.steal_batch_and_pop(&worker.0) {
crossbeam_deque::Steal::Empty => Err(GenericStealError::Empty),
crossbeam_deque::Steal::Retry => Err(GenericStealError::Busy),
crossbeam_deque::Steal::Success(item) => Ok(item),
}
}
}
impl<T: Send> GenericWorker<T> for CrossbeamLifoWorker<T> {
type S = CrossbeamLifoStealer<T>;
fn new() -> Self {
Self(crossbeam_deque::Worker::new_lifo())
}
fn push(&self, item: T) -> Result<(), T> {
self.0.push(item);
Ok(())
}
fn pop(&self) -> Option<T> {
self.0.pop()
}
fn stealer(&self) -> Self::S {
CrossbeamLifoStealer(self.0.stealer())
}
}
impl<T> Clone for CrossbeamLifoStealer<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: Send> GenericStealer<T> for CrossbeamLifoStealer<T> {
type W = CrossbeamLifoWorker<T>;
fn steal_batch_and_pop(&self, worker: &Self::W) -> Result<T, GenericStealError> {
match self.0.steal_batch_and_pop(&worker.0) {
crossbeam_deque::Steal::Empty => Err(GenericStealError::Empty),
crossbeam_deque::Steal::Retry => Err(GenericStealError::Busy),
crossbeam_deque::Steal::Success(item) => Ok(item),
}
}
}