use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::mem::{MaybeUninit, forget, needs_drop};
use std::ops::{Deref, Index};
use std::ptr::{NonNull, from_ref};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::atomic::{AtomicPtr, AtomicU32};
use saa::Lock;
use sdd::{AtomicShared, Epoch, Guard, Ptr, Shared, Tag};
use crate::Equivalent;
use crate::async_helper::AsyncGuard;
#[repr(align(64))]
pub struct Bucket<K, V, L: LruList, const TYPE: char> {
len: AtomicU32,
epoch: UnsafeCell<Option<Epoch>>,
metadata: Metadata<K, V, BUCKET_LEN>,
rw_lock: Lock,
lru_list: L,
}
pub const MAP: char = 'S';
pub const INDEX: char = 'O';
pub const CACHE: char = 'C';
pub const BUCKET_LEN: usize = u32::BITS as usize;
pub struct DataBlock<K, V, const LEN: usize>([UnsafeCell<MaybeUninit<(K, V)>>; LEN]);
#[derive(Debug)]
pub struct Writer<K, V, L: LruList, const TYPE: char> {
bucket: NonNull<Bucket<K, V, L, TYPE>>,
}
#[derive(Debug)]
pub struct Reader<K, V, L: LruList, const TYPE: char> {
bucket: NonNull<Bucket<K, V, L, TYPE>>,
}
pub struct EntryPtr<'g, K, V, const TYPE: char> {
current_link_ptr: Ptr<'g, LinkedBucket<K, V>>,
current_index: usize,
}
pub trait LruList: 'static + Default {
#[inline]
fn evict(&self, _tail: u32) -> Option<(u8, u32)> {
None
}
#[inline]
fn remove(&self, _tail: u32, _entry: u8) -> Option<u32> {
None
}
#[inline]
fn promote(&self, _tail: u32, _entry: u8) -> Option<u32> {
None
}
}
#[derive(Debug, Default)]
pub struct DoublyLinkedList([UnsafeCell<(u8, u8)>; BUCKET_LEN]);
struct Metadata<K, V, const LEN: usize> {
link: AtomicShared<LinkedBucket<K, V>>,
occupied_bitmap: AtomicU32,
removed_bitmap_or_lru_tail: AtomicU32,
partial_hash_array: [UnsafeCell<u8>; LEN],
}
const LINKED_BUCKET_LEN: usize = BUCKET_LEN / 4;
#[repr(align(128))]
struct LinkedBucket<K, V> {
metadata: Metadata<K, V, LINKED_BUCKET_LEN>,
data_block: DataBlock<K, V, LINKED_BUCKET_LEN>,
prev_link: AtomicPtr<LinkedBucket<K, V>>,
}
impl<K, V, L: LruList, const TYPE: char> Bucket<K, V, L, TYPE> {
#[cfg(any(test, feature = "loom"))]
pub fn new() -> Self {
Self {
len: AtomicU32::new(0),
epoch: UnsafeCell::new(None),
rw_lock: Lock::default(),
metadata: Metadata {
link: AtomicShared::default(),
occupied_bitmap: AtomicU32::default(),
removed_bitmap_or_lru_tail: AtomicU32::default(),
partial_hash_array: Default::default(),
},
lru_list: L::default(),
}
}
#[inline]
pub(crate) fn len(&self) -> usize {
self.len.load(Relaxed) as usize
}
#[inline]
pub(crate) fn need_rebuild(&self) -> bool {
TYPE == INDEX
&& self.metadata.removed_bitmap_or_lru_tail.load(Relaxed)
== (u32::MAX >> (32 - BUCKET_LEN))
}
#[inline]
pub(crate) fn insert<'g>(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
hash: u64,
entry: (K, V),
_guard: &'g Guard,
) -> Result<EntryPtr<'g, K, V, TYPE>, (K, V)> {
let len = self.len.load(Relaxed);
assert_ne!(len, u32::MAX, "bucket overflow");
let occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
let free_index = occupied_bitmap.trailing_ones() as usize;
if free_index == BUCKET_LEN {
Err(entry)
} else {
self.insert_entry(
&self.metadata,
unsafe { data_block.as_ref() },
free_index,
occupied_bitmap,
hash,
entry,
);
self.len.store(len + 1, Relaxed);
Ok(EntryPtr {
current_link_ptr: Ptr::null(),
current_index: free_index,
})
}
}
pub(crate) fn insert_overflow<'g>(
&self,
hash: u64,
entry: (K, V),
guard: &'g Guard,
) -> EntryPtr<'g, K, V, TYPE> {
let mut link_ptr = self.metadata.link.load(Acquire, guard);
while let Some(link) = link_ptr.as_ref() {
let occupied_bitmap = link.metadata.occupied_bitmap.load(Relaxed);
let free_index = occupied_bitmap.trailing_ones() as usize;
if free_index != LINKED_BUCKET_LEN {
debug_assert!(free_index < LINKED_BUCKET_LEN);
self.insert_entry(
&link.metadata,
&link.data_block,
free_index,
occupied_bitmap,
hash,
entry,
);
self.len.store(self.len.load(Relaxed) + 1, Relaxed);
return EntryPtr {
current_link_ptr: link_ptr,
current_index: free_index,
};
}
link_ptr = link.metadata.link.load(Acquire, guard);
}
let head = self.metadata.link.get_shared(Relaxed, guard);
let link = unsafe { Shared::new_unchecked(LinkedBucket::new(head)) };
let link_ptr = link.get_guarded_ptr(guard);
if let Some(link) = link_ptr.as_ref() {
self.write_cell(&link.data_block[0], |block| unsafe {
block.as_mut_ptr().write(entry);
});
self.write_cell(&link.metadata.partial_hash_array[0], |h| {
*h = Self::partial_hash(hash);
});
link.metadata.occupied_bitmap.store(1, Relaxed);
}
if let Some(head) = link.metadata.link.load(Relaxed, guard).as_ref() {
head.prev_link.store(link.as_ptr().cast_mut(), Relaxed);
}
self.metadata.link.swap((Some(link), Tag::None), Release);
self.len.store(self.len.load(Relaxed) + 1, Relaxed);
EntryPtr {
current_link_ptr: link_ptr,
current_index: 0,
}
}
#[inline]
pub(crate) fn remove<'g>(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
entry_ptr: &mut EntryPtr<'g, K, V, TYPE>,
guard: &'g Guard,
) -> (K, V) {
debug_assert_ne!(TYPE, INDEX);
debug_assert_ne!(entry_ptr.current_index, usize::MAX);
debug_assert_ne!(entry_ptr.current_index, BUCKET_LEN);
self.len.store(self.len.load(Relaxed) - 1, Relaxed);
if let Some(link) = entry_ptr.current_link_ptr.as_ref() {
let mut occupied_bitmap = link.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << entry_ptr.current_index), 0);
occupied_bitmap &= !(1_u32 << entry_ptr.current_index);
link.metadata
.occupied_bitmap
.store(occupied_bitmap, Relaxed);
let removed = Self::read_data_block(&link.data_block, entry_ptr.current_index);
if occupied_bitmap == 0 && TYPE != INDEX {
entry_ptr.unlink(&self.metadata.link, link, guard);
}
removed
} else {
let occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << entry_ptr.current_index), 0);
if TYPE == CACHE {
self.remove_from_lru_list(entry_ptr);
}
self.metadata.occupied_bitmap.store(
occupied_bitmap & !(1_u32 << entry_ptr.current_index),
Relaxed,
);
Self::read_data_block(unsafe { data_block.as_ref() }, entry_ptr.current_index)
}
}
#[inline]
pub(crate) fn mark_removed<'g>(
&self,
entry_ptr: &mut EntryPtr<'g, K, V, TYPE>,
guard: &'g Guard,
) {
debug_assert_eq!(TYPE, INDEX);
debug_assert_ne!(entry_ptr.current_index, usize::MAX);
debug_assert_ne!(entry_ptr.current_index, BUCKET_LEN);
self.len.store(self.len.load(Relaxed) - 1, Relaxed);
if let Some(link) = entry_ptr.current_link_ptr.as_ref() {
let mut removed_bitmap = link.metadata.removed_bitmap_or_lru_tail.load(Relaxed);
debug_assert_eq!(removed_bitmap & (1_u32 << entry_ptr.current_index), 0);
removed_bitmap |= 1_u32 << entry_ptr.current_index;
link.metadata
.removed_bitmap_or_lru_tail
.store(removed_bitmap, Release);
} else {
let mut removed_bitmap = self.metadata.removed_bitmap_or_lru_tail.load(Relaxed);
debug_assert_eq!(removed_bitmap & (1_u32 << entry_ptr.current_index), 0);
removed_bitmap |= 1_u32 << entry_ptr.current_index;
self.metadata
.removed_bitmap_or_lru_tail
.store(removed_bitmap, Release);
}
self.update_epoch(Some(guard));
}
#[inline]
pub(crate) fn evict_lru_head(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
) -> Option<(K, V)> {
debug_assert_eq!(TYPE, CACHE);
let occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
if occupied_bitmap == 0b1111_1111_1111_1111_1111_1111_1111_1111 {
self.len.store(self.len.load(Relaxed) - 1, Relaxed);
let tail = self.metadata.removed_bitmap_or_lru_tail.load(Relaxed);
let evicted = if let Some((evicted, new_tail)) = self.lru_list.evict(tail) {
self.metadata
.removed_bitmap_or_lru_tail
.store(new_tail, Relaxed);
evicted as usize
} else {
0
};
debug_assert_ne!(occupied_bitmap & (1_u32 << evicted), 0);
self.metadata
.occupied_bitmap
.store(occupied_bitmap & !(1_u32 << evicted), Relaxed);
return Some(Self::read_data_block(
unsafe { data_block.as_ref() },
evicted,
));
}
None
}
#[inline]
pub(crate) fn update_lru_tail(&self, entry_ptr: &EntryPtr<K, V, TYPE>) {
debug_assert_eq!(TYPE, CACHE);
debug_assert_ne!(entry_ptr.current_index, usize::MAX);
debug_assert_ne!(entry_ptr.current_index, BUCKET_LEN);
if entry_ptr.current_link_ptr.is_null() {
#[allow(clippy::cast_possible_truncation)]
let entry = entry_ptr.current_index as u8;
let tail = self.metadata.removed_bitmap_or_lru_tail.load(Relaxed);
if let Some(new_tail) = self.lru_list.promote(tail, entry) {
self.metadata
.removed_bitmap_or_lru_tail
.store(new_tail, Relaxed);
}
}
}
#[inline]
pub(crate) fn reserve_slots(&self, additional: usize, guard: &Guard) {
debug_assert!(self.rw_lock.is_locked(Relaxed));
let mut capacity =
BUCKET_LEN - self.metadata.occupied_bitmap.load(Relaxed).count_ones() as usize;
if capacity < additional {
let mut link_ptr = self.metadata.link.load(Acquire, guard);
while let Some(link) = link_ptr.as_ref() {
capacity += LINKED_BUCKET_LEN
- link.metadata.occupied_bitmap.load(Relaxed).count_ones() as usize;
if capacity >= additional {
return;
}
let mut next_link_ptr = link.metadata.link.load(Acquire, guard);
if next_link_ptr.is_null() {
let new_link = unsafe { Shared::new_unchecked(LinkedBucket::new(None)) };
new_link
.prev_link
.store(link_ptr.as_ptr().cast_mut(), Relaxed);
next_link_ptr = new_link.get_guarded_ptr(guard);
link.metadata
.link
.swap((Some(new_link), Tag::None), Release);
}
link_ptr = next_link_ptr;
}
}
}
#[inline]
pub(crate) fn extract_from<'g>(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
hash: u64,
from_writer: &Writer<K, V, L, TYPE>,
from_data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
from_entry_ptr: &mut EntryPtr<'g, K, V, TYPE>,
guard: &'g Guard,
) {
debug_assert!(self.rw_lock.is_locked(Relaxed));
let from_len = from_writer.len.load(Relaxed);
from_writer.len.store(from_len - 1, Relaxed);
let entry = if let Some(link) = from_entry_ptr.current_link_ptr.as_ref() {
let occupied_bitmap = link.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << from_entry_ptr.current_index), 0);
link.metadata.occupied_bitmap.store(
occupied_bitmap & !(1_u32 << from_entry_ptr.current_index),
Relaxed,
);
Self::read_data_block(&link.data_block, from_entry_ptr.current_index)
} else {
let occupied_bitmap = from_writer.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << from_entry_ptr.current_index), 0);
from_writer.metadata.occupied_bitmap.store(
occupied_bitmap & !(1_u32 << from_entry_ptr.current_index),
Relaxed,
);
Self::read_data_block(
unsafe { from_data_block.as_ref() },
from_entry_ptr.current_index,
)
};
if let Err(entry) = self.insert(data_block, hash, entry, guard) {
self.insert_overflow(hash, entry, guard);
}
}
pub(super) fn drop_entries(&self, data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>) {
if !self.metadata.link.is_null(Relaxed) {
let mut next = self.metadata.link.swap((None, Tag::None), Acquire);
while let Some(current) = next.0 {
next = current.metadata.link.swap((None, Tag::None), Acquire);
let dropped = unsafe { current.drop_in_place() };
debug_assert!(dropped);
}
}
if needs_drop::<(K, V)>() {
let mut occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
if occupied_bitmap != 0 {
let mut index = occupied_bitmap.trailing_zeros();
while index != 32 {
Self::drop_entry(unsafe { data_block.as_ref() }, index as usize);
occupied_bitmap -= 1_u32 << index;
index = occupied_bitmap.trailing_zeros();
}
}
}
}
#[inline]
pub(super) fn try_drop_unreachable_entries(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
guard: &Guard,
) {
debug_assert_eq!(TYPE, INDEX);
let prev_epoch = *Self::read_cell(&self.epoch);
if prev_epoch.is_some_and(|e| !e.in_same_generation(guard.epoch())) {
self.drop_unreachable_entries(data_block, guard);
}
}
fn drop_unreachable_entries(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
guard: &Guard,
) {
debug_assert_eq!(TYPE, INDEX);
let mut empty =
Self::cleanup_removed_entries(&self.metadata, unsafe { data_block.as_ref() });
let mut link_ptr = self.metadata.link.load(Acquire, guard);
while let Some(link) = link_ptr.as_ref() {
empty &= Self::cleanup_removed_entries(&link.metadata, &link.data_block);
let next_link_ptr = link.metadata.link.load(Acquire, guard);
if next_link_ptr.is_null() {
self.cleanup_empty_link(link_ptr.as_ptr());
}
link_ptr = next_link_ptr;
}
if empty {
self.update_epoch(None);
}
}
fn cleanup_removed_entries<const LEN: usize>(
metadata: &Metadata<K, V, LEN>,
data_block: &DataBlock<K, V, LEN>,
) -> bool {
debug_assert_eq!(TYPE, INDEX);
let mut removed_bitmap = metadata.removed_bitmap_or_lru_tail.load(Relaxed);
if removed_bitmap == 0 {
return true;
}
let mut occupied_bitmap = metadata.occupied_bitmap.load(Relaxed);
let mut index = removed_bitmap.trailing_zeros();
while index != 32 {
let bit = 1_u32 << index;
debug_assert_eq!(occupied_bitmap & bit, bit);
occupied_bitmap -= bit;
removed_bitmap -= bit;
Self::drop_entry(data_block, index as usize);
index = removed_bitmap.trailing_zeros();
}
metadata.occupied_bitmap.store(occupied_bitmap, Release);
metadata
.removed_bitmap_or_lru_tail
.store(removed_bitmap, Release);
removed_bitmap == 0
}
fn cleanup_empty_link(&self, mut link_ptr: *const LinkedBucket<K, V>) {
debug_assert_eq!(TYPE, INDEX);
while let Some(link) = unsafe { link_ptr.as_ref() } {
if link.metadata.occupied_bitmap.load(Relaxed) != 0 {
break;
}
link_ptr = link.prev_link.load(Relaxed);
if let Some(prev) = unsafe { link_ptr.as_ref() } {
let unlinked = prev.metadata.link.swap((None, Tag::None), Relaxed).0;
let released = unlinked.is_some_and(Shared::release);
debug_assert!(released);
} else {
let unlinked = self.metadata.link.swap((None, Tag::None), Relaxed).0;
let released = unlinked.is_some_and(Shared::release);
debug_assert!(released);
}
}
}
#[inline]
fn insert_entry<const LEN: usize>(
&self,
metadata: &Metadata<K, V, LEN>,
data_block: &DataBlock<K, V, LEN>,
index: usize,
occupied_bitmap: u32,
hash: u64,
entry: (K, V),
) {
debug_assert!(index < LEN);
debug_assert_eq!(metadata.occupied_bitmap.load(Relaxed) & (1_u32 << index), 0);
self.write_cell(&data_block[index], |block| unsafe {
block.as_mut_ptr().write(entry);
});
self.write_cell(&metadata.partial_hash_array[index], |h| {
*h = Self::partial_hash(hash);
});
metadata.occupied_bitmap.store(
occupied_bitmap | (1_u32 << index),
if TYPE == INDEX { Release } else { Relaxed },
);
}
#[inline]
fn drop_entry<const LEN: usize>(data_block: &DataBlock<K, V, LEN>, index: usize) {
unsafe {
(*data_block[index].get()).as_mut_ptr().drop_in_place();
}
}
#[inline]
fn remove_from_lru_list(&self, entry_ptr: &EntryPtr<K, V, TYPE>) {
debug_assert_eq!(TYPE, CACHE);
debug_assert_ne!(entry_ptr.current_index, usize::MAX);
debug_assert_ne!(entry_ptr.current_index, BUCKET_LEN);
if entry_ptr.current_link_ptr.is_null() {
#[allow(clippy::cast_possible_truncation)]
let entry = entry_ptr.current_index as u8;
let tail = self.metadata.removed_bitmap_or_lru_tail.load(Relaxed);
if let Some(new_tail) = self.lru_list.remove(tail, entry) {
self.metadata
.removed_bitmap_or_lru_tail
.store(new_tail, Relaxed);
}
}
}
#[inline]
fn update_epoch(&self, guard: Option<&Guard>) {
debug_assert_eq!(TYPE, INDEX);
self.write_cell(&self.epoch, |e| {
if let Some(guard) = guard {
e.replace(guard.epoch());
} else {
e.take();
}
});
}
#[allow(clippy::cast_possible_truncation)]
#[inline]
const fn partial_hash(hash: u64) -> u8 {
hash as u8
}
#[inline]
const fn read_data_block<const LEN: usize>(
data_block: &DataBlock<K, V, LEN>,
index: usize,
) -> (K, V) {
unsafe { (*data_block.0[index].get()).as_ptr().read() }
}
#[inline]
const fn entry_ptr<const LEN: usize>(
data_block: &DataBlock<K, V, LEN>,
index: usize,
) -> *const (K, V) {
Self::read_cell(&data_block.0[index]).as_ptr()
}
#[inline]
const fn entry_mut_ptr<const LEN: usize>(
data_block: &DataBlock<K, V, LEN>,
index: usize,
) -> *mut (K, V) {
unsafe { (*data_block.0[index].get()).as_mut_ptr() }
}
#[inline]
const fn read_cell<T>(cell: &UnsafeCell<T>) -> &T {
unsafe { &*cell.get() }
}
#[inline]
fn write_cell<T, R, F: FnOnce(&mut T) -> R>(&self, cell: &UnsafeCell<T>, f: F) -> R {
debug_assert!(self.rw_lock.is_locked(Relaxed));
unsafe { f(&mut *cell.get()) }
}
}
impl<K: Eq, V, L: LruList, const TYPE: char> Bucket<K, V, L, TYPE> {
#[inline]
pub(super) fn search_entry<'g, Q>(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
key: &Q,
hash: u64,
guard: &'g Guard,
) -> Option<&'g (K, V)>
where
Q: Equivalent<K> + ?Sized,
{
if self.len() == 0 {
return None;
}
if let Some((entry, _)) =
Self::search_data_block(&self.metadata, unsafe { data_block.as_ref() }, key, hash)
{
return Some(entry);
}
let mut link_ptr = self.metadata.link.load(Acquire, guard);
while let Some(link) = link_ptr.as_ref() {
if let Some((entry, _)) =
Self::search_data_block(&link.metadata, &link.data_block, key, hash)
{
return Some(entry);
}
link_ptr = link.metadata.link.load(Acquire, guard);
}
None
}
#[inline]
pub(crate) fn get_entry_ptr<'g, Q>(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
key: &Q,
hash: u64,
guard: &'g Guard,
) -> EntryPtr<'g, K, V, TYPE>
where
Q: Equivalent<K> + ?Sized,
{
if self.len() != 0 {
if let Some((_, index)) =
Self::search_data_block(&self.metadata, unsafe { data_block.as_ref() }, key, hash)
{
return EntryPtr {
current_link_ptr: Ptr::null(),
current_index: index,
};
}
let mut current_link_ptr = self.metadata.link.load(Acquire, guard);
while let Some(link) = current_link_ptr.as_ref() {
if let Some((_, index)) =
Self::search_data_block(&link.metadata, &link.data_block, key, hash)
{
return EntryPtr {
current_link_ptr,
current_index: index,
};
}
current_link_ptr = link.metadata.link.load(Acquire, guard);
}
}
EntryPtr::new(guard)
}
#[inline]
fn search_data_block<'g, Q, const LEN: usize>(
metadata: &Metadata<K, V, LEN>,
data_block: &'g DataBlock<K, V, LEN>,
key: &Q,
hash: u64,
) -> Option<(&'g (K, V), usize)>
where
Q: Equivalent<K> + ?Sized,
{
let mut bitmap = if TYPE == INDEX {
(!metadata.removed_bitmap_or_lru_tail.load(Acquire))
& metadata.occupied_bitmap.load(Acquire)
} else {
metadata.occupied_bitmap.load(Relaxed)
};
for i in 0..LEN {
if *Self::read_cell(&metadata.partial_hash_array[i]) != Self::partial_hash(hash) {
bitmap &= !(1_u32 << i);
}
}
let mut offset = bitmap.trailing_zeros();
while offset != u32::BITS {
let entry = unsafe { &*Self::entry_ptr(data_block, offset as usize) };
if key.equivalent(&entry.0) {
return Some((entry, offset as usize));
}
bitmap &= !(1_u32 << offset);
offset = bitmap.trailing_zeros();
}
None
}
}
unsafe impl<K: Eq + Send, V: Send, L: LruList, const TYPE: char> Send for Bucket<K, V, L, TYPE> {}
unsafe impl<K: Eq + Send + Sync, V: Send + Sync, L: LruList, const TYPE: char> Sync
for Bucket<K, V, L, TYPE>
{
}
impl<K, V, const LEN: usize> Index<usize> for DataBlock<K, V, LEN> {
type Output = UnsafeCell<MaybeUninit<(K, V)>>;
#[inline]
fn index(&self, index: usize) -> &Self::Output {
&self.0[index]
}
}
unsafe impl<K: Send, V: Send, const LEN: usize> Send for DataBlock<K, V, LEN> {}
unsafe impl<K: Send + Sync, V: Send + Sync, const LEN: usize> Sync for DataBlock<K, V, LEN> {}
impl<K, V, L: LruList, const TYPE: char> Writer<K, V, L, TYPE> {
#[inline]
pub(crate) async fn lock_async<'g>(
bucket: &'g Bucket<K, V, L, TYPE>,
async_guard: &'g AsyncGuard,
) -> Option<Writer<K, V, L, TYPE>> {
if bucket.rw_lock.lock_async_with(|| async_guard.reset()).await {
Some(Self::from_bucket(bucket))
} else {
None
}
}
#[inline]
pub(crate) fn lock_sync(bucket: &Bucket<K, V, L, TYPE>) -> Option<Writer<K, V, L, TYPE>> {
if bucket.rw_lock.lock_sync() {
Some(Self::from_bucket(bucket))
} else {
None
}
}
#[inline]
pub(crate) fn try_lock(
bucket: &Bucket<K, V, L, TYPE>,
) -> Result<Option<Writer<K, V, L, TYPE>>, ()> {
if bucket.rw_lock.try_lock() {
Ok(Some(Self::from_bucket(bucket)))
} else if bucket.rw_lock.is_poisoned(Relaxed) {
Ok(None)
} else {
Err(())
}
}
#[inline]
pub(crate) const fn from_bucket(bucket: &Bucket<K, V, L, TYPE>) -> Writer<K, V, L, TYPE> {
Writer {
bucket: unsafe { NonNull::new_unchecked(from_ref(bucket).cast_mut()) },
}
}
#[inline]
pub(super) fn kill(self) {
debug_assert_eq!(self.len(), 0);
debug_assert!(self.rw_lock.is_locked(Relaxed));
debug_assert!(
TYPE != INDEX
|| self.metadata.removed_bitmap_or_lru_tail.load(Relaxed)
== self.metadata.occupied_bitmap.load(Relaxed)
);
if TYPE != INDEX && !self.metadata.link.is_null(Relaxed) {
let mut link = self.metadata.link.swap((None, Tag::None), Acquire).0;
while let Some(current) = link {
link = current.metadata.link.swap((None, Tag::None), Acquire).0;
let dropped = unsafe { current.drop_in_place() };
debug_assert!(dropped);
}
}
let poisoned = self.rw_lock.poison_lock();
debug_assert!(poisoned);
forget(self);
}
}
impl<K, V, L: LruList, const TYPE: char> Deref for Writer<K, V, L, TYPE> {
type Target = Bucket<K, V, L, TYPE>;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { self.bucket.as_ref() }
}
}
impl<K, V, L: LruList, const TYPE: char> Drop for Writer<K, V, L, TYPE> {
#[inline]
fn drop(&mut self) {
self.rw_lock.release_lock();
}
}
unsafe impl<K: Send, V: Send, L: LruList, const TYPE: char> Send for Writer<K, V, L, TYPE> {}
unsafe impl<K: Send + Sync, V: Send + Sync, L: LruList, const TYPE: char> Sync
for Writer<K, V, L, TYPE>
{
}
impl<'g, K, V, L: LruList, const TYPE: char> Reader<K, V, L, TYPE> {
#[inline]
pub(crate) async fn lock_async(
bucket: &'g Bucket<K, V, L, TYPE>,
async_guard: &AsyncGuard,
) -> Option<Reader<K, V, L, TYPE>> {
if bucket
.rw_lock
.share_async_with(|| async_guard.reset())
.await
{
Some(Reader {
bucket: unsafe { NonNull::new_unchecked(from_ref(bucket).cast_mut()) },
})
} else {
None
}
}
#[inline]
pub(crate) fn lock_sync(bucket: &Bucket<K, V, L, TYPE>) -> Option<Reader<K, V, L, TYPE>> {
if bucket.rw_lock.share_sync() {
Some(Reader {
bucket: unsafe { NonNull::new_unchecked(from_ref(bucket).cast_mut()) },
})
} else {
None
}
}
#[inline]
pub(crate) fn try_lock(bucket: &Bucket<K, V, L, TYPE>) -> Option<Reader<K, V, L, TYPE>> {
if bucket.rw_lock.try_share() {
Some(Reader {
bucket: unsafe { NonNull::new_unchecked(from_ref(bucket).cast_mut()) },
})
} else {
None
}
}
}
impl<K, V, L: LruList, const TYPE: char> Deref for Reader<K, V, L, TYPE> {
type Target = Bucket<K, V, L, TYPE>;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { self.bucket.as_ref() }
}
}
impl<K, V, L: LruList, const TYPE: char> Drop for Reader<K, V, L, TYPE> {
#[inline]
fn drop(&mut self) {
self.rw_lock.release_share();
}
}
unsafe impl<K: Send, V: Send, L: LruList, const TYPE: char> Send for Reader<K, V, L, TYPE> {}
unsafe impl<K: Send + Sync, V: Send + Sync, L: LruList, const TYPE: char> Sync
for Reader<K, V, L, TYPE>
{
}
impl<'g, K, V, const TYPE: char> EntryPtr<'g, K, V, TYPE> {
#[inline]
pub(crate) const fn new(_guard: &'g Guard) -> Self {
Self {
current_link_ptr: Ptr::null(),
current_index: BUCKET_LEN,
}
}
#[inline]
pub(crate) const fn is_valid(&self) -> bool {
self.current_index != BUCKET_LEN
}
#[inline]
pub(crate) fn move_to_next<L: LruList>(
&mut self,
bucket: &Bucket<K, V, L, TYPE>,
guard: &'g Guard,
) -> bool {
if self.current_index != usize::MAX {
if self.current_link_ptr.is_null()
&& self.next_entry::<L, BUCKET_LEN>(&bucket.metadata, guard)
{
return true;
}
while let Some(link) = self.current_link_ptr.as_ref() {
if self.next_entry::<L, LINKED_BUCKET_LEN>(&link.metadata, guard) {
return true;
}
}
self.current_index = usize::MAX;
}
false
}
#[inline]
pub(crate) fn get(&self, data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>) -> &'g (K, V) {
debug_assert_ne!(self.current_index, usize::MAX);
let entry_ptr = if let Some(link) = self.current_link_ptr.as_ref() {
Bucket::<K, V, (), TYPE>::entry_ptr(&link.data_block, self.current_index)
} else {
Bucket::<K, V, (), TYPE>::entry_ptr(unsafe { data_block.as_ref() }, self.current_index)
};
unsafe { &(*entry_ptr) }
}
#[inline]
pub(crate) fn get_mut<L: LruList>(
&mut self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
_writer: &Writer<K, V, L, TYPE>,
) -> &mut (K, V) {
debug_assert_ne!(self.current_index, usize::MAX);
let entry_ptr = if let Some(link) = self.current_link_ptr.as_ref() {
Bucket::<K, V, L, TYPE>::entry_mut_ptr(&link.data_block, self.current_index)
} else {
Bucket::<K, V, L, TYPE>::entry_mut_ptr(
unsafe { data_block.as_ref() },
self.current_index,
)
};
unsafe { &mut (*entry_ptr) }
}
#[inline]
pub(crate) fn partial_hash<L: LruList>(&self, bucket: &Bucket<K, V, L, TYPE>) -> u8 {
debug_assert_ne!(self.current_index, usize::MAX);
if let Some(link) = self.current_link_ptr.as_ref() {
*Bucket::<K, V, L, TYPE>::read_cell(
&link.metadata.partial_hash_array[self.current_index],
)
} else {
*Bucket::<K, V, L, TYPE>::read_cell(
&bucket.metadata.partial_hash_array[self.current_index],
)
}
}
fn unlink(
&mut self,
link_head: &AtomicShared<LinkedBucket<K, V>>,
link: &LinkedBucket<K, V>,
guard: &'g Guard,
) {
debug_assert_ne!(TYPE, INDEX);
let prev_link_ptr = link.prev_link.load(Relaxed);
let next = link.metadata.link.swap((None, Tag::None), Relaxed).0;
if let Some(next) = next.as_ref() {
next.prev_link.store(prev_link_ptr, Relaxed);
self.current_link_ptr = next.get_guarded_ptr(guard);
self.current_index = LINKED_BUCKET_LEN;
} else {
self.current_link_ptr = Ptr::null();
self.current_index = usize::MAX;
}
let unlinked = if let Some(prev) = unsafe { prev_link_ptr.as_ref() } {
prev.metadata.link.swap((next, Tag::None), Acquire).0
} else {
link_head.swap((next, Tag::None), Acquire).0
};
if let Some(link) = unlinked {
let dropped = unsafe { link.drop_in_place() };
debug_assert!(dropped);
}
}
#[inline]
fn next_entry<L: LruList, const LEN: usize>(
&mut self,
metadata: &Metadata<K, V, LEN>,
guard: &'g Guard,
) -> bool {
let current_index = if self.current_index == LEN {
0
} else {
self.current_index + 1
};
if current_index < LEN {
let bitmap = if TYPE == INDEX {
(!metadata.removed_bitmap_or_lru_tail.load(Acquire)
& metadata.occupied_bitmap.load(Acquire))
& (!((1_u32 << current_index) - 1))
} else {
metadata.occupied_bitmap.load(Relaxed) & (!((1_u32 << current_index) - 1))
};
let next_index = bitmap.trailing_zeros() as usize;
if next_index < LEN {
self.current_index = next_index;
return true;
}
}
self.current_link_ptr = metadata.link.load(Acquire, guard);
self.current_index = LINKED_BUCKET_LEN;
false
}
}
impl<K, V, const TYPE: char> Clone for EntryPtr<'_, K, V, TYPE> {
#[inline]
fn clone(&self) -> Self {
Self {
current_link_ptr: self.current_link_ptr,
current_index: self.current_index,
}
}
}
impl<K, V, const TYPE: char> Debug for EntryPtr<'_, K, V, TYPE> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EntryPtr")
.field("current_link_ptr", &self.current_link_ptr)
.field("current_index", &self.current_index)
.finish()
}
}
unsafe impl<K: Send, V: Send, const TYPE: char> Send for EntryPtr<'_, K, V, TYPE> {}
unsafe impl<K: Send + Sync, V: Send + Sync, const TYPE: char> Sync for EntryPtr<'_, K, V, TYPE> {}
impl LruList for () {}
impl DoublyLinkedList {
#[inline]
const fn read(&self, index: usize) -> (u8, u8) {
unsafe { *self.0[index].get() }
}
#[inline]
fn write<R, F: FnOnce(&mut (u8, u8)) -> R>(&self, index: usize, f: F) -> R {
unsafe { f(&mut *self.0[index].get()) }
}
}
impl LruList for DoublyLinkedList {
#[inline]
fn evict(&self, tail: u32) -> Option<(u8, u32)> {
if tail == 0 {
None
} else {
let lru = self.read(tail as usize - 1).0;
let new_tail = if tail - 1 == u32::from(lru) {
0
} else {
let new_lru = self.read(lru as usize).0;
{
#![allow(clippy::cast_possible_truncation)]
self.write(new_lru as usize, |v| {
v.1 = tail as u8 - 1;
});
}
self.write(tail as usize - 1, |v| {
v.0 = new_lru;
});
tail
};
self.write(lru as usize, |v| {
*v = (0, 0);
});
Some((lru, new_tail))
}
}
#[inline]
fn remove(&self, tail: u32, entry: u8) -> Option<u32> {
if tail == 0
|| (self.read(entry as usize) == (0, 0)
&& (self.read(0) != (entry, entry) || (tail != 1 && tail != u32::from(entry) + 1)))
{
return None;
}
if self.read(entry as usize).0 == entry {
debug_assert_eq!(tail, u32::from(entry) + 1);
self.write(entry as usize, |v| {
*v = (0, 0);
});
return Some(0);
}
let (prev, next) = self.read(entry as usize);
debug_assert_eq!(self.read(prev as usize).1, entry);
self.write(prev as usize, |v| {
v.1 = next;
});
debug_assert_eq!(self.read(next as usize).0, entry);
self.write(next as usize, |v| {
v.0 = prev;
});
let new_tail = if tail == u32::from(entry) + 1 {
Some(u32::from(next) + 1)
} else {
None
};
self.write(entry as usize, |v| {
*v = (0, 0);
});
new_tail
}
#[inline]
fn promote(&self, tail: u32, entry: u8) -> Option<u32> {
if tail == u32::from(entry) + 1 {
return None;
} else if tail == 0 {
self.write(entry as usize, |v| {
*v = (entry, entry);
});
return Some(u32::from(entry) + 1);
}
if self.read(entry as usize) != (0, 0) || (self.read(0) == (entry, entry) && tail == 1) {
let (prev, next) = self.read(entry as usize);
debug_assert_eq!(self.read(prev as usize).1, entry);
self.write(prev as usize, |v| {
v.1 = next;
});
debug_assert_eq!(self.read(next as usize).0, entry);
self.write(next as usize, |v| {
v.0 = prev;
});
}
let oldest = self.read(tail as usize - 1).0;
debug_assert_eq!(u32::from(self.read(oldest as usize).1) + 1, tail);
self.write(oldest as usize, |v| {
v.1 = entry;
});
self.write(entry as usize, |v| {
v.0 = oldest;
});
self.write(tail as usize - 1, |v| {
v.0 = entry;
});
{
#![allow(clippy::cast_possible_truncation)]
self.write(entry as usize, |v| {
v.1 = tail as u8 - 1;
});
}
Some(u32::from(entry) + 1)
}
}
unsafe impl Send for DoublyLinkedList {}
unsafe impl Sync for DoublyLinkedList {}
unsafe impl<K: Send, V: Send, const LEN: usize> Send for Metadata<K, V, LEN> {}
unsafe impl<K: Send + Sync, V: Send + Sync, const LEN: usize> Sync for Metadata<K, V, LEN> {}
impl<K, V> LinkedBucket<K, V> {
#[inline]
fn new(next: Option<Shared<LinkedBucket<K, V>>>) -> Self {
let mut bucket = Self {
metadata: Metadata {
link: AtomicShared::default(),
occupied_bitmap: AtomicU32::default(),
removed_bitmap_or_lru_tail: AtomicU32::default(),
partial_hash_array: Default::default(),
},
data_block: unsafe {
#[allow(clippy::uninit_assumed_init)]
MaybeUninit::uninit().assume_init()
},
prev_link: AtomicPtr::default(),
};
if let Some(next) = next {
bucket.metadata.link = AtomicShared::from(next);
}
bucket
}
}
impl<K, V> Debug for LinkedBucket<K, V> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LinkedBucket").finish()
}
}
impl<K, V> Drop for LinkedBucket<K, V> {
#[inline]
fn drop(&mut self) {
if needs_drop::<(K, V)>() {
let mut occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
let mut index = occupied_bitmap.trailing_zeros();
while index != 32 {
Bucket::<K, V, (), MAP>::drop_entry(&self.data_block, index as usize);
occupied_bitmap -= 1_u32 << index;
index = occupied_bitmap.trailing_zeros();
}
}
}
}
#[cfg(not(feature = "loom"))]
#[cfg(test)]
mod test {
use super::*;
use std::mem::MaybeUninit;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::Relaxed;
use proptest::prelude::*;
use sdd::{Guard, Shared};
use tokio::sync::Barrier;
#[cfg(not(miri))]
static_assertions::assert_eq_size!(Bucket<String, String, (), MAP>, [u8; BUCKET_LEN * 2]);
#[cfg(not(miri))]
static_assertions::assert_eq_size!(Bucket<String, String, DoublyLinkedList, CACHE>, [u8; BUCKET_LEN * 4]);
proptest! {
#[cfg_attr(miri, ignore)]
#[test]
fn evict_untracked(xs in 0..BUCKET_LEN * 2) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> =
unsafe { MaybeUninit::uninit().assume_init() };
let data_block_ptr =
unsafe { NonNull::new_unchecked(from_ref(&data_block).cast_mut()) };
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
for v in 0..xs {
let guard = Guard::new();
let writer = Writer::lock_sync(&bucket).unwrap();
let evicted = writer.evict_lru_head(data_block_ptr);
assert_eq!(v >= BUCKET_LEN, evicted.is_some());
if let Err(entry) = writer.insert(data_block_ptr, 0, (v, v), &guard) {
writer.insert_overflow(0, entry, &guard);
}
assert_eq!(writer.metadata.removed_bitmap_or_lru_tail.load(Relaxed), 0);
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn evict_overflowed(xs in 1..BUCKET_LEN * 2) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> =
unsafe { MaybeUninit::uninit().assume_init() };
let data_block_ptr =
unsafe { NonNull::new_unchecked(from_ref(&data_block).cast_mut()) };
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
let guard = Guard::new();
let writer = Writer::lock_sync(&bucket).unwrap();
for _ in 0..3 {
for v in 0..xs {
let entry_ptr = match writer.insert(data_block_ptr, 0, (v, v), &guard) {
Ok(entry_ptr) => entry_ptr,
Err(entry) => writer.insert_overflow(0, entry, &guard),
};
writer.update_lru_tail(&entry_ptr);
if v < BUCKET_LEN {
assert_eq!(
writer.metadata.removed_bitmap_or_lru_tail.load(Relaxed) as usize,
v + 1
);
}
assert_eq!(
writer.lru_list.read
(writer.metadata.removed_bitmap_or_lru_tail.load(Relaxed) as usize - 1)
.0,
0
);
}
let mut evicted_key = None;
if xs >= BUCKET_LEN {
let evicted = writer.evict_lru_head(data_block_ptr);
assert!(evicted.is_some());
evicted_key = evicted.map(|(k, _)| k);
}
assert_ne!(writer.metadata.removed_bitmap_or_lru_tail.load(Relaxed), 0);
for v in 0..xs {
let mut entry_ptr = writer.get_entry_ptr(data_block_ptr, &v, 0, &guard);
if entry_ptr.is_valid() {
let _erased = writer.remove(data_block_ptr, &mut entry_ptr, &guard);
} else {
assert_eq!(v, evicted_key.unwrap());
}
}
assert_eq!(writer.metadata.removed_bitmap_or_lru_tail.load(Relaxed), 0);
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn evict_tracked(xs in 0..BUCKET_LEN * 2) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> =
unsafe { MaybeUninit::uninit().assume_init() };
let data_block_ptr =
unsafe { NonNull::new_unchecked(from_ref(&data_block).cast_mut()) };
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
for v in 0..xs {
let guard = Guard::new();
let writer = Writer::lock_sync(&bucket).unwrap();
let evicted = writer.evict_lru_head(data_block_ptr);
assert_eq!(v >= BUCKET_LEN, evicted.is_some());
let mut entry_ptr = match writer.insert(data_block_ptr, 0, (v, v), &guard) {
Ok(entry_ptr) => entry_ptr,
Err(entry) => writer.insert_overflow(0, entry, &guard),
};
writer.update_lru_tail(&entry_ptr);
assert_eq!(
writer.metadata.removed_bitmap_or_lru_tail.load(Relaxed) as usize,
entry_ptr.current_index + 1
);
if v >= BUCKET_LEN {
entry_ptr.current_index = xs % BUCKET_LEN;
writer.update_lru_tail(&entry_ptr);
assert_eq!(
writer.metadata.removed_bitmap_or_lru_tail.load(Relaxed) as usize,
entry_ptr.current_index + 1
);
let mut iterated = 1;
let mut i = writer.lru_list.read(entry_ptr.current_index).1 as usize;
while i != entry_ptr.current_index {
iterated += 1;
i = writer.lru_list.read(i).1 as usize;
}
assert_eq!(iterated, BUCKET_LEN);
iterated = 1;
i = writer.lru_list.read(entry_ptr.current_index).0 as usize;
while i != entry_ptr.current_index {
iterated += 1;
i = writer.lru_list.read(i).0 as usize;
}
assert_eq!(iterated, BUCKET_LEN);
}
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn removed(xs in 0..BUCKET_LEN) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> =
unsafe { MaybeUninit::uninit().assume_init() };
let data_block_ptr =
unsafe { NonNull::new_unchecked(from_ref(&data_block).cast_mut()) };
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
for v in 0..xs {
let guard = Guard::new();
let writer = Writer::lock_sync(&bucket).unwrap();
let entry_ptr = match writer.insert(data_block_ptr, 0, (v, v), &guard) {
Ok(entry_ptr) => entry_ptr,
Err(entry) => writer.insert_overflow(0, entry, &guard),
};
writer.update_lru_tail(&entry_ptr);
let mut iterated = 1;
let mut i = writer.lru_list.read(entry_ptr.current_index).1 as usize;
while i != entry_ptr.current_index {
iterated += 1;
i = writer.lru_list.read(i).1 as usize;
}
assert_eq!(iterated, v + 1);
}
for v in 0..xs {
let guard = Guard::new();
let writer = Writer::lock_sync(&bucket).unwrap();
let data_block_ptr =
unsafe { NonNull::new_unchecked(from_ref(&data_block).cast_mut()) };
let entry_ptr = writer.get_entry_ptr(data_block_ptr, &v, 0, &guard);
let mut iterated = 1;
let mut i = writer.lru_list.read(entry_ptr.current_index).1 as usize;
while i != entry_ptr.current_index {
iterated += 1;
i = writer.lru_list.read(i).1 as usize;
}
assert_eq!(iterated, xs - v);
writer.remove_from_lru_list(&entry_ptr);
}
assert_eq!(bucket.metadata.removed_bitmap_or_lru_tail.load(Relaxed), 0);
}
}
#[cfg_attr(miri, ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn bucket_lock_sync() {
let num_tasks = BUCKET_LEN + 2;
let barrier = Shared::new(Barrier::new(num_tasks));
let data_block: Shared<DataBlock<usize, usize, BUCKET_LEN>> =
Shared::new(unsafe { MaybeUninit::uninit().assume_init() });
let mut bucket: Shared<Bucket<usize, usize, (), MAP>> = Shared::new(Bucket::new());
let mut data: [u64; 128] = [0; 128];
let mut task_handles = Vec::with_capacity(num_tasks);
for task_id in 0..num_tasks {
let barrier_clone = barrier.clone();
let data_block_clone = data_block.clone();
let bucket_clone = bucket.clone();
let data_ptr = AtomicPtr::new(&raw mut data);
task_handles.push(tokio::spawn(async move {
barrier_clone.wait().await;
let partial_hash = (task_id % BUCKET_LEN).try_into().unwrap();
let guard = Guard::new();
for i in 0..2048 {
let writer = Writer::lock_sync(&bucket_clone).unwrap();
let mut sum: u64 = 0;
for j in 0..128 {
unsafe {
sum += (*data_ptr.load(Relaxed))[j];
(*data_ptr.load(Relaxed))[j] = if i % 4 == 0 { 2 } else { 4 }
};
}
assert_eq!(sum % 256, 0);
let data_block_ptr =
unsafe { NonNull::new_unchecked(data_block_clone.as_ptr().cast_mut()) };
if i == 0 {
if let Err(entry) =
writer.insert(data_block_ptr, partial_hash, (task_id, 0), &guard)
{
writer.insert_overflow(partial_hash, entry, &guard);
}
} else {
assert_eq!(
writer
.search_entry(data_block_ptr, &task_id, partial_hash, &guard)
.unwrap(),
&(task_id, 0_usize)
);
}
drop(writer);
let reader = Reader::lock_sync(&*bucket_clone).unwrap();
assert_eq!(
reader
.search_entry(data_block_ptr, &task_id, partial_hash, &guard)
.unwrap(),
&(task_id, 0_usize)
);
}
}));
}
for r in futures::future::join_all(task_handles).await {
assert!(r.is_ok());
}
let sum: u64 = data.iter().sum();
assert_eq!(sum % 256, 0);
assert_eq!(bucket.len(), num_tasks);
let epoch_guard = Guard::new();
let data_block_ptr = unsafe { NonNull::new_unchecked(data_block.as_ptr().cast_mut()) };
for task_id in 0..num_tasks {
assert_eq!(
bucket.search_entry(
data_block_ptr,
&task_id,
(task_id % BUCKET_LEN).try_into().unwrap(),
&epoch_guard
),
Some(&(task_id, 0))
);
}
let mut count = 0;
let mut entry_ptr = EntryPtr::new(&epoch_guard);
while entry_ptr.move_to_next(&bucket, &epoch_guard) {
count += 1;
}
assert_eq!(bucket.len(), count);
entry_ptr = EntryPtr::new(&epoch_guard);
let writer = Writer::lock_sync(&bucket).unwrap();
while entry_ptr.move_to_next(&writer, &epoch_guard) {
writer.remove(
unsafe { NonNull::new_unchecked(data_block.as_ptr().cast_mut()) },
&mut entry_ptr,
&epoch_guard,
);
}
assert_eq!(writer.len(), 0);
writer.kill();
assert_eq!(bucket.len(), 0);
assert!(Writer::lock_sync(unsafe { bucket.get_mut().unwrap() }).is_none());
}
}