use std::collections::{HashMap, VecDeque};
use std::io::{Read, Result as IoResult};
use crate::stream::Stream;
use crate::unit::Unit;
use url::Url;
pub const DEFAULT_HOST: &str = "localhost";
const MAX_IDLE_CONNECTIONS: usize = 100;
#[derive(Default, Debug)]
pub(crate) struct ConnectionPool {
recycle: HashMap<PoolKey, Stream>,
lru: VecDeque<PoolKey>,
}
impl ConnectionPool {
pub fn new() -> Self {
ConnectionPool {
..Default::default()
}
}
pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> {
let key = PoolKey::new(url);
self.remove(&key)
}
fn remove(&mut self, key: &PoolKey) -> Option<Stream> {
if !self.recycle.contains_key(&key) {
return None;
}
let index = self.lru.iter().position(|k| k == key);
assert!(
index.is_some(),
"invariant failed: key existed in recycle but not lru"
);
self.lru.remove(index.unwrap());
self.recycle.remove(&key)
}
fn add(&mut self, key: PoolKey, stream: Stream) {
self.remove(&key);
if self.recycle.len() + 1 > MAX_IDLE_CONNECTIONS {
self.remove_oldest();
}
self.lru.push_back(key.clone());
self.recycle.insert(key, stream);
}
fn remove_oldest(&mut self) {
if let Some(key) = self.lru.pop_front() {
let removed = self.recycle.remove(&key);
assert!(
removed.is_some(),
"invariant failed: key existed in lru but not in recycle"
);
} else {
panic!("tried to remove oldest but no entries found!");
}
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.recycle.len()
}
}
#[derive(PartialEq, Clone, Eq, Hash)]
struct PoolKey {
scheme: String,
hostname: String,
port: Option<u16>,
}
use std::fmt;
impl fmt::Debug for PoolKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!(
"{}|{}|{}",
self.scheme,
self.hostname,
self.port.unwrap_or(0)
))
}
}
impl PoolKey {
fn new(url: &Url) -> Self {
let port = url.port_or_known_default();
PoolKey {
scheme: url.scheme().to_string(),
hostname: url.host_str().unwrap_or("").to_string(),
port,
}
}
}
#[test]
fn poolkey_new() {
PoolKey::new(&Url::parse("zzz:///example.com").unwrap());
}
#[test]
fn pool_size_limit() {
assert_eq!(MAX_IDLE_CONNECTIONS, 100);
let mut pool = ConnectionPool::new();
let hostnames = (0..200).map(|i| format!("{}.example", i));
let poolkeys = hostnames.map(|hostname| PoolKey {
scheme: "https".to_string(),
hostname,
port: Some(999),
});
for key in poolkeys.clone() {
pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![])));
}
assert_eq!(pool.len(), 100);
for key in poolkeys.skip(100) {
let result = pool.remove(&key);
assert!(result.is_some(), "expected key was not in pool");
}
}
#[test]
fn pool_duplicates_limit() {
assert_eq!(MAX_IDLE_CONNECTIONS, 100);
let mut pool = ConnectionPool::new();
let hostnames = (0..100).map(|i| format!("{}.example", i));
let poolkeys = hostnames.map(|hostname| PoolKey {
scheme: "https".to_string(),
hostname,
port: Some(999),
});
for key in poolkeys.clone() {
pool.add(key.clone(), Stream::Cursor(std::io::Cursor::new(vec![])));
pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![])));
}
assert_eq!(pool.len(), 100);
for key in poolkeys {
let result = pool.remove(&key);
assert!(result.is_some(), "expected key was not in pool");
}
}
pub(crate) struct PoolReturnRead<R: Read + Sized + Into<Stream>> {
unit: Option<Unit>,
reader: Option<R>,
}
impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
pub fn new(unit: Option<Unit>, reader: R) -> Self {
PoolReturnRead {
unit,
reader: Some(reader),
}
}
fn return_connection(&mut self) {
if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) {
let state = &mut unit.agent.lock().unwrap();
let stream = reader.into();
if let Some(agent) = state.as_mut() {
if !stream.is_poolable() {
return;
}
let key = PoolKey::new(&unit.url);
agent.pool().add(key, stream);
}
}
}
fn do_read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
match self.reader.as_mut() {
None => Ok(0),
Some(reader) => reader.read(buf),
}
}
}
impl<R: Read + Sized + Into<Stream>> Read for PoolReturnRead<R> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let amount = self.do_read(buf)?;
if amount == 0 {
self.return_connection();
}
Ok(amount)
}
}
impl<R: Read + Sized + Into<Stream>> Drop for PoolReturnRead<R> {
fn drop(&mut self) {
self.return_connection();
}
}