summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-10-04 13:39:15 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-10-12 11:58:42 -0400
commit267bbff2d931ab1ddabfbab18da96d018422f7fd (patch)
treec32c8aa3251615c63c885f2cbb0c198785cc9453 /src
parentb04cc1e53b5d4b13d0b1f25303dd6f33b2e730f8 (diff)
downloadmongo-267bbff2d931ab1ddabfbab18da96d018422f7fd.tar.gz
SERVER-37229: Switch to unordered write ops ini $out
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp89
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp131
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h25
3 files changed, 93 insertions, 152 deletions
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index e86b7202d2b..46c1a4cadd6 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -42,15 +42,8 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
-#include "mongo/db/pipeline/document_source_cursor.h"
-#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/session_catalog.h"
-#include "mongo/db/stats/fill_locker_info.h"
-#include "mongo/db/stats/storage_stats.h"
-#include "mongo/db/storage/backup_cursor_hooks.h"
-#include "mongo/db/transaction_participant.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/cluster_write.h"
@@ -66,88 +59,6 @@ using write_ops::Insert;
using write_ops::Update;
using write_ops::UpdateOpEntry;
-namespace {
-
-/**
- * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
- */
-Insert buildInsertOp(const NamespaceString& nss,
- std::vector<BSONObj>&& objs,
- bool bypassDocValidation) {
- Insert insertOp(nss);
- insertOp.setDocuments(std::move(objs));
- insertOp.setWriteCommandBase([&] {
- write_ops::WriteCommandBase wcb;
- wcb.setOrdered(true);
- wcb.setBypassDocumentValidation(bypassDocValidation);
- return wcb;
- }());
- return insertOp;
-}
-
-/**
- * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}.
- *
- * Note that 'queries' and 'updates' must be the same length.
- */
-Update buildUpdateOp(const NamespaceString& nss,
- std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
- bool upsert,
- bool multi,
- bool bypassDocValidation) {
- Update updateOp(nss);
- updateOp.setUpdates([&] {
- std::vector<UpdateOpEntry> updateEntries;
- for (size_t index = 0; index < queries.size(); ++index) {
- updateEntries.push_back([&] {
- UpdateOpEntry entry;
- entry.setQ(std::move(queries[index]));
- entry.setU(std::move(updates[index]));
- entry.setUpsert(upsert);
- entry.setMulti(multi);
- return entry;
- }());
- }
- return updateEntries;
- }());
- updateOp.setWriteCommandBase([&] {
- write_ops::WriteCommandBase wcb;
- wcb.setOrdered(true);
- wcb.setBypassDocumentValidation(bypassDocValidation);
- return wcb;
- }());
- return updateOp;
-}
-
-// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each
-// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of
-// index.
-bool keyPatternNamesExactPaths(const BSONObj& keyPattern,
- const std::set<FieldPath>& uniqueKeyPaths) {
- size_t nFieldsMatched = 0;
- for (auto&& elem : keyPattern) {
- if (!elem.isNumber()) {
- return false;
- }
- if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) {
- return false;
- }
- ++nFieldsMatched;
- }
- return nFieldsMatched == uniqueKeyPaths.size();
-}
-
-bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const IndexCatalogEntry* index,
- const std::set<FieldPath>& uniqueKeyPaths) {
- return (index->descriptor()->unique() && !index->descriptor()->isPartial() &&
- keyPatternNamesExactPaths(index->descriptor()->keyPattern(), uniqueKeyPaths) &&
- CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator()));
-}
-
-} // namespace
-
std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields(
OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index cb1a2a9a803..661ec78b4ca 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -40,8 +40,6 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/ops/write_ops_exec.h"
-#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -51,9 +49,6 @@
#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
#include "mongo/db/transaction_participant.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -68,58 +63,6 @@ using write_ops::UpdateOpEntry;
namespace {
-/**
- * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
- */
-Insert buildInsertOp(const NamespaceString& nss,
- std::vector<BSONObj>&& objs,
- bool bypassDocValidation) {
- Insert insertOp(nss);
- insertOp.setDocuments(std::move(objs));
- insertOp.setWriteCommandBase([&] {
- write_ops::WriteCommandBase wcb;
- wcb.setOrdered(true);
- wcb.setBypassDocumentValidation(bypassDocValidation);
- return wcb;
- }());
- return insertOp;
-}
-
-/**
- * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}.
- *
- * Note that 'queries' and 'updates' must be the same length.
- */
-Update buildUpdateOp(const NamespaceString& nss,
- std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
- bool upsert,
- bool multi,
- bool bypassDocValidation) {
- Update updateOp(nss);
- updateOp.setUpdates([&] {
- std::vector<UpdateOpEntry> updateEntries;
- for (size_t index = 0; index < queries.size(); ++index) {
- updateEntries.push_back([&] {
- UpdateOpEntry entry;
- entry.setQ(std::move(queries[index]));
- entry.setU(std::move(updates[index]));
- entry.setUpsert(upsert);
- entry.setMulti(multi);
- return entry;
- }());
- }
- return updateEntries;
- }());
- updateOp.setWriteCommandBase([&] {
- write_ops::WriteCommandBase wcb;
- wcb.setOrdered(true);
- wcb.setBypassDocumentValidation(bypassDocValidation);
- return wcb;
- }());
- return updateOp;
-}
-
// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each
// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of
// index.
@@ -164,15 +107,69 @@ bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const Namespac
return css->getMetadata(opCtx)->isSharded();
}
+Insert MongoInterfaceStandalone::buildInsertOp(const NamespaceString& nss,
+ std::vector<BSONObj>&& objs,
+ bool bypassDocValidation) {
+ Insert insertOp(nss);
+ insertOp.setDocuments(std::move(objs));
+ insertOp.setWriteCommandBase([&] {
+ write_ops::WriteCommandBase wcb;
+ wcb.setOrdered(false);
+ // wcb.setOrdered(true);
+ wcb.setBypassDocumentValidation(bypassDocValidation);
+ return wcb;
+ }());
+ return insertOp;
+}
+
+Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
+ bool upsert,
+ bool multi,
+ bool bypassDocValidation) {
+ Update updateOp(nss);
+ updateOp.setUpdates([&] {
+ std::vector<UpdateOpEntry> updateEntries;
+ for (size_t index = 0; index < queries.size(); ++index) {
+ updateEntries.push_back([&] {
+ UpdateOpEntry entry;
+ entry.setQ(std::move(queries[index]));
+ entry.setU(std::move(updates[index]));
+ entry.setUpsert(upsert);
+ entry.setMulti(multi);
+ return entry;
+ }());
+ }
+ return updateEntries;
+ }());
+ updateOp.setWriteCommandBase([&] {
+ write_ops::WriteCommandBase wcb;
+ wcb.setOrdered(false);
+ // wcb.setOrdered(true);
+ wcb.setBypassDocumentValidation(bypassDocValidation);
+ return wcb;
+ }());
+ return updateOp;
+}
+
void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs) {
auto writeResults = performInserts(
expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
- // Only need to check that the final result passed because the inserts are ordered and the batch
- // will stop on the first failure.
- uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: ");
+ // Need to check each result in the batch since the writes are unordered.
+ uassertStatusOKWithContext(
+ [&writeResults]() {
+ for (const auto& result : writeResults.results) {
+ if (result.getStatus() != Status::OK()) {
+ return result.getStatus();
+ }
+ }
+ return Status::OK();
+ }(),
+ "Insert failed: ");
}
void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -189,9 +186,17 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
multi,
expCtx->bypassDocumentValidation));
- // Only need to check that the final result passed because the updates are ordered and the batch
- // will stop on the first failure.
- uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: ");
+ // Need to check each result in the batch since the writes are unordered.
+ uassertStatusOKWithContext(
+ [&writeResults]() {
+ for (const auto& result : writeResults.results) {
+ if (result.getStatus() != Status::OK()) {
+ return result.getStatus();
+ }
+ }
+ return Status::OK();
+ }(),
+ "Update failed: ");
}
CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx,
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 981ca2b35c3..490c6f6ea3b 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -29,11 +29,16 @@
#pragma once
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/pipeline/pipeline.h"
namespace mongo {
+using write_ops::Insert;
+using write_ops::Update;
+
/**
* Class to provide access to mongod-specific implementations of methods required by some
* document sources.
@@ -113,6 +118,26 @@ protected:
CurrentOpUserMode userMode,
std::vector<BSONObj>* ops) const final;
+ /**
+ * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
+ */
+ Insert buildInsertOp(const NamespaceString& nss,
+ std::vector<BSONObj>&& objs,
+ bool bypassDocValidation);
+
+ /**
+ * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u:
+ * <updates>}.
+ *
+ * Note that 'queries' and 'updates' must be the same length.
+ */
+ Update buildUpdateOp(const NamespaceString& nss,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
+ bool upsert,
+ bool multi,
+ bool bypassDocValidation);
+
private:
/**
* Looks up the collection default collator for the collection given by 'collectionUUID'. A