use super::ebr::{Arc, AtomicArc, Barrier, Tag};
use super::hash_table::bucket::{EntryPtr, Locker, Reader, SEQUENTIAL};
use super::hash_table::bucket_array::BucketArray;
use super::hash_table::{HashTable, LockedEntry};
use super::wait_queue::AsyncWait;
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::fmt::{self, Debug};
use std::hash::{BuildHasher, Hash};
use std::mem::replace;
use std::ops::{Deref, RangeInclusive};
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Relaxed};
pub struct HashMap<K, V, H = RandomState>
where
K: Eq + Hash,
H: BuildHasher,
{
array: AtomicArc<BucketArray<K, V, SEQUENTIAL>>,
minimum_capacity: AtomicUsize,
build_hasher: H,
}
pub enum Entry<'h, K, V, H = RandomState>
where
K: Eq + Hash,
H: BuildHasher,
{
Occupied(OccupiedEntry<'h, K, V, H>),
Vacant(VacantEntry<'h, K, V, H>),
}
pub struct OccupiedEntry<'h, K, V, H = RandomState>
where
K: Eq + Hash,
H: BuildHasher,
{
hashmap: &'h HashMap<K, V, H>,
locked_entry: LockedEntry<'h, K, V, SEQUENTIAL>,
}
pub struct VacantEntry<'h, K, V, H = RandomState>
where
K: Eq + Hash,
H: BuildHasher,
{
hashmap: &'h HashMap<K, V, H>,
key: K,
hash: u64,
locked_entry: LockedEntry<'h, K, V, SEQUENTIAL>,
}
pub struct Reserve<'h, K, V, H = RandomState>
where
K: Eq + Hash,
H: BuildHasher,
{
hashmap: &'h HashMap<K, V, H>,
additional: usize,
}
impl<K, V, H> HashMap<K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
pub fn with_hasher(build_hasher: H) -> Self {
Self {
array: AtomicArc::null(),
minimum_capacity: AtomicUsize::new(0),
build_hasher,
}
}
#[inline]
pub fn with_capacity_and_hasher(capacity: usize, build_hasher: H) -> Self {
let (array, minimum_capacity) = if capacity == 0 {
(AtomicArc::null(), AtomicUsize::new(0))
} else {
let array = unsafe {
Arc::new_unchecked(BucketArray::<K, V, SEQUENTIAL>::new(
capacity,
AtomicArc::null(),
))
};
let minimum_capacity = array.num_entries();
(AtomicArc::from(array), AtomicUsize::new(minimum_capacity))
};
Self {
array,
minimum_capacity,
build_hasher,
}
}
#[inline]
pub fn reserve(&self, additional_capacity: usize) -> Option<Reserve<K, V, H>> {
let additional = self.reserve_capacity(additional_capacity);
if additional == 0 {
None
} else {
Some(Reserve {
hashmap: self,
additional,
})
}
}
#[inline]
pub fn entry(&self, key: K) -> Entry<K, V, H> {
let barrier = Barrier::new();
let hash = self.hash(&key);
let locked_entry = unsafe {
self.reserve_entry(&key, hash, &mut (), self.prolonged_barrier_ref(&barrier))
.ok()
.unwrap_unchecked()
};
if locked_entry.entry_ptr.is_valid() {
Entry::Occupied(OccupiedEntry {
hashmap: self,
locked_entry,
})
} else {
Entry::Vacant(VacantEntry {
hashmap: self,
key,
hash,
locked_entry,
})
}
}
#[inline]
pub async fn entry_async(&self, key: K) -> Entry<K, V, H> {
let hash = self.hash(&key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
if let Ok(locked_entry) = self.reserve_entry(
&key,
hash,
&mut async_wait_pinned,
self.prolonged_barrier_ref(&barrier),
) {
if locked_entry.entry_ptr.is_valid() {
return Entry::Occupied(OccupiedEntry {
hashmap: self,
locked_entry,
});
}
return Entry::Vacant(VacantEntry {
hashmap: self,
key,
hash,
locked_entry,
});
}
}
async_wait_pinned.await;
}
}
#[inline]
pub fn first_occupied_entry(&self) -> Option<OccupiedEntry<K, V, H>> {
let barrier = Barrier::new();
let prolonged_barrier = self.prolonged_barrier_ref(&barrier);
if let Some(locked_entry) = self.lock_first_occupied_entry(prolonged_barrier) {
return Some(OccupiedEntry {
hashmap: self,
locked_entry,
});
}
None
}
#[inline]
pub async fn first_occupied_entry_async(&self) -> Option<OccupiedEntry<K, V, H>> {
let mut current_array_holder = self.array.get_arc(Acquire, &Barrier::new());
while let Some(current_array) = current_array_holder.take() {
self.cleanse_old_array_async(¤t_array).await;
for index in 0..current_array.num_buckets() {
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
let prolonged_barrier = self.prolonged_barrier_ref(&barrier);
let prolonged_current_array = current_array.get_ref_with(prolonged_barrier);
let bucket = prolonged_current_array.bucket_mut(index);
if let Ok(locker) = Locker::try_lock_or_wait(
bucket,
&mut async_wait_pinned,
prolonged_barrier,
) {
if let Some(locker) = locker {
let data_block_mut = prolonged_current_array.data_block_mut(index);
let mut entry_ptr = EntryPtr::new(prolonged_barrier);
if entry_ptr.next(&locker, prolonged_barrier) {
return Some(OccupiedEntry {
hashmap: self,
locked_entry: LockedEntry {
locker,
data_block_mut,
entry_ptr,
index,
},
});
}
}
break;
};
}
async_wait_pinned.await;
}
}
if let Some(new_current_array) = self.array.get_arc(Acquire, &Barrier::new()) {
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
current_array_holder.replace(new_current_array);
continue;
}
break;
}
None
}
#[inline]
pub fn insert(&self, key: K, val: V) -> Result<(), (K, V)> {
let barrier = Barrier::new();
let hash = self.hash(&key);
if let Ok(Some((k, v))) = self.insert_entry(key, val, hash, &mut (), &barrier) {
Err((k, v))
} else {
Ok(())
}
}
#[inline]
pub async fn insert_async(&self, mut key: K, mut val: V) -> Result<(), (K, V)> {
let hash = self.hash(&key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
match self.insert_entry(key, val, hash, &mut async_wait_pinned, &Barrier::new()) {
Ok(Some(returned)) => return Err(returned),
Ok(None) => return Ok(()),
Err(returned) => {
key = returned.0;
val = returned.1;
}
}
async_wait_pinned.await;
}
}
#[inline]
pub fn update<Q, U, R>(&self, key: &Q, updater: U) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
U: FnOnce(&K, &mut V) -> R,
{
let barrier = Barrier::new();
let LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
} = self
.get_entry(key, self.hash(key.borrow()), &mut (), &barrier)
.ok()
.flatten()?;
let (k, v) = entry_ptr.get_mut(data_block_mut, &mut locker);
Some(updater(k, v))
}
#[inline]
pub async fn update_async<Q, U, R>(&self, key: &Q, updater: U) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
U: FnOnce(&K, &mut V) -> R,
{
let hash = self.hash(key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok(result) = self.get_entry(key, hash, &mut async_wait_pinned, &Barrier::new()) {
if let Some(LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
}) = result
{
let (k, v) = entry_ptr.get_mut(data_block_mut, &mut locker);
return Some(updater(k, v));
}
return None;
}
async_wait_pinned.await;
}
}
#[inline]
pub fn upsert<C: FnOnce() -> V, U: FnOnce(&K, &mut V)>(
&self,
key: K,
constructor: C,
updater: U,
) {
let barrier = Barrier::new();
let hash = self.hash(&key);
if let Ok(LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
}) = self.reserve_entry(&key, hash, &mut (), &barrier)
{
if entry_ptr.is_valid() {
let (k, v) = entry_ptr.get_mut(data_block_mut, &mut locker);
updater(k, v);
return;
}
let val = constructor();
locker.insert_with(
data_block_mut,
BucketArray::<K, V, SEQUENTIAL>::partial_hash(hash),
|| (key, val),
&barrier,
);
};
}
#[inline]
pub async fn upsert_async<C: FnOnce() -> V, U: FnOnce(&K, &mut V)>(
&self,
key: K,
constructor: C,
updater: U,
) {
let hash = self.hash(&key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
if let Ok(LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
}) = self.reserve_entry(&key, hash, &mut async_wait_pinned, &barrier)
{
if entry_ptr.is_valid() {
let (k, v) = entry_ptr.get_mut(data_block_mut, &mut locker);
updater(k, v);
} else {
let val = constructor();
locker.insert_with(
data_block_mut,
BucketArray::<K, V, SEQUENTIAL>::partial_hash(hash),
|| (key, val),
&barrier,
);
}
return;
};
}
async_wait_pinned.await;
}
}
#[inline]
pub fn remove<Q>(&self, key: &Q) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if(key, |_| true)
}
#[inline]
pub async fn remove_async<Q>(&self, key: &Q) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if_async(key, |_| true).await
}
#[inline]
pub fn remove_if<Q, F: FnOnce(&mut V) -> bool>(&self, key: &Q, condition: F) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_entry(
key,
self.hash(key.borrow()),
condition,
Option::flatten,
&mut (),
&Barrier::new(),
)
.ok()
.flatten()
}
#[inline]
pub async fn remove_if_async<Q, F: FnOnce(&mut V) -> bool>(
&self,
key: &Q,
mut condition: F,
) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let hash = self.hash(key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
match self.remove_entry(
key,
hash,
condition,
Option::flatten,
&mut async_wait_pinned,
&Barrier::new(),
) {
Ok(r) => return r,
Err(c) => condition = c,
};
async_wait_pinned.await;
}
}
#[inline]
pub fn get<Q>(&self, key: &Q) -> Option<OccupiedEntry<K, V, H>>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let barrier = Barrier::new();
let locked_entry = self
.get_entry(
key,
self.hash(key.borrow()),
&mut (),
self.prolonged_barrier_ref(&barrier),
)
.ok()
.flatten()?;
Some(OccupiedEntry {
hashmap: self,
locked_entry,
})
}
#[inline]
pub async fn get_async<Q>(&self, key: &Q) -> Option<OccupiedEntry<K, V, H>>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let hash = self.hash(key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok(result) = self.get_entry(
key,
hash,
&mut async_wait_pinned,
self.prolonged_barrier_ref(&Barrier::new()),
) {
if let Some(locked_entry) = result {
return Some(OccupiedEntry {
hashmap: self,
locked_entry,
});
}
return None;
}
async_wait_pinned.await;
}
}
#[inline]
pub fn read<Q, R, F: FnOnce(&K, &V) -> R>(&self, key: &Q, reader: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read_entry(key, self.hash(key), &mut (), &Barrier::new())
.ok()
.flatten()
.map(|(k, v)| reader(k, v))
}
#[inline]
pub async fn read_async<Q, R, F: FnOnce(&K, &V) -> R>(&self, key: &Q, reader: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let hash = self.hash(key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok(result) = self.read_entry(key, hash, &mut async_wait_pinned, &Barrier::new())
{
return result.map(|(k, v)| reader(k, v));
}
async_wait_pinned.await;
}
}
#[inline]
pub fn contains<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read(key, |_, _| ()).is_some()
}
#[inline]
pub async fn contains_async<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read_async(key, |_, _| ()).await.is_some()
}
#[inline]
pub fn scan<F: FnMut(&K, &V)>(&self, mut scanner: F) {
self.any(|k, v| {
scanner(k, v);
false
});
}
#[inline]
pub async fn scan_async<F: FnMut(&K, &V)>(&self, mut scanner: F) {
self.any_async(|k, v| {
scanner(k, v);
false
})
.await;
}
#[inline]
pub fn any<P: FnMut(&K, &V) -> bool>(&self, pred: P) -> bool {
self.any_entry(pred)
}
#[inline]
pub async fn any_async<P: FnMut(&K, &V) -> bool>(&self, mut pred: P) -> bool {
let mut current_array_holder = self.array.get_arc(Acquire, &Barrier::new());
while let Some(current_array) = current_array_holder.take() {
self.cleanse_old_array_async(¤t_array).await;
for index in 0..current_array.num_buckets() {
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
let bucket = current_array.bucket(index);
if let Ok(reader) =
Reader::try_lock_or_wait(bucket, &mut async_wait_pinned, &barrier)
{
if let Some(reader) = reader {
let data_block = current_array.data_block(index);
let mut entry_ptr = EntryPtr::new(&barrier);
while entry_ptr.next(*reader, &barrier) {
let (k, v) = entry_ptr.get(data_block);
if pred(k, v) {
return true;
}
}
}
break;
};
}
async_wait_pinned.await;
}
}
if let Some(new_current_array) = self.array.get_arc(Acquire, &Barrier::new()) {
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
current_array_holder.replace(new_current_array);
continue;
}
break;
}
false
}
#[inline]
pub fn for_each<F: FnMut(&K, &mut V)>(&self, mut f: F) {
self.retain(|k, v| {
f(k, v);
true
});
}
#[inline]
pub async fn for_each_async<F: FnMut(&K, &mut V)>(&self, mut f: F) {
self.retain_async(|k, v| {
f(k, v);
true
})
.await;
}
#[inline]
pub fn retain<F: FnMut(&K, &mut V) -> bool>(&self, pred: F) -> (usize, usize) {
self.retain_entries(pred)
}
#[inline]
pub async fn retain_async<F: FnMut(&K, &mut V) -> bool>(&self, mut pred: F) -> (usize, usize) {
let mut num_retained: usize = 0;
let mut num_removed: usize = 0;
let mut current_array_holder = self.array.get_arc(Acquire, &Barrier::new());
while let Some(current_array) = current_array_holder.take() {
self.cleanse_old_array_async(¤t_array).await;
for index in 0..current_array.num_buckets() {
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
let bucket = current_array.bucket_mut(index);
if let Ok(locker) =
Locker::try_lock_or_wait(bucket, &mut async_wait_pinned, &barrier)
{
if let Some(mut locker) = locker {
let data_block_mut = current_array.data_block_mut(index);
let mut entry_ptr = EntryPtr::new(&barrier);
while entry_ptr.next(&locker, &barrier) {
let (k, v) = entry_ptr.get_mut(data_block_mut, &mut locker);
if pred(k, v) {
num_retained = num_retained.saturating_add(1);
} else {
locker.erase(data_block_mut, &mut entry_ptr);
num_removed = num_removed.saturating_add(1);
}
}
}
break;
};
}
async_wait_pinned.await;
}
}
if let Some(new_current_array) = self.array.get_arc(Acquire, &Barrier::new()) {
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
num_retained = 0;
current_array_holder.replace(new_current_array);
continue;
}
break;
}
if num_removed != 0 {
self.try_resize(0, &Barrier::new());
}
(num_retained, num_removed)
}
#[inline]
pub fn prune<F: FnMut(&K, V) -> Option<V>>(&self, pred: F) -> usize {
self.prune_entries(pred)
}
#[inline]
pub async fn prune_async<F: FnMut(&K, V) -> Option<V>>(&self, mut pred: F) -> usize {
let mut num_consumed: usize = 0;
let mut current_array_holder = self.array.get_arc(Acquire, &Barrier::new());
while let Some(current_array) = current_array_holder.take() {
self.cleanse_old_array_async(¤t_array).await;
for index in 0..current_array.num_buckets() {
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
let bucket = current_array.bucket_mut(index);
if let Ok(locker) =
Locker::try_lock_or_wait(bucket, &mut async_wait_pinned, &barrier)
{
if let Some(mut locker) = locker {
let data_block_mut = current_array.data_block_mut(index);
let mut entry_ptr = EntryPtr::new(&barrier);
while entry_ptr.next(&locker, &barrier) {
if locker.keep_or_consume(
data_block_mut,
&mut entry_ptr,
&mut pred,
) {
num_consumed += 1;
}
}
}
break;
};
}
async_wait_pinned.await;
}
}
if let Some(new_current_array) = self.array.get_arc(Acquire, &Barrier::new()) {
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
current_array_holder.replace(new_current_array);
continue;
}
break;
}
if num_consumed != 0 {
self.try_resize(0, &Barrier::new());
}
num_consumed
}
#[inline]
pub fn clear(&self) -> usize {
self.retain(|_, _| false).1
}
#[inline]
pub async fn clear_async(&self) -> usize {
self.retain_async(|_, _| false).await.1
}
#[inline]
pub fn len(&self) -> usize {
self.num_entries(&Barrier::new())
}
#[inline]
pub fn is_empty(&self) -> bool {
!self.has_entry(&Barrier::new())
}
#[inline]
pub fn capacity(&self) -> usize {
self.num_slots(&Barrier::new())
}
#[inline]
pub fn capacity_range(&self) -> RangeInclusive<usize> {
self.minimum_capacity.load(Relaxed)..=self.maximum_capacity()
}
async fn cleanse_old_array_async(&self, current_array: &BucketArray<K, V, SEQUENTIAL>) {
while !current_array.old_array(&Barrier::new()).is_null() {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if self.incremental_rehash::<_, _, false>(
current_array,
&mut async_wait_pinned,
&Barrier::new(),
) == Ok(true)
{
break;
}
async_wait_pinned.await;
}
}
}
impl<K, V> HashMap<K, V, RandomState>
where
K: Eq + Hash,
{
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_hasher(capacity, RandomState::new())
}
}
impl<K, V, H> Clone for HashMap<K, V, H>
where
K: Clone + Eq + Hash,
V: Clone,
H: BuildHasher + Clone,
{
#[inline]
fn clone(&self) -> Self {
let self_clone = Self::with_capacity_and_hasher(self.capacity(), self.hasher().clone());
self.scan(|k, v| {
let _reuslt = self_clone.insert(k.clone(), v.clone());
});
self_clone
}
}
impl<K, V, H> Debug for HashMap<K, V, H>
where
K: Debug + Eq + Hash,
V: Debug,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_map();
self.scan(|k, v| {
d.entry(k, v);
});
d.finish()
}
}
impl<K, V, H> Default for HashMap<K, V, H>
where
K: Eq + Hash,
H: BuildHasher + Default,
{
#[inline]
fn default() -> Self {
Self::with_hasher(H::default())
}
}
impl<K, V, H> Drop for HashMap<K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
fn drop(&mut self) {
self.array
.swap((None, Tag::None), Relaxed)
.0
.map(|a| unsafe {
a.release_drop_in_place()
});
}
}
impl<K, V, H> HashTable<K, V, H, SEQUENTIAL> for HashMap<K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
fn hasher(&self) -> &H {
&self.build_hasher
}
#[inline]
fn try_clone(_: &(K, V)) -> Option<(K, V)> {
None
}
#[inline]
fn try_reset(_: &mut V) {}
#[inline]
fn bucket_array(&self) -> &AtomicArc<BucketArray<K, V, SEQUENTIAL>> {
&self.array
}
#[inline]
fn minimum_capacity(&self) -> &AtomicUsize {
&self.minimum_capacity
}
#[inline]
fn maximum_capacity(&self) -> usize {
1_usize << (usize::BITS - 1)
}
}
impl<K, V, H> PartialEq for HashMap<K, V, H>
where
K: Eq + Hash,
V: PartialEq,
H: BuildHasher,
{
#[inline]
fn eq(&self, other: &Self) -> bool {
if !self.any(|k, v| other.read(k, |_, ov| v == ov) != Some(true)) {
return !other.any(|k, v| self.read(k, |_, sv| v == sv) != Some(true));
}
false
}
}
impl<'h, K, V, H> Entry<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
pub fn or_insert(self, val: V) -> OccupiedEntry<'h, K, V, H> {
self.or_insert_with(|| val)
}
#[inline]
pub fn or_insert_with<F: FnOnce() -> V>(self, constructor: F) -> OccupiedEntry<'h, K, V, H> {
self.or_insert_with_key(|_| constructor())
}
#[inline]
pub fn or_insert_with_key<F: FnOnce(&K) -> V>(
self,
constructor: F,
) -> OccupiedEntry<'h, K, V, H> {
match self {
Self::Occupied(o) => o,
Self::Vacant(v) => {
let val = constructor(v.key());
v.insert_entry(val)
}
}
}
#[inline]
pub fn key(&self) -> &K {
match self {
Self::Occupied(o) => o.key(),
Self::Vacant(v) => v.key(),
}
}
#[inline]
#[must_use]
pub fn and_modify<F>(self, f: F) -> Self
where
F: FnOnce(&mut V),
{
match self {
Self::Occupied(mut o) => {
f(o.get_mut());
Self::Occupied(o)
}
Self::Vacant(_) => self,
}
}
#[inline]
pub fn insert_entry(self, val: V) -> OccupiedEntry<'h, K, V, H> {
match self {
Self::Occupied(mut o) => {
o.insert(val);
o
}
Self::Vacant(v) => v.insert_entry(val),
}
}
}
impl<'h, K, V, H> Entry<'h, K, V, H>
where
K: Eq + Hash,
V: Default,
H: BuildHasher,
{
#[inline]
pub fn or_default(self) -> OccupiedEntry<'h, K, V, H> {
match self {
Self::Occupied(o) => o,
Self::Vacant(v) => v.insert_entry(Default::default()),
}
}
}
impl<'h, K, V, H> Debug for Entry<'h, K, V, H>
where
K: Debug + Eq + Hash,
V: Debug,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Vacant(v) => f.debug_tuple("Entry").field(v).finish(),
Self::Occupied(o) => f.debug_tuple("Entry").field(o).finish(),
}
}
}
impl<'h, K, V, H> OccupiedEntry<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
#[must_use]
pub fn key(&self) -> &K {
&self
.locked_entry
.entry_ptr
.get(self.locked_entry.data_block_mut)
.0
}
#[inline]
#[must_use]
pub fn remove_entry(mut self) -> (K, V) {
let entry = unsafe {
self.locked_entry
.locker
.erase(
self.locked_entry.data_block_mut,
&mut self.locked_entry.entry_ptr,
)
.unwrap_unchecked()
};
if self.locked_entry.locker.num_entries() <= 1 || self.locked_entry.locker.need_rebuild() {
let barrier = Barrier::new();
let hashmap = self.hashmap;
if let Some(current_array) = hashmap.bucket_array().load(Acquire, &barrier).as_ref() {
if current_array.old_array(&barrier).is_null() {
let index = self.locked_entry.index;
if current_array.within_sampling_range(index) {
drop(self);
hashmap.try_shrink_or_rebuild(current_array, index, &barrier);
}
}
}
}
entry
}
#[inline]
#[must_use]
pub fn get(&self) -> &V {
&self
.locked_entry
.entry_ptr
.get(self.locked_entry.data_block_mut)
.1
}
#[inline]
pub fn get_mut(&mut self) -> &mut V {
&mut self
.locked_entry
.entry_ptr
.get_mut(
self.locked_entry.data_block_mut,
&mut self.locked_entry.locker,
)
.1
}
#[inline]
pub fn insert(&mut self, val: V) -> V {
replace(self.get_mut(), val)
}
#[inline]
#[must_use]
pub fn remove(self) -> V {
self.remove_entry().1
}
#[inline]
#[must_use]
pub fn next(mut self) -> Option<Self> {
let barrier = Barrier::new();
let prolonged_barrier = self.hashmap.prolonged_barrier_ref(&barrier);
if self.locked_entry.entry_ptr.next(
&self.locked_entry.locker,
self.hashmap.prolonged_barrier_ref(prolonged_barrier),
) {
return Some(self);
}
let hashmap = self.hashmap;
let current_array_ptr = hashmap.array.load(Acquire, prolonged_barrier);
if let Some(current_array) = current_array_ptr.as_ref() {
if !current_array.old_array(prolonged_barrier).is_null() {
drop(self);
return hashmap.first_occupied_entry();
}
let prev_index = self.locked_entry.index;
drop(self);
for index in (prev_index + 1)..current_array.num_buckets() {
let bucket = current_array.bucket_mut(index);
if let Some(locker) = Locker::lock(bucket, prolonged_barrier) {
let data_block_mut = current_array.data_block_mut(index);
let mut entry_ptr = EntryPtr::new(prolonged_barrier);
if entry_ptr.next(&locker, prolonged_barrier) {
return Some(OccupiedEntry {
hashmap,
locked_entry: LockedEntry {
locker,
data_block_mut,
entry_ptr,
index,
},
});
}
}
}
let new_current_array_ptr = hashmap.array.load(Relaxed, prolonged_barrier);
if current_array_ptr.without_tag() != new_current_array_ptr.without_tag() {
return hashmap.first_occupied_entry();
}
}
None
}
#[inline]
pub async fn next_async(mut self) -> Option<OccupiedEntry<'h, K, V, H>> {
if self.locked_entry.entry_ptr.next(
&self.locked_entry.locker,
self.hashmap.prolonged_barrier_ref(&Barrier::new()),
) {
return Some(self);
}
let hashmap = self.hashmap;
let mut current_array_holder = hashmap.array.get_arc(Acquire, &Barrier::new());
if let Some(current_array) = current_array_holder {
if !current_array.old_array(&Barrier::new()).is_null() {
drop(self);
return hashmap.first_occupied_entry_async().await;
}
let prev_index = self.locked_entry.index;
drop(self);
for index in (prev_index + 1)..current_array.num_buckets() {
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
let prolonged_barrier = hashmap.prolonged_barrier_ref(&barrier);
let prolonged_current_array = current_array.get_ref_with(prolonged_barrier);
let bucket = prolonged_current_array.bucket_mut(index);
if let Ok(locker) = Locker::try_lock_or_wait(
bucket,
&mut async_wait_pinned,
prolonged_barrier,
) {
if let Some(locker) = locker {
let data_block_mut = prolonged_current_array.data_block_mut(index);
let mut entry_ptr = EntryPtr::new(prolonged_barrier);
if entry_ptr.next(&locker, prolonged_barrier) {
return Some(OccupiedEntry {
hashmap,
locked_entry: LockedEntry {
locker,
data_block_mut,
entry_ptr,
index,
},
});
}
}
break;
};
}
async_wait_pinned.await;
}
}
current_array_holder = hashmap.array.get_arc(Relaxed, &Barrier::new());
if let Some(new_current_array) = current_array_holder {
if new_current_array.as_ptr() != current_array.as_ptr() {
return hashmap.first_occupied_entry_async().await;
}
}
}
None
}
}
impl<'h, K, V, H> Debug for OccupiedEntry<'h, K, V, H>
where
K: Debug + Eq + Hash,
V: Debug,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OccupiedEntry")
.field("key", self.key())
.field("value", self.get())
.finish_non_exhaustive()
}
}
impl<'h, K, V, H> VacantEntry<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
pub fn key(&self) -> &K {
&self.key
}
#[inline]
pub fn into_key(self) -> K {
self.key
}
#[inline]
pub fn insert_entry(mut self, val: V) -> OccupiedEntry<'h, K, V, H> {
let barrier = Barrier::new();
let entry_ptr = self.locked_entry.locker.insert_with(
self.locked_entry.data_block_mut,
BucketArray::<K, V, SEQUENTIAL>::partial_hash(self.hash),
|| (self.key, val),
self.hashmap.prolonged_barrier_ref(&barrier),
);
OccupiedEntry {
hashmap: self.hashmap,
locked_entry: LockedEntry {
index: self.locked_entry.index,
data_block_mut: self.locked_entry.data_block_mut,
locker: self.locked_entry.locker,
entry_ptr,
},
}
}
}
impl<'h, K, V, H> Debug for VacantEntry<'h, K, V, H>
where
K: Debug + Eq + Hash,
V: Debug,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("VacantEntry").field(self.key()).finish()
}
}
impl<'h, K, V, H> Reserve<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
#[must_use]
pub fn additional_capacity(&self) -> usize {
self.additional
}
}
impl<'h, K, V, H> AsRef<HashMap<K, V, H>> for Reserve<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
fn as_ref(&self) -> &HashMap<K, V, H> {
self.hashmap
}
}
impl<'h, K, V, H> Debug for Reserve<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Reserve").field(&self.additional).finish()
}
}
impl<'h, K, V, H> Deref for Reserve<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
type Target = HashMap<K, V, H>;
#[inline]
fn deref(&self) -> &Self::Target {
self.hashmap
}
}
impl<'h, K, V, H> Drop for Reserve<'h, K, V, H>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
fn drop(&mut self) {
let result = self
.hashmap
.minimum_capacity
.fetch_sub(self.additional, Relaxed);
self.hashmap.try_resize(0, &Barrier::new());
debug_assert!(result >= self.additional);
}
}