summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml2
-rw-r--r--jstests/aggregation/sources/out/batch_writes.js14
-rw-r--r--jstests/aggregation/sources/out/mode_insert_documents.js4
-rw-r--r--jstests/aggregation/sources/out/mode_replace_collection.js2
-rw-r--r--jstests/aggregation/sources/out/mode_replace_documents.js2
-rw-r--r--jstests/sharding/out_to_existing.js96
-rw-r--r--jstests/sharding/out_to_non_existing.js50
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h7
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp6
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h28
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h10
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.cpp138
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.h52
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h20
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp17
20 files changed, 375 insertions, 87 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 2b79b3c5739..e29cb846462 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -33,6 +33,8 @@ selector:
- jstests/sharding/collation_targeting_inherited.js
- jstests/sharding/geo_near_random1.js
- jstests/sharding/geo_near_random2.js
+ - jstests/sharding/out_to_existing.js
+ - jstests/sharding/out_to_non_existing.js
- jstests/sharding/shard7.js
- jstests/sharding/shard_collection_existing_zones.js
- jstests/sharding/snapshot_aggregate_mongos.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index 661efbfa354..512e97581b5 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -349,6 +349,8 @@ selector:
- jstests/sharding/collation_targeting_inherited.js
- jstests/sharding/geo_near_random1.js
- jstests/sharding/geo_near_random2.js
+ - jstests/sharding/out_to_existing.js
+ - jstests/sharding/out_to_non_existing.js
- jstests/sharding/shard7.js
- jstests/sharding/shard_collection_existing_zones.js
- jstests/sharding/snapshot_aggregate_mongos.js
diff --git a/jstests/aggregation/sources/out/batch_writes.js b/jstests/aggregation/sources/out/batch_writes.js
index 6bbaac7737a..941b795d27d 100644
--- a/jstests/aggregation/sources/out/batch_writes.js
+++ b/jstests/aggregation/sources/out/batch_writes.js
@@ -41,14 +41,12 @@
// Test that both batched updates and inserts will successfully write the first document but
// fail on the second. We don't guarantee any particular behavior in this case, but this test is
// meant to characterize the current behavior.
- assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "insertDocuments"}}], 16996);
- assert.soon(() => {
- return outColl.find().itcount() == 2;
- });
-
- assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "replaceDocuments"}}], 50904);
- assert.soon(() => {
- return outColl.find().itcount() == 2;
+ ["insertDocuments", "replaceDocuments"].forEach(mode => {
+ assertErrorCode(
+ coll, [{$out: {to: outColl.getName(), mode: mode}}], ErrorCodes.DuplicateKey);
+ assert.soon(() => {
+ return outColl.find().itcount() == 2;
+ });
});
// Mode "replaceCollection" will drop the contents of the output collection, so there is no
diff --git a/jstests/aggregation/sources/out/mode_insert_documents.js b/jstests/aggregation/sources/out/mode_insert_documents.js
index b6918424a9c..c15818d8011 100644
--- a/jstests/aggregation/sources/out/mode_insert_documents.js
+++ b/jstests/aggregation/sources/out/mode_insert_documents.js
@@ -32,7 +32,7 @@
//
// Test that $out fails if there's a duplicate key error.
//
- assertErrorCode(coll, pipeline, 16996);
+ assertErrorCode(coll, pipeline, ErrorCodes.DuplicateKey);
//
// Test that $out will preserve the indexes and options of the output collection.
@@ -61,6 +61,6 @@
targetColl.drop();
assert.commandWorked(targetColl.createIndex({a: 1}, {unique: true}));
- assertErrorCode(coll, pipeline, 16996);
+ assertErrorCode(coll, pipeline, ErrorCodes.DuplicateKey);
}());
diff --git a/jstests/aggregation/sources/out/mode_replace_collection.js b/jstests/aggregation/sources/out/mode_replace_collection.js
index e7e770c6d5b..af41753f3cd 100644
--- a/jstests/aggregation/sources/out/mode_replace_collection.js
+++ b/jstests/aggregation/sources/out/mode_replace_collection.js
@@ -52,7 +52,7 @@
targetColl.drop();
assert.commandWorked(targetColl.createIndex({a: 1}, {unique: true}));
- assertErrorCode(coll, pipeline, 16996);
+ assertErrorCode(coll, pipeline, ErrorCodes.DuplicateKey);
// Rerun a similar test, except populate the target collection with a document that conflics
// with one out of the pipeline. In this case, there is no unique key violation since the target
diff --git a/jstests/aggregation/sources/out/mode_replace_documents.js b/jstests/aggregation/sources/out/mode_replace_documents.js
index 8335981a1ff..a147688adda 100644
--- a/jstests/aggregation/sources/out/mode_replace_documents.js
+++ b/jstests/aggregation/sources/out/mode_replace_documents.js
@@ -68,7 +68,7 @@
assertErrorCode(
coll,
[{$addFields: {a: 0}}, {$out: {to: outColl.getName(), mode: "replaceDocuments"}}],
- 50904);
+ ErrorCodes.DuplicateKey);
// Test that $out fails if the unique key contains an array.
coll.drop();
diff --git a/jstests/sharding/out_to_existing.js b/jstests/sharding/out_to_existing.js
new file mode 100644
index 00000000000..fe3e17fc9e5
--- /dev/null
+++ b/jstests/sharding/out_to_existing.js
@@ -0,0 +1,96 @@
+// Tests for $out with an existing target collection.
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+ const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+ const mongosTargetColl = mongosDB[jsTestName() + "_out"];
+
+ function testOut(sourceColl, targetColl, shardedSource, shardedTarget) {
+ jsTestLog("Testing $out from source collection (sharded: " + shardedSource +
+ ") to target collection (sharded: " + shardedTarget + ")");
+ sourceColl.drop();
+ targetColl.drop();
+
+ if (shardedSource) {
+ st.shardColl(sourceColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
+ }
+
+ if (shardedTarget) {
+ st.shardColl(targetColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
+ }
+
+ for (let i = 0; i < 10; i++) {
+ assert.commandWorked(sourceColl.insert({_id: i}));
+ }
+
+ // Test mode "insertDocuments" to an existing collection with no documents.
+ sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "insertDocuments"}}]);
+ assert.eq(10, targetColl.find().itcount());
+
+ // Test mode "insertDocuments" to an existing collection with unique key conflicts.
+ assertErrorCode(sourceColl,
+ [{$out: {to: targetColl.getName(), mode: "insertDocuments"}}],
+ ErrorCodes.DuplicateKey);
+
+ // Test mode "replaceDocuments" to an existing collection with no documents.
+ targetColl.remove({});
+ sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "replaceDocuments"}}]);
+ assert.eq(10, targetColl.find().itcount());
+
+ // Test mode "replaceDocuments" to an existing collection with documents that match the
+ // unique key. Re-run the previous aggregation, expecting it to succeed and overwrite the
+ // existing documents because the mode is "replaceDocuments".
+ sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "replaceDocuments"}}]);
+ assert.eq(10, targetColl.find().itcount());
+
+ // Replace all documents in the target collection with documents that don't conflict with
+ // the insert operations.
+ targetColl.remove({});
+ for (let i = 10; i < 20; i++) {
+ assert.commandWorked(targetColl.insert({_id: i}));
+ }
+
+ sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "insertDocuments"}}]);
+ assert.eq(20, targetColl.find().itcount());
+
+ // Test that mode "replaceCollection" will drop the target collection and replace with the
+ // contents of the $out.
+ // TODO SERVER-36123: Mode "replaceCollection" should fail (gracefully) if the target exists
+ // and is sharded.
+ if (!shardedTarget) {
+ sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "replaceCollection"}}]);
+ assert.eq(10, targetColl.find().itcount());
+
+ // Legacy syntax should behave identical to mode "replaceCollection".
+ sourceColl.aggregate([{$out: targetColl.getName()}]);
+ assert.eq(10, targetColl.find().itcount());
+ }
+ }
+
+ //
+ // Test with unsharded source and sharded target collection.
+ //
+ testOut(mongosColl, mongosTargetColl, false, true);
+
+ //
+ // Test with sharded source and sharded target collection.
+ //
+ testOut(mongosColl, mongosTargetColl, true, true);
+
+ //
+ // Test with sharded source and unsharded target collection.
+ //
+ testOut(mongosColl, mongosTargetColl, true, false);
+
+ //
+ // Test with unsharded source and unsharded target collection.
+ //
+ testOut(mongosColl, mongosTargetColl, false, false);
+
+ st.stop();
+}());
diff --git a/jstests/sharding/out_to_non_existing.js b/jstests/sharding/out_to_non_existing.js
new file mode 100644
index 00000000000..d3aab05e112
--- /dev/null
+++ b/jstests/sharding/out_to_non_existing.js
@@ -0,0 +1,50 @@
+// Tests for $out with a non-existing target collection.
+(function() {
+ "use strict";
+
+ const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+ const mongosTargetColl = mongosDB[jsTestName() + "_out"];
+
+ function testOut(sourceColl, targetColl, shardedSource) {
+ jsTestLog("Testing $out to non-existent target collection (source collection sharded : " +
+ shardedSource + ").");
+ sourceColl.drop();
+ targetColl.drop();
+
+ if (shardedSource) {
+ st.shardColl(sourceColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
+ }
+
+ for (let i = 0; i < 10; i++) {
+ assert.commandWorked(sourceColl.insert({_id: i}));
+ }
+
+ // Test the behavior for each of the $out modes. Since the target collection does not exist,
+ // the behavior should be identical.
+ ["insertDocuments", "replaceDocuments", "replaceCollection"].forEach(mode => {
+ targetColl.drop();
+ sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: mode}}]);
+ assert.eq(10, targetColl.find().itcount());
+ });
+
+ // Test with legacy syntax, which should behave identical to mode "replaceCollection".
+ targetColl.drop();
+ sourceColl.aggregate([{$out: targetColl.getName()}]);
+ assert.eq(10, targetColl.find().itcount());
+ }
+
+ //
+ // Test with unsharded source collection to a non-existent target collection.
+ //
+ testOut(mongosColl, mongosTargetColl, false);
+
+ //
+ // Test with sharded source collection to a non-existent target collection.
+ //
+ testOut(mongosColl, mongosTargetColl, true);
+
+ st.stop();
+}());
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 3fa7c3ffa93..358028ac6b0 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -491,7 +491,7 @@ Status runAggregate(OperationContext* opCtx,
new ExpressionContext(opCtx,
request,
std::move(*collatorToUse),
- std::make_shared<MongoDInterface>(opCtx),
+ MongoDInterface::create(opCtx),
uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)),
uuid));
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 2b53ed9c181..944a908ac8c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -280,6 +280,7 @@ env.Library(
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/stats/top',
+ '$BUILD_DIR/mongo/s/sharding_api',
'mongo_process_common',
]
)
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 46eec709cd3..cabdd753cae 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -74,8 +74,9 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceGraphL
PrivilegeVector privileges{
Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)};
- return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(nss),
- std::move(privileges));
+ constexpr bool allowedSharded = false;
+ return std::make_unique<LiteParsedDocumentSourceForeignCollections>(
+ std::move(nss), std::move(privileges), allowedSharded);
}
REGISTER_DOCUMENT_SOURCE(graphLookup,
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 5bce4026649..f9597592b69 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -79,6 +79,13 @@ public:
return requiredPrivileges;
}
+ /**
+ * Lookup from a sharded collection is not allowed.
+ */
+ bool allowShardedForeignCollections() const final {
+ return false;
+ }
+
private:
const NamespaceString _fromNss;
const stdx::unordered_set<NamespaceString> _foreignNssSet;
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 071c9bcb37f..02d2caf3374 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -74,8 +74,9 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceOut::l
PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)};
- return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(targetNss),
- std::move(privileges));
+ constexpr bool allowSharded = true;
+ return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(
+ std::move(targetNss), std::move(privileges), allowSharded);
}
REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::liteParse, DocumentSourceOut::createFromBson);
@@ -96,7 +97,6 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
_initialized = true;
}
- // Insert all documents into temp collection, batching to perform vectored inserts.
BatchedObjects batch;
int bufferedBytes = 0;
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index 0efc4c5b623..4d439ad2f29 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -128,6 +128,13 @@ public:
}
/**
+ * Returns true if the involved namespaces for this aggregation are allowed to be sharded.
+ */
+ virtual bool allowShardedForeignCollections() const {
+ return false;
+ }
+
+ /**
* Verifies that this stage is allowed to run with the specified read concern. Throws a
* UserException if not compatible.
*/
@@ -158,17 +165,23 @@ public:
};
/**
- * Helper class for DocumentSources which which reference one or more foreign collections.
+ * Helper class for DocumentSources which reference one or more foreign collections.
*/
class LiteParsedDocumentSourceForeignCollections : public LiteParsedDocumentSource {
public:
LiteParsedDocumentSourceForeignCollections(NamespaceString foreignNss,
- PrivilegeVector privileges)
- : _foreignNssSet{std::move(foreignNss)}, _requiredPrivileges(std::move(privileges)) {}
+ PrivilegeVector privileges,
+ bool allowSharded)
+ : _foreignNssSet{std::move(foreignNss)},
+ _requiredPrivileges(std::move(privileges)),
+ _allowSharded(allowSharded) {}
LiteParsedDocumentSourceForeignCollections(stdx::unordered_set<NamespaceString> foreignNssSet,
- PrivilegeVector privileges)
- : _foreignNssSet(std::move(foreignNssSet)), _requiredPrivileges(std::move(privileges)) {}
+ PrivilegeVector privileges,
+ bool allowSharded)
+ : _foreignNssSet(std::move(foreignNssSet)),
+ _requiredPrivileges(std::move(privileges)),
+ _allowSharded(allowSharded) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return {_foreignNssSet};
@@ -178,8 +191,13 @@ public:
return _requiredPrivileges;
}
+ bool allowShardedForeignCollections() const final {
+ return _allowSharded;
+ }
+
private:
stdx::unordered_set<NamespaceString> _foreignNssSet;
PrivilegeVector _requiredPrivileges;
+ bool _allowSharded;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index 9f8325d1e33..303754df4b0 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -119,6 +119,16 @@ public:
}
/**
+ * Returns false if at least one of the stages does not allow an involved namespace to be
+ * sharded.
+ */
+ bool allowShardedForeignCollections() const {
+ return std::all_of(_stageSpecs.begin(), _stageSpecs.end(), [](auto&& spec) {
+ return spec->allowShardedForeignCollections();
+ });
+ }
+
+ /**
* Verifies that this pipeline is allowed to run with the specified read concern. This ensures
* that each stage is compatible, and throws a UserException if not.
*/
diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp
index e80f65da3df..e54ceb1dc17 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongod_process_interface.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/stats/storage_stats.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -57,7 +58,72 @@ using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
+using write_ops::Insert;
using write_ops::Update;
+using write_ops::UpdateOpEntry;
+
+namespace {
+
+/**
+ * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
+ */
+Insert buildInsertOp(const NamespaceString& nss,
+ const std::vector<BSONObj>& objs,
+ bool bypassDocValidation) {
+ Insert insertOp(nss);
+ insertOp.setDocuments(std::move(objs));
+ insertOp.setWriteCommandBase([&] {
+ write_ops::WriteCommandBase wcb;
+ wcb.setOrdered(true);
+ wcb.setBypassDocumentValidation(bypassDocValidation);
+ return wcb;
+ }());
+ return insertOp;
+}
+
+/**
+ * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}.
+ *
+ * Note that 'queries' and 'updates' must be the same length.
+ */
+Update buildUpdateOp(const NamespaceString& nss,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi,
+ bool bypassDocValidation) {
+ Update updateOp(nss);
+ updateOp.setUpdates([&] {
+ std::vector<UpdateOpEntry> updateEntries;
+ for (size_t index = 0; index < queries.size(); ++index) {
+ updateEntries.push_back([&] {
+ UpdateOpEntry entry;
+ entry.setQ(queries[index]);
+ entry.setU(updates[index]);
+ entry.setUpsert(upsert);
+ entry.setMulti(multi);
+ return entry;
+ }());
+ }
+ return updateEntries;
+ }());
+ updateOp.setWriteCommandBase([&] {
+ write_ops::WriteCommandBase wcb;
+ wcb.setOrdered(true);
+ wcb.setBypassDocumentValidation(bypassDocValidation);
+ return wcb;
+ }());
+ return updateOp;
+}
+
+} // namespace
+
+// static
+std::shared_ptr<MongoProcessInterface> MongoDInterface::create(OperationContext* opCtx) {
+ return ShardingState::get(opCtx)->enabled()
+ ? std::make_shared<MongoDInterfaceShardServer>(opCtx)
+ : std::make_shared<MongoDInterface>(opCtx);
+}
MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {}
@@ -78,14 +144,12 @@ bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString&
void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
const std::vector<BSONObj>& objs) {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (expCtx->bypassDocumentValidation)
- maybeDisableValidation.emplace(expCtx->opCtx);
-
- _client.insert(ns.ns(), objs);
- uassert(16996,
- str::stream() << "Insert failed: " << _client.getLastError(),
- _client.getLastError().empty());
+ auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation);
+ auto writeResults = performInserts(expCtx->opCtx, insertOp);
+
+ // Only need to check that the final result passed because the inserts are ordered and the batch
+ // will stop on the first failure.
+ uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: ");
}
void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -94,31 +158,13 @@ void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expC
const std::vector<BSONObj>& updates,
bool upsert,
bool multi) {
- BSONObjBuilder updateBob;
- updateBob.append(Update::kCommandName, ns.coll());
- updateBob.append(Update::kDbNameFieldName, ns.db());
- updateBob.append(Update::kBypassDocumentValidationFieldName, expCtx->bypassDocumentValidation);
-
- // Build the array of UpdateOp entries.
- invariant(queries.size() == updates.size());
- BSONArrayBuilder updatesArray;
- for (size_t index = 0; index < queries.size(); ++index) {
- updatesArray.append(BSON("q" << queries[index] << "u" << updates[index] << "multi" << multi
- << "upsert"
- << upsert));
- }
- updateBob.append(Update::kUpdatesFieldName, updatesArray.arr());
-
- auto updateObj = updateBob.done();
- auto writeResults =
- performUpdates(expCtx->opCtx, Update::parse(IDLParserErrorContext("update"), updateObj));
+ auto updateOp =
+ buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation);
+ auto writeResults = performUpdates(expCtx->opCtx, updateOp);
- // Verify that each of the update results is successful.
- for (const auto& result : writeResults.results) {
- uassert(50904,
- str::stream() << "Update failed: " << result.getStatus(),
- result.getStatus() == Status::OK());
- }
+ // Only need to check that the final result passed because the updates are ordered and the batch
+ // will stop on the first failure.
+ uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: ");
}
CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx,
@@ -421,4 +467,32 @@ std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollato
return collator ? collator->clone() : nullptr;
}
+void MongoDInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+
+ auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation);
+ ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(insertOp), &stats, &response);
+ // TODO SERVER-35403: Add more context for which shard produced the error.
+ uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
+}
+
+void MongoDInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi) {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+
+ auto updateOp =
+ buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation);
+ ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(updateOp), &stats, &response);
+ // TODO SERVER-35403: Add more context for which shard produced the error.
+ uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h
index 2b9ab8d8f19..8d452e54e62 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.h
+++ b/src/mongo/db/pipeline/mongod_process_interface.h
@@ -38,23 +38,26 @@ namespace mongo {
* Class to provide access to mongod-specific implementations of methods required by some
* document sources.
*/
-class MongoDInterface final : public MongoProcessCommon {
+class MongoDInterface : public MongoProcessCommon {
public:
+ static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx);
+
MongoDInterface(OperationContext* opCtx);
+ virtual ~MongoDInterface() = default;
+
void setOperationContext(OperationContext* opCtx) final;
DBClientBase* directClient() final;
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
- void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& objs) final;
- void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& queries,
- const std::vector<BSONObj>& updates,
- bool upsert,
- bool multi) final;
-
+ virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs);
+ virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi);
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
void appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
@@ -116,4 +119,31 @@ private:
std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache;
};
+/**
+ * Specialized version of the MongoDInterface when this node is a shard server.
+ */
+class MongoDInterfaceShardServer final : public MongoDInterface {
+public:
+ using MongoDInterface::MongoDInterface;
+
+ /**
+ * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking,
+ * routing, stale config handling, etc.
+ */
+ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) final;
+
+ /**
+ * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
+ * routing, stale config handling, etc.
+ */
+ void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi) final;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f3317c81b3b..e7db3deeae7 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -47,6 +47,8 @@
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_cursor.h"
@@ -75,6 +77,7 @@
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
@@ -85,6 +88,7 @@ using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
+using write_ops::Insert;
namespace {
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 5c3d0464462..0851578ddec 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -49,14 +49,13 @@ struct PlanSummaryStats;
class BSONObj;
struct DepsTracker;
-/*
- PipelineD is an extension of the Pipeline class, but with additional
- material that references symbols that are not available in mongos,
- where the remainder of the Pipeline class also functions. PipelineD
- is a friend of Pipeline so that it can have equal access to Pipeline's
- members.
-
- See the friend declaration in Pipeline.
+/**
+ * PipelineD is an extension of the Pipeline class, but with additional material that references
+ * symbols that are not available in mongos, where the remainder of the Pipeline class also
+ * functions. PipelineD is a friend of Pipeline so that it can have equal access to Pipeline's
+ * members.
+ *
+ * See the friend declaration in Pipeline.
*/
class PipelineD {
public:
@@ -97,11 +96,6 @@ public:
const AggregationRequest* aggRequest,
Pipeline* pipeline);
- /**
- * Injects a MongodInterface into stages which require access to mongod-specific functionality.
- */
- static void injectMongodInterface(Pipeline* pipeline);
-
static std::string getPlanSummaryStr(const Pipeline* pipeline);
static void getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut);
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 867c6932d21..d06c5426555 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -739,12 +739,12 @@ ShardId pickMergingShard(OperationContext* opCtx,
.toString();
}
-// "Resolve" involved namespaces and verify that none of them are sharded. We won't try to execute
-// anything on a mongos, but we still have to populate this map so that any $lookups, etc. will be
-// able to have a resolved view definition. It's okay that this is incorrect, we will repopulate the
-// real namespace map on the mongod. Note that this function must be called before forwarding an
-// aggregation command on an unsharded collection, in order to validate that none of the involved
-// collections are sharded.
+// "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the
+// pipeline. We won't try to execute anything on a mongos, but we still have to populate this map so
+// that any $lookups, etc. will be able to have a resolved view definition. It's okay that this is
+// incorrect, we will repopulate the real namespace map on the mongod. Note that this function must
+// be called before forwarding an aggregation command on an unsharded collection, in order to verify
+// that the involved namespaces are allowed to be sharded.
StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
OperationContext* opCtx, const LiteParsedPipeline& litePipe) {
@@ -752,8 +752,9 @@ StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
for (auto&& nss : litePipe.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
- uassert(
- 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm());
+ uassert(28769,
+ str::stream() << nss.ns() << " cannot be sharded",
+ !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollections());
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
}
return resolvedNamespaces;