use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime};
use reqwest::header::{Headers, RetryAfter};
use reqwest::{Client, Proxy, StatusCode};
use api::protocol::Event;
use Dsn;
#[derive(Debug)]
struct RealTransportImpl {
sender: Mutex<SyncSender<Option<Event<'static>>>>,
drain_signal: Arc<Condvar>,
queue_size: Arc<Mutex<usize>>,
_handle: Option<JoinHandle<()>>,
}
#[cfg(any(test, feature = "with_test_support"))]
#[derive(Debug)]
struct TestTransportImpl {
collected: Mutex<Vec<Event<'static>>>,
}
#[derive(Debug)]
enum TransportImpl {
Real(RealTransportImpl),
#[cfg(any(test, feature = "with_test_support"))]
Test(TestTransportImpl),
}
#[derive(Debug)]
pub struct Transport {
dsn: Dsn,
inner: TransportImpl,
}
fn spawn_http_sender(
client: Client,
receiver: Receiver<Option<Event<'static>>>,
dsn: Dsn,
signal: Arc<Condvar>,
queue_size: Arc<Mutex<usize>>,
user_agent: String,
) -> JoinHandle<()> {
let mut disabled: Option<(Instant, RetryAfter)> = None;
thread::spawn(move || {
let url = dsn.store_api_url().to_string();
while let Some(event) = receiver.recv().unwrap_or(None) {
match disabled {
Some((disabled_at, RetryAfter::Delay(disabled_for))) => {
if disabled_at.elapsed() > disabled_for {
disabled = None;
} else {
continue;
}
}
Some((_, RetryAfter::DateTime(wait_until))) => {
if SystemTime::from(wait_until) > SystemTime::now() {
disabled = None;
} else {
continue;
}
}
None => {}
}
let auth = dsn.to_auth(Some(&user_agent));
let mut headers = Headers::new();
headers.set_raw("X-Sentry-Auth", auth.to_string());
if let Ok(resp) = client
.post(url.as_str())
.json(&event)
.headers(headers)
.send()
{
if resp.status() == StatusCode::TooManyRequests {
disabled = resp.headers()
.get::<RetryAfter>()
.map(|x| (Instant::now(), *x));
}
}
let mut size = queue_size.lock().unwrap();
*size -= 1;
if *size == 0 {
signal.notify_all();
}
}
})
}
impl Transport {
pub fn new(
dsn: Dsn,
user_agent: String,
http_proxy: Option<&str>,
https_proxy: Option<&str>,
) -> Transport {
let (sender, receiver) = sync_channel(30);
let drain_signal = Arc::new(Condvar::new());
#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))]
let queue_size = Arc::new(Mutex::new(0));
let mut client = Client::builder();
if let Some(url) = http_proxy {
client.proxy(Proxy::http(url).unwrap());
};
if let Some(url) = https_proxy {
client.proxy(Proxy::https(url).unwrap());
};
let _handle = Some(spawn_http_sender(
client.build().unwrap(),
receiver,
dsn.clone(),
drain_signal.clone(),
queue_size.clone(),
user_agent,
));
Transport {
dsn,
inner: TransportImpl::Real(RealTransportImpl {
sender: Mutex::new(sender),
drain_signal,
queue_size,
_handle,
}),
}
}
#[cfg(any(test, feature = "with_test_support"))]
pub fn testable(dsn: Dsn) -> Transport {
Transport {
dsn,
inner: TransportImpl::Test(TestTransportImpl {
collected: Mutex::new(vec![]),
}),
}
}
pub fn dsn(&self) -> &Dsn {
&self.dsn
}
pub fn send_event(&self, event: Event<'static>) {
match self.inner {
TransportImpl::Real(ref ti) => {
*ti.queue_size.lock().unwrap() += 1;
if ti.sender.lock().unwrap().try_send(Some(event)).is_err() {
*ti.queue_size.lock().unwrap() -= 1;
}
}
#[cfg(any(test, feature = "with_test_support"))]
TransportImpl::Test(ref ti) => {
ti.collected.lock().unwrap().push(event);
}
}
}
pub fn drain(&self, timeout: Option<Duration>) -> bool {
match self.inner {
TransportImpl::Real(ref ti) => {
let guard = ti.queue_size.lock().unwrap();
if *guard == 0 {
return true;
}
if let Some(timeout) = timeout {
ti.drain_signal.wait_timeout(guard, timeout).is_ok()
} else {
ti.drain_signal.wait(guard).is_ok()
}
}
#[cfg(any(test, feature = "with_test_support"))]
TransportImpl::Test(..) => true,
}
}
#[cfg(any(test, feature = "with_test_support"))]
pub fn fetch_and_clear_events(&self) -> Vec<Event<'static>> {
match self.inner {
TransportImpl::Real(..) => {
panic!("Can only fetch events from testable transports");
}
#[cfg(any(test, feature = "with_test_support"))]
TransportImpl::Test(ref ti) => {
use std::mem;
let mut guard = ti.collected.lock().unwrap();
mem::replace(&mut *guard, vec![])
}
}
}
#[cfg(any(test, feature = "with_test_support"))]
pub fn is_test(&self) -> bool {
match self.inner {
TransportImpl::Real(..) => false,
#[cfg(any(test, feature = "with_test_support"))]
TransportImpl::Test(..) => true,
}
}
}
impl Drop for Transport {
fn drop(&mut self) {
match self.inner {
TransportImpl::Real(ref ti) => {
if let Ok(sender) = ti.sender.lock() {
sender.send(None).ok();
}
}
#[cfg(any(test, feature = "with_test_support"))]
TransportImpl::Test(..) => {}
}
}
}