#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
use crate::sync::batch_semaphore as semaphore;
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;
use std::cell::UnsafeCell;
use std::error::Error;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, mem, ptr};
pub struct Mutex<T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
s: semaphore::Semaphore,
c: UnsafeCell<T>,
}
#[clippy::has_significant_drop]
#[must_use = "if unused the Mutex will immediately unlock"]
pub struct MutexGuard<'a, T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: &'a Mutex<T>,
}
#[clippy::has_significant_drop]
pub struct OwnedMutexGuard<T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: Arc<Mutex<T>>,
}
#[clippy::has_significant_drop]
#[must_use = "if unused the Mutex will immediately unlock"]
pub struct MappedMutexGuard<'a, T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
s: &'a semaphore::Semaphore,
data: *mut T,
marker: PhantomData<&'a mut T>,
}
#[clippy::has_significant_drop]
#[must_use = "if unused the Mutex will immediately unlock"]
pub struct OwnedMappedMutexGuard<T: ?Sized, U: ?Sized = T> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
data: *mut U,
lock: Arc<Mutex<T>>,
}
#[allow(dead_code)] struct MutexGuardInner<'a, T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: &'a Mutex<T>,
}
struct OwnedMutexGuardInner<T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: Arc<Mutex<T>>,
}
#[allow(dead_code)] struct MappedMutexGuardInner<'a, T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
s: &'a semaphore::Semaphore,
data: *mut T,
}
#[allow(dead_code)] struct OwnedMappedMutexGuardInner<T: ?Sized, U: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
data: *mut U,
lock: Arc<Mutex<T>>,
}
unsafe impl<T> Send for Mutex<T> where T: ?Sized + Send {}
unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {}
unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<'a, T> Sync for MappedMutexGuard<'a, T> where T: ?Sized + Sync + 'a {}
unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a {}
unsafe impl<T, U> Sync for OwnedMappedMutexGuard<T, U>
where
T: ?Sized + Send + Sync,
U: ?Sized + Send + Sync,
{
}
unsafe impl<T, U> Send for OwnedMappedMutexGuard<T, U>
where
T: ?Sized + Send,
U: ?Sized + Send,
{
}
#[derive(Debug)]
pub struct TryLockError(pub(super) ());
impl fmt::Display for TryLockError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "operation would block")
}
}
impl Error for TryLockError {}
#[test]
#[cfg(not(loom))]
fn bounds() {
fn check_send<T: Send>() {}
fn check_unpin<T: Unpin>() {}
fn check_send_sync_val<T: Send + Sync>(_t: T) {}
fn check_send_sync<T: Send + Sync>() {}
fn check_static<T: 'static>() {}
fn check_static_val<T: 'static>(_t: T) {}
check_send::<MutexGuard<'_, u32>>();
check_send::<OwnedMutexGuard<u32>>();
check_unpin::<Mutex<u32>>();
check_send_sync::<Mutex<u32>>();
check_static::<OwnedMutexGuard<u32>>();
let mutex = Mutex::new(1);
check_send_sync_val(mutex.lock());
let arc_mutex = Arc::new(Mutex::new(1));
check_send_sync_val(arc_mutex.clone().lock_owned());
check_static_val(arc_mutex.lock_owned());
}
impl<T: ?Sized> Mutex<T> {
#[track_caller]
pub fn new(t: T) -> Self
where
T: Sized,
{
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let location = std::panic::Location::caller();
tracing::trace_span!(
"runtime.resource",
concrete_type = "Mutex",
kind = "Sync",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
)
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
let s = resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = false,
);
semaphore::Semaphore::new(1)
});
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let s = semaphore::Semaphore::new(1);
Self {
c: UnsafeCell::new(t),
s,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}
#[cfg(all(feature = "parking_lot", not(all(loom, test)),))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new(t: T) -> Self
where
T: Sized,
{
Self {
c: UnsafeCell::new(t),
s: semaphore::Semaphore::const_new(1),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(),
}
}
pub async fn lock(&self) -> MutexGuard<'_, T> {
let acquire_fut = async {
self.acquire().await;
MutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
let acquire_fut = trace::async_op(
move || acquire_fut,
self.resource_span.clone(),
"Mutex::lock",
"poll",
false,
);
#[allow(clippy::let_and_return)] let guard = acquire_fut.await;
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = true,
);
});
guard
}
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "lock_blocking"))]
pub fn blocking_lock(&self) -> MutexGuard<'_, T> {
crate::future::block_on(self.lock())
}
#[track_caller]
#[cfg(feature = "sync")]
pub fn blocking_lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
crate::future::block_on(self.lock_owned())
}
pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = self.resource_span.clone();
let acquire_fut = async {
self.acquire().await;
OwnedMutexGuard {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
lock: self,
}
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
let acquire_fut = trace::async_op(
move || acquire_fut,
resource_span,
"Mutex::lock_owned",
"poll",
false,
);
#[allow(clippy::let_and_return)] let guard = acquire_fut.await;
#[cfg(all(tokio_unstable, feature = "tracing"))]
guard.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = true,
);
});
guard
}
async fn acquire(&self) {
crate::trace::async_trace_leaf().await;
self.s.acquire(1).await.unwrap_or_else(|_| {
unreachable!()
});
}
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
match self.s.try_acquire(1) {
Ok(_) => {
let guard = MutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = true,
);
});
Ok(guard)
}
Err(_) => Err(TryLockError(())),
}
}
pub fn get_mut(&mut self) -> &mut T {
unsafe {
&mut *self.c.get()
}
}
pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> {
match self.s.try_acquire(1) {
Ok(_) => {
let guard = OwnedMutexGuard {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
lock: self,
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
guard.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = true,
);
});
Ok(guard)
}
Err(_) => Err(TryLockError(())),
}
}
pub fn into_inner(self) -> T
where
T: Sized,
{
self.c.into_inner()
}
}
impl<T> From<T> for Mutex<T> {
fn from(s: T) -> Self {
Self::new(s)
}
}
impl<T> Default for Mutex<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}
impl<T: ?Sized> std::fmt::Debug for Mutex<T>
where
T: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Mutex");
match self.try_lock() {
Ok(inner) => d.field("data", &&*inner),
Err(_) => d.field("data", &format_args!("<locked>")),
};
d.finish()
}
}
impl<'a, T: ?Sized> MutexGuard<'a, T> {
fn skip_drop(self) -> MutexGuardInner<'a, T> {
let me = mem::ManuallyDrop::new(self);
MutexGuardInner {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: unsafe { std::ptr::read(&me.resource_span) },
lock: me.lock,
}
}
#[inline]
pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U>
where
F: FnOnce(&mut T) -> &mut U,
{
let data = f(&mut *this) as *mut U;
let inner = this.skip_drop();
MappedMutexGuard {
s: &inner.lock.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
#[inline]
pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self>
where
F: FnOnce(&mut T) -> Option<&mut U>,
{
let data = match f(&mut *this) {
Some(data) => data as *mut U,
None => return Err(this),
};
let inner = this.skip_drop();
Ok(MappedMutexGuard {
s: &inner.lock.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
#[inline]
pub fn mutex(this: &Self) -> &'a Mutex<T> {
this.lock
}
}
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.lock.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = false,
);
});
}
}
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.c.get() }
}
}
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.lock.c.get() }
}
}
impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
impl<T: ?Sized> OwnedMutexGuard<T> {
fn skip_drop(self) -> OwnedMutexGuardInner<T> {
let me = mem::ManuallyDrop::new(self);
unsafe {
OwnedMutexGuardInner {
lock: ptr::read(&me.lock),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: ptr::read(&me.resource_span),
}
}
}
#[inline]
pub fn map<U, F>(mut this: Self, f: F) -> OwnedMappedMutexGuard<T, U>
where
F: FnOnce(&mut T) -> &mut U,
{
let data = f(&mut *this) as *mut U;
let inner = this.skip_drop();
OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
#[inline]
pub fn try_map<U, F>(mut this: Self, f: F) -> Result<OwnedMappedMutexGuard<T, U>, Self>
where
F: FnOnce(&mut T) -> Option<&mut U>,
{
let data = match f(&mut *this) {
Some(data) => data as *mut U,
None => return Err(this),
};
let inner = this.skip_drop();
Ok(OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
#[inline]
pub fn mutex(this: &Self) -> &Arc<Mutex<T>> {
&this.lock
}
}
impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
fn drop(&mut self) {
self.lock.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = false,
);
});
}
}
impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.c.get() }
}
}
impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.lock.c.get() }
}
}
impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
fn skip_drop(self) -> MappedMutexGuardInner<'a, T> {
let me = mem::ManuallyDrop::new(self);
MappedMutexGuardInner {
s: me.s,
data: me.data,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: unsafe { std::ptr::read(&me.resource_span) },
}
}
#[inline]
pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U>
where
F: FnOnce(&mut T) -> &mut U,
{
let data = f(&mut *this) as *mut U;
let inner = this.skip_drop();
MappedMutexGuard {
s: inner.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
#[inline]
pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self>
where
F: FnOnce(&mut T) -> Option<&mut U>,
{
let data = match f(&mut *this) {
Some(data) => data as *mut U,
None => return Err(this),
};
let inner = this.skip_drop();
Ok(MappedMutexGuard {
s: inner.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
}
impl<'a, T: ?Sized> Drop for MappedMutexGuard<'a, T> {
fn drop(&mut self) {
self.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = false,
);
});
}
}
impl<'a, T: ?Sized> Deref for MappedMutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.data }
}
}
impl<'a, T: ?Sized> DerefMut for MappedMutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.data }
}
}
impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<'a, T: ?Sized + fmt::Display> fmt::Display for MappedMutexGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
impl<T: ?Sized, U: ?Sized> OwnedMappedMutexGuard<T, U> {
fn skip_drop(self) -> OwnedMappedMutexGuardInner<T, U> {
let me = mem::ManuallyDrop::new(self);
unsafe {
OwnedMappedMutexGuardInner {
data: me.data,
lock: ptr::read(&me.lock),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: ptr::read(&me.resource_span),
}
}
}
#[inline]
pub fn map<S, F>(mut this: Self, f: F) -> OwnedMappedMutexGuard<T, S>
where
F: FnOnce(&mut U) -> &mut S,
{
let data = f(&mut *this) as *mut S;
let inner = this.skip_drop();
OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
#[inline]
pub fn try_map<S, F>(mut this: Self, f: F) -> Result<OwnedMappedMutexGuard<T, S>, Self>
where
F: FnOnce(&mut U) -> Option<&mut S>,
{
let data = match f(&mut *this) {
Some(data) => data as *mut S,
None => return Err(this),
};
let inner = this.skip_drop();
Ok(OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
}
impl<T: ?Sized, U: ?Sized> Drop for OwnedMappedMutexGuard<T, U> {
fn drop(&mut self) {
self.lock.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = false,
);
});
}
}
impl<T: ?Sized, U: ?Sized> Deref for OwnedMappedMutexGuard<T, U> {
type Target = U;
fn deref(&self) -> &Self::Target {
unsafe { &*self.data }
}
}
impl<T: ?Sized, U: ?Sized> DerefMut for OwnedMappedMutexGuard<T, U> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.data }
}
}
impl<T: ?Sized, U: ?Sized + fmt::Debug> fmt::Debug for OwnedMappedMutexGuard<T, U> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: ?Sized, U: ?Sized + fmt::Display> fmt::Display for OwnedMappedMutexGuard<T, U> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}