summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_out.h6
-rw-r--r--src/mongo/db/pipeline/document_source_out_in_place.h10
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.cpp56
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.h12
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h6
8 files changed, 64 insertions, 46 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index cefaaefcf4b..9d830162109 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -202,14 +202,16 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
bufferedBytes += insertObj.objsize();
if (!batch.empty() &&
(bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
- spill(batch);
+ spill(std::move(batch));
batch.clear();
bufferedBytes = insertObj.objsize();
}
batch.emplace(std::move(insertObj), std::move(uniqueKey));
}
- if (!batch.empty())
- spill(batch);
+ if (!batch.empty()) {
+ spill(std::move(batch));
+ batch.clear();
+ }
switch (nextInput.getStatus()) {
case GetNextResult::ReturnStatus::kAdvanced: {
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 853d2b3a9e9..68dcaa838ed 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -128,7 +128,7 @@ public:
*
*/
struct BatchedObjects {
- void emplace(BSONObj obj, BSONObj key) {
+ void emplace(BSONObj&& obj, BSONObj&& key) {
objects.emplace_back(std::move(obj));
uniqueKeys.emplace_back(std::move(key));
}
@@ -155,8 +155,8 @@ public:
/**
* Writes the documents in 'batch' to the write namespace.
*/
- virtual void spill(const BatchedObjects& batch) {
- pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), batch.objects);
+ virtual void spill(BatchedObjects&& batch) {
+ pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), std::move(batch.objects));
};
/**
diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h
index fa28ae78cae..345980b5b47 100644
--- a/src/mongo/db/pipeline/document_source_out_in_place.h
+++ b/src/mongo/db/pipeline/document_source_out_in_place.h
@@ -61,13 +61,17 @@ class DocumentSourceOutInPlaceReplace final : public DocumentSourceOutInPlace {
public:
using DocumentSourceOutInPlace::DocumentSourceOutInPlace;
- void spill(const BatchedObjects& batch) final {
+ void spill(BatchedObjects&& batch) final {
// Set upsert to true and multi to false as there should be at most one document to update
// or insert.
constexpr auto upsert = true;
constexpr auto multi = false;
- pExpCtx->mongoProcessInterface->update(
- pExpCtx, getWriteNs(), batch.uniqueKeys, batch.objects, upsert, multi);
+ pExpCtx->mongoProcessInterface->update(pExpCtx,
+ getWriteNs(),
+ std::move(batch.uniqueKeys),
+ std::move(batch.objects),
+ upsert,
+ multi);
}
};
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index f457c184706..890f5565f87 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -102,7 +102,7 @@ public:
*/
virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& objs) = 0;
+ std::vector<BSONObj>&& objs) = 0;
/**
* Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException
@@ -110,8 +110,8 @@ public:
*/
virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi) = 0;
diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp
index 14830712722..719ee010b04 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongod_process_interface.cpp
@@ -71,7 +71,7 @@ namespace {
* Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
*/
Insert buildInsertOp(const NamespaceString& nss,
- const std::vector<BSONObj>& objs,
+ std::vector<BSONObj>&& objs,
bool bypassDocValidation) {
Insert insertOp(nss);
insertOp.setDocuments(std::move(objs));
@@ -90,8 +90,8 @@ Insert buildInsertOp(const NamespaceString& nss,
* Note that 'queries' and 'updates' must be the same length.
*/
Update buildUpdateOp(const NamespaceString& nss,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi,
bool bypassDocValidation) {
@@ -101,8 +101,8 @@ Update buildUpdateOp(const NamespaceString& nss,
for (size_t index = 0; index < queries.size(); ++index) {
updateEntries.push_back([&] {
UpdateOpEntry entry;
- entry.setQ(queries[index]);
- entry.setU(updates[index]);
+ entry.setQ(std::move(queries[index]));
+ entry.setU(std::move(updates[index]));
entry.setUpsert(upsert);
entry.setMulti(multi);
return entry;
@@ -172,9 +172,9 @@ bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString&
void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& objs) {
- auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation);
- auto writeResults = performInserts(expCtx->opCtx, insertOp);
+ std::vector<BSONObj>&& objs) {
+ auto writeResults = performInserts(
+ expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
// Only need to check that the final result passed because the inserts are ordered and the batch
// will stop on the first failure.
@@ -183,13 +183,17 @@ void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expC
void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi) {
- auto updateOp =
- buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation);
- auto writeResults = performUpdates(expCtx->opCtx, updateOp);
+ auto writeResults = performUpdates(expCtx->opCtx,
+ buildUpdateOp(ns,
+ std::move(queries),
+ std::move(updates),
+ upsert,
+ multi,
+ expCtx->bypassDocumentValidation));
// Only need to check that the final result passed because the updates are ordered and the batch
// will stop on the first failure.
@@ -585,28 +589,36 @@ std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollato
void MongoDInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& objs) {
+ std::vector<BSONObj>&& objs) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation);
- ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(insertOp), &stats, &response);
+ ClusterWriter::write(
+ expCtx->opCtx,
+ BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)),
+ &stats,
+ &response);
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
}
void MongoDInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
-
- auto updateOp =
- buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation);
- ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(updateOp), &stats, &response);
+ ClusterWriter::write(expCtx->opCtx,
+ BatchedCommandRequest(buildUpdateOp(ns,
+ std::move(queries),
+ std::move(updates),
+ upsert,
+ multi,
+ expCtx->bypassDocumentValidation)),
+ &stats,
+ &response);
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
}
diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h
index 915a8946118..5667c8a5158 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.h
+++ b/src/mongo/db/pipeline/mongod_process_interface.h
@@ -51,11 +51,11 @@ public:
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& objs);
+ std::vector<BSONObj>&& objs);
virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi);
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
@@ -144,7 +144,7 @@ public:
*/
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& objs) final;
+ std::vector<BSONObj>&& objs) final;
/**
* Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
@@ -152,8 +152,8 @@ public:
*/
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi) final;
};
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index b16f72608bb..a9092028cd5 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -63,14 +63,14 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& objs) final {
+ std::vector<BSONObj>&& objs) final {
MONGO_UNREACHABLE;
}
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 638419b2571..a3df25fc49b 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -59,14 +59,14 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& objs) override {
+ std::vector<BSONObj>&& objs) override {
MONGO_UNREACHABLE;
}
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
bool upsert,
bool multi) final {
MONGO_UNREACHABLE;