mod internal_node;
mod leaf;
mod leaf_node;
mod node;
use crate::async_yield;
use crate::ebr::{Arc, AtomicArc, Barrier, Ptr, Tag};
use leaf::{InsertResult, Leaf, RemoveResult, Scanner};
use node::Node;
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::iter::FusedIterator;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::RangeBounds;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
pub struct TreeIndex<K, V>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
{
root: AtomicArc<Node<K, V>>,
}
impl<K, V> TreeIndex<K, V>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
{
#[must_use]
pub fn new() -> TreeIndex<K, V> {
TreeIndex {
root: AtomicArc::null(),
}
}
#[inline]
pub fn insert(&self, mut key: K, mut value: V) -> Result<(), (K, V)> {
loop {
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.insert::<false>(key, value, &barrier) {
Ok(r) => match r {
InsertResult::Success => return Ok(()),
InsertResult::Frozen(k, v) | InsertResult::Retry(k, v) => {
key = k;
value = v;
root_ref.cleanup_link(key.borrow(), false, &barrier);
}
InsertResult::Duplicate(k, v) => return Err((k, v)),
InsertResult::Full(k, v) => {
let (k, v) = Node::split_root::<false>(k, v, &self.root, &barrier);
key = k;
value = v;
continue;
}
InsertResult::Retired(k, v) => {
key = k;
value = v;
let _result = Node::remove_root::<false>(&self.root, &barrier);
}
},
Err((k, v)) => {
key = k;
value = v;
}
}
}
let new_root = Arc::new(Node::new_leaf_node());
let _result = self.root.compare_exchange(
Ptr::null(),
(Some(new_root), Tag::None),
AcqRel,
Acquire,
&barrier,
);
}
}
#[inline]
pub async fn insert_async(&self, mut key: K, mut value: V) -> Result<(), (K, V)> {
loop {
let need_await = {
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.insert::<true>(key, value, &barrier) {
Ok(r) => match r {
InsertResult::Success => return Ok(()),
InsertResult::Frozen(k, v) | InsertResult::Retry(k, v) => {
key = k;
value = v;
root_ref.cleanup_link(key.borrow(), false, &barrier);
true
}
InsertResult::Duplicate(k, v) => return Err((k, v)),
InsertResult::Full(k, v) => {
let (k, v) = Node::split_root::<true>(k, v, &self.root, &barrier);
key = k;
value = v;
continue;
}
InsertResult::Retired(k, v) => {
key = k;
value = v;
!matches!(Node::remove_root::<true>(&self.root, &barrier), Ok(true))
}
},
Err((k, v)) => {
key = k;
value = v;
true
}
}
} else {
false
}
};
if need_await {
async_yield::async_yield().await;
}
let new_root = Arc::new(Node::new_leaf_node());
let _result = self.root.compare_exchange(
Ptr::null(),
(Some(new_root), Tag::None),
AcqRel,
Acquire,
&Barrier::new(),
);
}
}
#[inline]
pub fn remove<Q>(&self, key_ref: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.remove_if(key_ref, |_| true)
}
#[inline]
pub async fn remove_async<Q>(&self, key_ref: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.remove_if_async(key_ref, |_| true).await
}
#[inline]
pub fn remove_if<Q, F: FnMut(&V) -> bool>(&self, key_ref: &Q, mut condition: F) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let mut has_been_removed = false;
loop {
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.remove_if::<_, _, false>(key_ref, &mut condition, &barrier) {
Ok(r) => match r {
RemoveResult::Success => return true,
RemoveResult::Cleanup => {
root_ref.cleanup_link(key_ref, false, &barrier);
return true;
}
RemoveResult::Retired => {
if matches!(Node::remove_root::<false>(&self.root, &barrier), Ok(true))
{
return true;
}
has_been_removed = true;
}
RemoveResult::Fail => return has_been_removed,
RemoveResult::Frozen => (),
},
Err(removed) => {
if removed {
has_been_removed = true;
}
}
}
} else {
return has_been_removed;
}
}
}
#[inline]
pub async fn remove_if_async<Q, F: FnMut(&V) -> bool>(
&self,
key_ref: &Q,
mut condition: F,
) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let mut has_been_removed = false;
loop {
{
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.remove_if::<_, _, true>(key_ref, &mut condition, &barrier) {
Ok(r) => match r {
RemoveResult::Success => return true,
RemoveResult::Cleanup => {
root_ref.cleanup_link(key_ref, false, &barrier);
return true;
}
RemoveResult::Retired => {
if matches!(
Node::remove_root::<true>(&self.root, &barrier),
Ok(true)
) {
return true;
}
has_been_removed = true;
}
RemoveResult::Fail => return has_been_removed,
RemoveResult::Frozen => (),
},
Err(removed) => {
if removed {
has_been_removed = true;
}
}
}
} else {
return has_been_removed;
}
}
async_yield::async_yield().await;
}
}
#[inline]
pub fn read<Q, R, F: FnOnce(&Q, &V) -> R>(&self, key_ref: &Q, reader: F) -> Option<R>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let barrier = Barrier::new();
self.read_with(key_ref, reader, &barrier)
}
#[inline]
pub fn read_with<'b, Q, R, F: FnOnce(&Q, &'b V) -> R>(
&self,
key_ref: &Q,
reader: F,
barrier: &'b Barrier,
) -> Option<R>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
if let Some(root_ref) = self.root.load(Acquire, barrier).as_ref() {
if let Some(value) = root_ref.search(key_ref, barrier) {
return Some(reader(key_ref, value));
}
}
None
}
#[inline]
pub fn clear(&self) {
self.root.swap((None, Tag::None), Relaxed);
}
#[inline]
pub fn len(&self) -> usize {
let barrier = Barrier::new();
self.iter(&barrier).count()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn depth(&self) -> usize {
let barrier = Barrier::new();
self.root
.load(Acquire, &barrier)
.as_ref()
.map_or(0, |root_ref| root_ref.depth(1, &barrier))
}
#[inline]
pub fn iter<'t, 'b>(&'t self, barrier: &'b Barrier) -> Visitor<'t, 'b, K, V> {
Visitor::new(&self.root, barrier)
}
#[inline]
pub fn range<'t, 'b, R: RangeBounds<K>>(
&'t self,
range: R,
barrier: &'b Barrier,
) -> Range<'t, 'b, K, V, R> {
Range::new(&self.root, range, barrier)
}
}
impl<K, V> Default for TreeIndex<K, V>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
{
#[inline]
fn default() -> Self {
TreeIndex::new()
}
}
pub struct Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
{
root: &'t AtomicArc<Node<K, V>>,
leaf_scanner: Option<Scanner<'b, K, V>>,
barrier: &'b Barrier,
}
impl<'t, 'b, K, V> Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
{
fn new(root: &'t AtomicArc<Node<K, V>>, barrier: &'b Barrier) -> Visitor<'t, 'b, K, V> {
Visitor::<'t, 'b, K, V> {
root,
leaf_scanner: None,
barrier,
}
}
}
impl<'t, 'b, K, V> Iterator for Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
{
type Item = (&'b K, &'b V);
fn next(&mut self) -> Option<Self::Item> {
if self.leaf_scanner.is_none() {
let root_ptr = self.root.load(Acquire, self.barrier);
if let Some(root_ref) = root_ptr.as_ref() {
if let Some(scanner) = root_ref.min(self.barrier) {
self.leaf_scanner.replace(scanner);
}
} else {
return None;
}
}
if let Some(mut scanner) = self.leaf_scanner.take() {
let min_allowed_key = scanner.get().map(|(key, _)| key);
if let Some(result) = scanner.next() {
self.leaf_scanner.replace(scanner);
return Some(result);
}
if let Some(new_scanner) = scanner.jump(min_allowed_key, self.barrier) {
if let Some(entry) = new_scanner.get() {
self.leaf_scanner.replace(new_scanner);
return Some(entry);
}
}
}
None
}
}
impl<'t, 'b, K, V> FusedIterator for Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
{
}
pub struct Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
R: 'static + RangeBounds<K>,
{
root: &'t AtomicArc<Node<K, V>>,
leaf_scanner: Option<Scanner<'b, K, V>>,
range: R,
check_lower_bound: bool,
check_upper_bound: bool,
barrier: &'b Barrier,
}
impl<'t, 'b, K, V, R> Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
R: RangeBounds<K>,
{
fn new(
root: &'t AtomicArc<Node<K, V>>,
range: R,
barrier: &'b Barrier,
) -> Range<'t, 'b, K, V, R> {
Range::<'t, 'b, K, V, R> {
root,
leaf_scanner: None,
range,
check_lower_bound: true,
check_upper_bound: false,
barrier,
}
}
fn next_unbounded(&mut self) -> Option<(&'b K, &'b V)> {
if self.leaf_scanner.is_none() {
let root_ptr = self.root.load(Acquire, self.barrier);
if let Some(root_ref) = root_ptr.as_ref() {
let min_allowed_key = match self.range.start_bound() {
Excluded(key) | Included(key) => Some(key),
Unbounded => {
self.check_lower_bound = false;
None
}
};
if let Some(leaf_scanner) = min_allowed_key.map_or_else(
|| {
if let Some(mut min_scanner) = root_ref.min(self.barrier) {
min_scanner.next();
Some(min_scanner)
} else {
None
}
},
|min_allowed_key| {
root_ref.max_le_appr(min_allowed_key, self.barrier)
},
) {
self.check_upper_bound = match self.range.end_bound() {
Excluded(key) => leaf_scanner
.max_entry()
.map_or(false, |max_entry| max_entry.0.cmp(key) != Ordering::Less),
Included(key) => leaf_scanner
.max_entry()
.map_or(false, |max_entry| max_entry.0.cmp(key) == Ordering::Greater),
Unbounded => false,
};
if let Some(result) = leaf_scanner.get() {
self.leaf_scanner.replace(leaf_scanner);
return Some(result);
}
}
} else {
return None;
}
}
if let Some(mut scanner) = self.leaf_scanner.take() {
let min_allowed_key = scanner.get().map(|(key, _)| key);
if let Some(result) = scanner.next() {
self.leaf_scanner.replace(scanner);
return Some(result);
}
if let Some(new_scanner) = scanner.jump(min_allowed_key, self.barrier).take() {
if let Some(entry) = new_scanner.get() {
self.check_upper_bound = match self.range.end_bound() {
Excluded(key) => new_scanner
.max_entry()
.map_or(false, |max_entry| max_entry.0.cmp(key) != Ordering::Less),
Included(key) => new_scanner
.max_entry()
.map_or(false, |max_entry| max_entry.0.cmp(key) == Ordering::Greater),
Unbounded => false,
};
self.leaf_scanner.replace(new_scanner);
return Some(entry);
}
}
}
None
}
}
impl<'t, 'b, K, V, R> Iterator for Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
R: RangeBounds<K>,
{
type Item = (&'b K, &'b V);
fn next(&mut self) -> Option<Self::Item> {
while let Some((key_ref, value_ref)) = self.next_unbounded() {
if self.check_lower_bound {
match self.range.start_bound() {
Excluded(key) => {
if key_ref.cmp(key) != Ordering::Greater {
continue;
}
}
Included(key) => {
if key_ref.cmp(key) == Ordering::Less {
continue;
}
}
Unbounded => (),
}
}
self.check_lower_bound = false;
if self.check_upper_bound {
match self.range.end_bound() {
Excluded(key) => {
if key_ref.cmp(key) == Ordering::Less {
return Some((key_ref, value_ref));
}
}
Included(key) => {
if key_ref.cmp(key) != Ordering::Greater {
return Some((key_ref, value_ref));
}
}
Unbounded => {
return Some((key_ref, value_ref));
}
}
break;
}
return Some((key_ref, value_ref));
}
None
}
}
impl<'t, 'b, K, V, R> FusedIterator for Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Send + Sync,
V: 'static + Clone + Send + Sync,
R: RangeBounds<K>,
{
}