use std::borrow::Cow;
use std::error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::time::Duration;
use crate::inner::PoolInner;
use crate::internals::Conn;
pub struct Pool<M: ManageConnection> {
pub(crate) inner: PoolInner<M>,
}
impl<M: ManageConnection> Pool<M> {
pub fn builder() -> Builder<M> {
Builder::new()
}
pub async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
self.inner.get().await
}
pub async fn get_owned(&self) -> Result<PooledConnection<'static, M>, RunError<M::Error>> {
Ok(PooledConnection {
conn: self.get().await?.take(),
pool: Cow::Owned(self.inner.clone()),
state: ConnectionState::Present,
})
}
pub async fn dedicated_connection(&self) -> Result<M::Connection, M::Error> {
self.inner.connect().await
}
pub fn add(&self, conn: M::Connection) -> Result<(), AddError<M::Connection>> {
self.inner.try_put(conn)
}
pub fn state(&self) -> State {
self.inner.state()
}
pub fn config(&self) -> Config {
Config::from(self.inner.builder())
}
}
impl<M: ManageConnection> Clone for Pool<M> {
fn clone(&self) -> Self {
Pool {
inner: self.inner.clone(),
}
}
}
impl<M: ManageConnection> fmt::Debug for Pool<M> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_fmt(format_args!("Pool({:?})", self.inner))
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct State {
pub connections: u32,
pub idle_connections: u32,
pub statistics: Statistics,
}
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct Statistics {
pub get_started: u64,
pub get_direct: u64,
pub get_waited: u64,
pub get_timed_out: u64,
pub get_wait_time: Duration,
pub connections_created: u64,
pub connections_closed_broken: u64,
pub connections_closed_invalid: u64,
pub connections_closed_max_lifetime: u64,
pub connections_closed_idle_timeout: u64,
}
impl Statistics {
pub fn pending_gets(&self) -> u64 {
self.get_started - self.completed_gets()
}
pub fn completed_gets(&self) -> u64 {
self.get_direct + self.get_waited + self.get_timed_out
}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct Config {
pub max_size: u32,
pub min_idle: Option<u32>,
pub test_on_check_out: bool,
pub max_lifetime: Option<Duration>,
pub idle_timeout: Option<Duration>,
pub connection_timeout: Duration,
pub retry_connection: bool,
pub reaper_rate: Duration,
pub queue_strategy: QueueStrategy,
}
impl<M: ManageConnection> From<&Builder<M>> for Config {
fn from(builder: &Builder<M>) -> Self {
let Builder {
max_size,
min_idle,
test_on_check_out,
max_lifetime,
idle_timeout,
connection_timeout,
retry_connection,
error_sink: _,
reaper_rate,
queue_strategy,
connection_customizer: _,
_p: _,
} = builder;
Self {
max_size: *max_size,
min_idle: *min_idle,
test_on_check_out: *test_on_check_out,
max_lifetime: *max_lifetime,
idle_timeout: *idle_timeout,
connection_timeout: *connection_timeout,
retry_connection: *retry_connection,
reaper_rate: *reaper_rate,
queue_strategy: *queue_strategy,
}
}
}
#[derive(Debug)]
pub struct Builder<M: ManageConnection> {
pub(crate) max_size: u32,
pub(crate) min_idle: Option<u32>,
pub(crate) test_on_check_out: bool,
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
pub(crate) connection_timeout: Duration,
pub(crate) retry_connection: bool,
pub(crate) error_sink: Box<dyn ErrorSink<M::Error>>,
pub(crate) reaper_rate: Duration,
pub(crate) queue_strategy: QueueStrategy,
pub(crate) connection_customizer: Option<Box<dyn CustomizeConnection<M::Connection, M::Error>>>,
_p: PhantomData<M>,
}
#[derive(Debug, Default, Clone, Copy)]
pub enum QueueStrategy {
#[default]
Fifo,
Lifo,
}
impl<M: ManageConnection> Default for Builder<M> {
fn default() -> Self {
Builder {
max_size: 10,
min_idle: None,
test_on_check_out: true,
max_lifetime: Some(Duration::from_secs(30 * 60)),
idle_timeout: Some(Duration::from_secs(10 * 60)),
connection_timeout: Duration::from_secs(30),
retry_connection: true,
error_sink: Box::new(NopErrorSink),
reaper_rate: Duration::from_secs(30),
queue_strategy: QueueStrategy::default(),
connection_customizer: None,
_p: PhantomData,
}
}
}
impl<M: ManageConnection> Builder<M> {
#[must_use]
pub fn new() -> Self {
Builder::default()
}
#[must_use]
pub fn max_size(mut self, max_size: u32) -> Self {
assert!(max_size > 0, "max_size must be greater than zero!");
self.max_size = max_size;
self
}
#[must_use]
pub fn min_idle(mut self, min_idle: impl Into<Option<u32>>) -> Self {
self.min_idle = min_idle.into();
self
}
#[must_use]
pub fn test_on_check_out(mut self, test_on_check_out: bool) -> Self {
self.test_on_check_out = test_on_check_out;
self
}
#[must_use]
pub fn max_lifetime(mut self, max_lifetime: impl Into<Option<Duration>>) -> Self {
let max_lifetime = max_lifetime.into();
assert_ne!(
max_lifetime,
Some(Duration::from_secs(0)),
"max_lifetime must be greater than zero!"
);
self.max_lifetime = max_lifetime;
self
}
#[must_use]
pub fn idle_timeout(mut self, idle_timeout: impl Into<Option<Duration>>) -> Self {
let idle_timeout = idle_timeout.into();
assert_ne!(
idle_timeout,
Some(Duration::from_secs(0)),
"idle_timeout must be greater than zero!"
);
self.idle_timeout = idle_timeout;
self
}
#[must_use]
pub fn connection_timeout(mut self, connection_timeout: Duration) -> Self {
assert!(
connection_timeout > Duration::from_secs(0),
"connection_timeout must be non-zero"
);
self.connection_timeout = connection_timeout;
self
}
#[must_use]
pub fn retry_connection(mut self, retry: bool) -> Self {
self.retry_connection = retry;
self
}
#[must_use]
pub fn error_sink(mut self, error_sink: Box<dyn ErrorSink<M::Error>>) -> Self {
self.error_sink = error_sink;
self
}
#[allow(dead_code)]
#[must_use]
pub fn reaper_rate(mut self, reaper_rate: Duration) -> Self {
self.reaper_rate = reaper_rate;
self
}
#[must_use]
pub fn queue_strategy(mut self, queue_strategy: QueueStrategy) -> Self {
self.queue_strategy = queue_strategy;
self
}
#[must_use]
pub fn connection_customizer(
mut self,
connection_customizer: Box<dyn CustomizeConnection<M::Connection, M::Error>>,
) -> Self {
self.connection_customizer = Some(connection_customizer);
self
}
fn build_inner(self, manager: M) -> Pool<M> {
if let Some(min_idle) = self.min_idle {
assert!(
self.max_size >= min_idle,
"min_idle must be no larger than max_size"
);
}
Pool {
inner: PoolInner::new(self, manager),
}
}
pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
let pool = self.build_inner(manager);
pool.inner.start_connections().await.map(|()| pool)
}
pub fn build_unchecked(self, manager: M) -> Pool<M> {
let p = self.build_inner(manager);
p.inner.spawn_start_connections();
p
}
}
pub trait ManageConnection: Sized + Send + Sync + 'static {
type Connection: Send + 'static;
type Error: fmt::Debug + Send + 'static;
fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
fn is_valid(
&self,
conn: &mut Self::Connection,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn has_broken(&self, conn: &mut Self::Connection) -> bool;
}
pub trait CustomizeConnection<C: Send + 'static, E: 'static>:
fmt::Debug + Send + Sync + 'static
{
fn on_acquire<'a>(
&'a self,
_connection: &'a mut C,
) -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + 'a>> {
Box::pin(async { Ok(()) })
}
}
pub struct PooledConnection<'a, M: ManageConnection> {
pool: Cow<'a, PoolInner<M>>,
conn: Option<Conn<M::Connection>>,
pub(crate) state: ConnectionState,
}
impl<'a, M: ManageConnection> PooledConnection<'a, M> {
pub(crate) fn new(pool: &'a PoolInner<M>, conn: Conn<M::Connection>) -> Self {
Self {
pool: Cow::Borrowed(pool),
conn: Some(conn),
state: ConnectionState::Present,
}
}
pub(crate) fn take(mut self) -> Option<Conn<M::Connection>> {
self.state = ConnectionState::Extracted;
self.conn.take()
}
}
impl<M: ManageConnection> Deref for PooledConnection<'_, M> {
type Target = M::Connection;
fn deref(&self) -> &Self::Target {
&self.conn.as_ref().unwrap().conn
}
}
impl<M: ManageConnection> DerefMut for PooledConnection<'_, M> {
fn deref_mut(&mut self) -> &mut M::Connection {
&mut self.conn.as_mut().unwrap().conn
}
}
impl<M> fmt::Debug for PooledConnection<'_, M>
where
M: ManageConnection,
M::Connection: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
}
}
impl<M: ManageConnection> Drop for PooledConnection<'_, M> {
fn drop(&mut self) {
if let ConnectionState::Extracted = self.state {
return;
}
debug_assert!(self.conn.is_some(), "incorrect state {:?}", self.state);
if let Some(conn) = self.conn.take() {
self.pool.as_ref().put_back(conn, self.state);
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum ConnectionState {
Present,
Extracted,
Invalid,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RunError<E> {
User(E),
TimedOut,
}
impl<E: error::Error + 'static> fmt::Display for RunError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
RunError::User(ref err) => write!(f, "{err}"),
RunError::TimedOut => write!(f, "Timed out in bb8"),
}
}
}
impl<E: error::Error + 'static> error::Error for RunError<E> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
RunError::User(ref err) => Some(err),
RunError::TimedOut => None,
}
}
}
impl<E: error::Error> From<E> for RunError<E> {
fn from(error: E) -> Self {
Self::User(error)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AddError<C> {
Broken(C),
NoCapacity(C),
}
impl<E: error::Error + 'static> fmt::Display for AddError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
AddError::Broken(_) => write!(f, "The connection was broken before it could be added"),
AddError::NoCapacity(_) => write!(
f,
"Unable to add the connection to the pool due to insufficient capacity"
),
}
}
}
impl<E: error::Error + 'static> error::Error for AddError<E> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
None
}
}
pub trait ErrorSink<E>: fmt::Debug + Send + Sync + 'static {
fn sink(&self, error: E);
fn boxed_clone(&self) -> Box<dyn ErrorSink<E>>;
}
#[derive(Debug, Clone, Copy)]
pub struct NopErrorSink;
impl<E> ErrorSink<E> for NopErrorSink {
fn sink(&self, _: E) {}
fn boxed_clone(&self) -> Box<dyn ErrorSink<E>> {
Box::new(*self)
}
}