pub mod bucket;
pub mod bucket_array;
use std::hash::{BuildHasher, Hash};
use std::mem::forget;
use std::ops::Deref;
use std::pin::pin;
use std::ptr::{self, NonNull, from_ref};
#[cfg(not(feature = "loom"))]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use bucket::{BUCKET_LEN, CACHE, DataBlock, EntryPtr, INDEX, LruList, Reader, Writer};
use bucket_array::BucketArray;
#[cfg(feature = "loom")]
use loom::sync::atomic::AtomicUsize;
use sdd::{AtomicShared, Guard, Ptr, Shared, Tag};
use super::Equivalent;
use super::async_helper::{AsyncGuard, fake_guard};
use super::exit_guard::ExitGuard;
use super::hash_table::bucket::Bucket;
pub(super) trait HashTable<K, V, H, L: LruList, const TYPE: char>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
fn hash<Q>(&self, key: &Q) -> u64
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.hasher().hash_one(key)
}
fn hasher(&self) -> &H;
fn bucket_array_var(&self) -> &AtomicShared<BucketArray<K, V, L, TYPE>>;
fn bucket_array<'g>(&self, guard: &'g Guard) -> Option<&'g BucketArray<K, V, L, TYPE>> {
unsafe {
self.bucket_array_var()
.load(Acquire, guard)
.as_ref_unchecked()
}
}
#[inline]
fn defer_reclaim(&self, bucket_array: Shared<BucketArray<K, V, L, TYPE>>, _guard: &Guard) {
drop(bucket_array);
}
#[inline]
fn calculate_bucket_index<Q>(&self, key: &Q) -> usize
where
Q: Equivalent<K> + Hash + ?Sized,
{
unsafe {
self.bucket_array_var()
.load(Acquire, &Guard::new())
.as_ref_unchecked()
.map_or(0, |a| a.bucket_index(self.hash(key)))
}
}
fn minimum_capacity_var(&self) -> &AtomicUsize;
fn minimum_capacity(&self) -> usize {
self.minimum_capacity_var().load(Relaxed) & (!RESIZING)
}
fn maximum_capacity(&self) -> usize {
MAXIMUM_CAPACITY_LIMIT
}
fn reserve_capacity(&self, additional_capacity: usize) -> usize {
let mut current_minimum_capacity = self.minimum_capacity_var().load(Relaxed);
loop {
if additional_capacity
> self.maximum_capacity() - (current_minimum_capacity & (!RESIZING))
{
return 0;
}
match self.minimum_capacity_var().compare_exchange_weak(
current_minimum_capacity,
additional_capacity + current_minimum_capacity,
Relaxed,
Relaxed,
) {
Ok(_) => {
let guard = Guard::new();
if let Some(current_array) = self.bucket_array(&guard) {
if !current_array.has_linked_array() {
self.try_resize(current_array, 0, &guard);
}
}
return additional_capacity;
}
Err(actual) => current_minimum_capacity = actual,
}
}
}
#[inline]
fn get_or_create_bucket_array<'g>(&self, guard: &'g Guard) -> &'g BucketArray<K, V, L, TYPE> {
if let Some(current_array) = self.bucket_array(guard) {
current_array
} else {
self.allocate_bucket_array(guard)
}
}
fn allocate_bucket_array<'g>(&self, guard: &'g Guard) -> &'g BucketArray<K, V, L, TYPE> {
unsafe {
let capacity = self.minimum_capacity();
let allocated = Shared::new_unchecked(BucketArray::new(capacity, AtomicShared::null()));
match self.bucket_array_var().compare_exchange(
Ptr::null(),
(Some(allocated), Tag::None),
AcqRel,
Acquire,
guard,
) {
Ok((_, ptr)) | Err((_, ptr)) => ptr.as_ref_unchecked().unwrap_unchecked(),
}
}
}
#[inline]
fn num_slots(&self, guard: &Guard) -> usize {
if let Some(current_array) = self.bucket_array(guard) {
current_array.num_slots()
} else {
0
}
}
fn num_entries(&self, guard: &Guard) -> usize {
let mut num_entries: usize = 0;
if let Some(current_array) = self.bucket_array(guard) {
if let Some(old_array) = current_array.linked_array(guard) {
self.incremental_rehash_sync::<true>(current_array, guard);
for i in 0..old_array.len() {
num_entries = num_entries.saturating_add(old_array.bucket(i).len());
}
}
for i in 0..current_array.len() {
num_entries = num_entries.saturating_add(current_array.bucket(i).len());
}
if num_entries == 0 && self.minimum_capacity() == 0 {
self.try_resize(current_array, 0, guard);
}
}
num_entries
}
fn has_entry(&self, guard: &Guard) -> bool {
if let Some(current_array) = self.bucket_array(guard) {
if let Some(old_array) = current_array.linked_array(guard) {
self.incremental_rehash_sync::<true>(current_array, guard);
for i in 0..old_array.len() {
if old_array.bucket(i).len() != 0 {
return true;
}
}
}
for i in 0..current_array.len() {
if current_array.bucket(i).len() != 0 {
return true;
}
}
if self.minimum_capacity() == 0 {
self.try_resize(current_array, 0, guard);
}
}
false
}
#[inline]
fn sample(current_array: &BucketArray<K, V, L, TYPE>, sampling_index: usize) -> usize {
let sample_size = current_array.sample_size();
let sample_1 = sampling_index & (!(sample_size - 1));
let sample_2 = if sample_1 == 0 {
current_array.len() - sample_size
} else {
0
};
let mut num_entries = 0;
for i in (sample_1..sample_1 + sample_size).chain(sample_2..(sample_2 + sample_size)) {
num_entries += current_array.bucket(i).len();
}
num_entries * (current_array.len() / (sample_size * 2))
}
#[inline]
fn check_rebuild(current_array: &BucketArray<K, V, L, TYPE>, sampling_index: usize) -> bool {
let sample_size = current_array.sample_size();
let sample_1 = sampling_index & (!(sample_size - 1));
let sample_2 = if sample_1 == 0 {
current_array.len() - sample_size
} else {
0
};
let mut num_buckets_to_rebuild = 0;
for i in (sample_1..sample_1 + sample_size).chain(sample_2..(sample_2 + sample_size)) {
if current_array.bucket(i).need_rebuild() {
num_buckets_to_rebuild += 1;
if num_buckets_to_rebuild >= sample_size {
return true;
}
}
}
false
}
#[inline]
fn peek_entry<'g, Q>(&self, key: &Q, guard: &'g Guard) -> Option<&'g (K, V)>
where
Q: Equivalent<K> + Hash + ?Sized,
{
debug_assert_eq!(TYPE, INDEX);
let hash = self.hash(key);
let mut current_array_ptr = self.bucket_array_var().load(Acquire, guard);
while let Some(current_array) = unsafe { current_array_ptr.as_ref_unchecked() } {
if let Some(old_array) = current_array.linked_array(guard) {
self.incremental_rehash_sync::<true>(current_array, guard);
let index = old_array.bucket_index(hash);
if let Some(entry) = old_array.bucket(index).search_entry(
old_array.data_block(index),
key,
hash,
guard,
) {
return Some(entry);
}
}
let index = current_array.bucket_index(hash);
if let Some(entry) = current_array.bucket(index).search_entry(
current_array.data_block(index),
key,
hash,
guard,
) {
return Some(entry);
}
let new_current_array_ptr = self.bucket_array_var().load(Acquire, guard);
if current_array_ptr == new_current_array_ptr {
break;
}
current_array_ptr = new_current_array_ptr;
}
None
}
#[inline]
async fn reader_async<Q, R, F: FnOnce(&K, &V) -> R>(&self, key: &Q, f: F) -> Option<R>
where
Q: Equivalent<K> + Hash + ?Sized,
{
let hash = self.hash(key);
let async_guard = pin!(AsyncGuard::default());
while let Some(current_array) = async_guard.load_unchecked(self.bucket_array_var(), Acquire)
{
if current_array.has_linked_array() {
self.incremental_rehash_async(current_array, &async_guard)
.await;
if !self
.dedup_bucket_async(
current_array,
current_array.bucket_index(hash),
&async_guard,
)
.await
{
continue;
}
}
let bucket_index = current_array.bucket_index(hash);
let bucket = current_array.bucket(bucket_index);
if let Some(reader) = Reader::try_lock(bucket) {
if let Some(entry) = reader.search_entry(
current_array.data_block(bucket_index),
key,
hash,
async_guard.guard(),
) {
return Some(f(&entry.0, &entry.1));
}
break;
} else if let Some(reader) = Reader::lock_async(bucket, &async_guard).await {
if let Some(entry) = reader.search_entry(
current_array.data_block(bucket_index),
key,
hash,
async_guard.guard(),
) {
return Some(f(&entry.0, &entry.1));
}
break;
}
}
None
}
#[inline]
fn reader_sync<Q, R, F: FnOnce(&K, &V) -> R>(&self, key: &Q, f: F) -> Option<R>
where
Q: Equivalent<K> + Hash + ?Sized,
{
let hash = self.hash(key);
let guard = Guard::new();
while let Some(current_array) = self.bucket_array(&guard) {
let index = current_array.bucket_index(hash);
if let Some(old_array) = current_array.linked_array(&guard) {
self.incremental_rehash_sync::<false>(current_array, &guard);
self.dedup_bucket_sync::<false>(current_array, old_array, index);
}
let bucket = current_array.bucket(index);
if let Some(reader) = Reader::lock_sync(bucket) {
if let Some(entry) =
reader.search_entry(current_array.data_block(index), key, hash, &guard)
{
return Some(f(&entry.0, &entry.1));
}
break;
}
}
None
}
#[inline]
async fn writer_async(&self, hash: u64) -> LockedBucket<K, V, L, TYPE> {
let async_guard = pin!(AsyncGuard::default());
if let Some(locked_bucket) = self.try_optional_writer::<true>(hash, async_guard.guard()) {
return locked_bucket;
}
loop {
let current_array = self.get_or_create_bucket_array(async_guard.guard());
if current_array.has_linked_array() {
self.incremental_rehash_async(current_array, &async_guard)
.await;
if !self
.dedup_bucket_async(
current_array,
current_array.bucket_index(hash),
&async_guard,
)
.await
{
continue;
}
}
let bucket_index = current_array.bucket_index(hash);
let bucket = current_array.bucket(bucket_index);
if (TYPE != CACHE || current_array.num_slots() < self.maximum_capacity())
&& bucket.len() >= BUCKET_LEN - 1
&& current_array.initiate_sampling(hash)
{
self.try_enlarge(
current_array,
bucket_index,
bucket.len(),
async_guard.guard(),
);
}
if let Some(writer) = Writer::lock_async(bucket, &async_guard).await {
return LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
};
}
}
}
#[inline]
fn writer_sync(&self, hash: u64) -> LockedBucket<K, V, L, TYPE> {
let guard = Guard::new();
if let Some(locked_bucket) = self.try_optional_writer::<true>(hash, &guard) {
return locked_bucket;
}
loop {
let current_array = self.get_or_create_bucket_array(&guard);
let bucket_index = current_array.bucket_index(hash);
if let Some(old_array) = current_array.linked_array(&guard) {
self.incremental_rehash_sync::<false>(current_array, &guard);
self.dedup_bucket_sync::<false>(current_array, old_array, bucket_index);
}
let bucket = current_array.bucket(bucket_index);
if (TYPE != CACHE || current_array.num_slots() < self.maximum_capacity())
&& bucket.len() >= BUCKET_LEN - 1
&& current_array.initiate_sampling(hash)
{
self.try_enlarge(current_array, bucket_index, bucket.len(), &guard);
}
if let Some(writer) = Writer::lock_sync(bucket) {
return LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
};
}
}
}
#[inline]
async fn optional_writer_async(&self, hash: u64) -> Option<LockedBucket<K, V, L, TYPE>> {
let async_guard = pin!(AsyncGuard::default());
if let Some(locked_bucket) = self.try_optional_writer::<false>(hash, async_guard.guard()) {
return Some(locked_bucket);
}
while let Some(current_array) = async_guard.load_unchecked(self.bucket_array_var(), Acquire)
{
if current_array.has_linked_array() {
self.incremental_rehash_async(current_array, &async_guard)
.await;
if !self
.dedup_bucket_async(
current_array,
current_array.bucket_index(hash),
&async_guard,
)
.await
{
continue;
}
}
let bucket_index = current_array.bucket_index(hash);
let bucket = current_array.bucket(bucket_index);
if let Some(writer) = Writer::lock_async(bucket, &async_guard).await {
return Some(LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
});
}
}
None
}
#[inline]
fn optional_writer_sync(&self, hash: u64) -> Option<LockedBucket<K, V, L, TYPE>> {
let guard = Guard::new();
if let Some(locked_bucket) = self.try_optional_writer::<false>(hash, &guard) {
return Some(locked_bucket);
}
while let Some(current_array) = self.bucket_array(&guard) {
let bucket_index = current_array.bucket_index(hash);
if let Some(old_array) = current_array.linked_array(&guard) {
self.incremental_rehash_sync::<false>(current_array, &guard);
self.dedup_bucket_sync::<false>(current_array, old_array, bucket_index);
}
let bucket = current_array.bucket(bucket_index);
if let Some(writer) = Writer::lock_sync(bucket) {
return Some(LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
});
}
}
None
}
#[inline]
fn try_optional_writer<const CHECK_SIZE: bool>(
&self,
hash: u64,
guard: &Guard,
) -> Option<LockedBucket<K, V, L, TYPE>> {
if let Some(current_array) = self.bucket_array(guard) {
if current_array.has_linked_array() {
return None;
}
let bucket_index = current_array.bucket_index(hash);
let bucket = current_array.bucket(bucket_index);
if CHECK_SIZE && bucket.len() >= BUCKET_LEN {
return None;
}
if let Ok(Some(writer)) = Writer::try_lock(bucket) {
return Some(LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
});
}
}
None
}
#[inline]
async fn for_each_reader_async<F>(&self, mut f: F)
where
F: FnMut(Reader<K, V, L, TYPE>, NonNull<DataBlock<K, V, BUCKET_LEN>>) -> bool,
{
let async_guard = pin!(AsyncGuard::default());
let mut start_index = 0;
let mut prev_len = 0;
while let Some(current_array) = async_guard.load_unchecked(self.bucket_array_var(), Acquire)
{
start_index = if prev_len == 0 || prev_len == current_array.len() {
start_index
} else {
from_index_to_range(prev_len, current_array.len(), start_index).0
};
prev_len = current_array.len();
while start_index < current_array.len() {
if current_array.has_linked_array() {
self.incremental_rehash_async(current_array, &async_guard)
.await;
if !self
.dedup_bucket_async(current_array, start_index, &async_guard)
.await
{
break;
}
}
let bucket = current_array.bucket(start_index);
if let Some(reader) = Reader::lock_async(bucket, &async_guard).await {
if !async_guard.check_ref(self.bucket_array_var(), current_array, Acquire) {
break;
}
let data_block = current_array.data_block(start_index);
if !f(reader, data_block) {
return;
}
} else {
break;
}
start_index += 1;
}
if start_index == current_array.len() {
break;
}
}
}
#[inline]
fn for_each_reader_sync<F>(&self, guard: &Guard, mut f: F)
where
F: FnMut(Reader<K, V, L, TYPE>, NonNull<DataBlock<K, V, BUCKET_LEN>>) -> bool,
{
let mut start_index = 0;
let mut prev_len = 0;
while let Some(current_array) = self.bucket_array(guard) {
start_index = if prev_len == 0 || prev_len == current_array.len() {
start_index
} else {
from_index_to_range(prev_len, current_array.len(), start_index).0
};
prev_len = current_array.len();
while start_index < current_array.len() {
let index = start_index;
if let Some(old_array) = current_array.linked_array(guard) {
self.incremental_rehash_sync::<false>(current_array, guard);
self.dedup_bucket_sync::<false>(current_array, old_array, index);
}
let bucket = current_array.bucket(index);
if let Some(reader) = Reader::lock_sync(bucket) {
let data_block = current_array.data_block(index);
if !f(reader, data_block) {
return;
}
} else {
break;
}
start_index += 1;
}
if start_index == current_array.len() {
break;
}
}
}
#[inline]
async fn for_each_writer_async<F>(
&self,
mut start_index: usize,
expected_array_len: usize,
mut f: F,
) where
F: FnMut(LockedBucket<K, V, L, TYPE>, &mut bool) -> bool,
{
let async_guard = pin!(AsyncGuard::default());
let mut prev_len = expected_array_len;
let mut removed = false;
while let Some(current_array) = async_guard.load_unchecked(self.bucket_array_var(), Acquire)
{
let current_array_len = current_array.len();
start_index = if prev_len == 0 || prev_len == current_array_len {
start_index
} else {
from_index_to_range(prev_len, current_array_len, start_index).0
};
prev_len = current_array_len;
while start_index < current_array_len {
let bucket_index = start_index;
if current_array.has_linked_array() {
self.incremental_rehash_async(current_array, &async_guard)
.await;
if !self
.dedup_bucket_async(current_array, bucket_index, &async_guard)
.await
{
break;
}
}
let bucket = current_array.bucket(bucket_index);
if let Some(writer) = Writer::lock_async(bucket, &async_guard).await {
if !async_guard.check_ref(self.bucket_array_var(), current_array, Acquire) {
break;
}
let locked_bucket = LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
};
let stop = f(locked_bucket, &mut removed);
if stop {
start_index = current_array_len;
break;
}
} else {
break;
}
start_index += 1;
}
if start_index == current_array_len {
break;
}
}
if removed {
if let Some(current_array) = self.bucket_array(async_guard.guard()) {
self.try_shrink_or_rebuild(current_array, 0, async_guard.guard());
}
}
}
#[inline]
fn for_each_writer_sync<F>(
&self,
mut start_index: usize,
expected_array_len: usize,
guard: &Guard,
mut f: F,
) where
F: FnMut(LockedBucket<K, V, L, TYPE>, &mut bool) -> bool,
{
let mut prev_len = expected_array_len;
let mut removed = false;
while let Some(current_array) = self.bucket_array(guard) {
let current_array_len = current_array.len();
start_index = if prev_len == 0 || prev_len == current_array_len {
start_index
} else {
from_index_to_range(prev_len, current_array_len, start_index).0
};
prev_len = current_array_len;
while start_index < current_array_len {
let bucket_index = start_index;
if let Some(old_array) = current_array.linked_array(guard) {
self.incremental_rehash_sync::<false>(current_array, guard);
self.dedup_bucket_sync::<false>(current_array, old_array, bucket_index);
}
let bucket = current_array.bucket(bucket_index);
if let Some(writer) = Writer::lock_sync(bucket) {
let locked_bucket = LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
};
let stop = f(locked_bucket, &mut removed);
if stop {
start_index = current_array_len;
break;
}
} else {
break;
}
start_index += 1;
}
if start_index == current_array_len {
break;
}
}
if removed {
if let Some(current_array) = self.bucket_array(guard) {
self.try_shrink_or_rebuild(current_array, 0, guard);
}
}
}
#[inline]
fn try_reserve_bucket(&self, hash: u64, guard: &Guard) -> Option<LockedBucket<K, V, L, TYPE>> {
loop {
let current_array = self.get_or_create_bucket_array(guard);
let bucket_index = current_array.bucket_index(hash);
if let Some(old_array) = current_array.linked_array(guard) {
self.incremental_rehash_sync::<true>(current_array, guard);
if !self.dedup_bucket_sync::<true>(current_array, old_array, bucket_index) {
return None;
}
}
let mut bucket = current_array.bucket(bucket_index);
if (TYPE != CACHE || current_array.num_slots() < self.maximum_capacity())
&& bucket.len() >= BUCKET_LEN - 1
&& current_array.initiate_sampling(hash)
{
self.try_enlarge(current_array, bucket_index, bucket.len(), guard);
bucket = current_array.bucket(bucket_index);
}
let Ok(writer) = Writer::try_lock(bucket) else {
return None;
};
if let Some(writer) = writer {
return Some(LockedBucket {
writer,
data_block: current_array.data_block(bucket_index),
bucket_index,
bucket_array: into_non_null(current_array),
});
}
}
}
async fn dedup_bucket_async<'g>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
index: usize,
async_guard: &'g AsyncGuard,
) -> bool {
if !async_guard.check_ref(self.bucket_array_var(), current_array, Acquire) {
return false;
}
if let Some(old_array) =
async_guard.load_unchecked(current_array.linked_array_var(), Acquire)
{
let range = from_index_to_range(current_array.len(), old_array.len(), index);
for old_index in range.0..range.1 {
let bucket = old_array.bucket(old_index);
let writer = Writer::lock_async(bucket, async_guard).await;
if let Some(writer) = writer {
self.relocate_bucket_async(
current_array,
old_array,
old_index,
writer,
async_guard,
)
.await;
} else if !async_guard.has_guard() {
return false;
}
if !current_array.has_linked_array() {
break;
}
}
}
true
}
fn dedup_bucket_sync<'g, const TRY_LOCK: bool>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
old_array: &'g BucketArray<K, V, L, TYPE>,
index: usize,
) -> bool {
let range = from_index_to_range(current_array.len(), old_array.len(), index);
for old_index in range.0..range.1 {
let bucket = old_array.bucket(old_index);
let writer = if TRY_LOCK {
let Ok(writer) = Writer::try_lock(bucket) else {
return false;
};
writer
} else {
Writer::lock_sync(bucket)
};
if let Some(writer) = writer {
if !self.relocate_bucket_sync::<TRY_LOCK>(
current_array,
old_array,
old_index,
writer,
) {
return false;
}
}
}
true
}
async fn relocate_bucket_async<'g>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
old_array: &'g BucketArray<K, V, L, TYPE>,
old_index: usize,
old_writer: Writer<K, V, L, TYPE>,
async_guard: &'g AsyncGuard,
) {
if old_writer.len() == 0 {
async_guard.guard();
old_writer.kill();
return;
}
let (target_index, end_target_index) =
from_index_to_range(old_array.len(), current_array.len(), old_index);
for i in target_index..end_target_index {
let writer = unsafe {
Writer::lock_async(current_array.bucket(i), async_guard)
.await
.unwrap_unchecked()
};
forget(writer);
}
let Some(old_array) = current_array.linked_array(async_guard.guard()) else {
return;
};
let (target_index, end_target_index) =
from_index_to_range(old_array.len(), current_array.len(), old_index);
let unlock = ExitGuard::new(
(current_array, target_index, end_target_index),
|(current_array, target_index, end_target_index)| {
for i in target_index..end_target_index {
let writer = Writer::from_bucket(current_array.bucket(i));
drop(writer);
}
},
);
self.relocate_bucket(unlock.0, unlock.1, old_array, old_index, &old_writer);
drop(unlock);
async_guard.guard();
old_writer.kill();
}
fn relocate_bucket_sync<'g, const TRY_LOCK: bool>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
old_array: &'g BucketArray<K, V, L, TYPE>,
old_index: usize,
old_writer: Writer<K, V, L, TYPE>,
) -> bool {
if old_writer.len() == 0 {
old_writer.kill();
return true;
}
let (target_index, end_target_index) =
from_index_to_range(old_array.len(), current_array.len(), old_index);
for i in target_index..end_target_index {
let writer = if TRY_LOCK {
let Ok(Some(writer)) = Writer::try_lock(current_array.bucket(i)) else {
for j in target_index..i {
let writer = Writer::from_bucket(current_array.bucket(j));
drop(writer);
}
return false;
};
writer
} else {
unsafe { Writer::lock_sync(current_array.bucket(i)).unwrap_unchecked() }
};
forget(writer);
}
let unlock = ExitGuard::new((), |()| {
for i in target_index..end_target_index {
let writer = Writer::from_bucket(current_array.bucket(i));
drop(writer);
}
});
self.relocate_bucket(
current_array,
target_index,
old_array,
old_index,
&old_writer,
);
drop(unlock);
old_writer.kill();
true
}
fn relocate_bucket(
&self,
current_array: &BucketArray<K, V, L, TYPE>,
target_index: usize,
old_array: &BucketArray<K, V, L, TYPE>,
old_index: usize,
old_writer: &Writer<K, V, L, TYPE>,
) {
let pre_allocate_slots =
old_array.len() > current_array.len() || old_writer.len() > BUCKET_LEN;
let old_data_block = old_array.data_block(old_index);
let mut entry_ptr = EntryPtr::null();
let mut position = 0;
let mut dist = [0_u32; 8];
let mut extended_dist: Vec<u32> = Vec::new();
let mut hash_data = [0_u64; BUCKET_LEN];
while entry_ptr.move_to_next(old_writer) {
let (offset, hash) = if old_array.len() >= current_array.len() {
(0, u64::from(entry_ptr.partial_hash(&**old_writer)))
} else {
let hash = self.hash(&entry_ptr.get_mut(old_data_block, old_writer).0);
let new_index = current_array.bucket_index(hash);
debug_assert!(new_index - target_index < (current_array.len() / old_array.len()));
(new_index - target_index, hash)
};
if pre_allocate_slots {
if position != BUCKET_LEN {
hash_data[position] = hash;
position += 1;
}
if offset < 8 {
dist[offset] += 1;
} else {
if extended_dist.len() < offset - 7 {
extended_dist.resize(offset - 7, 0);
}
extended_dist[offset - 8] += 1;
}
} else {
current_array.bucket(target_index + offset).extract_from(
current_array.data_block(target_index + offset),
hash,
old_writer,
old_data_block,
&mut entry_ptr,
);
}
}
if !pre_allocate_slots {
return;
}
for (i, d) in dist.iter().chain(extended_dist.iter()).enumerate() {
if *d != 0 {
let bucket = current_array.bucket(target_index + i);
bucket.reserve_slots((*d) as usize);
}
}
entry_ptr = EntryPtr::null();
position = 0;
while entry_ptr.move_to_next(old_writer) {
let hash = if old_array.len() >= current_array.len() {
u64::from(entry_ptr.partial_hash(&**old_writer))
} else if position == BUCKET_LEN {
self.hash(&entry_ptr.get(old_data_block, fake_guard()).0)
} else {
position += 1;
hash_data[position - 1]
};
let index = if old_array.len() >= current_array.len() {
target_index
} else {
current_array.bucket_index(hash)
};
current_array.bucket(index).extract_from(
current_array.data_block(index),
hash,
old_writer,
old_data_block,
&mut entry_ptr,
);
}
}
#[inline]
fn start_incremental_rehash(old_array: &BucketArray<K, V, L, TYPE>) -> Option<usize> {
let rehashing_metadata = old_array.rehashing_metadata();
let mut current = rehashing_metadata.load(Relaxed);
loop {
if current >= old_array.len() || (current & (BUCKET_LEN - 1)) == BUCKET_LEN - 1 {
return None;
}
match rehashing_metadata.compare_exchange_weak(
current,
current + BUCKET_LEN + 1,
Acquire,
Relaxed,
) {
Ok(_) => {
current &= !(BUCKET_LEN - 1);
return Some(current);
}
Err(result) => current = result,
}
}
}
#[inline]
fn end_incremental_rehash(
old_array: &BucketArray<K, V, L, TYPE>,
prev: usize,
success: bool,
) -> bool {
let rehashing_metadata = old_array.rehashing_metadata();
if success {
let metadata = rehashing_metadata.fetch_sub(1, Release) - 1;
(metadata & (BUCKET_LEN - 1) == 0) && metadata >= old_array.len()
} else {
let mut current = rehashing_metadata.load(Relaxed);
loop {
let new = if current <= prev {
current - 1
} else {
let refs = current & (BUCKET_LEN - 1);
prev | (refs - 1)
};
match rehashing_metadata.compare_exchange_weak(current, new, Release, Relaxed) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
false
}
}
async fn incremental_rehash_async<'g>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
async_guard: &'g AsyncGuard,
) {
if let Some(old_array) =
async_guard.load_unchecked(current_array.linked_array_var(), Acquire)
{
if let Some(current) = Self::start_incremental_rehash(old_array) {
let rehashing_guard = ExitGuard::new((old_array, current), |(old_array, prev)| {
Self::end_incremental_rehash(old_array, prev, false);
});
for bucket_index in
rehashing_guard.1..(rehashing_guard.1 + BUCKET_LEN).min(old_array.len())
{
let old_bucket = rehashing_guard.0.bucket(bucket_index);
let writer = Writer::lock_async(old_bucket, async_guard).await;
if let Some(writer) = writer {
self.relocate_bucket_async(
current_array,
rehashing_guard.0,
bucket_index,
writer,
async_guard,
)
.await;
}
debug_assert!(current_array.has_linked_array());
}
if Self::end_incremental_rehash(rehashing_guard.0, rehashing_guard.1, true) {
if let Some(bucket_array) = current_array
.linked_array_var()
.swap((None, Tag::None), Release)
.0
{
self.defer_reclaim(bucket_array, async_guard.guard());
}
}
rehashing_guard.forget();
}
}
}
fn incremental_rehash_sync<'g, const TRY_LOCK: bool>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
guard: &'g Guard,
) {
if let Some(old_array) = current_array.linked_array(guard) {
if let Some(current) = Self::start_incremental_rehash(old_array) {
let rehashing_guard = ExitGuard::new((old_array, current), |(old_array, prev)| {
Self::end_incremental_rehash(old_array, prev, false);
});
for bucket_index in
rehashing_guard.1..(rehashing_guard.1 + BUCKET_LEN).min(old_array.len())
{
let old_bucket = rehashing_guard.0.bucket(bucket_index);
let writer = if TRY_LOCK {
let Ok(writer) = Writer::try_lock(old_bucket) else {
return;
};
writer
} else {
Writer::lock_sync(old_bucket)
};
if let Some(writer) = writer {
if !self.relocate_bucket_sync::<TRY_LOCK>(
current_array,
rehashing_guard.0,
bucket_index,
writer,
) {
return;
}
}
}
if Self::end_incremental_rehash(rehashing_guard.0, rehashing_guard.1, true) {
if let Some(bucket_array) = current_array
.linked_array_var()
.swap((None, Tag::None), Release)
.0
{
self.defer_reclaim(bucket_array, guard);
}
}
rehashing_guard.forget();
}
}
}
fn try_enlarge(
&self,
current_array: &BucketArray<K, V, L, TYPE>,
index: usize,
mut num_entries: usize,
guard: &Guard,
) {
if !current_array.has_linked_array() {
let threshold = current_array.sample_size() * (BUCKET_LEN / 32) * 25;
if num_entries > threshold
|| (1..current_array.sample_size()).any(|i| {
num_entries += current_array
.bucket((index + i) % current_array.len())
.len();
num_entries > threshold
})
{
self.try_resize(current_array, index, guard);
}
}
}
fn try_shrink_or_rebuild(
&self,
current_array: &BucketArray<K, V, L, TYPE>,
index: usize,
guard: &Guard,
) {
if !current_array.has_linked_array() {
let minimum_capacity = self.minimum_capacity();
if TYPE == INDEX || current_array.num_slots() > minimum_capacity {
let shrink_threshold = current_array.sample_size() * BUCKET_LEN / 8;
let rebuild_threshold = current_array.sample_size() / 2;
let mut num_entries = 0;
let mut num_buckets_to_rebuild = 0;
for i in 0..current_array.sample_size() {
let bucket = current_array.bucket((index + i) % current_array.len());
num_entries += bucket.len();
if num_entries >= shrink_threshold
&& (TYPE != INDEX
|| num_buckets_to_rebuild + (current_array.sample_size() - i)
< rebuild_threshold)
{
return;
}
if TYPE == INDEX && bucket.need_rebuild() {
num_buckets_to_rebuild += 1;
if num_buckets_to_rebuild > rebuild_threshold {
self.try_resize(current_array, index, guard);
return;
}
}
}
if TYPE != INDEX || num_entries <= shrink_threshold {
self.try_resize(current_array, index, guard);
}
}
}
}
fn try_resize(
&self,
sampled_array: &BucketArray<K, V, L, TYPE>,
sampling_index: usize,
guard: &Guard,
) {
let current_array_ptr = self.bucket_array_var().load(Acquire, guard);
let Some(current_array) = (unsafe { current_array_ptr.as_ref_unchecked() }) else {
return;
};
if !ptr::eq(current_array, sampled_array) {
return;
} else if current_array.has_linked_array() {
return;
}
let minimum_capacity = self.minimum_capacity();
let capacity = current_array.num_slots();
let estimated_num_entries = Self::sample(current_array, sampling_index);
let new_capacity =
if capacity < minimum_capacity || estimated_num_entries >= (capacity / 32) * 25 {
if capacity == self.maximum_capacity() {
capacity
} else {
let mut new_capacity = minimum_capacity.next_power_of_two().max(capacity);
while new_capacity / 2 < estimated_num_entries {
if new_capacity >= self.maximum_capacity() {
break;
}
new_capacity *= 2;
}
new_capacity
}
} else if estimated_num_entries <= capacity / 8 {
(estimated_num_entries * 2)
.max(minimum_capacity)
.max(BucketArray::<K, V, L, TYPE>::minimum_capacity())
.next_power_of_two()
} else {
capacity
};
let try_resize = new_capacity != capacity;
let try_drop_table = estimated_num_entries == 0 && minimum_capacity == 0;
let try_rebuild =
TYPE == INDEX && !try_resize && Self::check_rebuild(current_array, sampling_index);
if !try_resize && !try_drop_table && !try_rebuild {
return;
}
if self
.minimum_capacity_var()
.fetch_update(AcqRel, Acquire, |lock_state| {
if lock_state >= RESIZING {
None
} else {
Some(lock_state + RESIZING)
}
})
.is_err()
{
return;
}
let _lock_guard = ExitGuard::new((), |()| {
self.minimum_capacity_var().fetch_sub(RESIZING, Release);
});
if self.bucket_array_var().load(Acquire, guard) != current_array_ptr {
return;
}
if try_drop_table {
let mut writer_guard = ExitGuard::new((0, false), |(len, success): (usize, bool)| {
for i in 0..len {
let writer = Writer::from_bucket(current_array.bucket(i));
if success {
debug_assert_eq!(writer.len(), 0);
writer.kill();
}
}
});
if !(0..current_array.len()).any(|i| {
if let Ok(Some(writer)) = Writer::try_lock(current_array.bucket(i)) {
if writer.len() == 0 {
writer_guard.0 = i + 1;
forget(writer);
return false;
}
}
true
}) {
writer_guard.1 = true;
if let Some(bucket_array) =
self.bucket_array_var().swap((None, Tag::None), Release).0
{
self.defer_reclaim(bucket_array, guard);
}
}
} else if try_resize || try_rebuild {
let new_bucket_array = unsafe {
Shared::new_unchecked(BucketArray::<K, V, L, TYPE>::new(
new_capacity,
(*self.bucket_array_var()).clone(Relaxed, guard),
))
};
self.bucket_array_var()
.swap((Some(new_bucket_array), Tag::None), Release);
}
}
#[inline]
fn capacity_from_size_hint(size_hint: (usize, Option<usize>)) -> usize {
(size_hint
.1
.unwrap_or(size_hint.0)
.min(1_usize << (usize::BITS - 2))
/ 4)
* 5
}
}
pub(super) const MAXIMUM_CAPACITY_LIMIT: usize = 1_usize << (usize::BITS - 2);
pub(super) const RESIZING: usize = 1_usize << (usize::BITS - 1);
#[derive(Debug)]
pub(crate) struct LockedBucket<K, V, L: LruList, const TYPE: char> {
pub writer: Writer<K, V, L, TYPE>,
pub data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
pub bucket_index: usize,
pub bucket_array: NonNull<BucketArray<K, V, L, TYPE>>,
}
impl<K, V, L: LruList, const TYPE: char> LockedBucket<K, V, L, TYPE> {
#[inline]
pub(crate) const fn bucket_array(&self) -> &BucketArray<K, V, L, TYPE> {
unsafe { self.bucket_array.as_ref() }
}
#[inline]
pub(crate) fn entry(&self, entry_ptr: &EntryPtr<K, V, TYPE>) -> &(K, V) {
entry_ptr.get(self.data_block, fake_guard())
}
#[inline]
pub(crate) fn entry_mut<'b>(
&'b mut self,
entry_ptr: &'b mut EntryPtr<K, V, TYPE>,
) -> &'b mut (K, V) {
entry_ptr.get_mut(self.data_block, &self.writer)
}
#[inline]
pub(crate) fn insert(&self, hash: u64, entry: (K, V)) -> EntryPtr<K, V, TYPE> {
self.writer.insert(self.data_block, hash, entry)
}
}
impl<K: Eq + Hash, V, L: LruList, const TYPE: char> LockedBucket<K, V, L, TYPE> {
#[inline]
pub(crate) fn search<Q>(&self, key: &Q, hash: u64) -> EntryPtr<K, V, TYPE>
where
Q: Equivalent<K> + ?Sized,
{
(*self.writer).get_entry_ptr(self.data_block, key, hash)
}
#[inline]
pub(crate) fn remove<H, T: HashTable<K, V, H, L, TYPE>>(
self,
hash_table: &T,
entry_ptr: &mut EntryPtr<K, V, TYPE>,
) -> (K, V)
where
H: BuildHasher,
{
let removed = self.writer.remove(self.data_block, entry_ptr);
self.try_shrink_or_rebuild(hash_table);
removed
}
#[inline]
pub(crate) fn mark_removed<H, T: HashTable<K, V, H, L, TYPE>>(
self,
hash_table: &T,
entry_ptr: &mut EntryPtr<K, V, TYPE>,
) where
H: BuildHasher,
{
debug_assert_eq!(TYPE, INDEX);
self.writer.mark_removed(entry_ptr);
self.try_shrink_or_rebuild(hash_table);
}
#[inline]
pub(crate) fn try_shrink_or_rebuild<H, T: HashTable<K, V, H, L, TYPE>>(self, hash_table: &T)
where
H: BuildHasher,
{
if (TYPE == INDEX && self.writer.need_rebuild()) || self.writer.len() == 0 {
let guard = Guard::new();
if let Some(current_array) = hash_table.bucket_array(&guard) {
if ptr::eq(current_array, self.bucket_array()) {
let bucket_index = self.bucket_index;
drop(self);
hash_table.try_shrink_or_rebuild(current_array, bucket_index, &guard);
}
}
}
}
#[inline]
pub(super) async fn next_async<H, T: HashTable<K, V, H, L, TYPE>>(
self,
hash_table: &T,
entry_ptr: &mut EntryPtr<K, V, TYPE>,
) -> Option<LockedBucket<K, V, L, TYPE>>
where
H: BuildHasher,
{
if entry_ptr.move_to_next(&self.writer) {
return Some(self);
}
let next_index = self.bucket_index + 1;
let len = self.bucket_array().len();
if self.writer.len() == 0 {
self.try_shrink_or_rebuild(hash_table);
} else {
drop(self);
}
if next_index == len {
return None;
}
let mut next_entry = None;
hash_table
.for_each_writer_async(next_index, len, |locked_bucket, _| {
*entry_ptr = EntryPtr::null();
if entry_ptr.move_to_next(&locked_bucket.writer) {
next_entry = Some(locked_bucket);
return true;
}
false
})
.await;
next_entry
}
#[inline]
pub(super) fn next_sync<H, T: HashTable<K, V, H, L, TYPE>>(
self,
hash_table: &T,
entry_ptr: &mut EntryPtr<K, V, TYPE>,
) -> Option<Self>
where
H: BuildHasher,
{
if entry_ptr.move_to_next(&self.writer) {
return Some(self);
}
let next_index = self.bucket_index + 1;
let len = self.bucket_array().len();
if self.writer.len() == 0 {
self.try_shrink_or_rebuild(hash_table);
} else {
drop(self);
}
if next_index == len {
return None;
}
let mut next_entry = None;
hash_table.for_each_writer_sync(next_index, len, &Guard::new(), |locked_bucket, _| {
*entry_ptr = EntryPtr::null();
if entry_ptr.move_to_next(&locked_bucket.writer) {
next_entry = Some(locked_bucket);
return true;
}
false
});
next_entry
}
}
impl<K, V, L: LruList, const TYPE: char> Deref for LockedBucket<K, V, L, TYPE> {
type Target = Bucket<K, V, L, TYPE>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.writer
}
}
unsafe impl<K: Send, V: Send, L: LruList, const TYPE: char> Send for LockedBucket<K, V, L, TYPE> {}
unsafe impl<K: Send + Sync, V: Send + Sync, L: LruList, const TYPE: char> Sync
for LockedBucket<K, V, L, TYPE>
{
}
#[inline]
const fn from_index_to_range(from_len: usize, to_len: usize, from_index: usize) -> (usize, usize) {
debug_assert!(from_len.is_power_of_two() && to_len.is_power_of_two());
if from_len < to_len {
let ratio = to_len / from_len;
let start_index = from_index * ratio;
debug_assert!(start_index + ratio <= to_len,);
(start_index, start_index + ratio)
} else {
let ratio = from_len / to_len;
let start_index = from_index / ratio;
debug_assert!(start_index < to_len,);
(start_index, start_index + 1)
}
}
const fn into_non_null<T: Sized>(t: &T) -> NonNull<T> {
unsafe { NonNull::new_unchecked(from_ref(t).cast_mut()) }
}