diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-10-04 13:39:15 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-10-12 11:58:42 -0400 |
commit | 267bbff2d931ab1ddabfbab18da96d018422f7fd (patch) | |
tree | c32c8aa3251615c63c885f2cbb0c198785cc9453 /src | |
parent | b04cc1e53b5d4b13d0b1f25303dd6f33b2e730f8 (diff) | |
download | mongo-267bbff2d931ab1ddabfbab18da96d018422f7fd.tar.gz |
SERVER-37229: Switch to unordered write ops ini $out
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/process_interface_shardsvr.cpp | 89 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.cpp | 131 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.h | 25 |
3 files changed, 93 insertions, 152 deletions
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index e86b7202d2b..46c1a4cadd6 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -42,15 +42,8 @@ #include "mongo/db/db_raii.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" -#include "mongo/db/pipeline/document_source_cursor.h" -#include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/session_catalog.h" -#include "mongo/db/stats/fill_locker_info.h" -#include "mongo/db/stats/storage_stats.h" -#include "mongo/db/storage/backup_cursor_hooks.h" -#include "mongo/db/transaction_participant.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/cluster_write.h" @@ -66,88 +59,6 @@ using write_ops::Insert; using write_ops::Update; using write_ops::UpdateOpEntry; -namespace { - -/** - * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. - */ -Insert buildInsertOp(const NamespaceString& nss, - std::vector<BSONObj>&& objs, - bool bypassDocValidation) { - Insert insertOp(nss); - insertOp.setDocuments(std::move(objs)); - insertOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(true); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return insertOp; -} - -/** - * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}. - * - * Note that 'queries' and 'updates' must be the same length. - */ -Update buildUpdateOp(const NamespaceString& nss, - std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, - bool upsert, - bool multi, - bool bypassDocValidation) { - Update updateOp(nss); - updateOp.setUpdates([&] { - std::vector<UpdateOpEntry> updateEntries; - for (size_t index = 0; index < queries.size(); ++index) { - updateEntries.push_back([&] { - UpdateOpEntry entry; - entry.setQ(std::move(queries[index])); - entry.setU(std::move(updates[index])); - entry.setUpsert(upsert); - entry.setMulti(multi); - return entry; - }()); - } - return updateEntries; - }()); - updateOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(true); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return updateOp; -} - -// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each -// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of -// index. -bool keyPatternNamesExactPaths(const BSONObj& keyPattern, - const std::set<FieldPath>& uniqueKeyPaths) { - size_t nFieldsMatched = 0; - for (auto&& elem : keyPattern) { - if (!elem.isNumber()) { - return false; - } - if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) { - return false; - } - ++nFieldsMatched; - } - return nFieldsMatched == uniqueKeyPaths.size(); -} - -bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const IndexCatalogEntry* index, - const std::set<FieldPath>& uniqueKeyPaths) { - return (index->descriptor()->unique() && !index->descriptor()->isPartial() && - keyPatternNamesExactPaths(index->descriptor()->keyPattern(), uniqueKeyPaths) && - CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator())); -} - -} // namespace - std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields( OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index cb1a2a9a803..661ec78b4ca 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -40,8 +40,6 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/ops/write_ops_exec.h" -#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/s/collection_sharding_state.h" @@ -51,9 +49,6 @@ #include "mongo/db/stats/storage_stats.h" #include "mongo/db/storage/backup_cursor_hooks.h" #include "mongo/db/transaction_participant.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" -#include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/log.h" namespace mongo { @@ -68,58 +63,6 @@ using write_ops::UpdateOpEntry; namespace { -/** - * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. - */ -Insert buildInsertOp(const NamespaceString& nss, - std::vector<BSONObj>&& objs, - bool bypassDocValidation) { - Insert insertOp(nss); - insertOp.setDocuments(std::move(objs)); - insertOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(true); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return insertOp; -} - -/** - * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}. - * - * Note that 'queries' and 'updates' must be the same length. - */ -Update buildUpdateOp(const NamespaceString& nss, - std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, - bool upsert, - bool multi, - bool bypassDocValidation) { - Update updateOp(nss); - updateOp.setUpdates([&] { - std::vector<UpdateOpEntry> updateEntries; - for (size_t index = 0; index < queries.size(); ++index) { - updateEntries.push_back([&] { - UpdateOpEntry entry; - entry.setQ(std::move(queries[index])); - entry.setU(std::move(updates[index])); - entry.setUpsert(upsert); - entry.setMulti(multi); - return entry; - }()); - } - return updateEntries; - }()); - updateOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(true); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return updateOp; -} - // Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each // of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of // index. @@ -164,15 +107,69 @@ bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const Namespac return css->getMetadata(opCtx)->isSharded(); } +Insert MongoInterfaceStandalone::buildInsertOp(const NamespaceString& nss, + std::vector<BSONObj>&& objs, + bool bypassDocValidation) { + Insert insertOp(nss); + insertOp.setDocuments(std::move(objs)); + insertOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(false); + // wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return insertOp; +} + +Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, + bool upsert, + bool multi, + bool bypassDocValidation) { + Update updateOp(nss); + updateOp.setUpdates([&] { + std::vector<UpdateOpEntry> updateEntries; + for (size_t index = 0; index < queries.size(); ++index) { + updateEntries.push_back([&] { + UpdateOpEntry entry; + entry.setQ(std::move(queries[index])); + entry.setU(std::move(updates[index])); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()); + } + return updateEntries; + }()); + updateOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(false); + // wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return updateOp; +} + void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs) { auto writeResults = performInserts( expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); - // Only need to check that the final result passed because the inserts are ordered and the batch - // will stop on the first failure. - uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: "); + // Need to check each result in the batch since the writes are unordered. + uassertStatusOKWithContext( + [&writeResults]() { + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + } + return Status::OK(); + }(), + "Insert failed: "); } void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -189,9 +186,17 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte multi, expCtx->bypassDocumentValidation)); - // Only need to check that the final result passed because the updates are ordered and the batch - // will stop on the first failure. - uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: "); + // Need to check each result in the batch since the writes are unordered. + uassertStatusOKWithContext( + [&writeResults]() { + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + } + return Status::OK(); + }(), + "Update failed: "); } CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx, diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 981ca2b35c3..490c6f6ea3b 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -29,11 +29,16 @@ #pragma once #include "mongo/db/dbdirectclient.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" namespace mongo { +using write_ops::Insert; +using write_ops::Update; + /** * Class to provide access to mongod-specific implementations of methods required by some * document sources. @@ -113,6 +118,26 @@ protected: CurrentOpUserMode userMode, std::vector<BSONObj>* ops) const final; + /** + * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. + */ + Insert buildInsertOp(const NamespaceString& nss, + std::vector<BSONObj>&& objs, + bool bypassDocValidation); + + /** + * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: + * <updates>}. + * + * Note that 'queries' and 'updates' must be the same length. + */ + Update buildUpdateOp(const NamespaceString& nss, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, + bool upsert, + bool multi, + bool bypassDocValidation); + private: /** * Looks up the collection default collator for the collection given by 'collectionUUID'. A |