use crate::errors::*;
use crate::{Counter, Histogram};
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::marker::PhantomData;
use std::ops::{AddAssign, Deref, DerefMut};
use std::sync::{atomic, Arc, Mutex};
use std::time;
#[derive(Debug)]
pub struct Recorder<C: Counter> {
local: Histogram<C>,
shared: Arc<Shared<C>>,
last_phase: usize,
}
impl<C: Counter> AddAssign<u64> for Recorder<C> {
fn add_assign(&mut self, value: u64) {
self.record(value).unwrap();
}
}
impl<C: Counter> Clone for Recorder<C> {
fn clone(&self) -> Self {
{
let mut truth = self.shared.truth.lock().unwrap();
truth.recorders += 1;
}
Recorder {
local: Histogram::new_from(&self.local),
shared: self.shared.clone(),
last_phase: self.last_phase,
}
}
}
impl<C: Counter> Drop for Recorder<C> {
fn drop(&mut self) {
let mut truth = self.shared.truth.lock().unwrap();
truth.recorders -= 1;
let h = Histogram::new_from(&self.local);
let h = std::mem::replace(&mut self.local, h);
let _ = self.shared.sender.send(h).is_ok();
drop(truth);
}
}
#[derive(Debug)]
struct Critical {
recorders: usize,
}
#[derive(Debug)]
struct Shared<C: Counter> {
truth: Mutex<Critical>,
sender: crossbeam_channel::Sender<Histogram<C>>,
phase: atomic::AtomicUsize,
}
pub type IdleRecorderGuard<'a, C> = IdleRecorder<&'a mut Recorder<C>, C>;
#[derive(Debug)]
pub struct IdleRecorder<T, C: Counter>
where
T: BorrowMut<Recorder<C>>,
{
recorder: Option<T>,
c: PhantomData<C>,
}
impl<T, C: Counter> IdleRecorder<T, C>
where
T: BorrowMut<Recorder<C>>,
{
fn reactivate(&mut self) {
let recorder = if let Some(ref mut r) = self.recorder {
r
} else {
return;
};
let recorder = recorder.borrow_mut();
let mut crit = recorder.shared.truth.lock().unwrap();
crit.recorders += 1;
recorder.last_phase = recorder.shared.phase.load(atomic::Ordering::Acquire);
drop(crit);
}
}
impl<C: Counter> IdleRecorder<Recorder<C>, C> {
pub fn activate(mut self) -> Recorder<C> {
self.reactivate();
self.recorder.take().unwrap()
}
pub fn recorder(&self) -> Recorder<C> {
self.recorder.as_ref().unwrap().clone()
}
}
impl<T, C: Counter> Drop for IdleRecorder<T, C>
where
T: BorrowMut<Recorder<C>>,
{
fn drop(&mut self) {
self.reactivate()
}
}
impl<C: Counter> Recorder<C> {
fn with_hist<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Histogram<C>) -> R,
{
let r = f(&mut self.local);
let phase = self.shared.phase.load(atomic::Ordering::Acquire);
if phase != self.last_phase {
self.update();
self.last_phase = phase;
}
r
}
fn shed(&mut self) -> Histogram<C> {
let h = Histogram::new_from(&self.local);
std::mem::replace(&mut self.local, h)
}
fn update(&mut self) {
let h = self.shed();
let _ = self.shared.sender.send(h).is_ok(); }
fn deactivate(&mut self) {
let phase;
{
let mut crit = self.shared.truth.lock().unwrap();
crit.recorders -= 1;
phase = self.shared.phase.load(atomic::Ordering::Acquire);
if phase != self.last_phase {
let h = Histogram::new_from(&self.local);
let h = std::mem::replace(&mut self.local, h);
let _ = self.shared.sender.send(h).is_ok(); }
}
self.last_phase = phase;
}
pub fn idle(&mut self) -> IdleRecorderGuard<C> {
self.deactivate();
IdleRecorder {
recorder: Some(self),
c: PhantomData,
}
}
pub fn into_idle(mut self) -> IdleRecorder<Self, C> {
self.deactivate();
IdleRecorder {
recorder: Some(self),
c: PhantomData,
}
}
pub fn add<B: Borrow<Histogram<C>>>(&mut self, source: B) -> Result<(), AdditionError> {
self.with_hist(move |h| h.add(source))
}
pub fn add_correct<B: Borrow<Histogram<C>>>(
&mut self,
source: B,
interval: u64,
) -> Result<(), RecordError> {
self.with_hist(move |h| h.add_correct(source, interval))
}
pub fn subtract<B: Borrow<Histogram<C>>>(
&mut self,
subtrahend: B,
) -> Result<(), SubtractionError> {
self.with_hist(move |h| h.subtract(subtrahend))
}
pub fn record(&mut self, value: u64) -> Result<(), RecordError> {
self.with_hist(move |h| h.record(value))
}
pub fn saturating_record(&mut self, value: u64) {
self.with_hist(move |h| h.saturating_record(value))
}
pub fn record_n(&mut self, value: u64, count: C) -> Result<(), RecordError> {
self.with_hist(move |h| h.record_n(value, count))
}
pub fn saturating_record_n(&mut self, value: u64, count: C) {
self.with_hist(move |h| h.saturating_record_n(value, count))
}
pub fn record_correct(&mut self, value: u64, interval: u64) -> Result<(), RecordError> {
self.with_hist(move |h| h.record_correct(value, interval))
}
pub fn record_n_correct(
&mut self,
value: u64,
count: C,
interval: u64,
) -> Result<(), RecordError> {
self.with_hist(move |h| h.record_n_correct(value, count, interval))
}
}
#[derive(Debug)]
pub struct SyncHistogram<C: Counter> {
merged: Histogram<C>,
shared: Arc<Shared<C>>,
receiver: crossbeam_channel::Receiver<Histogram<C>>,
}
impl<C: Counter> SyncHistogram<C> {
fn refresh_inner(&mut self, timeout: Option<time::Duration>) {
let end = timeout.map(|dur| time::Instant::now() + dur);
while let Ok(h) = self.receiver.try_recv() {
self.merged
.add(&h)
.expect("TODO: failed to merge histogram");
}
let recorders = self.shared.truth.lock().unwrap().recorders;
let _ = self.shared.phase.fetch_add(1, atomic::Ordering::AcqRel);
let mut phased = 0;
while phased < recorders {
let h = if let Some(end) = end {
let now = time::Instant::now();
if now > end {
break;
}
match self.receiver.recv_timeout(end - now) {
Ok(h) => h,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => break,
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => unreachable!(),
}
} else {
self.receiver
.recv()
.expect("SyncHistogram has an Arc<Shared> with a Receiver")
};
self.merged
.add(&h)
.expect("TODO: failed to merge histogram");
phased += 1;
}
while let Ok(h) = self.receiver.try_recv() {
self.merged
.add(&h)
.expect("TODO: failed to merge histogram");
}
}
pub fn refresh(&mut self) {
self.refresh_inner(None)
}
pub fn refresh_timeout(&mut self, timeout: time::Duration) {
self.refresh_inner(Some(timeout))
}
pub fn recorder(&self) -> Recorder<C> {
{
let mut truth = self.shared.truth.lock().unwrap();
truth.recorders += 1;
}
Recorder {
local: Histogram::new_from(&self.merged),
shared: self.shared.clone(),
last_phase: self.shared.phase.load(atomic::Ordering::Acquire),
}
}
}
impl<C: Counter> From<Histogram<C>> for SyncHistogram<C> {
fn from(h: Histogram<C>) -> Self {
let (tx, rx) = crossbeam_channel::unbounded();
SyncHistogram {
merged: h,
receiver: rx,
shared: Arc::new(Shared {
truth: Mutex::new(Critical { recorders: 0 }),
sender: tx,
phase: atomic::AtomicUsize::new(0),
}),
}
}
}
impl<C: Counter> Deref for SyncHistogram<C> {
type Target = Histogram<C>;
fn deref(&self) -> &Self::Target {
&self.merged
}
}
impl<C: Counter> DerefMut for SyncHistogram<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.merged
}
}