use super::ebr::{Arc, AtomicArc, Barrier};
use super::hash_table::cell::{Locker, Reader};
use super::hash_table::cell_array::CellArray;
use super::hash_table::HashTable;
use super::wait_queue::AsyncWait;
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::fmt::{self, Debug};
use std::hash::{BuildHasher, Hash};
use std::pin::Pin;
use std::sync::atomic::Ordering::{Acquire, Relaxed};
use std::sync::atomic::{AtomicU8, AtomicUsize};
pub struct HashMap<K, V, H = RandomState>
where
K: 'static + Eq + Hash + Sync,
V: 'static + Sync,
H: BuildHasher,
{
array: AtomicArc<CellArray<K, V, false>>,
minimum_capacity: usize,
additional_capacity: AtomicUsize,
resize_mutex: AtomicU8,
build_hasher: H,
}
impl<K, V, H> HashMap<K, V, H>
where
K: 'static + Eq + Hash + Sync,
V: 'static + Sync,
H: BuildHasher,
{
#[inline]
pub fn with_hasher(build_hasher: H) -> HashMap<K, V, H> {
HashMap {
array: AtomicArc::new(CellArray::<K, V, false>::new(
Self::default_capacity(),
AtomicArc::null(),
)),
minimum_capacity: Self::default_capacity(),
additional_capacity: AtomicUsize::new(0),
resize_mutex: AtomicU8::new(0),
build_hasher,
}
}
#[inline]
pub fn with_capacity_and_hasher(capacity: usize, build_hasher: H) -> HashMap<K, V, H> {
let initial_capacity = capacity.max(Self::default_capacity());
let array = Arc::new(CellArray::<K, V, false>::new(
initial_capacity,
AtomicArc::null(),
));
let current_capacity = array.num_entries();
HashMap {
array: AtomicArc::from(array),
minimum_capacity: current_capacity,
additional_capacity: AtomicUsize::new(0),
resize_mutex: AtomicU8::new(0),
build_hasher,
}
}
#[inline]
pub fn reserve(&self, capacity: usize) -> Option<Ticket<K, V, H>> {
let mut current_additional_capacity = self.additional_capacity.load(Relaxed);
loop {
if usize::MAX - self.minimum_capacity - current_additional_capacity <= capacity {
return None;
}
match self.additional_capacity.compare_exchange(
current_additional_capacity,
current_additional_capacity + capacity,
Relaxed,
Relaxed,
) {
Ok(_) => {
self.resize(&Barrier::new());
return Some(Ticket {
hash_map: self,
increment: capacity,
});
}
Err(current) => current_additional_capacity = current,
}
}
}
#[inline]
pub fn insert(&self, key: K, val: V) -> Result<(), (K, V)> {
let (hash, partial_hash) = self.hash(&key);
if let Ok(Some((k, v))) =
self.insert_entry(key, val, hash, partial_hash, None, &Barrier::new())
{
Err((k, v))
} else {
Ok(())
}
}
#[inline]
pub async fn insert_async(&self, mut key: K, mut val: V) -> Result<(), (K, V)> {
let (hash, partial_hash) = self.hash(&key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
match self.insert_entry(
key,
val,
hash,
partial_hash,
Some(async_wait_pinned.mut_ptr()),
&Barrier::new(),
) {
Ok(Some(returned)) => return Err(returned),
Ok(None) => return Ok(()),
Err(returned) => {
key = returned.0;
val = returned.1;
}
}
async_wait_pinned.await;
}
}
#[inline]
pub fn update<Q, F, R>(&self, key_ref: &Q, updater: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
F: FnOnce(&K, &mut V) -> R,
{
let (hash, partial_hash) = self.hash(key_ref);
let barrier = Barrier::new();
let (_, _locker, iterator) = self
.acquire::<Q>(key_ref, hash, partial_hash, None, &barrier)
.ok()?;
if let Some(iterator) = iterator {
let (k, v) = iterator.get();
#[allow(clippy::cast_ref_to_mut)]
return Some(updater(k, unsafe { &mut *(v as *const V as *mut V) }));
}
None
}
#[inline]
pub async fn update_async<Q, F, R>(&self, key_ref: &Q, updater: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
F: FnOnce(&K, &mut V) -> R,
{
let (hash, partial_hash) = self.hash(key_ref);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok((_, _locker, iterator)) = self.acquire::<Q>(
key_ref,
hash,
partial_hash,
Some(async_wait_pinned.mut_ptr()),
&Barrier::new(),
) {
if let Some(iterator) = iterator {
let (k, v) = iterator.get();
#[allow(clippy::cast_ref_to_mut)]
return Some(updater(k, unsafe { &mut *(v as *const V as *mut V) }));
}
return None;
}
async_wait_pinned.await;
}
}
#[inline]
pub fn upsert<FI: FnOnce() -> V, FU: FnOnce(&K, &mut V)>(
&self,
key: K,
constructor: FI,
updater: FU,
) {
let (hash, partial_hash) = self.hash(&key);
let barrier = Barrier::new();
if let Ok((_, locker, iterator)) =
self.acquire::<_>(&key, hash, partial_hash, None, &barrier)
{
if let Some(iterator) = iterator {
let (k, v) = iterator.get();
#[allow(clippy::cast_ref_to_mut)]
updater(k, unsafe { &mut *(v as *const V as *mut V) });
return;
}
locker.insert(key, constructor(), partial_hash, &barrier);
};
}
#[inline]
pub async fn upsert_async<FI: FnOnce() -> V, FU: FnOnce(&K, &mut V)>(
&self,
key: K,
constructor: FI,
updater: FU,
) {
let (hash, partial_hash) = self.hash(&key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok((_, locker, iterator)) = self.acquire::<_>(
&key,
hash,
partial_hash,
Some(async_wait_pinned.mut_ptr()),
&Barrier::new(),
) {
if let Some(iterator) = iterator {
let (k, v) = iterator.get();
#[allow(clippy::cast_ref_to_mut)]
updater(k, unsafe { &mut *(v as *const V as *mut V) });
return;
}
locker.insert(key, constructor(), partial_hash, &Barrier::new());
return;
}
async_wait_pinned.await;
}
}
#[inline]
pub fn remove<Q>(&self, key_ref: &Q) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if(key_ref, |_| true)
}
#[inline]
pub async fn remove_async<Q>(&self, key_ref: &Q) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if_async(key_ref, |_| true).await
}
#[inline]
pub fn remove_if<Q, F: FnMut(&V) -> bool>(
&self,
key_ref: &Q,
mut condition: F,
) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let (hash, partial_hash) = self.hash(key_ref);
self.remove_entry::<Q, _>(
key_ref,
hash,
partial_hash,
&mut condition,
None,
&Barrier::new(),
)
.ok()
.and_then(|(r, _)| r)
}
#[inline]
pub async fn remove_if_async<Q, F: FnMut(&V) -> bool>(
&self,
key_ref: &Q,
mut condition: F,
) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let (hash, partial_hash) = self.hash(key_ref);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok(result) = self.remove_entry::<Q, F>(
key_ref,
hash,
partial_hash,
&mut condition,
Some(async_wait_pinned.mut_ptr()),
&Barrier::new(),
) {
return result.0;
}
async_wait_pinned.await;
}
}
#[inline]
pub fn read<Q, R, F: FnMut(&K, &V) -> R>(&self, key_ref: &Q, mut reader: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let barrier = Barrier::new();
let (hash, partial_hash) = self.hash(key_ref);
self.read_entry::<Q, R, F>(key_ref, hash, partial_hash, &mut reader, None, &barrier)
.ok()
.and_then(|r| r)
}
#[inline]
pub async fn read_async<Q, R, F: FnMut(&K, &V) -> R>(
&self,
key_ref: &Q,
mut reader: F,
) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let (hash, partial_hash) = self.hash(key_ref);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok(result) = self.read_entry::<Q, R, _>(
key_ref,
hash,
partial_hash,
&mut reader,
Some(async_wait_pinned.mut_ptr()),
&Barrier::new(),
) {
return result;
}
async_wait_pinned.await;
}
}
#[inline]
pub fn contains<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read(key, |_, _| ()).is_some()
}
#[inline]
pub async fn contains_async<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read_async(key, |_, _| ()).await.is_some()
}
pub fn scan<F: FnMut(&K, &V)>(&self, mut scanner: F) {
let barrier = Barrier::new();
let mut current_array_ptr = self.array.load(Acquire, &barrier);
while let Some(current_array_ref) = current_array_ptr.as_ref() {
while !current_array_ref.old_array(&barrier).is_null() {
if current_array_ref.partial_rehash::<_, _, _>(
|key| self.hash(key),
|_, _| None,
None,
&barrier,
) == Ok(true)
{
break;
}
}
for cell_index in 0..current_array_ref.num_cells() {
if let Some(locker) = Reader::lock(current_array_ref.cell(cell_index), &barrier) {
locker
.cell()
.iter(&barrier)
.for_each(|((k, v), _)| scanner(k, v));
}
}
let new_current_array_ptr = self.array.load(Acquire, &barrier);
if current_array_ptr == new_current_array_ptr {
break;
}
current_array_ptr = new_current_array_ptr;
}
}
pub async fn scan_async<F: FnMut(&K, &V)>(&self, mut scanner: F) {
let mut current_array_holder = self.array.get_arc(Acquire, &Barrier::new());
while let Some(current_array) = current_array_holder.take() {
while !current_array.old_array(&Barrier::new()).is_null() {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if current_array.partial_rehash::<_, _, _>(
|key| self.hash(key),
|_, _| None,
Some(async_wait_pinned.mut_ptr()),
&Barrier::new(),
) == Ok(true)
{
break;
}
async_wait_pinned.await;
}
for cell_index in 0..current_array.num_cells() {
let killed = loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
if let Ok(result) = Reader::try_lock_or_wait(
current_array.cell(cell_index),
async_wait_pinned.mut_ptr(),
&barrier,
) {
if let Some(locker) = result {
let mut iterator = locker.cell().iter(&barrier);
while iterator.next().is_some() {
let (k, v) = iterator.get();
scanner(k, v);
}
break false;
}
break true;
};
}
async_wait_pinned.await;
};
if killed {
break;
}
}
if let Some(new_current_array) = self.array.get_arc(Acquire, &Barrier::new()) {
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
current_array_holder.replace(new_current_array);
continue;
}
break;
}
}
#[inline]
pub fn for_each<F: FnMut(&K, &mut V)>(&self, mut f: F) {
self.retain(|k, v| {
f(k, v);
true
});
}
#[inline]
pub async fn for_each_async<F: FnMut(&K, &mut V)>(&self, mut f: F) {
self.retain_async(|k, v| {
f(k, v);
true
})
.await;
}
pub fn retain<F: FnMut(&K, &mut V) -> bool>(&self, mut filter: F) -> (usize, usize) {
let mut num_retained: usize = 0;
let mut num_removed: usize = 0;
let barrier = Barrier::new();
let mut current_array_ptr = self.array.load(Acquire, &barrier);
while let Some(current_array) = current_array_ptr.as_ref() {
while !current_array.old_array(&barrier).is_null() {
if current_array.partial_rehash::<_, _, _>(
|key| self.hash(key),
|_, _| None,
None,
&barrier,
) == Ok(true)
{
break;
}
}
debug_assert!(current_array.old_array(&barrier).is_null());
for cell_index in 0..current_array.num_cells() {
if let Some(locker) = Locker::lock(current_array.cell(cell_index), &barrier) {
let mut iterator = locker.cell().iter(&barrier);
while iterator.next().is_some() {
let (k, v) = iterator.get();
#[allow(clippy::cast_ref_to_mut)]
let retain = filter(k, unsafe { &mut *(v as *const V as *mut V) });
if retain {
num_retained = num_retained.saturating_add(1);
} else {
locker.erase(&mut iterator);
num_removed = num_removed.saturating_add(1);
}
}
}
}
let new_current_array_ptr = self.array.load(Acquire, &barrier);
if current_array_ptr == new_current_array_ptr {
break;
}
num_retained = 0;
current_array_ptr = new_current_array_ptr;
}
if num_removed >= num_retained {
self.resize(&barrier);
}
(num_retained, num_removed)
}
pub async fn retain_async<F: FnMut(&K, &mut V) -> bool>(
&self,
mut filter: F,
) -> (usize, usize) {
let mut num_retained: usize = 0;
let mut num_removed: usize = 0;
let mut current_array_holder = self.array.get_arc(Acquire, &Barrier::new());
while let Some(current_array) = current_array_holder.take() {
while !current_array.old_array(&Barrier::new()).is_null() {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if current_array.partial_rehash::<_, _, _>(
|key| self.hash(key),
|_, _| None,
Some(async_wait_pinned.mut_ptr()),
&Barrier::new(),
) == Ok(true)
{
break;
}
async_wait_pinned.await;
}
debug_assert!(current_array.old_array(&Barrier::new()).is_null());
for cell_index in 0..current_array.num_cells() {
let killed = loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
if let Ok(result) = Locker::try_lock_or_wait(
current_array.cell(cell_index),
async_wait_pinned.mut_ptr(),
&barrier,
) {
if let Some(locker) = result {
let mut iterator = locker.cell().iter(&barrier);
while iterator.next().is_some() {
let (k, v) = iterator.get();
#[allow(clippy::cast_ref_to_mut)]
let retain =
filter(k, unsafe { &mut *(v as *const V as *mut V) });
if retain {
num_retained = num_retained.saturating_add(1);
} else {
locker.erase(&mut iterator);
num_removed = num_removed.saturating_add(1);
}
}
break false;
}
break true;
};
}
async_wait_pinned.await;
};
if killed {
break;
}
}
if let Some(new_current_array) = self.array.get_arc(Acquire, &Barrier::new()) {
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
num_retained = 0;
current_array_holder.replace(new_current_array);
continue;
}
break;
}
if num_removed >= num_retained {
self.resize(&Barrier::new());
}
(num_retained, num_removed)
}
#[inline]
pub fn clear(&self) -> usize {
self.retain(|_, _| false).1
}
#[inline]
pub async fn clear_async(&self) -> usize {
self.retain_async(|_, _| false).await.1
}
#[inline]
pub fn len(&self) -> usize {
self.num_entries(&Barrier::new())
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn capacity(&self) -> usize {
self.num_slots(&Barrier::new())
}
}
impl<K, V, H> Clone for HashMap<K, V, H>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
H: BuildHasher + Clone,
{
#[inline]
fn clone(&self) -> Self {
let cloned = Self::with_capacity_and_hasher(self.capacity(), self.hasher().clone());
self.scan(|k, v| {
let _reuslt = cloned.insert(k.clone(), v.clone());
});
cloned
}
}
impl<K, V, H> Debug for HashMap<K, V, H>
where
K: 'static + Debug + Eq + Hash + Sync,
V: 'static + Debug + Sync,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_map();
self.scan(|k, v| {
d.entry(k, v);
});
d.finish()
}
}
impl<K, V> HashMap<K, V, RandomState>
where
K: 'static + Eq + Hash + Sync,
V: 'static + Sync,
{
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
#[must_use]
pub fn with_capacity(capacity: usize) -> HashMap<K, V, RandomState> {
let initial_capacity = capacity.max(Self::default_capacity());
let array = Arc::new(CellArray::<K, V, false>::new(
initial_capacity,
AtomicArc::null(),
));
let current_capacity = array.num_entries();
HashMap {
array: AtomicArc::from(array),
minimum_capacity: current_capacity,
additional_capacity: AtomicUsize::new(0),
resize_mutex: AtomicU8::new(0),
build_hasher: RandomState::new(),
}
}
}
impl<K, V> Default for HashMap<K, V, RandomState>
where
K: 'static + Eq + Hash + Sync,
V: 'static + Sync,
{
#[inline]
fn default() -> Self {
HashMap {
array: AtomicArc::new(CellArray::<K, V, false>::new(
Self::default_capacity(),
AtomicArc::null(),
)),
minimum_capacity: Self::default_capacity(),
additional_capacity: AtomicUsize::new(0),
resize_mutex: AtomicU8::new(0),
build_hasher: RandomState::new(),
}
}
}
impl<K, V, H> HashTable<K, V, H, false> for HashMap<K, V, H>
where
K: 'static + Eq + Hash + Sync,
V: 'static + Sync,
H: BuildHasher,
{
#[inline]
fn hasher(&self) -> &H {
&self.build_hasher
}
#[inline]
fn copier(_: &K, _: &V) -> Option<(K, V)> {
None
}
#[inline]
fn cell_array(&self) -> &AtomicArc<CellArray<K, V, false>> {
&self.array
}
#[inline]
fn minimum_capacity(&self) -> usize {
self.minimum_capacity + self.additional_capacity.load(Relaxed)
}
#[inline]
fn resize_mutex(&self) -> &AtomicU8 {
&self.resize_mutex
}
}
pub struct Ticket<'h, K, V, H>
where
K: 'static + Eq + Hash + Sync,
V: 'static + Sync,
H: BuildHasher,
{
hash_map: &'h HashMap<K, V, H>,
increment: usize,
}
impl<'h, K, V, H> Drop for Ticket<'h, K, V, H>
where
K: 'static + Eq + Hash + Sync,
V: 'static + Sync,
H: BuildHasher,
{
#[inline]
fn drop(&mut self) {
let result = self
.hash_map
.additional_capacity
.fetch_sub(self.increment, Relaxed);
self.hash_map.resize(&Barrier::new());
debug_assert!(result >= self.increment);
}
}