diff options
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_out.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_out.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_out_in_place.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_interface.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongod_process_interface.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongod_process_interface.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongos_process_interface.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stub_mongo_process_interface.h | 6 |
8 files changed, 64 insertions, 46 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index cefaaefcf4b..9d830162109 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -202,14 +202,16 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { bufferedBytes += insertObj.objsize(); if (!batch.empty() && (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { - spill(batch); + spill(std::move(batch)); batch.clear(); bufferedBytes = insertObj.objsize(); } batch.emplace(std::move(insertObj), std::move(uniqueKey)); } - if (!batch.empty()) - spill(batch); + if (!batch.empty()) { + spill(std::move(batch)); + batch.clear(); + } switch (nextInput.getStatus()) { case GetNextResult::ReturnStatus::kAdvanced: { diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 853d2b3a9e9..68dcaa838ed 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -128,7 +128,7 @@ public: * */ struct BatchedObjects { - void emplace(BSONObj obj, BSONObj key) { + void emplace(BSONObj&& obj, BSONObj&& key) { objects.emplace_back(std::move(obj)); uniqueKeys.emplace_back(std::move(key)); } @@ -155,8 +155,8 @@ public: /** * Writes the documents in 'batch' to the write namespace. */ - virtual void spill(const BatchedObjects& batch) { - pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), batch.objects); + virtual void spill(BatchedObjects&& batch) { + pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), std::move(batch.objects)); }; /** diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h index fa28ae78cae..345980b5b47 100644 --- a/src/mongo/db/pipeline/document_source_out_in_place.h +++ b/src/mongo/db/pipeline/document_source_out_in_place.h @@ -61,13 +61,17 @@ class DocumentSourceOutInPlaceReplace final : public DocumentSourceOutInPlace { public: using DocumentSourceOutInPlace::DocumentSourceOutInPlace; - void spill(const BatchedObjects& batch) final { + void spill(BatchedObjects&& batch) final { // Set upsert to true and multi to false as there should be at most one document to update // or insert. constexpr auto upsert = true; constexpr auto multi = false; - pExpCtx->mongoProcessInterface->update( - pExpCtx, getWriteNs(), batch.uniqueKeys, batch.objects, upsert, multi); + pExpCtx->mongoProcessInterface->update(pExpCtx, + getWriteNs(), + std::move(batch.uniqueKeys), + std::move(batch.objects), + upsert, + multi); } }; diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index f457c184706..890f5565f87 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -102,7 +102,7 @@ public: */ virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& objs) = 0; + std::vector<BSONObj>&& objs) = 0; /** * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException @@ -110,8 +110,8 @@ public: */ virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi) = 0; diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp index 14830712722..719ee010b04 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/mongod_process_interface.cpp @@ -71,7 +71,7 @@ namespace { * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. */ Insert buildInsertOp(const NamespaceString& nss, - const std::vector<BSONObj>& objs, + std::vector<BSONObj>&& objs, bool bypassDocValidation) { Insert insertOp(nss); insertOp.setDocuments(std::move(objs)); @@ -90,8 +90,8 @@ Insert buildInsertOp(const NamespaceString& nss, * Note that 'queries' and 'updates' must be the same length. */ Update buildUpdateOp(const NamespaceString& nss, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi, bool bypassDocValidation) { @@ -101,8 +101,8 @@ Update buildUpdateOp(const NamespaceString& nss, for (size_t index = 0; index < queries.size(); ++index) { updateEntries.push_back([&] { UpdateOpEntry entry; - entry.setQ(queries[index]); - entry.setU(updates[index]); + entry.setQ(std::move(queries[index])); + entry.setU(std::move(updates[index])); entry.setUpsert(upsert); entry.setMulti(multi); return entry; @@ -172,9 +172,9 @@ bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& objs) { - auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation); - auto writeResults = performInserts(expCtx->opCtx, insertOp); + std::vector<BSONObj>&& objs) { + auto writeResults = performInserts( + expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); // Only need to check that the final result passed because the inserts are ordered and the batch // will stop on the first failure. @@ -183,13 +183,17 @@ void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expC void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi) { - auto updateOp = - buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation); - auto writeResults = performUpdates(expCtx->opCtx, updateOp); + auto writeResults = performUpdates(expCtx->opCtx, + buildUpdateOp(ns, + std::move(queries), + std::move(updates), + upsert, + multi, + expCtx->bypassDocumentValidation)); // Only need to check that the final result passed because the updates are ordered and the batch // will stop on the first failure. @@ -585,28 +589,36 @@ std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollato void MongoDInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& objs) { + std::vector<BSONObj>&& objs) { BatchedCommandResponse response; BatchWriteExecStats stats; - auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation); - ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(insertOp), &stats, &response); + ClusterWriter::write( + expCtx->opCtx, + BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)), + &stats, + &response); // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); } void MongoDInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi) { BatchedCommandResponse response; BatchWriteExecStats stats; - - auto updateOp = - buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation); - ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(updateOp), &stats, &response); + ClusterWriter::write(expCtx->opCtx, + BatchedCommandRequest(buildUpdateOp(ns, + std::move(queries), + std::move(updates), + upsert, + multi, + expCtx->bypassDocumentValidation)), + &stats, + &response); // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Update failed: "); } diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h index 915a8946118..5667c8a5158 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.h +++ b/src/mongo/db/pipeline/mongod_process_interface.h @@ -51,11 +51,11 @@ public: bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& objs); + std::vector<BSONObj>&& objs); virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi); CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; @@ -144,7 +144,7 @@ public: */ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& objs) final; + std::vector<BSONObj>&& objs) final; /** * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, @@ -152,8 +152,8 @@ public: */ void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi) final; }; diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index b16f72608bb..a9092028cd5 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -63,14 +63,14 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& objs) final { + std::vector<BSONObj>&& objs) final { MONGO_UNREACHABLE; } void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi) final { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 638419b2571..a3df25fc49b 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -59,14 +59,14 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& objs) override { + std::vector<BSONObj>&& objs) override { MONGO_UNREACHABLE; } void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, bool upsert, bool multi) final { MONGO_UNREACHABLE; |