#![forbid(unsafe_code, future_incompatible)]
#![deny(
missing_docs,
missing_debug_implementations,
missing_copy_implementations,
nonstandard_style,
unused_qualifications,
unused_import_braces,
unused_extern_crates,
trivial_casts,
trivial_numeric_casts
)]
#![allow(clippy::doc_lazy_continuation)]
#![cfg_attr(docsrs, feature(doc_cfg))]
mod body;
mod error;
mod managers;
#[cfg(feature = "streaming")]
mod runtime;
use std::{
collections::HashMap,
convert::TryFrom,
fmt::{self, Debug},
str::FromStr,
sync::Arc,
time::SystemTime,
};
use http::{header::CACHE_CONTROL, request, response, Response, StatusCode};
use http_cache_semantics::{AfterResponse, BeforeRequest, CachePolicy};
use serde::{Deserialize, Serialize};
use url::Url;
pub use body::StreamingBody;
pub use error::{BadHeader, BadVersion, BoxError, Result, StreamingError};
#[cfg(feature = "manager-cacache")]
pub use managers::cacache::CACacheManager;
#[cfg(feature = "streaming")]
pub use managers::streaming_cache::StreamingManager;
#[cfg(feature = "manager-moka")]
pub use managers::moka::MokaManager;
#[cfg(feature = "manager-moka")]
#[cfg_attr(docsrs, doc(cfg(feature = "manager-moka")))]
pub use moka::future::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
pub const XCACHE: &str = "x-cache";
pub const XCACHELOOKUP: &str = "x-cache-lookup";
const WARNING: &str = "warning";
#[derive(Debug, Copy, Clone)]
pub enum HitOrMiss {
HIT,
MISS,
}
impl fmt::Display for HitOrMiss {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::HIT => write!(f, "HIT"),
Self::MISS => write!(f, "MISS"),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[non_exhaustive]
pub enum HttpVersion {
#[serde(rename = "HTTP/0.9")]
Http09,
#[serde(rename = "HTTP/1.0")]
Http10,
#[serde(rename = "HTTP/1.1")]
Http11,
#[serde(rename = "HTTP/2.0")]
H2,
#[serde(rename = "HTTP/3.0")]
H3,
}
impl fmt::Display for HttpVersion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
HttpVersion::Http09 => write!(f, "HTTP/0.9"),
HttpVersion::Http10 => write!(f, "HTTP/1.0"),
HttpVersion::Http11 => write!(f, "HTTP/1.1"),
HttpVersion::H2 => write!(f, "HTTP/2.0"),
HttpVersion::H3 => write!(f, "HTTP/3.0"),
}
}
}
fn extract_url_from_request_parts(parts: &request::Parts) -> Result<Url> {
if let Some(_scheme) = parts.uri.scheme() {
return Url::parse(&parts.uri.to_string())
.map_err(|_| BadHeader.into());
}
let scheme = if let Some(host) = parts.headers.get("host") {
let host_str = host.to_str().map_err(|_| BadHeader)?;
if host_str.starts_with("localhost")
|| host_str.starts_with("127.0.0.1")
{
"http"
} else if let Some(forwarded_proto) =
parts.headers.get("x-forwarded-proto")
{
forwarded_proto.to_str().map_err(|_| BadHeader)?
} else {
"https" }
} else {
"https" };
let host = parts
.headers
.get("host")
.ok_or(BadHeader)?
.to_str()
.map_err(|_| BadHeader)?;
let url_string = format!(
"{}://{}{}",
scheme,
host,
parts.uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/")
);
Url::parse(&url_string).map_err(|_| BadHeader.into())
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct HttpResponse {
pub body: Vec<u8>,
pub headers: HashMap<String, String>,
pub status: u16,
pub url: Url,
pub version: HttpVersion,
}
impl HttpResponse {
pub fn parts(&self) -> Result<response::Parts> {
let mut converted =
response::Builder::new().status(self.status).body(())?;
{
let headers = converted.headers_mut();
for header in &self.headers {
headers.insert(
http::header::HeaderName::from_str(header.0.as_str())?,
http::HeaderValue::from_str(header.1.as_str())?,
);
}
}
Ok(converted.into_parts().0)
}
#[must_use]
fn warning_code(&self) -> Option<usize> {
self.headers.get(WARNING).and_then(|hdr| {
hdr.as_str().chars().take(3).collect::<String>().parse().ok()
})
}
fn add_warning(&mut self, url: &Url, code: usize, message: &str) {
self.headers.insert(
WARNING.to_string(),
format!(
"{} {} {:?} \"{}\"",
code,
url.host().expect("Invalid URL"),
message,
httpdate::fmt_http_date(SystemTime::now())
),
);
}
fn remove_warning(&mut self) {
self.headers.remove(WARNING);
}
pub fn update_headers(&mut self, parts: &response::Parts) -> Result<()> {
for header in parts.headers.iter() {
self.headers.insert(
header.0.as_str().to_string(),
header.1.to_str()?.to_string(),
);
}
Ok(())
}
#[must_use]
fn must_revalidate(&self) -> bool {
self.headers.get(CACHE_CONTROL.as_str()).is_some_and(|val| {
val.as_str().to_lowercase().contains("must-revalidate")
})
}
pub fn cache_status(&mut self, hit_or_miss: HitOrMiss) {
self.headers.insert(XCACHE.to_string(), hit_or_miss.to_string());
}
pub fn cache_lookup_status(&mut self, hit_or_miss: HitOrMiss) {
self.headers.insert(XCACHELOOKUP.to_string(), hit_or_miss.to_string());
}
}
#[async_trait::async_trait]
pub trait CacheManager: Send + Sync + 'static {
async fn get(
&self,
cache_key: &str,
) -> Result<Option<(HttpResponse, CachePolicy)>>;
async fn put(
&self,
cache_key: String,
res: HttpResponse,
policy: CachePolicy,
) -> Result<HttpResponse>;
async fn delete(&self, cache_key: &str) -> Result<()>;
}
#[async_trait::async_trait]
pub trait StreamingCacheManager: Send + Sync + 'static {
type Body: http_body::Body + Send + 'static;
async fn get(
&self,
cache_key: &str,
) -> Result<Option<(Response<Self::Body>, CachePolicy)>>
where
<Self::Body as http_body::Body>::Data: Send,
<Self::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static;
async fn put<B>(
&self,
cache_key: String,
response: Response<B>,
policy: CachePolicy,
request_url: Url,
) -> Result<Response<Self::Body>>
where
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<StreamingError>,
<Self::Body as http_body::Body>::Data: Send,
<Self::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static;
async fn convert_body<B>(
&self,
response: Response<B>,
) -> Result<Response<Self::Body>>
where
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<StreamingError>,
<Self::Body as http_body::Body>::Data: Send,
<Self::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static;
async fn delete(&self, cache_key: &str) -> Result<()>;
#[cfg(feature = "streaming")]
fn body_to_bytes_stream(
body: Self::Body,
) -> impl futures_util::Stream<
Item = std::result::Result<
bytes::Bytes,
Box<dyn std::error::Error + Send + Sync>,
>,
> + Send
where
<Self::Body as http_body::Body>::Data: Send,
<Self::Body as http_body::Body>::Error: Send + Sync + 'static;
}
#[async_trait::async_trait]
pub trait Middleware: Send {
fn overridden_cache_mode(&self) -> Option<CacheMode> {
None
}
fn is_method_get_head(&self) -> bool;
fn policy(&self, response: &HttpResponse) -> Result<CachePolicy>;
fn policy_with_options(
&self,
response: &HttpResponse,
options: CacheOptions,
) -> Result<CachePolicy>;
fn update_headers(&mut self, parts: &request::Parts) -> Result<()>;
fn force_no_cache(&mut self) -> Result<()>;
fn parts(&self) -> Result<request::Parts>;
fn url(&self) -> Result<Url>;
fn method(&self) -> Result<String>;
async fn remote_fetch(&mut self) -> Result<HttpResponse>;
}
pub trait HttpCacheInterface<B = Vec<u8>>: Send + Sync {
fn analyze_request(
&self,
parts: &request::Parts,
mode_override: Option<CacheMode>,
) -> Result<CacheAnalysis>;
#[allow(async_fn_in_trait)]
async fn lookup_cached_response(
&self,
key: &str,
) -> Result<Option<(HttpResponse, CachePolicy)>>;
#[allow(async_fn_in_trait)]
async fn process_response(
&self,
analysis: CacheAnalysis,
response: Response<B>,
) -> Result<Response<B>>;
fn prepare_conditional_request(
&self,
parts: &mut request::Parts,
cached_response: &HttpResponse,
policy: &CachePolicy,
) -> Result<()>;
#[allow(async_fn_in_trait)]
async fn handle_not_modified(
&self,
cached_response: HttpResponse,
fresh_parts: &response::Parts,
) -> Result<HttpResponse>;
}
pub trait HttpCacheStreamInterface: Send + Sync {
type Body: http_body::Body + Send + 'static;
fn analyze_request(
&self,
parts: &request::Parts,
mode_override: Option<CacheMode>,
) -> Result<CacheAnalysis>;
#[allow(async_fn_in_trait)]
async fn lookup_cached_response(
&self,
key: &str,
) -> Result<Option<(Response<Self::Body>, CachePolicy)>>
where
<Self::Body as http_body::Body>::Data: Send,
<Self::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static;
#[allow(async_fn_in_trait)]
async fn process_response<B>(
&self,
analysis: CacheAnalysis,
response: Response<B>,
) -> Result<Response<Self::Body>>
where
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<StreamingError>,
<Self::Body as http_body::Body>::Data: Send,
<Self::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static;
fn prepare_conditional_request(
&self,
parts: &mut request::Parts,
cached_response: &Response<Self::Body>,
policy: &CachePolicy,
) -> Result<()>;
#[allow(async_fn_in_trait)]
async fn handle_not_modified(
&self,
cached_response: Response<Self::Body>,
fresh_parts: &response::Parts,
) -> Result<Response<Self::Body>>
where
<Self::Body as http_body::Body>::Data: Send,
<Self::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static;
}
#[derive(Debug, Clone)]
pub struct CacheAnalysis {
pub cache_key: String,
pub should_cache: bool,
pub cache_mode: CacheMode,
pub cache_bust_keys: Vec<String>,
pub request_parts: request::Parts,
pub is_get_head: bool,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum CacheMode {
#[default]
Default,
NoStore,
Reload,
NoCache,
ForceCache,
OnlyIfCached,
IgnoreRules,
}
impl TryFrom<http::Version> for HttpVersion {
type Error = BoxError;
fn try_from(value: http::Version) -> Result<Self> {
Ok(match value {
http::Version::HTTP_09 => Self::Http09,
http::Version::HTTP_10 => Self::Http10,
http::Version::HTTP_11 => Self::Http11,
http::Version::HTTP_2 => Self::H2,
http::Version::HTTP_3 => Self::H3,
_ => return Err(Box::new(BadVersion)),
})
}
}
impl From<HttpVersion> for http::Version {
fn from(value: HttpVersion) -> Self {
match value {
HttpVersion::Http09 => Self::HTTP_09,
HttpVersion::Http10 => Self::HTTP_10,
HttpVersion::Http11 => Self::HTTP_11,
HttpVersion::H2 => Self::HTTP_2,
HttpVersion::H3 => Self::HTTP_3,
}
}
}
#[cfg(feature = "http-types")]
impl TryFrom<http_types::Version> for HttpVersion {
type Error = BoxError;
fn try_from(value: http_types::Version) -> Result<Self> {
Ok(match value {
http_types::Version::Http0_9 => Self::Http09,
http_types::Version::Http1_0 => Self::Http10,
http_types::Version::Http1_1 => Self::Http11,
http_types::Version::Http2_0 => Self::H2,
http_types::Version::Http3_0 => Self::H3,
_ => return Err(Box::new(BadVersion)),
})
}
}
#[cfg(feature = "http-types")]
impl From<HttpVersion> for http_types::Version {
fn from(value: HttpVersion) -> Self {
match value {
HttpVersion::Http09 => Self::Http0_9,
HttpVersion::Http10 => Self::Http1_0,
HttpVersion::Http11 => Self::Http1_1,
HttpVersion::H2 => Self::Http2_0,
HttpVersion::H3 => Self::Http3_0,
}
}
}
pub use http_cache_semantics::CacheOptions;
pub type CacheKey = Arc<dyn Fn(&request::Parts) -> String + Send + Sync>;
pub type CacheModeFn = Arc<dyn Fn(&request::Parts) -> CacheMode + Send + Sync>;
pub type ResponseCacheModeFn = Arc<
dyn Fn(&request::Parts, &HttpResponse) -> Option<CacheMode> + Send + Sync,
>;
pub type CacheBust = Arc<
dyn Fn(&request::Parts, &Option<CacheKey>, &str) -> Vec<String>
+ Send
+ Sync,
>;
#[derive(Clone)]
pub struct HttpCacheOptions {
pub cache_options: Option<CacheOptions>,
pub cache_key: Option<CacheKey>,
pub cache_mode_fn: Option<CacheModeFn>,
pub response_cache_mode_fn: Option<ResponseCacheModeFn>,
pub cache_bust: Option<CacheBust>,
pub cache_status_headers: bool,
}
impl Default for HttpCacheOptions {
fn default() -> Self {
Self {
cache_options: None,
cache_key: None,
cache_mode_fn: None,
response_cache_mode_fn: None,
cache_bust: None,
cache_status_headers: true,
}
}
}
impl Debug for HttpCacheOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpCacheOptions")
.field("cache_options", &self.cache_options)
.field("cache_key", &"Fn(&request::Parts) -> String")
.field("cache_mode_fn", &"Fn(&request::Parts) -> CacheMode")
.field(
"response_cache_mode_fn",
&"Fn(&request::Parts, &HttpResponse) -> Option<CacheMode>",
)
.field("cache_bust", &"Fn(&request::Parts) -> Vec<String>")
.field("cache_status_headers", &self.cache_status_headers)
.finish()
}
}
impl HttpCacheOptions {
fn create_cache_key(
&self,
parts: &request::Parts,
override_method: Option<&str>,
) -> String {
if let Some(cache_key) = &self.cache_key {
cache_key(parts)
} else {
format!(
"{}:{}",
override_method.unwrap_or_else(|| parts.method.as_str()),
parts.uri
)
}
}
pub fn create_cache_key_for_invalidation(
&self,
parts: &request::Parts,
method_override: &str,
) -> String {
self.create_cache_key(parts, Some(method_override))
}
fn headers_to_hashmap(
headers: &http::HeaderMap,
) -> HashMap<String, String> {
headers
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect()
}
pub fn http_response_to_response<B>(
http_response: &HttpResponse,
body: B,
) -> Result<Response<B>> {
let mut response_builder = Response::builder()
.status(http_response.status)
.version(http_response.version.into());
for (name, value) in &http_response.headers {
if let (Ok(header_name), Ok(header_value)) = (
name.parse::<http::HeaderName>(),
value.parse::<http::HeaderValue>(),
) {
response_builder =
response_builder.header(header_name, header_value);
}
}
Ok(response_builder.body(body)?)
}
fn parts_to_http_response(
&self,
parts: &response::Parts,
request_parts: &request::Parts,
) -> Result<HttpResponse> {
Ok(HttpResponse {
body: vec![], headers: Self::headers_to_hashmap(&parts.headers),
status: parts.status.as_u16(),
url: extract_url_from_request_parts(request_parts)?,
version: parts.version.try_into()?,
})
}
fn evaluate_response_cache_mode(
&self,
request_parts: &request::Parts,
http_response: &HttpResponse,
original_mode: CacheMode,
) -> CacheMode {
if let Some(response_cache_mode_fn) = &self.response_cache_mode_fn {
if let Some(override_mode) =
response_cache_mode_fn(request_parts, http_response)
{
return override_mode;
}
}
original_mode
}
fn create_cache_policy(
&self,
request_parts: &request::Parts,
response_parts: &response::Parts,
) -> CachePolicy {
match self.cache_options {
Some(options) => CachePolicy::new_options(
request_parts,
response_parts,
SystemTime::now(),
options,
),
None => CachePolicy::new(request_parts, response_parts),
}
}
fn should_cache_response(
&self,
effective_cache_mode: CacheMode,
http_response: &HttpResponse,
is_get_head: bool,
policy: &CachePolicy,
) -> bool {
let is_cacheable_status = matches!(
http_response.status,
200 | 203 | 204 | 206 | 300 | 301 | 404 | 405 | 410 | 414 | 501
);
if is_cacheable_status {
match effective_cache_mode {
CacheMode::ForceCache => is_get_head,
CacheMode::IgnoreRules => true,
CacheMode::NoStore => false,
_ => is_get_head && policy.is_storable(),
}
} else {
false
}
}
fn analyze_request_internal(
&self,
parts: &request::Parts,
mode_override: Option<CacheMode>,
default_mode: CacheMode,
) -> Result<CacheAnalysis> {
let effective_mode = mode_override
.or_else(|| self.cache_mode_fn.as_ref().map(|f| f(parts)))
.unwrap_or(default_mode);
let is_get_head = parts.method == "GET" || parts.method == "HEAD";
let should_cache = effective_mode == CacheMode::IgnoreRules
|| (is_get_head && effective_mode != CacheMode::NoStore);
let cache_key = self.create_cache_key(parts, None);
let cache_bust_keys = if let Some(cache_bust) = &self.cache_bust {
cache_bust(parts, &self.cache_key, &cache_key)
} else {
Vec::new()
};
Ok(CacheAnalysis {
cache_key,
should_cache,
cache_mode: effective_mode,
cache_bust_keys,
request_parts: parts.clone(),
is_get_head,
})
}
}
#[derive(Debug, Clone)]
pub struct HttpCache<T: CacheManager> {
pub mode: CacheMode,
pub manager: T,
pub options: HttpCacheOptions,
}
#[derive(Debug, Clone)]
pub struct HttpStreamingCache<T: StreamingCacheManager> {
pub mode: CacheMode,
pub manager: T,
pub options: HttpCacheOptions,
}
#[allow(dead_code)]
impl<T: CacheManager> HttpCache<T> {
pub fn can_cache_request(
&self,
middleware: &impl Middleware,
) -> Result<bool> {
let analysis = self.analyze_request(
&middleware.parts()?,
middleware.overridden_cache_mode(),
)?;
Ok(analysis.should_cache)
}
pub async fn run_no_cache(
&self,
middleware: &mut impl Middleware,
) -> Result<()> {
let parts = middleware.parts()?;
self.manager
.delete(&self.options.create_cache_key(&parts, Some("GET")))
.await
.ok();
let cache_key = self.options.create_cache_key(&parts, None);
if let Some(cache_bust) = &self.options.cache_bust {
for key_to_cache_bust in
cache_bust(&parts, &self.options.cache_key, &cache_key)
{
self.manager.delete(&key_to_cache_bust).await?;
}
}
Ok(())
}
pub async fn run(
&self,
mut middleware: impl Middleware,
) -> Result<HttpResponse> {
let analysis = self.analyze_request(
&middleware.parts()?,
middleware.overridden_cache_mode(),
)?;
if !analysis.should_cache {
return self.remote_fetch(&mut middleware).await;
}
for key in &analysis.cache_bust_keys {
self.manager.delete(key).await?;
}
if let Some((mut cached_response, policy)) =
self.lookup_cached_response(&analysis.cache_key).await?
{
if self.options.cache_status_headers {
cached_response.cache_lookup_status(HitOrMiss::HIT);
}
if let Some(warning_code) = cached_response.warning_code() {
if (100..200).contains(&warning_code) {
cached_response.remove_warning();
}
}
match analysis.cache_mode {
CacheMode::Default => {
self.conditional_fetch(middleware, cached_response, policy)
.await
}
CacheMode::NoCache => {
middleware.force_no_cache()?;
let mut res = self.remote_fetch(&mut middleware).await?;
if self.options.cache_status_headers {
res.cache_lookup_status(HitOrMiss::HIT);
}
Ok(res)
}
CacheMode::ForceCache
| CacheMode::OnlyIfCached
| CacheMode::IgnoreRules => {
cached_response.add_warning(
&cached_response.url.clone(),
112,
"Disconnected operation",
);
if self.options.cache_status_headers {
cached_response.cache_status(HitOrMiss::HIT);
}
Ok(cached_response)
}
_ => self.remote_fetch(&mut middleware).await,
}
} else {
match analysis.cache_mode {
CacheMode::OnlyIfCached => {
let mut res = HttpResponse {
body: b"GatewayTimeout".to_vec(),
headers: HashMap::default(),
status: 504,
url: middleware.url()?,
version: HttpVersion::Http11,
};
if self.options.cache_status_headers {
res.cache_status(HitOrMiss::MISS);
res.cache_lookup_status(HitOrMiss::MISS);
}
Ok(res)
}
_ => self.remote_fetch(&mut middleware).await,
}
}
}
fn cache_mode(&self, middleware: &impl Middleware) -> Result<CacheMode> {
Ok(if let Some(mode) = middleware.overridden_cache_mode() {
mode
} else if let Some(cache_mode_fn) = &self.options.cache_mode_fn {
cache_mode_fn(&middleware.parts()?)
} else {
self.mode
})
}
async fn remote_fetch(
&self,
middleware: &mut impl Middleware,
) -> Result<HttpResponse> {
let mut res = middleware.remote_fetch().await?;
if self.options.cache_status_headers {
res.cache_status(HitOrMiss::MISS);
res.cache_lookup_status(HitOrMiss::MISS);
}
let policy = match self.options.cache_options {
Some(options) => middleware.policy_with_options(&res, options)?,
None => middleware.policy(&res)?,
};
let is_get_head = middleware.is_method_get_head();
let mut mode = self.cache_mode(middleware)?;
let parts = middleware.parts()?;
if let Some(response_cache_mode_fn) =
&self.options.response_cache_mode_fn
{
if let Some(override_mode) = response_cache_mode_fn(&parts, &res) {
mode = override_mode;
}
}
let is_cacheable = self.options.should_cache_response(
mode,
&res,
is_get_head,
&policy,
);
if is_cacheable {
Ok(self
.manager
.put(self.options.create_cache_key(&parts, None), res, policy)
.await?)
} else if !is_get_head {
self.manager
.delete(&self.options.create_cache_key(&parts, Some("GET")))
.await
.ok();
Ok(res)
} else {
Ok(res)
}
}
async fn conditional_fetch(
&self,
mut middleware: impl Middleware,
mut cached_res: HttpResponse,
mut policy: CachePolicy,
) -> Result<HttpResponse> {
let parts = middleware.parts()?;
let before_req = policy.before_request(&parts, SystemTime::now());
match before_req {
BeforeRequest::Fresh(parts) => {
cached_res.update_headers(&parts)?;
if self.options.cache_status_headers {
cached_res.cache_status(HitOrMiss::HIT);
cached_res.cache_lookup_status(HitOrMiss::HIT);
}
return Ok(cached_res);
}
BeforeRequest::Stale { request: parts, matches } => {
if matches {
middleware.update_headers(&parts)?;
}
}
}
let req_url = middleware.url()?;
match middleware.remote_fetch().await {
Ok(mut cond_res) => {
let status = StatusCode::from_u16(cond_res.status)?;
if status.is_server_error() && cached_res.must_revalidate() {
cached_res.add_warning(
&req_url,
111,
"Revalidation failed",
);
if self.options.cache_status_headers {
cached_res.cache_status(HitOrMiss::HIT);
}
Ok(cached_res)
} else if cond_res.status == 304 {
let after_res = policy.after_response(
&parts,
&cond_res.parts()?,
SystemTime::now(),
);
match after_res {
AfterResponse::Modified(new_policy, parts)
| AfterResponse::NotModified(new_policy, parts) => {
policy = new_policy;
cached_res.update_headers(&parts)?;
}
}
if self.options.cache_status_headers {
cached_res.cache_status(HitOrMiss::HIT);
cached_res.cache_lookup_status(HitOrMiss::HIT);
}
let res = self
.manager
.put(
self.options.create_cache_key(&parts, None),
cached_res,
policy,
)
.await?;
Ok(res)
} else if cond_res.status == 200 {
let policy = match self.options.cache_options {
Some(options) => middleware
.policy_with_options(&cond_res, options)?,
None => middleware.policy(&cond_res)?,
};
if self.options.cache_status_headers {
cond_res.cache_status(HitOrMiss::MISS);
cond_res.cache_lookup_status(HitOrMiss::HIT);
}
let res = self
.manager
.put(
self.options.create_cache_key(&parts, None),
cond_res,
policy,
)
.await?;
Ok(res)
} else {
if self.options.cache_status_headers {
cached_res.cache_status(HitOrMiss::HIT);
}
Ok(cached_res)
}
}
Err(e) => {
if cached_res.must_revalidate() {
Err(e)
} else {
cached_res.add_warning(
&req_url,
111,
"Revalidation failed",
);
if self.options.cache_status_headers {
cached_res.cache_status(HitOrMiss::HIT);
}
Ok(cached_res)
}
}
}
}
}
impl<T: StreamingCacheManager> HttpCacheStreamInterface
for HttpStreamingCache<T>
where
<T::Body as http_body::Body>::Data: Send,
<T::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static,
{
type Body = T::Body;
fn analyze_request(
&self,
parts: &request::Parts,
mode_override: Option<CacheMode>,
) -> Result<CacheAnalysis> {
self.options.analyze_request_internal(parts, mode_override, self.mode)
}
async fn lookup_cached_response(
&self,
key: &str,
) -> Result<Option<(Response<Self::Body>, CachePolicy)>> {
self.manager.get(key).await
}
async fn process_response<B>(
&self,
analysis: CacheAnalysis,
response: Response<B>,
) -> Result<Response<Self::Body>>
where
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<StreamingError>,
<T::Body as http_body::Body>::Data: Send,
<T::Body as http_body::Body>::Error:
Into<StreamingError> + Send + Sync + 'static,
{
if !analysis.should_cache {
return self.manager.convert_body(response).await;
}
for key in &analysis.cache_bust_keys {
self.manager.delete(key).await?;
}
let (parts, body) = response.into_parts();
let http_response = self
.options
.parts_to_http_response(&parts, &analysis.request_parts)?;
let effective_cache_mode = self.options.evaluate_response_cache_mode(
&analysis.request_parts,
&http_response,
analysis.cache_mode,
);
let response = Response::from_parts(parts, body);
if effective_cache_mode == CacheMode::NoStore {
return self.manager.convert_body(response).await;
}
let (parts, body) = response.into_parts();
let policy =
self.options.create_cache_policy(&analysis.request_parts, &parts);
let response = Response::from_parts(parts, body);
let should_cache_response = self.options.should_cache_response(
effective_cache_mode,
&http_response,
analysis.is_get_head,
&policy,
);
if should_cache_response {
let request_url =
extract_url_from_request_parts(&analysis.request_parts)?;
self.manager
.put(analysis.cache_key, response, policy, request_url)
.await
} else {
self.manager.convert_body(response).await
}
}
fn prepare_conditional_request(
&self,
parts: &mut request::Parts,
_cached_response: &Response<Self::Body>,
policy: &CachePolicy,
) -> Result<()> {
let before_req = policy.before_request(parts, SystemTime::now());
if let BeforeRequest::Stale { request, .. } = before_req {
parts.headers.extend(request.headers);
}
Ok(())
}
async fn handle_not_modified(
&self,
cached_response: Response<Self::Body>,
fresh_parts: &response::Parts,
) -> Result<Response<Self::Body>> {
let (mut parts, body) = cached_response.into_parts();
parts.headers.extend(fresh_parts.headers.clone());
Ok(Response::from_parts(parts, body))
}
}
impl<T: CacheManager> HttpCacheInterface for HttpCache<T> {
fn analyze_request(
&self,
parts: &request::Parts,
mode_override: Option<CacheMode>,
) -> Result<CacheAnalysis> {
self.options.analyze_request_internal(parts, mode_override, self.mode)
}
async fn lookup_cached_response(
&self,
key: &str,
) -> Result<Option<(HttpResponse, CachePolicy)>> {
self.manager.get(key).await
}
async fn process_response(
&self,
analysis: CacheAnalysis,
response: Response<Vec<u8>>,
) -> Result<Response<Vec<u8>>> {
if !analysis.should_cache {
return Ok(response);
}
for key in &analysis.cache_bust_keys {
self.manager.delete(key).await?;
}
let (parts, body) = response.into_parts();
let mut http_response = self
.options
.parts_to_http_response(&parts, &analysis.request_parts)?;
http_response.body = body.clone();
let effective_cache_mode = self.options.evaluate_response_cache_mode(
&analysis.request_parts,
&http_response,
analysis.cache_mode,
);
if effective_cache_mode == CacheMode::NoStore {
let response = Response::from_parts(parts, body);
return Ok(response);
}
let policy = self.options.create_cache_policy(
&analysis.request_parts,
&http_response.parts()?,
);
let should_cache_response = self.options.should_cache_response(
effective_cache_mode,
&http_response,
analysis.is_get_head,
&policy,
);
if should_cache_response {
let cached_response = self
.manager
.put(analysis.cache_key, http_response, policy)
.await?;
let response_parts = cached_response.parts()?;
let mut response = Response::builder()
.status(response_parts.status)
.version(response_parts.version)
.body(cached_response.body)?;
*response.headers_mut() = response_parts.headers;
Ok(response)
} else {
let response = Response::from_parts(parts, body);
Ok(response)
}
}
fn prepare_conditional_request(
&self,
parts: &mut request::Parts,
_cached_response: &HttpResponse,
policy: &CachePolicy,
) -> Result<()> {
let before_req = policy.before_request(parts, SystemTime::now());
if let BeforeRequest::Stale { request, .. } = before_req {
parts.headers.extend(request.headers);
}
Ok(())
}
async fn handle_not_modified(
&self,
mut cached_response: HttpResponse,
fresh_parts: &response::Parts,
) -> Result<HttpResponse> {
cached_response.update_headers(fresh_parts)?;
if self.options.cache_status_headers {
cached_response.cache_status(HitOrMiss::HIT);
cached_response.cache_lookup_status(HitOrMiss::HIT);
}
Ok(cached_response)
}
}
#[allow(dead_code)]
#[cfg(test)]
mod test;