pub mod options;
use std::{borrow::Borrow, collections::HashSet, fmt, fmt::Debug, sync::Arc};
use futures_util::{
future,
stream::{StreamExt, TryStreamExt},
};
use serde::{
de::{DeserializeOwned, Error as DeError},
Deserialize,
Deserializer,
Serialize,
};
use self::options::*;
use crate::{
bson::{doc, to_document, Bson, Document},
bson_util,
change_stream::{
event::ChangeStreamEvent,
options::ChangeStreamOptions,
session::SessionChangeStream,
ChangeStream,
},
client::options::ServerAddress,
cmap::conn::PinnedConnectionHandle,
concern::{ReadConcern, WriteConcern},
error::{convert_bulk_errors, BulkWriteError, BulkWriteFailure, Error, ErrorKind, Result},
index::IndexModel,
operation::{
Aggregate,
Count,
CountDocuments,
CreateIndexes,
Delete,
Distinct,
DropCollection,
DropIndexes,
Find,
FindAndModify,
Insert,
ListIndexes,
Update,
},
results::{
CreateIndexResult,
CreateIndexesResult,
DeleteResult,
InsertManyResult,
InsertOneResult,
UpdateResult,
},
selection_criteria::SelectionCriteria,
Client,
ClientSession,
Cursor,
Database,
SessionCursor,
};
#[derive(Debug)]
pub struct Collection<T> {
inner: Arc<CollectionInner>,
_phantom: std::marker::PhantomData<T>,
}
impl<T> Clone for Collection<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_phantom: Default::default(),
}
}
}
#[derive(Debug)]
struct CollectionInner {
client: Client,
db: Database,
name: String,
selection_criteria: Option<SelectionCriteria>,
read_concern: Option<ReadConcern>,
write_concern: Option<WriteConcern>,
}
impl<T> Collection<T> {
pub(crate) fn new(db: Database, name: &str, options: Option<CollectionOptions>) -> Self {
let options = options.unwrap_or_default();
let selection_criteria = options
.selection_criteria
.or_else(|| db.selection_criteria().cloned());
let read_concern = options.read_concern.or_else(|| db.read_concern().cloned());
let write_concern = options
.write_concern
.or_else(|| db.write_concern().cloned());
Self {
inner: Arc::new(CollectionInner {
client: db.client().clone(),
db,
name: name.to_string(),
selection_criteria,
read_concern,
write_concern,
}),
_phantom: Default::default(),
}
}
pub fn clone_with_type<U>(&self) -> Collection<U> {
Collection {
inner: self.inner.clone(),
_phantom: Default::default(),
}
}
fn client(&self) -> &Client {
&self.inner.client
}
pub fn name(&self) -> &str {
&self.inner.name
}
pub fn namespace(&self) -> Namespace {
Namespace {
db: self.inner.db.name().into(),
coll: self.name().into(),
}
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.selection_criteria.as_ref()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.write_concern.as_ref()
}
async fn drop_common(
&self,
options: impl Into<Option<DropCollectionOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<()> {
let session = session.into();
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let drop = DropCollection::new(self.namespace(), options);
self.client().execute_operation(drop, session).await
}
pub async fn drop(&self, options: impl Into<Option<DropCollectionOptions>>) -> Result<()> {
self.drop_common(options, None).await
}
pub async fn drop_with_session(
&self,
options: impl Into<Option<DropCollectionOptions>>,
session: &mut ClientSession,
) -> Result<()> {
self.drop_common(options, session).await
}
pub async fn aggregate(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<AggregateOptions>>,
) -> Result<Cursor<Document>> {
let mut options = options.into();
resolve_options!(
self,
options,
[read_concern, write_concern, selection_criteria]
);
let aggregate = Aggregate::new(self.namespace(), pipeline, options);
let client = self.client();
client.execute_cursor_operation(aggregate).await
}
pub async fn aggregate_with_session(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<AggregateOptions>>,
session: &mut ClientSession,
) -> Result<SessionCursor<Document>> {
let mut options = options.into();
resolve_read_concern_with_session!(self, options, Some(&mut *session))?;
resolve_write_concern_with_session!(self, options, Some(&mut *session))?;
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let aggregate = Aggregate::new(self.namespace(), pipeline, options);
let client = self.client();
client
.execute_session_cursor_operation(aggregate, session)
.await
}
pub async fn estimated_document_count(
&self,
options: impl Into<Option<EstimatedDocumentCountOptions>>,
) -> Result<u64> {
let mut options = options.into();
resolve_options!(self, options, [read_concern, selection_criteria]);
let op = Count::new(self.namespace(), options);
self.client().execute_operation(op, None).await
}
async fn count_documents_common(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<CountOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<u64> {
let session = session.into();
let mut options = options.into();
resolve_read_concern_with_session!(self, options, session.as_ref())?;
resolve_selection_criteria_with_session!(self, options, session.as_ref())?;
let op = CountDocuments::new(self.namespace(), filter.into(), options)?;
self.client().execute_operation(op, session).await
}
pub async fn count_documents(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<CountOptions>>,
) -> Result<u64> {
self.count_documents_common(filter, options, None).await
}
pub async fn count_documents_with_session(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<CountOptions>>,
session: &mut ClientSession,
) -> Result<u64> {
self.count_documents_common(filter, options, session).await
}
async fn delete_many_common(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<DeleteResult> {
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let delete = Delete::new(self.namespace(), query, None, options);
self.client().execute_operation(delete, session).await
}
async fn create_indexes_common(
&self,
indexes: impl IntoIterator<Item = IndexModel>,
options: impl Into<Option<CreateIndexOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<CreateIndexesResult> {
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let indexes: Vec<IndexModel> = indexes.into_iter().collect();
let create_indexes = CreateIndexes::new(self.namespace(), indexes, options);
self.client()
.execute_operation(create_indexes, session)
.await
}
async fn create_index_common(
&self,
index: IndexModel,
options: impl Into<Option<CreateIndexOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<CreateIndexResult> {
let response = self
.create_indexes_common(vec![index], options, session)
.await?;
Ok(response.into_create_index_result())
}
pub async fn create_index(
&self,
index: IndexModel,
options: impl Into<Option<CreateIndexOptions>>,
) -> Result<CreateIndexResult> {
self.create_index_common(index, options, None).await
}
pub async fn create_index_with_session(
&self,
index: IndexModel,
options: impl Into<Option<CreateIndexOptions>>,
session: &mut ClientSession,
) -> Result<CreateIndexResult> {
self.create_index_common(index, options, session).await
}
pub async fn create_indexes(
&self,
indexes: impl IntoIterator<Item = IndexModel>,
options: impl Into<Option<CreateIndexOptions>>,
) -> Result<CreateIndexesResult> {
self.create_indexes_common(indexes, options, None).await
}
pub async fn create_indexes_with_session(
&self,
indexes: impl IntoIterator<Item = IndexModel>,
options: impl Into<Option<CreateIndexOptions>>,
session: &mut ClientSession,
) -> Result<CreateIndexesResult> {
self.create_indexes_common(indexes, options, session).await
}
pub async fn delete_many(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
) -> Result<DeleteResult> {
self.delete_many_common(query, options, None).await
}
pub async fn delete_many_with_session(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
session: &mut ClientSession,
) -> Result<DeleteResult> {
self.delete_many_common(query, options, session).await
}
async fn delete_one_common(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<DeleteResult> {
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let delete = Delete::new(self.namespace(), query, Some(1), options);
self.client().execute_operation(delete, session).await
}
pub async fn delete_one(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
) -> Result<DeleteResult> {
self.delete_one_common(query, options, None).await
}
pub async fn delete_one_with_session(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
session: &mut ClientSession,
) -> Result<DeleteResult> {
self.delete_one_common(query, options, session).await
}
async fn distinct_common(
&self,
field_name: impl AsRef<str>,
filter: impl Into<Option<Document>>,
options: impl Into<Option<DistinctOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<Vec<Bson>> {
let session = session.into();
let mut options = options.into();
resolve_read_concern_with_session!(self, options, session.as_ref())?;
resolve_selection_criteria_with_session!(self, options, session.as_ref())?;
let op = Distinct::new(
self.namespace(),
field_name.as_ref().to_string(),
filter.into(),
options,
);
self.client().execute_operation(op, session).await
}
pub async fn distinct(
&self,
field_name: impl AsRef<str>,
filter: impl Into<Option<Document>>,
options: impl Into<Option<DistinctOptions>>,
) -> Result<Vec<Bson>> {
self.distinct_common(field_name, filter, options, None)
.await
}
pub async fn distinct_with_session(
&self,
field_name: impl AsRef<str>,
filter: impl Into<Option<Document>>,
options: impl Into<Option<DistinctOptions>>,
session: &mut ClientSession,
) -> Result<Vec<Bson>> {
self.distinct_common(field_name, filter, options, session)
.await
}
async fn drop_indexes_common(
&self,
name: impl Into<Option<&str>>,
options: impl Into<Option<DropIndexOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<()> {
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let index_name = name.into().unwrap_or("*").to_string();
let drop_index = DropIndexes::new(self.namespace(), index_name, options);
self.client().execute_operation(drop_index, session).await
}
async fn drop_index_common(
&self,
name: impl AsRef<str>,
options: impl Into<Option<DropIndexOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<()> {
let name = name.as_ref();
if name == "*" {
return Err(ErrorKind::InvalidArgument {
message: "Cannot pass name \"*\" to drop_index since more than one index would be \
dropped."
.to_string(),
}
.into());
}
self.drop_indexes_common(name, options, session).await
}
pub async fn drop_index(
&self,
name: impl AsRef<str>,
options: impl Into<Option<DropIndexOptions>>,
) -> Result<()> {
self.drop_index_common(name, options, None).await
}
pub async fn drop_index_with_session(
&self,
name: impl AsRef<str>,
options: impl Into<Option<DropIndexOptions>>,
session: &mut ClientSession,
) -> Result<()> {
self.drop_index_common(name, options, session).await
}
pub async fn drop_indexes(&self, options: impl Into<Option<DropIndexOptions>>) -> Result<()> {
self.drop_indexes_common(None, options, None).await
}
pub async fn drop_indexes_with_session(
&self,
options: impl Into<Option<DropIndexOptions>>,
session: &mut ClientSession,
) -> Result<()> {
self.drop_indexes_common(None, options, session).await
}
pub async fn list_indexes(
&self,
options: impl Into<Option<ListIndexesOptions>>,
) -> Result<Cursor<IndexModel>> {
let list_indexes = ListIndexes::new(self.namespace(), options.into());
let client = self.client();
client.execute_cursor_operation(list_indexes).await
}
pub async fn list_indexes_with_session(
&self,
options: impl Into<Option<ListIndexesOptions>>,
session: &mut ClientSession,
) -> Result<SessionCursor<IndexModel>> {
let list_indexes = ListIndexes::new(self.namespace(), options.into());
let client = self.client();
client
.execute_session_cursor_operation(list_indexes, session)
.await
}
async fn list_index_names_common(
&self,
cursor: impl TryStreamExt<Ok = IndexModel, Error = Error>,
) -> Result<Vec<String>> {
cursor
.try_filter_map(|index| future::ok(index.get_name()))
.try_collect()
.await
}
pub async fn list_index_names(&self) -> Result<Vec<String>> {
let cursor = self.list_indexes(None).await?;
self.list_index_names_common(cursor).await
}
pub async fn list_index_names_with_session(
&self,
session: &mut ClientSession,
) -> Result<Vec<String>> {
let mut cursor = self.list_indexes_with_session(None, session).await?;
self.list_index_names_common(cursor.stream(session)).await
}
async fn update_many_common(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<UpdateResult> {
let update = update.into();
if let UpdateModifications::Document(ref d) = update {
bson_util::update_document_check(d)?;
}
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let update = Update::new(self.namespace(), query, update, true, options);
self.client().execute_operation(update, session).await
}
pub async fn update_many(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
) -> Result<UpdateResult> {
self.update_many_common(query, update, options, None).await
}
pub async fn update_many_with_session(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
session: &mut ClientSession,
) -> Result<UpdateResult> {
self.update_many_common(query, update, options, session)
.await
}
async fn update_one_common(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<UpdateResult> {
let update = update.into();
if let UpdateModifications::Document(ref d) = update {
bson_util::update_document_check(d)?;
}
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let update = Update::new(self.namespace(), query, update, false, options);
self.client().execute_operation(update, session).await
}
pub async fn update_one(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
) -> Result<UpdateResult> {
self.update_one_common(query, update, options, None).await
}
pub async fn update_one_with_session(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
session: &mut ClientSession,
) -> Result<UpdateResult> {
self.update_one_common(query, update, options, session)
.await
}
pub(super) async fn kill_cursor(
&self,
cursor_id: i64,
pinned_connection: Option<&PinnedConnectionHandle>,
drop_address: Option<ServerAddress>,
) -> Result<()> {
let ns = self.namespace();
self.client()
.database(ns.db.as_str())
.run_command_common(
doc! {
"killCursors": ns.coll.as_str(),
"cursors": [cursor_id]
},
drop_address.map(SelectionCriteria::from_address),
None,
pinned_connection,
)
.await?;
Ok(())
}
pub async fn watch(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<ChangeStreamOptions>>,
) -> Result<ChangeStream<ChangeStreamEvent<T>>>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
let mut options = options.into();
resolve_options!(self, options, [read_concern, selection_criteria]);
let target = self.namespace().into();
self.client()
.execute_watch(pipeline, options, target, None)
.await
}
pub async fn watch_with_session(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<ChangeStreamOptions>>,
session: &mut ClientSession,
) -> Result<SessionChangeStream<ChangeStreamEvent<T>>>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
let mut options = options.into();
resolve_read_concern_with_session!(self, options, Some(&mut *session))?;
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let target = self.namespace().into();
self.client()
.execute_watch_with_session(pipeline, options, target, None, session)
.await
}
pub async fn find(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<FindOptions>>,
) -> Result<Cursor<T>> {
let mut options = options.into();
resolve_options!(self, options, [read_concern, selection_criteria]);
let find = Find::new(self.namespace(), filter.into(), options);
let client = self.client();
client.execute_cursor_operation(find).await
}
pub async fn find_with_session(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<FindOptions>>,
session: &mut ClientSession,
) -> Result<SessionCursor<T>> {
let mut options = options.into();
resolve_read_concern_with_session!(self, options, Some(&mut *session))?;
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let find = Find::new(self.namespace(), filter.into(), options);
let client = self.client();
client.execute_session_cursor_operation(find, session).await
}
}
impl<T> Collection<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub async fn find_one(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<FindOneOptions>>,
) -> Result<Option<T>> {
let mut options = options.into();
resolve_options!(self, options, [read_concern, selection_criteria]);
let options: FindOptions = options.map(Into::into).unwrap_or_else(Default::default);
let mut cursor = self.find(filter, Some(options)).await?;
cursor.next().await.transpose()
}
pub async fn find_one_with_session(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<FindOneOptions>>,
session: &mut ClientSession,
) -> Result<Option<T>> {
let mut options = options.into();
resolve_read_concern_with_session!(self, options, Some(&mut *session))?;
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let options: FindOptions = options.map(Into::into).unwrap_or_else(Default::default);
let mut cursor = self
.find_with_session(filter, Some(options), session)
.await?;
let mut cursor = cursor.stream(session);
cursor.next().await.transpose()
}
}
impl<T> Collection<T>
where
T: DeserializeOwned,
{
async fn find_one_and_delete_common(
&self,
filter: Document,
options: impl Into<Option<FindOneAndDeleteOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<Option<T>> {
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let op = FindAndModify::<T>::with_delete(self.namespace(), filter, options);
self.client().execute_operation(op, session).await
}
pub async fn find_one_and_delete(
&self,
filter: Document,
options: impl Into<Option<FindOneAndDeleteOptions>>,
) -> Result<Option<T>> {
self.find_one_and_delete_common(filter, options, None).await
}
pub async fn find_one_and_delete_with_session(
&self,
filter: Document,
options: impl Into<Option<FindOneAndDeleteOptions>>,
session: &mut ClientSession,
) -> Result<Option<T>> {
self.find_one_and_delete_common(filter, options, session)
.await
}
async fn find_one_and_update_common(
&self,
filter: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<FindOneAndUpdateOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<Option<T>> {
let update = update.into();
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let op = FindAndModify::<T>::with_update(self.namespace(), filter, update, options)?;
self.client().execute_operation(op, session).await
}
pub async fn find_one_and_update(
&self,
filter: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<FindOneAndUpdateOptions>>,
) -> Result<Option<T>> {
self.find_one_and_update_common(filter, update, options, None)
.await
}
pub async fn find_one_and_update_with_session(
&self,
filter: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<FindOneAndUpdateOptions>>,
session: &mut ClientSession,
) -> Result<Option<T>> {
self.find_one_and_update_common(filter, update, options, session)
.await
}
}
impl<T> Collection<T>
where
T: Serialize + DeserializeOwned,
{
async fn find_one_and_replace_common(
&self,
filter: Document,
replacement: impl Borrow<T>,
options: impl Into<Option<FindOneAndReplaceOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<Option<T>> {
let replacement = to_document(replacement.borrow())?;
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let op = FindAndModify::<T>::with_replace(self.namespace(), filter, replacement, options)?;
self.client().execute_operation(op, session).await
}
pub async fn find_one_and_replace(
&self,
filter: Document,
replacement: impl Borrow<T>,
options: impl Into<Option<FindOneAndReplaceOptions>>,
) -> Result<Option<T>> {
self.find_one_and_replace_common(filter, replacement, options, None)
.await
}
pub async fn find_one_and_replace_with_session(
&self,
filter: Document,
replacement: impl Borrow<T>,
options: impl Into<Option<FindOneAndReplaceOptions>>,
session: &mut ClientSession,
) -> Result<Option<T>> {
self.find_one_and_replace_common(filter, replacement, options, session)
.await
}
}
impl<T> Collection<T>
where
T: Serialize,
{
#[allow(clippy::needless_option_as_deref)]
async fn insert_many_common(
&self,
docs: impl IntoIterator<Item = impl Borrow<T>>,
options: impl Into<Option<InsertManyOptions>>,
mut session: Option<&mut ClientSession>,
) -> Result<InsertManyResult> {
let ds: Vec<_> = docs.into_iter().collect();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
if ds.is_empty() {
return Err(ErrorKind::InvalidArgument {
message: "No documents provided to insert_many".to_string(),
}
.into());
}
let ordered = options.as_ref().and_then(|o| o.ordered).unwrap_or(true);
let mut cumulative_failure: Option<BulkWriteFailure> = None;
let mut error_labels: HashSet<String> = Default::default();
let mut cumulative_result: Option<InsertManyResult> = None;
let mut n_attempted = 0;
while n_attempted < ds.len() {
let docs: Vec<&T> = ds.iter().skip(n_attempted).map(Borrow::borrow).collect();
let insert = Insert::new(self.namespace(), docs, options.clone());
match self
.client()
.execute_operation(insert, session.as_deref_mut())
.await
{
Ok(result) => {
let current_batch_size = result.inserted_ids.len();
let cumulative_result =
cumulative_result.get_or_insert_with(InsertManyResult::new);
for (index, id) in result.inserted_ids {
cumulative_result
.inserted_ids
.insert(index + n_attempted, id);
}
n_attempted += current_batch_size;
}
Err(e) => {
let labels = e.labels().clone();
match *e.kind {
ErrorKind::BulkWrite(bw) => {
let current_batch_size = bw.inserted_ids.len()
+ bw.write_errors.as_ref().map(|we| we.len()).unwrap_or(0);
let failure_ref =
cumulative_failure.get_or_insert_with(BulkWriteFailure::new);
if let Some(write_errors) = bw.write_errors {
for err in write_errors {
let index = n_attempted + err.index;
failure_ref
.write_errors
.get_or_insert_with(Default::default)
.push(BulkWriteError { index, ..err });
}
}
if let Some(wc_error) = bw.write_concern_error {
failure_ref.write_concern_error = Some(wc_error);
}
error_labels.extend(labels);
if ordered {
if let Some(failure) = cumulative_failure {
return Err(Error::new(
ErrorKind::BulkWrite(failure),
Some(error_labels),
));
}
}
n_attempted += current_batch_size;
}
_ => return Err(e),
}
}
}
}
match cumulative_failure {
Some(failure) => Err(Error::new(
ErrorKind::BulkWrite(failure),
Some(error_labels),
)),
None => Ok(cumulative_result.unwrap_or_else(InsertManyResult::new)),
}
}
pub async fn insert_many(
&self,
docs: impl IntoIterator<Item = impl Borrow<T>>,
options: impl Into<Option<InsertManyOptions>>,
) -> Result<InsertManyResult> {
self.insert_many_common(docs, options, None).await
}
pub async fn insert_many_with_session(
&self,
docs: impl IntoIterator<Item = impl Borrow<T>>,
options: impl Into<Option<InsertManyOptions>>,
session: &mut ClientSession,
) -> Result<InsertManyResult> {
self.insert_many_common(docs, options, Some(session)).await
}
async fn insert_one_common(
&self,
doc: &T,
options: impl Into<Option<InsertOneOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<InsertOneResult> {
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let insert = Insert::new(
self.namespace(),
vec![doc],
options.map(InsertManyOptions::from_insert_one_options),
);
self.client()
.execute_operation(insert, session)
.await
.map(InsertOneResult::from_insert_many_result)
.map_err(convert_bulk_errors)
}
pub async fn insert_one(
&self,
doc: impl Borrow<T>,
options: impl Into<Option<InsertOneOptions>>,
) -> Result<InsertOneResult> {
self.insert_one_common(doc.borrow(), options, None).await
}
pub async fn insert_one_with_session(
&self,
doc: impl Borrow<T>,
options: impl Into<Option<InsertOneOptions>>,
session: &mut ClientSession,
) -> Result<InsertOneResult> {
self.insert_one_common(doc.borrow(), options, session).await
}
async fn replace_one_common(
&self,
query: Document,
replacement: impl Borrow<T>,
options: impl Into<Option<ReplaceOptions>>,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<UpdateResult> {
let replacement = to_document(replacement.borrow())?;
bson_util::replacement_document_check(&replacement)?;
let session = session.into();
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;
let update = Update::new(
self.namespace(),
query,
UpdateModifications::Document(replacement),
false,
options.map(UpdateOptions::from_replace_options),
);
self.client().execute_operation(update, session).await
}
pub async fn replace_one(
&self,
query: Document,
replacement: impl Borrow<T>,
options: impl Into<Option<ReplaceOptions>>,
) -> Result<UpdateResult> {
self.replace_one_common(query, replacement, options, None)
.await
}
pub async fn replace_one_with_session(
&self,
query: Document,
replacement: impl Borrow<T>,
options: impl Into<Option<ReplaceOptions>>,
session: &mut ClientSession,
) -> Result<UpdateResult> {
self.replace_one_common(query, replacement, options, session)
.await
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Namespace {
pub db: String,
pub coll: String,
}
impl Namespace {
#[cfg(test)]
pub(crate) fn empty() -> Self {
Self {
db: String::new(),
coll: String::new(),
}
}
}
impl fmt::Display for Namespace {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}.{}", self.db, self.coll)
}
}
impl<'de> Deserialize<'de> for Namespace {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let mut parts = s.split('.');
let db = parts.next();
let coll = parts.collect::<Vec<_>>().join(".");
match (db, coll) {
(Some(db), coll) if !coll.is_empty() => Ok(Self {
db: db.to_string(),
coll,
}),
_ => Err(D::Error::custom("Missing one or more fields in namespace")),
}
}
}
impl Serialize for Namespace {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&(self.db.clone() + "." + &self.coll))
}
}