diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-10-09 17:57:02 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-10-11 08:31:16 -0400 |
commit | 767e041931e73ae4f7a0047114becf5be803f3ab (patch) | |
tree | 6810e9485838564a5681bb2d8984e7374ec2eb94 | |
parent | 45d35fe3fcefefe1282b8e0dfc8cd76cb247951d (diff) | |
download | mongo-767e041931e73ae4f7a0047114becf5be803f3ab.tar.gz |
SERVER-29609 Enable updateLookup for sharded change streams.
15 files changed, 859 insertions, 244 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 34363ddd337..1299b98d7bc 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -30,6 +30,9 @@ selector: # New feature in v3.6 mongos and mongod. - jstests/sharding/advance_logical_time_with_valid_signature.js - jstests/sharding/after_cluster_time.js + - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js + - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js + - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js - jstests/sharding/change_stream_invalidation.js - jstests/sharding/change_stream_remove_shard.js - jstests/sharding/change_streams.js diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js index a2a1a83664a..3cb35d318bb 100644 --- a/jstests/sharding/change_streams.js +++ b/jstests/sharding/change_streams.js @@ -92,7 +92,6 @@ // Test that using change streams with any stages not allowed to run on mongos results in an // error. - assertErrorCode(mongosColl, [{$changeStream: {fullDocument: "updateLookup"}}], 40470); assertErrorCode( mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); diff --git a/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js b/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js new file mode 100644 index 00000000000..022d5dd6172 --- /dev/null +++ b/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js @@ -0,0 +1,107 @@ +// Tests the behavior of looking up the post image for change streams on collections which are +// sharded with a compound shard key. +(function() { + "use strict"; + + // For supportsMajorityReadConcern(). + load("jstests/multiVersion/libs/causal_consistency_helpers.js"); + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection with a compound shard key: a, b, c. Then split it into two chunks, + // and put one chunk on each shard. + assert.commandWorked(mongosDB.adminCommand( + {shardCollection: mongosColl.getFullName(), key: {a: 1, b: 1, c: 1}})); + + // Split the collection into 2 chunks: + // [{a: MinKey, b: MinKey, c: MinKey}, {a: 1, b: MinKey, c: MinKey}) + // and + // [{a: 1, b: MinKey, c: MinKey}, {a: MaxKey, b: MaxKey, c: MaxKey}). + assert.commandWorked(mongosDB.adminCommand( + {split: mongosColl.getFullName(), middle: {a: 1, b: MinKey, c: MinKey}})); + + // Move the upper chunk to shard 1. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosColl.getFullName(), + find: {a: 1, b: MinKey, c: MinKey}, + to: st.rs1.getURL() + })); + + const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]); + + const nDocs = 6; + const bValues = ["one", "two", "three", "four", "five", "six"]; + + // This shard key function results in 1/3rd of documents on shard0 and 2/3rds on shard1. + function shardKeyFromId(id) { + return {a: id % 3, b: bValues[id], c: id % 2}; + } + + // Do some writes. + for (let id = 0; id < nDocs; ++id) { + const documentKey = Object.merge({_id: id}, shardKeyFromId(id)); + assert.writeOK(mongosColl.insert(documentKey)); + assert.writeOK(mongosColl.update(documentKey, {$set: {updatedCount: 1}})); + } + + for (let id = 0; id < nDocs; ++id) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {_id: id}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id})); + assert.docEq(next.fullDocument, + Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 1})); + } + + // Test that the change stream can still see the updated post image, even if a chunk is + // migrated. + for (let id = 0; id < nDocs; ++id) { + const documentKey = Object.merge({_id: id}, shardKeyFromId(id)); + assert.writeOK(mongosColl.update(documentKey, {$set: {updatedCount: 2}})); + } + + // Move the upper chunk back to shard 0. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosColl.getFullName(), + find: {a: 1, b: MinKey, c: MinKey}, + to: st.rs0.getURL() + })); + + for (let id = 0; id < nDocs; ++id) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id})); + assert.docEq(next.fullDocument, + Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 2})); + } + + st.stop(); +})(); diff --git a/jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js b/jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js new file mode 100644 index 00000000000..d2b130abf33 --- /dev/null +++ b/jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js @@ -0,0 +1,79 @@ +// Tests the behavior of looking up the post image for change streams on collections which are +// sharded with a hashed shard key. +(function() { + "use strict"; + + // For supportsMajorityReadConcern(). + load("jstests/multiVersion/libs/causal_consistency_helpers.js"); + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + enableBalancer: false, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on the field "shardKey", and split it into two chunks. + assert.commandWorked(mongosDB.adminCommand({ + shardCollection: mongosColl.getFullName(), + numInitialChunks: 2, + key: {shardKey: "hashed"} + })); + + // Make sure the negative chunk is on shard 0. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosColl.getFullName(), + bounds: [{shardKey: MinKey}, {shardKey: NumberLong("0")}], + to: st.rs0.getURL() + })); + + // Make sure the positive chunk is on shard 1. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosColl.getFullName(), + bounds: [{shardKey: NumberLong("0")}, {shardKey: MaxKey}], + to: st.rs1.getURL() + })); + + const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]); + + // Write enough documents that we likely have some on each shard. + const nDocs = 1000; + for (let id = 0; id < nDocs; ++id) { + assert.writeOK(mongosColl.insert({_id: id, shardKey: id})); + assert.writeOK(mongosColl.update({shardKey: id}, {$set: {updatedCount: 1}})); + } + + for (let id = 0; id < nDocs; ++id) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + // TODO SERVER-30599 this documentKey should contain the shard key. + assert.eq(next.documentKey, {_id: id}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey, {shardKey: id, _id: id}); + assert.docEq(next.fullDocument, {_id: id, shardKey: id, updatedCount: 1}); + } + + st.stop(); +})(); diff --git a/jstests/sharding/lookup_change_stream_post_image_id_shard_key.js b/jstests/sharding/lookup_change_stream_post_image_id_shard_key.js new file mode 100644 index 00000000000..5cd2a6fa5e0 --- /dev/null +++ b/jstests/sharding/lookup_change_stream_post_image_id_shard_key.js @@ -0,0 +1,94 @@ +// Tests the behavior of looking up the post image for change streams on collections which are +// sharded with a key which is just the "_id" field. +(function() { + "use strict"; + + // For supportsMajorityReadConcern(). + load("jstests/multiVersion/libs/causal_consistency_helpers.js"); + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey) chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1})); + assert.writeOK(mongosColl.insert({_id: 1})); + + const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]); + + // Do some writes. + assert.writeOK(mongosColl.insert({_id: 1000})); + assert.writeOK(mongosColl.insert({_id: -1000})); + assert.writeOK(mongosColl.update({_id: 1000}, {$set: {updatedCount: 1}})); + assert.writeOK(mongosColl.update({_id: -1000}, {$set: {updatedCount: 1}})); + + for (let nextId of[1000, -1000]) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {_id: nextId}); + } + + for (let nextId of[1000, -1000]) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update"); + // Only the "_id" field is present in next.documentKey because the shard key is the _id. + assert.eq(next.documentKey, {_id: nextId}); + assert.docEq(next.fullDocument, {_id: nextId, updatedCount: 1}); + } + + // Test that the change stream can still see the updated post image, even if a chunk is + // migrated. + assert.writeOK(mongosColl.update({_id: 1000}, {$set: {updatedCount: 2}})); + assert.writeOK(mongosColl.update({_id: -1000}, {$set: {updatedCount: 2}})); + + // Split the [0, MaxKey) chunk into 2: [0, 500), [500, MaxKey). + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 500}})); + // Move the [500, MaxKey) chunk back to shard0000. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1000}, to: st.rs0.getURL()})); + + for (let nextId of[1000, -1000]) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey, {_id: nextId}); + assert.docEq(next.fullDocument, {_id: nextId, updatedCount: 2}); + } + + st.stop(); +})(); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 64d01baf450..5e385da0420 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -351,14 +351,12 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // There should only be one close cursor stage. If we're on the shards and producing input // to be merged, do not add a close cursor stage, since the mongos will already have one. stages.push_back(DocumentSourceCloseCursor::create(expCtx)); - } - if (shouldLookupPostImage) { - uassert( - 40470, - str::stream() << "looking up the full document after an update is not yet supported on " - "sharded collections", - !expCtx->inMongos); - stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); + + // There should be only one post-image lookup stage. If we're on the shards and producing + // input to be merged, the lookup is done on the mongos. + if (shouldLookupPostImage) { + stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); + } } return stages; } diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 3720c6213df..90449e1b073 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -106,7 +106,7 @@ void DocumentSourceCursor::loadBatch() { // Furthermore, if we need to return the latest oplog time (in the tailable and // needs-merge case), batching will result in a wrong time. if (shouldWaitForInserts(pExpCtx->opCtx) || - (pExpCtx->isTailable() && pExpCtx->needsMerge) || + (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); @@ -115,7 +115,7 @@ void DocumentSourceCursor::loadBatch() { } // Special case for tailable cursor -- EOF doesn't preclude more results, so keep // the PlanExecutor alive. - if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) { + if (state == PlanExecutor::IS_EOF && pExpCtx->isTailableAwaitData()) { _exec->saveState(); return; } diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 18d38b52fdb..2a4ccc972fb 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -62,9 +62,12 @@ public: } StageConstraints constraints(Pipeline::SplitState pipeState) const final { + invariant(pipeState != Pipeline::SplitState::kSplitForShards); StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, - HostTypeRequirement::kAnyShard, + pipeState == Pipeline::SplitState::kUnsplit + ? HostTypeRequirement::kNone + : HostTypeRequirement::kMongoS, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 0e7488aaa55..5bce214c776 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -113,9 +113,10 @@ public: }; /** - * Convenience call that returns true if the tailableMode indicate a tailable query. + * Convenience call that returns true if the tailableMode indicates a tailable and awaitData + * query. */ - bool isTailable() const { + bool isTailableAwaitData() const { return tailableMode == TailableMode::kTailableAndAwaitData; } diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index ca82498abd5..d571603bdf9 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -59,7 +59,11 @@ #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_router_adapter.h" #include "mongo/s/query/establish_cursors.h" +#include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/query/router_stage_merge.h" +#include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -67,6 +71,100 @@ namespace mongo { namespace { + +/** + * Class to provide access to mongos-specific implementations of methods required by some document + * sources. + */ +class MongosProcessInterface final + : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { +public: + MongosProcessInterface(OperationContext* opCtx) : _opCtx(opCtx) {} + + virtual ~MongosProcessInterface() = default; + + void setOperationContext(OperationContext* opCtx) override { + _opCtx = opCtx; + } + + DBClientBase* directClient() override { + MONGO_UNREACHABLE; + } + + bool isSharded(const NamespaceString& ns) override { + MONGO_UNREACHABLE; + } + + BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override { + MONGO_UNREACHABLE; + } + + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) override { + MONGO_UNREACHABLE; + } + + void appendLatencyStats(const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const override { + MONGO_UNREACHABLE; + } + + Status appendStorageStats(const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const override { + MONGO_UNREACHABLE; + } + + Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const override { + MONGO_UNREACHABLE; + } + + BSONObj getCollectionOptions(const NamespaceString& nss) override { + MONGO_UNREACHABLE; + } + + Status renameIfOptionsAndIndexesHaveNotChanged( + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) override { + MONGO_UNREACHABLE; + } + + /** + * Constructs an executable pipeline targeted to a remote shard. Returns + * ErrorCodes::InternalError if 'rawPipeline' specifies a pipeline that does not target a single + * shard. + */ + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions = {}) override; + + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) override { + MONGO_UNREACHABLE; + } + + std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode truncateMode) const override { + MONGO_UNREACHABLE; + } + + std::string getShardName(OperationContext* opCtx) const override { + MONGO_UNREACHABLE; + } + +private: + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipelineWithOneRemote( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + OperationContext* _opCtx; +}; + // Given a document representing an aggregation command such as // // {aggregate: "myCollection", pipeline: [], ...}, @@ -244,15 +342,15 @@ BSONObj createCommandForMergingShard( return mergeCmd.freeze().toBson(); } -StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { +StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> +establishShardCursorsWithoutRetrying(OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + CachedCollectionRoutingInfo* routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; std::set<ShardId> shardIds = @@ -305,6 +403,162 @@ StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardC return swCursors; } +struct EstablishShardCursorsResults { + // True if this pipeline was split, and the second half of the pipeline needs to be run on the + // primary shard for the database. + bool needsPrimaryShardMerge; + + // Populated if this *is not* an explain, this vector represents the cursors on the remote + // shards. + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + + // Populated if this *is* an explain, this vector represents the results from each shard. + std::vector<AsyncRequestsSender::Response> remoteExplainOutput; + + // The half of the pipeline that was sent to each shard, or the entire pipeline if there was + // only one shard targeted. + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards; + + // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; +}; + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ +StatusWith<EstablishShardCursorsResults> establishShardCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + BSONObj originalCmdObj, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) { + // The process is as follows: + // - First, determine whether we need to target more than one shard. If so, we split the + // pipeline; if not, we retain the existing pipeline. + // - Call establishShardCursorsWithoutRetrying to dispatch the aggregation to the targeted + // shards. + // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with + // the refreshed routing table data. + // - If the pipeline is not split and we now need to target multiple shards, split it. If the + // pipeline is already split and we now only need to target a single shard, reassemble the + // original pipeline. + // - After exhausting 10 attempts to establish the cursors, we give up and throw. + auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); + auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>(); + auto opCtx = expCtx->opCtx; + + const bool needsPrimaryShardMerge = + (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); + + const auto shardQuery = pipeline->getInitialQuery(); + + auto pipelineForTargetedShards = std::move(pipeline); + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; + + int numAttempts = 0; + + do { + // We need to grab a new routing table at the start of each iteration, since a stale config + // exception will invalidate the previous one. + auto executionNsRoutingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss)); + + // Determine whether we can run the entire aggregation on a single shard. + std::set<ShardId> shardIds = getTargetedShards(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + shardQuery, + aggRequest.getCollation()); + + uassert(ErrorCodes::ShardNotFound, + "No targets were found for this aggregation. All shards were removed from the " + "cluster mid-operation", + shardIds.size() > 0); + + // Don't need to split pipeline if we are only targeting a single shard, unless there is a + // stage that needs to be run on the primary shard and the single target shard is not the + // primary. + const bool needsSplit = + (shardIds.size() > 1u || + (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId())); + + const bool isSplit = pipelineForTargetedShards->isSplitForShards(); + + // If we have to run on multiple shards and the pipeline is not yet split, split it. If we + // can run on a single shard and the pipeline is already split, reassemble it. + if (needsSplit && !isSplit) { + pipelineForMerging = std::move(pipelineForTargetedShards); + pipelineForTargetedShards = pipelineForMerging->splitForSharded(); + } else if (!needsSplit && isSplit) { + pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); + } + + // Generate the command object for the targeted shards. + BSONObj targetedCommand = + createCommandForTargetedShards(aggRequest, originalCmdObj, pipelineForTargetedShards); + + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAllShards(executionNss, liteParsedPipeline)) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and should + // not participate in the shard version protocol. + swShardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db().toString(), + executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target shards, + // and should participate in the shard version protocol. + swShardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db().toString(), + executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation(), + nullptr /* viewDefinition */); + } + } else { + swCursors = establishShardCursorsWithoutRetrying(opCtx, + executionNss, + liteParsedPipeline, + &executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + shardQuery, + aggRequest.getCollation()); + + if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { + LOG(1) << "got stale shardVersion error " << swCursors.getStatus() + << " while dispatching " << redact(targetedCommand) << " after " + << (numAttempts + 1) << " dispatch attempts"; + } + } + } while (++numAttempts < kMaxNumStaleVersionRetries && + (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK())); + + if (!swShardResults.isOK()) { + return swShardResults.getStatus(); + } + if (!swCursors.isOK()) { + return swCursors.getStatus(); + } + return EstablishShardCursorsResults{needsPrimaryShardMerge, + std::move(swCursors.getValue()), + std::move(swShardResults.getValue()), + std::move(pipelineForTargetedShards), + std::move(pipelineForMerging)}; +} + StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor( OperationContext* opCtx, const NamespaceString& nss, @@ -335,6 +589,16 @@ BSONObj establishMergingMongosCursor( std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging, std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { + // Inject the MongosProcessInterface for sources which need it. + const auto& sources = pipelineForMerging->getSources(); + for (auto&& source : sources) { + DocumentSourceNeedsMongoProcessInterface* needsMongoProcessInterface = + dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get()); + if (needsMongoProcessInterface) { + needsMongoProcessInterface->injectMongoProcessInterface( + std::make_shared<MongosProcessInterface>(opCtx)); + } + } ClusterClientCursorParams params( requestedNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), @@ -415,6 +679,100 @@ BSONObj establishMergingMongosCursor( return cursorResponse.obj(); } +/** + * This is a special type of RouterExecStage that is used to iterate remote cursors that were + * created internally and do not represent a client cursor, such as those used in $changeStream's + * updateLookup functionality. + * + * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without + * creating a ClusterClientCursor, which would show up in the server stats for this mongos. + */ +class RouterStageInternalCursor final : public RouterExecStage { +public: + RouterStageInternalCursor(OperationContext* opCtx, + std::unique_ptr<ClusterClientCursorParams>&& params, + std::unique_ptr<RouterExecStage> child) + : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {} + + StatusWith<ClusterQueryResult> next(ExecContext execContext) { + return getChildStage()->next(execContext); + } + +private: + std::unique_ptr<ClusterClientCursorParams> _params; +}; + +StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> MongosProcessInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) { + // For the time being we don't expect any callers with options other than these. + invariant(pipelineOptions.optimize); + invariant(pipelineOptions.attachCursorSource); + invariant(!pipelineOptions.forceInjectMongoProcessInterface); + + // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace + // than the DocumentSource this MongodImplementation is injected into, but both + // ExpressionContext instances should still have the same OperationContext. + invariant(_opCtx == expCtx->opCtx); + + // Explain is not supported for auxiliary lookups. + invariant(!expCtx->explain); + return makePipelineWithOneRemote(rawPipeline, expCtx); +} + +StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> +MongosProcessInterface::makePipelineWithOneRemote( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + + // Generate the command object for the targeted shards. + AggregationRequest aggRequest(expCtx->ns, rawPipeline); + LiteParsedPipeline liteParsedPipeline(aggRequest); + auto parsedPipeline = Pipeline::parse(rawPipeline, expCtx); + if (!parsedPipeline.isOK()) { + return parsedPipeline.getStatus(); + } + parsedPipeline.getValue()->optimizePipeline(); + + auto targetStatus = establishShardCursors(expCtx, + expCtx->ns, + aggRequest.serializeToCommandObj().toBson(), + aggRequest, + liteParsedPipeline, + std::move(parsedPipeline.getValue())); + + if (!targetStatus.isOK()) { + return targetStatus.getStatus(); + } + auto targetingResults = std::move(targetStatus.getValue()); + if (targetingResults.remoteCursors.size() != 1) { + return {ErrorCodes::InternalError, + str::stream() << "Unable to target pipeline to single shard: " + << Value(rawPipeline).toString()}; + } + invariant(!targetingResults.pipelineForMerging); + + auto params = stdx::make_unique<ClusterClientCursorParams>( + expCtx->ns, + AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(), + ReadPreferenceSetting::get(expCtx->opCtx)); + params->remotes = std::move(targetingResults.remoteCursors); + + // We will transfer ownership of the params to the RouterStageInternalCursor, but need a + // reference to them to construct the RouterStageMerge. + auto* unownedParams = params.get(); + auto mergeStage = stdx::make_unique<RouterStageMerge>( + expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + unownedParams); + auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>( + expCtx->opCtx, std::move(params), std::move(mergeStage)); + + return Pipeline::create( + {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx); +} + } // namespace Status ClusterAggregate::runAggregate(OperationContext* opCtx, @@ -502,163 +860,61 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return getStatusFromCommandResult(result->asTempObj()); } - // Begin shard targeting. The process is as follows: - // - First, determine whether we need to target more than one shard. If so, we split the - // pipeline; if not, we retain the existing pipeline. - // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with - // the refreshed routing table data. - // - If the pipeline is not split and we now need to target multiple shards, split it. If the - // pipeline is already split and we now only need to target a single shard, reassemble the - // original pipeline. - // - After exhausting 10 attempts to establish the cursors, we give up and throw. - auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); - auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>(); - - const bool needsPrimaryShardMerge = - (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); - - const auto shardQuery = pipeline->getInitialQuery(); - - auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; - - int numAttempts = 0; - - do { - // We need to grab a new routing table at the start of each iteration, since a stale config - // exception will invalidate the previous one. - executionNsRoutingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, namespaces.executionNss)); - - // Determine whether we can run the entire aggregation on a single shard. - std::set<ShardId> shardIds = getTargetedShards(opCtx, - namespaces.executionNss, - liteParsedPipeline, - executionNsRoutingInfo, - shardQuery, - request.getCollation()); - - uassert(ErrorCodes::ShardNotFound, - "No targets were found for this aggregation. All shards were removed from the " - "cluster mid-operation", - shardIds.size() > 0); - - // Don't need to split pipeline if we are only targeting a single shard, unless there is a - // stage that needs to be run on the primary shard and the single target shard is not the - // primary. - const bool needsSplit = - (shardIds.size() > 1u || - (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId())); - - const bool isSplit = pipelineForTargetedShards->isSplitForShards(); - - // If we have to run on multiple shards and the pipeline is not yet split, split it. If we - // can run on a single shard and the pipeline is already split, reassemble it. - if (needsSplit && !isSplit) { - pipelineForMerging = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMerging->splitForSharded(); - } else if (!needsSplit && isSplit) { - pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); - } - - // Generate the command object for the targeted shards. - BSONObj targetedCommand = - createCommandForTargetedShards(request, cmdObj, pipelineForTargetedShards); - - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - if (mergeCtx->explain) { - if (mustRunOnAllShards(namespaces.executionNss, liteParsedPipeline)) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and should - // not participate in the shard version protocol. - swShardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - namespaces.executionNss.db().toString(), - namespaces.executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target shards, - // and should participate in the shard version protocol. - swShardResults = scatterGatherVersionedTargetByRoutingTable( - opCtx, - namespaces.executionNss.db().toString(), - namespaces.executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - request.getCollation(), - nullptr /* viewDefinition */); - } - } else { - swCursors = establishShardCursors(opCtx, - namespaces.executionNss, - liteParsedPipeline, - &executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - shardQuery, - request.getCollation()); - - if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { - LOG(1) << "got stale shardVersion error " << swCursors.getStatus() - << " while dispatching " << redact(targetedCommand) << " after " - << (numAttempts + 1) << " dispatch attempts"; - } - } - } while (++numAttempts < kMaxNumStaleVersionRetries && - (mergeCtx->explain ? !swShardResults.isOK() : !swCursors.isOK())); + auto targetingResults = uassertStatusOK(establishShardCursors(mergeCtx, + namespaces.executionNss, + cmdObj, + request, + liteParsedPipeline, + std::move(pipeline))); if (mergeCtx->explain) { // If we reach here, we've either succeeded in running the explain or exhausted all // attempts. In either case, attempt to append the explain results to the output builder. - auto shardResults = uassertStatusOK(std::move(swShardResults)); - uassertAllShardsSupportExplain(shardResults); + uassertAllShardsSupportExplain(targetingResults.remoteExplainOutput); - return appendExplainResults(std::move(shardResults), + return appendExplainResults(std::move(targetingResults.remoteExplainOutput), mergeCtx, - pipelineForTargetedShards, - pipelineForMerging, + targetingResults.pipelineForTargetedShards, + targetingResults.pipelineForMerging, result); } - // Retrieve the shard cursors and check whether or not we dispatched to a single shard. - auto cursors = uassertStatusOK(std::move(swCursors)); - invariant(cursors.size() > 0); + invariant(targetingResults.remoteCursors.size() > 0); // If we dispatched to a single shard, store the remote cursor and return immediately. - if (!pipelineForTargetedShards->isSplitForShards()) { - invariant(cursors.size() == 1); + if (!targetingResults.pipelineForTargetedShards->isSplitForShards()) { + invariant(targetingResults.remoteCursors.size() == 1); + const auto& remoteCursor = targetingResults.remoteCursors[0]; auto executorPool = Grid::get(opCtx)->getExecutorPool(); const BSONObj reply = uassertStatusOK(storePossibleCursor( opCtx, - cursors[0].shardId, - cursors[0].hostAndPort, - cursors[0].cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse), + remoteCursor.shardId, + remoteCursor.hostAndPort, + remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse), namespaces.requestedNss, executorPool->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), mergeCtx->tailableMode)); - return appendCursorResponseToCommandResult(cursors[0].shardId, reply, result); + return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result); } // If we reach here, we have a merge pipeline to dispatch. - invariant(pipelineForMerging); + auto mergingPipeline = std::move(targetingResults.pipelineForMerging); + invariant(mergingPipeline); // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS, // then ignore the internalQueryProhibitMergingOnMongoS parameter. - if (pipelineForMerging->requiredToRunOnMongos() || - (!internalQueryProhibitMergingOnMongoS.load() && pipelineForMerging->canRunOnMongos())) { + if (mergingPipeline->requiredToRunOnMongos() || + (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) { // Register the new mongoS cursor, and retrieve the initial batch of results. - auto cursorResponse = establishMergingMongosCursor(opCtx, - request, - namespaces.requestedNss, - std::move(pipelineForMerging), - std::move(cursors)); + auto cursorResponse = + establishMergingMongosCursor(opCtx, + request, + namespaces.requestedNss, + std::move(mergingPipeline), + std::move(targetingResults.remoteCursors)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -667,16 +923,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If we cannot merge on mongoS, establish the merge cursor on a shard. - pipelineForMerging->addInitialSource( - DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx)); - auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMerging); + mergingPipeline->addInitialSource( + DocumentSourceMergeCursors::create(parseCursors(targetingResults.remoteCursors), mergeCtx)); + auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline); auto mergeResponse = uassertStatusOK(establishMergingShardCursor( opCtx, namespaces.executionNss, - cursors, + targetingResults.remoteCursors, mergeCmdObj, - boost::optional<ShardId>{needsPrimaryShardMerge, executionNsRoutingInfo.primaryId()})); + boost::optional<ShardId>{targetingResults.needsPrimaryShardMerge, + executionNsRoutingInfo.primaryId()})); auto mergingShardId = mergeResponse.first; auto response = mergeResponse.second; diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index d3efbd95923..32359be6143 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -32,6 +32,7 @@ env.Library( env.Library( target="router_exec_stage", source=[ + "document_source_router_adapter.cpp", "router_stage_limit.cpp", "router_stage_merge.cpp", "router_stage_mock.cpp", diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp new file mode 100644 index 00000000000..4e144751dcb --- /dev/null +++ b/src/mongo/s/query/document_source_router_adapter.cpp @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/document_source_router_adapter.h" + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression_context.h" + +namespace mongo { + +boost::intrusive_ptr<DocumentSourceRouterAdapter> DocumentSourceRouterAdapter::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<RouterExecStage> childStage) { + return new DocumentSourceRouterAdapter(expCtx, std::move(childStage)); +} + +DocumentSource::GetNextResult DocumentSourceRouterAdapter::getNext() { + auto next = uassertStatusOK(_child->next(_execContext)); + if (auto nextObj = next.getResult()) { + return Document::fromBsonWithMetaData(*nextObj); + } + return GetNextResult::makeEOF(); +} + +void DocumentSourceRouterAdapter::doDispose() { + _child->kill(pExpCtx->opCtx); +} + +void DocumentSourceRouterAdapter::reattachToOperationContext(OperationContext* opCtx) { + _child->reattachToOperationContext(opCtx); +} + +void DocumentSourceRouterAdapter::detachFromOperationContext() { + _child->detachFromOperationContext(); +} + +Value DocumentSourceRouterAdapter::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + invariant(explain); // We shouldn't need to serialize this stage to send it anywhere. + return Value(); // Return the empty value to hide this stage from explain output. +} + +bool DocumentSourceRouterAdapter::remotesExhausted() { + return _child->remotesExhausted(); +} + +DocumentSourceRouterAdapter::DocumentSourceRouterAdapter( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<RouterExecStage> childStage) + : DocumentSource(expCtx), _child(std::move(childStage)) {} + +} // namespace mongo diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h new file mode 100644 index 00000000000..1520713edd5 --- /dev/null +++ b/src/mongo/s/query/document_source_router_adapter.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/router_exec_stage.h" + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { +/** + * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces, + * translating results from an input RouterExecStage into DocumentSource::GetNextResults. + */ +class DocumentSourceRouterAdapter final : public DocumentSource { +public: + static boost::intrusive_ptr<DocumentSourceRouterAdapter> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<RouterExecStage> childStage); + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kMongoS, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + + GetNextResult getNext() final; + void doDispose() final; + void reattachToOperationContext(OperationContext* opCtx) final; + void detachFromOperationContext() final; + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; + bool remotesExhausted(); + + void setExecContext(RouterExecStage::ExecContext execContext) { + _execContext = execContext; + } + + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const { + return _child->setAwaitDataTimeout(awaitDataTimeout); + } + +private: + DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<RouterExecStage> childStage); + + std::unique_ptr<RouterExecStage> _child; + RouterExecStage::ExecContext _execContext; +}; +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 159ffcc1f47..45111de6b98 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -35,6 +35,7 @@ #include "mongo/db/pipeline/document_source_list_local_sessions.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/s/query/document_source_router_adapter.h" namespace mongo { @@ -62,7 +63,7 @@ StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecCo } // If we reach this point, we have hit EOF. - if (!_mergePipeline->getContext()->isTailable()) { + if (!_mergePipeline->getContext()->isTailableAwaitData()) { _mergePipeline.get_deleter().dismissDisposal(); _mergePipeline->dispose(getOpCtx()); } @@ -91,47 +92,4 @@ Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) return _routerAdapter->setAwaitDataTimeout(awaitDataTimeout); } -boost::intrusive_ptr<RouterStagePipeline::DocumentSourceRouterAdapter> -RouterStagePipeline::DocumentSourceRouterAdapter::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage) { - return new DocumentSourceRouterAdapter(expCtx, std::move(childStage)); -} - -DocumentSource::GetNextResult RouterStagePipeline::DocumentSourceRouterAdapter::getNext() { - auto next = uassertStatusOK(_child->next(_execContext)); - if (auto nextObj = next.getResult()) { - return Document::fromBsonWithMetaData(*nextObj); - } - return GetNextResult::makeEOF(); -} - -void RouterStagePipeline::DocumentSourceRouterAdapter::doDispose() { - _child->kill(pExpCtx->opCtx); -} - -void RouterStagePipeline::DocumentSourceRouterAdapter::reattachToOperationContext( - OperationContext* opCtx) { - _child->reattachToOperationContext(opCtx); -} - -void RouterStagePipeline::DocumentSourceRouterAdapter::detachFromOperationContext() { - _child->detachFromOperationContext(); -} - -Value RouterStagePipeline::DocumentSourceRouterAdapter::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - invariant(explain); // We shouldn't need to serialize this stage to send it anywhere. - return Value(); // Return the empty value to hide this stage from explain output. -} - -bool RouterStagePipeline::DocumentSourceRouterAdapter::remotesExhausted() { - return _child->remotesExhausted(); -} - -RouterStagePipeline::DocumentSourceRouterAdapter::DocumentSourceRouterAdapter( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage) - : DocumentSource(expCtx), _child(std::move(childStage)) {} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index edb9fd2e074..3c54eb9d2bb 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/query/document_source_router_adapter.h" namespace mongo { @@ -58,50 +59,8 @@ protected: void doDetachFromOperationContext() final; private: - /** - * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces, - * translating results from an input RouterExecStage into DocumentSource::GetNextResults. - */ - class DocumentSourceRouterAdapter final : public DocumentSource { - public: - static boost::intrusive_ptr<DocumentSourceRouterAdapter> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage); - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - return {StreamType::kStreaming, - PositionRequirement::kFirst, - HostTypeRequirement::kNone, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; - } - - GetNextResult getNext() final; - void doDispose() final; - void reattachToOperationContext(OperationContext* opCtx) final; - void detachFromOperationContext() final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; - bool remotesExhausted(); - - void setExecContext(RouterExecStage::ExecContext execContext) { - _execContext = execContext; - } - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const { - return _child->setAwaitDataTimeout(awaitDataTimeout); - } - - private: - DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage); - - std::unique_ptr<RouterExecStage> _child; - ExecContext _execContext; - }; - boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter; std::unique_ptr<Pipeline, Pipeline::Deleter> _mergePipeline; bool _mongosOnlyPipeline; }; - } // namespace mongo |