diff options
20 files changed, 375 insertions, 87 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 2b79b3c5739..e29cb846462 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -33,6 +33,8 @@ selector: - jstests/sharding/collation_targeting_inherited.js - jstests/sharding/geo_near_random1.js - jstests/sharding/geo_near_random2.js + - jstests/sharding/out_to_existing.js + - jstests/sharding/out_to_non_existing.js - jstests/sharding/shard7.js - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_aggregate_mongos.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index 661efbfa354..512e97581b5 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -349,6 +349,8 @@ selector: - jstests/sharding/collation_targeting_inherited.js - jstests/sharding/geo_near_random1.js - jstests/sharding/geo_near_random2.js + - jstests/sharding/out_to_existing.js + - jstests/sharding/out_to_non_existing.js - jstests/sharding/shard7.js - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_aggregate_mongos.js diff --git a/jstests/aggregation/sources/out/batch_writes.js b/jstests/aggregation/sources/out/batch_writes.js index 6bbaac7737a..941b795d27d 100644 --- a/jstests/aggregation/sources/out/batch_writes.js +++ b/jstests/aggregation/sources/out/batch_writes.js @@ -41,14 +41,12 @@ // Test that both batched updates and inserts will successfully write the first document but // fail on the second. We don't guarantee any particular behavior in this case, but this test is // meant to characterize the current behavior. - assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "insertDocuments"}}], 16996); - assert.soon(() => { - return outColl.find().itcount() == 2; - }); - - assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "replaceDocuments"}}], 50904); - assert.soon(() => { - return outColl.find().itcount() == 2; + ["insertDocuments", "replaceDocuments"].forEach(mode => { + assertErrorCode( + coll, [{$out: {to: outColl.getName(), mode: mode}}], ErrorCodes.DuplicateKey); + assert.soon(() => { + return outColl.find().itcount() == 2; + }); }); // Mode "replaceCollection" will drop the contents of the output collection, so there is no diff --git a/jstests/aggregation/sources/out/mode_insert_documents.js b/jstests/aggregation/sources/out/mode_insert_documents.js index b6918424a9c..c15818d8011 100644 --- a/jstests/aggregation/sources/out/mode_insert_documents.js +++ b/jstests/aggregation/sources/out/mode_insert_documents.js @@ -32,7 +32,7 @@ // // Test that $out fails if there's a duplicate key error. // - assertErrorCode(coll, pipeline, 16996); + assertErrorCode(coll, pipeline, ErrorCodes.DuplicateKey); // // Test that $out will preserve the indexes and options of the output collection. @@ -61,6 +61,6 @@ targetColl.drop(); assert.commandWorked(targetColl.createIndex({a: 1}, {unique: true})); - assertErrorCode(coll, pipeline, 16996); + assertErrorCode(coll, pipeline, ErrorCodes.DuplicateKey); }()); diff --git a/jstests/aggregation/sources/out/mode_replace_collection.js b/jstests/aggregation/sources/out/mode_replace_collection.js index e7e770c6d5b..af41753f3cd 100644 --- a/jstests/aggregation/sources/out/mode_replace_collection.js +++ b/jstests/aggregation/sources/out/mode_replace_collection.js @@ -52,7 +52,7 @@ targetColl.drop(); assert.commandWorked(targetColl.createIndex({a: 1}, {unique: true})); - assertErrorCode(coll, pipeline, 16996); + assertErrorCode(coll, pipeline, ErrorCodes.DuplicateKey); // Rerun a similar test, except populate the target collection with a document that conflics // with one out of the pipeline. In this case, there is no unique key violation since the target diff --git a/jstests/aggregation/sources/out/mode_replace_documents.js b/jstests/aggregation/sources/out/mode_replace_documents.js index 8335981a1ff..a147688adda 100644 --- a/jstests/aggregation/sources/out/mode_replace_documents.js +++ b/jstests/aggregation/sources/out/mode_replace_documents.js @@ -68,7 +68,7 @@ assertErrorCode( coll, [{$addFields: {a: 0}}, {$out: {to: outColl.getName(), mode: "replaceDocuments"}}], - 50904); + ErrorCodes.DuplicateKey); // Test that $out fails if the unique key contains an array. coll.drop(); diff --git a/jstests/sharding/out_to_existing.js b/jstests/sharding/out_to_existing.js new file mode 100644 index 00000000000..fe3e17fc9e5 --- /dev/null +++ b/jstests/sharding/out_to_existing.js @@ -0,0 +1,96 @@ +// Tests for $out with an existing target collection. +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + + const st = new ShardingTest({shards: 2, rs: {nodes: 1}}); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + const mongosTargetColl = mongosDB[jsTestName() + "_out"]; + + function testOut(sourceColl, targetColl, shardedSource, shardedTarget) { + jsTestLog("Testing $out from source collection (sharded: " + shardedSource + + ") to target collection (sharded: " + shardedTarget + ")"); + sourceColl.drop(); + targetColl.drop(); + + if (shardedSource) { + st.shardColl(sourceColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName()); + } + + if (shardedTarget) { + st.shardColl(targetColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName()); + } + + for (let i = 0; i < 10; i++) { + assert.commandWorked(sourceColl.insert({_id: i})); + } + + // Test mode "insertDocuments" to an existing collection with no documents. + sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "insertDocuments"}}]); + assert.eq(10, targetColl.find().itcount()); + + // Test mode "insertDocuments" to an existing collection with unique key conflicts. + assertErrorCode(sourceColl, + [{$out: {to: targetColl.getName(), mode: "insertDocuments"}}], + ErrorCodes.DuplicateKey); + + // Test mode "replaceDocuments" to an existing collection with no documents. + targetColl.remove({}); + sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "replaceDocuments"}}]); + assert.eq(10, targetColl.find().itcount()); + + // Test mode "replaceDocuments" to an existing collection with documents that match the + // unique key. Re-run the previous aggregation, expecting it to succeed and overwrite the + // existing documents because the mode is "replaceDocuments". + sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "replaceDocuments"}}]); + assert.eq(10, targetColl.find().itcount()); + + // Replace all documents in the target collection with documents that don't conflict with + // the insert operations. + targetColl.remove({}); + for (let i = 10; i < 20; i++) { + assert.commandWorked(targetColl.insert({_id: i})); + } + + sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "insertDocuments"}}]); + assert.eq(20, targetColl.find().itcount()); + + // Test that mode "replaceCollection" will drop the target collection and replace with the + // contents of the $out. + // TODO SERVER-36123: Mode "replaceCollection" should fail (gracefully) if the target exists + // and is sharded. + if (!shardedTarget) { + sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: "replaceCollection"}}]); + assert.eq(10, targetColl.find().itcount()); + + // Legacy syntax should behave identical to mode "replaceCollection". + sourceColl.aggregate([{$out: targetColl.getName()}]); + assert.eq(10, targetColl.find().itcount()); + } + } + + // + // Test with unsharded source and sharded target collection. + // + testOut(mongosColl, mongosTargetColl, false, true); + + // + // Test with sharded source and sharded target collection. + // + testOut(mongosColl, mongosTargetColl, true, true); + + // + // Test with sharded source and unsharded target collection. + // + testOut(mongosColl, mongosTargetColl, true, false); + + // + // Test with unsharded source and unsharded target collection. + // + testOut(mongosColl, mongosTargetColl, false, false); + + st.stop(); +}()); diff --git a/jstests/sharding/out_to_non_existing.js b/jstests/sharding/out_to_non_existing.js new file mode 100644 index 00000000000..d3aab05e112 --- /dev/null +++ b/jstests/sharding/out_to_non_existing.js @@ -0,0 +1,50 @@ +// Tests for $out with a non-existing target collection. +(function() { + "use strict"; + + const st = new ShardingTest({shards: 2, rs: {nodes: 1}}); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + const mongosTargetColl = mongosDB[jsTestName() + "_out"]; + + function testOut(sourceColl, targetColl, shardedSource) { + jsTestLog("Testing $out to non-existent target collection (source collection sharded : " + + shardedSource + ")."); + sourceColl.drop(); + targetColl.drop(); + + if (shardedSource) { + st.shardColl(sourceColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName()); + } + + for (let i = 0; i < 10; i++) { + assert.commandWorked(sourceColl.insert({_id: i})); + } + + // Test the behavior for each of the $out modes. Since the target collection does not exist, + // the behavior should be identical. + ["insertDocuments", "replaceDocuments", "replaceCollection"].forEach(mode => { + targetColl.drop(); + sourceColl.aggregate([{$out: {to: targetColl.getName(), mode: mode}}]); + assert.eq(10, targetColl.find().itcount()); + }); + + // Test with legacy syntax, which should behave identical to mode "replaceCollection". + targetColl.drop(); + sourceColl.aggregate([{$out: targetColl.getName()}]); + assert.eq(10, targetColl.find().itcount()); + } + + // + // Test with unsharded source collection to a non-existent target collection. + // + testOut(mongosColl, mongosTargetColl, false); + + // + // Test with sharded source collection to a non-existent target collection. + // + testOut(mongosColl, mongosTargetColl, true); + + st.stop(); +}()); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 3fa7c3ffa93..358028ac6b0 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -491,7 +491,7 @@ Status runAggregate(OperationContext* opCtx, new ExpressionContext(opCtx, request, std::move(*collatorToUse), - std::make_shared<MongoDInterface>(opCtx), + MongoDInterface::create(opCtx), uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)), uuid)); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 2b53ed9c181..944a908ac8c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -280,6 +280,7 @@ env.Library( '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/stats/top', + '$BUILD_DIR/mongo/s/sharding_api', 'mongo_process_common', ] ) diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 46eec709cd3..cabdd753cae 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -74,8 +74,9 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceGraphL PrivilegeVector privileges{ Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}; - return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(nss), - std::move(privileges)); + constexpr bool allowedSharded = false; + return std::make_unique<LiteParsedDocumentSourceForeignCollections>( + std::move(nss), std::move(privileges), allowedSharded); } REGISTER_DOCUMENT_SOURCE(graphLookup, diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 5bce4026649..f9597592b69 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -79,6 +79,13 @@ public: return requiredPrivileges; } + /** + * Lookup from a sharded collection is not allowed. + */ + bool allowShardedForeignCollections() const final { + return false; + } + private: const NamespaceString _fromNss; const stdx::unordered_set<NamespaceString> _foreignNssSet; diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 071c9bcb37f..02d2caf3374 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -74,8 +74,9 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceOut::l PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)}; - return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(targetNss), - std::move(privileges)); + constexpr bool allowSharded = true; + return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>( + std::move(targetNss), std::move(privileges), allowSharded); } REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::liteParse, DocumentSourceOut::createFromBson); @@ -96,7 +97,6 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { _initialized = true; } - // Insert all documents into temp collection, batching to perform vectored inserts. BatchedObjects batch; int bufferedBytes = 0; diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index 0efc4c5b623..4d439ad2f29 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -128,6 +128,13 @@ public: } /** + * Returns true if the involved namespaces for this aggregation are allowed to be sharded. + */ + virtual bool allowShardedForeignCollections() const { + return false; + } + + /** * Verifies that this stage is allowed to run with the specified read concern. Throws a * UserException if not compatible. */ @@ -158,17 +165,23 @@ public: }; /** - * Helper class for DocumentSources which which reference one or more foreign collections. + * Helper class for DocumentSources which reference one or more foreign collections. */ class LiteParsedDocumentSourceForeignCollections : public LiteParsedDocumentSource { public: LiteParsedDocumentSourceForeignCollections(NamespaceString foreignNss, - PrivilegeVector privileges) - : _foreignNssSet{std::move(foreignNss)}, _requiredPrivileges(std::move(privileges)) {} + PrivilegeVector privileges, + bool allowSharded) + : _foreignNssSet{std::move(foreignNss)}, + _requiredPrivileges(std::move(privileges)), + _allowSharded(allowSharded) {} LiteParsedDocumentSourceForeignCollections(stdx::unordered_set<NamespaceString> foreignNssSet, - PrivilegeVector privileges) - : _foreignNssSet(std::move(foreignNssSet)), _requiredPrivileges(std::move(privileges)) {} + PrivilegeVector privileges, + bool allowSharded) + : _foreignNssSet(std::move(foreignNssSet)), + _requiredPrivileges(std::move(privileges)), + _allowSharded(allowSharded) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return {_foreignNssSet}; @@ -178,8 +191,13 @@ public: return _requiredPrivileges; } + bool allowShardedForeignCollections() const final { + return _allowSharded; + } + private: stdx::unordered_set<NamespaceString> _foreignNssSet; PrivilegeVector _requiredPrivileges; + bool _allowSharded; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index 9f8325d1e33..303754df4b0 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -119,6 +119,16 @@ public: } /** + * Returns false if at least one of the stages does not allow an involved namespace to be + * sharded. + */ + bool allowShardedForeignCollections() const { + return std::all_of(_stageSpecs.begin(), _stageSpecs.end(), [](auto&& spec) { + return spec->allowShardedForeignCollections(); + }); + } + + /** * Verifies that this pipeline is allowed to run with the specified read concern. This ensures * that each stage is compatible, and throws a UserException if not. */ diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp index e80f65da3df..e54ceb1dc17 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/mongod_process_interface.cpp @@ -49,6 +49,7 @@ #include "mongo/db/stats/storage_stats.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" +#include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/log.h" namespace mongo { @@ -57,7 +58,72 @@ using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; +using write_ops::Insert; using write_ops::Update; +using write_ops::UpdateOpEntry; + +namespace { + +/** + * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. + */ +Insert buildInsertOp(const NamespaceString& nss, + const std::vector<BSONObj>& objs, + bool bypassDocValidation) { + Insert insertOp(nss); + insertOp.setDocuments(std::move(objs)); + insertOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return insertOp; +} + +/** + * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}. + * + * Note that 'queries' and 'updates' must be the same length. + */ +Update buildUpdateOp(const NamespaceString& nss, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi, + bool bypassDocValidation) { + Update updateOp(nss); + updateOp.setUpdates([&] { + std::vector<UpdateOpEntry> updateEntries; + for (size_t index = 0; index < queries.size(); ++index) { + updateEntries.push_back([&] { + UpdateOpEntry entry; + entry.setQ(queries[index]); + entry.setU(updates[index]); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()); + } + return updateEntries; + }()); + updateOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return updateOp; +} + +} // namespace + +// static +std::shared_ptr<MongoProcessInterface> MongoDInterface::create(OperationContext* opCtx) { + return ShardingState::get(opCtx)->enabled() + ? std::make_shared<MongoDInterfaceShardServer>(opCtx) + : std::make_shared<MongoDInterface>(opCtx); +} MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {} @@ -78,14 +144,12 @@ bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, const std::vector<BSONObj>& objs) { - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (expCtx->bypassDocumentValidation) - maybeDisableValidation.emplace(expCtx->opCtx); - - _client.insert(ns.ns(), objs); - uassert(16996, - str::stream() << "Insert failed: " << _client.getLastError(), - _client.getLastError().empty()); + auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation); + auto writeResults = performInserts(expCtx->opCtx, insertOp); + + // Only need to check that the final result passed because the inserts are ordered and the batch + // will stop on the first failure. + uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: "); } void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -94,31 +158,13 @@ void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expC const std::vector<BSONObj>& updates, bool upsert, bool multi) { - BSONObjBuilder updateBob; - updateBob.append(Update::kCommandName, ns.coll()); - updateBob.append(Update::kDbNameFieldName, ns.db()); - updateBob.append(Update::kBypassDocumentValidationFieldName, expCtx->bypassDocumentValidation); - - // Build the array of UpdateOp entries. - invariant(queries.size() == updates.size()); - BSONArrayBuilder updatesArray; - for (size_t index = 0; index < queries.size(); ++index) { - updatesArray.append(BSON("q" << queries[index] << "u" << updates[index] << "multi" << multi - << "upsert" - << upsert)); - } - updateBob.append(Update::kUpdatesFieldName, updatesArray.arr()); - - auto updateObj = updateBob.done(); - auto writeResults = - performUpdates(expCtx->opCtx, Update::parse(IDLParserErrorContext("update"), updateObj)); + auto updateOp = + buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation); + auto writeResults = performUpdates(expCtx->opCtx, updateOp); - // Verify that each of the update results is successful. - for (const auto& result : writeResults.results) { - uassert(50904, - str::stream() << "Update failed: " << result.getStatus(), - result.getStatus() == Status::OK()); - } + // Only need to check that the final result passed because the updates are ordered and the batch + // will stop on the first failure. + uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: "); } CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx, @@ -421,4 +467,32 @@ std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollato return collator ? collator->clone() : nullptr; } +void MongoDInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) { + BatchedCommandResponse response; + BatchWriteExecStats stats; + + auto insertOp = buildInsertOp(ns, objs, expCtx->bypassDocumentValidation); + ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(insertOp), &stats, &response); + // TODO SERVER-35403: Add more context for which shard produced the error. + uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); +} + +void MongoDInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi) { + BatchedCommandResponse response; + BatchWriteExecStats stats; + + auto updateOp = + buildUpdateOp(ns, queries, updates, upsert, multi, expCtx->bypassDocumentValidation); + ClusterWriter::write(expCtx->opCtx, BatchedCommandRequest(updateOp), &stats, &response); + // TODO SERVER-35403: Add more context for which shard produced the error. + uassertStatusOKWithContext(response.toStatus(), "Update failed: "); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h index 2b9ab8d8f19..8d452e54e62 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.h +++ b/src/mongo/db/pipeline/mongod_process_interface.h @@ -38,23 +38,26 @@ namespace mongo { * Class to provide access to mongod-specific implementations of methods required by some * document sources. */ -class MongoDInterface final : public MongoProcessCommon { +class MongoDInterface : public MongoProcessCommon { public: + static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx); + MongoDInterface(OperationContext* opCtx); + virtual ~MongoDInterface() = default; + void setOperationContext(OperationContext* opCtx) final; DBClientBase* directClient() final; bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; - void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& objs) final; - void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& queries, - const std::vector<BSONObj>& updates, - bool upsert, - bool multi) final; - + virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs); + virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi); CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; void appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, @@ -116,4 +119,31 @@ private: std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache; }; +/** + * Specialized version of the MongoDInterface when this node is a shard server. + */ +class MongoDInterfaceShardServer final : public MongoDInterface { +public: + using MongoDInterface::MongoDInterface; + + /** + * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, + * routing, stale config handling, etc. + */ + void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) final; + + /** + * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, + * routing, stale config handling, etc. + */ + void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi) final; +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f3317c81b3b..e7db3deeae7 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -47,6 +47,8 @@ #include "mongo/db/index/index_access_method.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_cursor.h" @@ -75,6 +77,7 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/grid.h" +#include "mongo/s/write_ops/cluster_write.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/time_support.h" @@ -85,6 +88,7 @@ using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; +using write_ops::Insert; namespace { diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 5c3d0464462..0851578ddec 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -49,14 +49,13 @@ struct PlanSummaryStats; class BSONObj; struct DepsTracker; -/* - PipelineD is an extension of the Pipeline class, but with additional - material that references symbols that are not available in mongos, - where the remainder of the Pipeline class also functions. PipelineD - is a friend of Pipeline so that it can have equal access to Pipeline's - members. - - See the friend declaration in Pipeline. +/** + * PipelineD is an extension of the Pipeline class, but with additional material that references + * symbols that are not available in mongos, where the remainder of the Pipeline class also + * functions. PipelineD is a friend of Pipeline so that it can have equal access to Pipeline's + * members. + * + * See the friend declaration in Pipeline. */ class PipelineD { public: @@ -97,11 +96,6 @@ public: const AggregationRequest* aggRequest, Pipeline* pipeline); - /** - * Injects a MongodInterface into stages which require access to mongod-specific functionality. - */ - static void injectMongodInterface(Pipeline* pipeline); - static std::string getPlanSummaryStr(const Pipeline* pipeline); static void getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut); diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 867c6932d21..d06c5426555 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -739,12 +739,12 @@ ShardId pickMergingShard(OperationContext* opCtx, .toString(); } -// "Resolve" involved namespaces and verify that none of them are sharded. We won't try to execute -// anything on a mongos, but we still have to populate this map so that any $lookups, etc. will be -// able to have a resolved view definition. It's okay that this is incorrect, we will repopulate the -// real namespace map on the mongod. Note that this function must be called before forwarding an -// aggregation command on an unsharded collection, in order to validate that none of the involved -// collections are sharded. +// "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the +// pipeline. We won't try to execute anything on a mongos, but we still have to populate this map so +// that any $lookups, etc. will be able to have a resolved view definition. It's okay that this is +// incorrect, we will repopulate the real namespace map on the mongod. Note that this function must +// be called before forwarding an aggregation command on an unsharded collection, in order to verify +// that the involved namespaces are allowed to be sharded. StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( OperationContext* opCtx, const LiteParsedPipeline& litePipe) { @@ -752,8 +752,9 @@ StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( for (auto&& nss : litePipe.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - uassert( - 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm()); + uassert(28769, + str::stream() << nss.ns() << " cannot be sharded", + !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollections()); resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); } return resolvedNamespaces; |