summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-10-09 17:57:02 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-10-11 08:31:16 -0400
commit767e041931e73ae4f7a0047114becf5be803f3ab (patch)
tree6810e9485838564a5681bb2d8984e7374ec2eb94
parent45d35fe3fcefefe1282b8e0dfc8cd76cb247951d (diff)
downloadmongo-767e041931e73ae4f7a0047114becf5be803f3ab.tar.gz
SERVER-29609 Enable updateLookup for sharded change streams.
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml3
-rw-r--r--jstests/sharding/change_streams.js1
-rw-r--r--jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js107
-rw-r--r--jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js79
-rw-r--r--jstests/sharding/lookup_change_stream_post_image_id_shard_key.js94
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h5
-rw-r--r--src/mongo/db/pipeline/expression_context.h5
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp545
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/document_source_router_adapter.cpp79
-rw-r--r--src/mongo/s/query/document_source_router_adapter.h77
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp46
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h43
15 files changed, 859 insertions, 244 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 34363ddd337..1299b98d7bc 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -30,6 +30,9 @@ selector:
# New feature in v3.6 mongos and mongod.
- jstests/sharding/advance_logical_time_with_valid_signature.js
- jstests/sharding/after_cluster_time.js
+ - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
+ - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
+ - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- jstests/sharding/change_stream_invalidation.js
- jstests/sharding/change_stream_remove_shard.js
- jstests/sharding/change_streams.js
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index a2a1a83664a..3cb35d318bb 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -92,7 +92,6 @@
// Test that using change streams with any stages not allowed to run on mongos results in an
// error.
- assertErrorCode(mongosColl, [{$changeStream: {fullDocument: "updateLookup"}}], 40470);
assertErrorCode(
mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
diff --git a/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js b/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
new file mode 100644
index 00000000000..022d5dd6172
--- /dev/null
+++ b/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
@@ -0,0 +1,107 @@
+// Tests the behavior of looking up the post image for change streams on collections which are
+// sharded with a compound shard key.
+(function() {
+ "use strict";
+
+ // For supportsMajorityReadConcern().
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js");
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection with a compound shard key: a, b, c. Then split it into two chunks,
+ // and put one chunk on each shard.
+ assert.commandWorked(mongosDB.adminCommand(
+ {shardCollection: mongosColl.getFullName(), key: {a: 1, b: 1, c: 1}}));
+
+ // Split the collection into 2 chunks:
+ // [{a: MinKey, b: MinKey, c: MinKey}, {a: 1, b: MinKey, c: MinKey})
+ // and
+ // [{a: 1, b: MinKey, c: MinKey}, {a: MaxKey, b: MaxKey, c: MaxKey}).
+ assert.commandWorked(mongosDB.adminCommand(
+ {split: mongosColl.getFullName(), middle: {a: 1, b: MinKey, c: MinKey}}));
+
+ // Move the upper chunk to shard 1.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {a: 1, b: MinKey, c: MinKey},
+ to: st.rs1.getURL()
+ }));
+
+ const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
+
+ const nDocs = 6;
+ const bValues = ["one", "two", "three", "four", "five", "six"];
+
+ // This shard key function results in 1/3rd of documents on shard0 and 2/3rds on shard1.
+ function shardKeyFromId(id) {
+ return {a: id % 3, b: bValues[id], c: id % 2};
+ }
+
+ // Do some writes.
+ for (let id = 0; id < nDocs; ++id) {
+ const documentKey = Object.merge({_id: id}, shardKeyFromId(id));
+ assert.writeOK(mongosColl.insert(documentKey));
+ assert.writeOK(mongosColl.update(documentKey, {$set: {updatedCount: 1}}));
+ }
+
+ for (let id = 0; id < nDocs; ++id) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {_id: id});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
+ assert.docEq(next.fullDocument,
+ Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 1}));
+ }
+
+ // Test that the change stream can still see the updated post image, even if a chunk is
+ // migrated.
+ for (let id = 0; id < nDocs; ++id) {
+ const documentKey = Object.merge({_id: id}, shardKeyFromId(id));
+ assert.writeOK(mongosColl.update(documentKey, {$set: {updatedCount: 2}}));
+ }
+
+ // Move the upper chunk back to shard 0.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {a: 1, b: MinKey, c: MinKey},
+ to: st.rs0.getURL()
+ }));
+
+ for (let id = 0; id < nDocs; ++id) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
+ assert.docEq(next.fullDocument,
+ Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 2}));
+ }
+
+ st.stop();
+})();
diff --git a/jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js b/jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
new file mode 100644
index 00000000000..d2b130abf33
--- /dev/null
+++ b/jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
@@ -0,0 +1,79 @@
+// Tests the behavior of looking up the post image for change streams on collections which are
+// sharded with a hashed shard key.
+(function() {
+ "use strict";
+
+ // For supportsMajorityReadConcern().
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js");
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ enableBalancer: false,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on the field "shardKey", and split it into two chunks.
+ assert.commandWorked(mongosDB.adminCommand({
+ shardCollection: mongosColl.getFullName(),
+ numInitialChunks: 2,
+ key: {shardKey: "hashed"}
+ }));
+
+ // Make sure the negative chunk is on shard 0.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ bounds: [{shardKey: MinKey}, {shardKey: NumberLong("0")}],
+ to: st.rs0.getURL()
+ }));
+
+ // Make sure the positive chunk is on shard 1.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ bounds: [{shardKey: NumberLong("0")}, {shardKey: MaxKey}],
+ to: st.rs1.getURL()
+ }));
+
+ const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
+
+ // Write enough documents that we likely have some on each shard.
+ const nDocs = 1000;
+ for (let id = 0; id < nDocs; ++id) {
+ assert.writeOK(mongosColl.insert({_id: id, shardKey: id}));
+ assert.writeOK(mongosColl.update({shardKey: id}, {$set: {updatedCount: 1}}));
+ }
+
+ for (let id = 0; id < nDocs; ++id) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ // TODO SERVER-30599 this documentKey should contain the shard key.
+ assert.eq(next.documentKey, {_id: id});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey, {shardKey: id, _id: id});
+ assert.docEq(next.fullDocument, {_id: id, shardKey: id, updatedCount: 1});
+ }
+
+ st.stop();
+})();
diff --git a/jstests/sharding/lookup_change_stream_post_image_id_shard_key.js b/jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
new file mode 100644
index 00000000000..5cd2a6fa5e0
--- /dev/null
+++ b/jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
@@ -0,0 +1,94 @@
+// Tests the behavior of looking up the post image for change streams on collections which are
+// sharded with a key which is just the "_id" field.
+(function() {
+ "use strict";
+
+ // For supportsMajorityReadConcern().
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js");
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey) chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}));
+ assert.writeOK(mongosColl.insert({_id: 1}));
+
+ const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
+
+ // Do some writes.
+ assert.writeOK(mongosColl.insert({_id: 1000}));
+ assert.writeOK(mongosColl.insert({_id: -1000}));
+ assert.writeOK(mongosColl.update({_id: 1000}, {$set: {updatedCount: 1}}));
+ assert.writeOK(mongosColl.update({_id: -1000}, {$set: {updatedCount: 1}}));
+
+ for (let nextId of[1000, -1000]) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {_id: nextId});
+ }
+
+ for (let nextId of[1000, -1000]) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ // Only the "_id" field is present in next.documentKey because the shard key is the _id.
+ assert.eq(next.documentKey, {_id: nextId});
+ assert.docEq(next.fullDocument, {_id: nextId, updatedCount: 1});
+ }
+
+ // Test that the change stream can still see the updated post image, even if a chunk is
+ // migrated.
+ assert.writeOK(mongosColl.update({_id: 1000}, {$set: {updatedCount: 2}}));
+ assert.writeOK(mongosColl.update({_id: -1000}, {$set: {updatedCount: 2}}));
+
+ // Split the [0, MaxKey) chunk into 2: [0, 500), [500, MaxKey).
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 500}}));
+ // Move the [500, MaxKey) chunk back to shard0000.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1000}, to: st.rs0.getURL()}));
+
+ for (let nextId of[1000, -1000]) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey, {_id: nextId});
+ assert.docEq(next.fullDocument, {_id: nextId, updatedCount: 2});
+ }
+
+ st.stop();
+})();
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 64d01baf450..5e385da0420 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -351,14 +351,12 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// There should only be one close cursor stage. If we're on the shards and producing input
// to be merged, do not add a close cursor stage, since the mongos will already have one.
stages.push_back(DocumentSourceCloseCursor::create(expCtx));
- }
- if (shouldLookupPostImage) {
- uassert(
- 40470,
- str::stream() << "looking up the full document after an update is not yet supported on "
- "sharded collections",
- !expCtx->inMongos);
- stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
+
+ // There should be only one post-image lookup stage. If we're on the shards and producing
+ // input to be merged, the lookup is done on the mongos.
+ if (shouldLookupPostImage) {
+ stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
+ }
}
return stages;
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 3720c6213df..90449e1b073 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -106,7 +106,7 @@ void DocumentSourceCursor::loadBatch() {
// Furthermore, if we need to return the latest oplog time (in the tailable and
// needs-merge case), batching will result in a wrong time.
if (shouldWaitForInserts(pExpCtx->opCtx) ||
- (pExpCtx->isTailable() && pExpCtx->needsMerge) ||
+ (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
@@ -115,7 +115,7 @@ void DocumentSourceCursor::loadBatch() {
}
// Special case for tailable cursor -- EOF doesn't preclude more results, so keep
// the PlanExecutor alive.
- if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) {
+ if (state == PlanExecutor::IS_EOF && pExpCtx->isTailableAwaitData()) {
_exec->saveState();
return;
}
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 18d38b52fdb..2a4ccc972fb 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -62,9 +62,12 @@ public:
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ invariant(pipeState != Pipeline::SplitState::kSplitForShards);
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
- HostTypeRequirement::kAnyShard,
+ pipeState == Pipeline::SplitState::kUnsplit
+ ? HostTypeRequirement::kNone
+ : HostTypeRequirement::kMongoS,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
ChangeStreamRequirement::kChangeStreamStage);
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 0e7488aaa55..5bce214c776 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -113,9 +113,10 @@ public:
};
/**
- * Convenience call that returns true if the tailableMode indicate a tailable query.
+ * Convenience call that returns true if the tailableMode indicates a tailable and awaitData
+ * query.
*/
- bool isTailable() const {
+ bool isTailableAwaitData() const {
return tailableMode == TailableMode::kTailableAndAwaitData;
}
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index ca82498abd5..d571603bdf9 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -59,7 +59,11 @@
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_router_adapter.h"
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/s/query/router_exec_stage.h"
+#include "mongo/s/query/router_stage_merge.h"
+#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -67,6 +71,100 @@
namespace mongo {
namespace {
+
+/**
+ * Class to provide access to mongos-specific implementations of methods required by some document
+ * sources.
+ */
+class MongosProcessInterface final
+ : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface {
+public:
+ MongosProcessInterface(OperationContext* opCtx) : _opCtx(opCtx) {}
+
+ virtual ~MongosProcessInterface() = default;
+
+ void setOperationContext(OperationContext* opCtx) override {
+ _opCtx = opCtx;
+ }
+
+ DBClientBase* directClient() override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isSharded(const NamespaceString& ns) override {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override {
+ MONGO_UNREACHABLE;
+ }
+
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void appendLatencyStats(const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendStorageStats(const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj getCollectionOptions(const NamespaceString& nss) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status renameIfOptionsAndIndexesHaveNotChanged(
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) override {
+ MONGO_UNREACHABLE;
+ }
+
+ /**
+ * Constructs an executable pipeline targeted to a remote shard. Returns
+ * ErrorCodes::InternalError if 'rawPipeline' specifies a pipeline that does not target a single
+ * shard.
+ */
+ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions = {}) override;
+
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) override {
+ MONGO_UNREACHABLE;
+ }
+
+ std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode truncateMode) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const override {
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipelineWithOneRemote(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ OperationContext* _opCtx;
+};
+
// Given a document representing an aggregation command such as
//
// {aggregate: "myCollection", pipeline: [], ...},
@@ -244,15 +342,15 @@ BSONObj createCommandForMergingShard(
return mergeCmd.freeze().toBson();
}
-StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>
+establishShardCursorsWithoutRetrying(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
std::set<ShardId> shardIds =
@@ -305,6 +403,162 @@ StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardC
return swCursors;
}
+struct EstablishShardCursorsResults {
+ // True if this pipeline was split, and the second half of the pipeline needs to be run on the
+ // primary shard for the database.
+ bool needsPrimaryShardMerge;
+
+ // Populated if this *is not* an explain, this vector represents the cursors on the remote
+ // shards.
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+
+ // Populated if this *is* an explain, this vector represents the results from each shard.
+ std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
+
+ // The half of the pipeline that was sent to each shard, or the entire pipeline if there was
+ // only one shard targeted.
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards;
+
+ // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+};
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+StatusWith<EstablishShardCursorsResults> establishShardCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ BSONObj originalCmdObj,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) {
+ // The process is as follows:
+ // - First, determine whether we need to target more than one shard. If so, we split the
+ // pipeline; if not, we retain the existing pipeline.
+ // - Call establishShardCursorsWithoutRetrying to dispatch the aggregation to the targeted
+ // shards.
+ // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
+ // the refreshed routing table data.
+ // - If the pipeline is not split and we now need to target multiple shards, split it. If the
+ // pipeline is already split and we now only need to target a single shard, reassemble the
+ // original pipeline.
+ // - After exhausting 10 attempts to establish the cursors, we give up and throw.
+ auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
+ auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>();
+ auto opCtx = expCtx->opCtx;
+
+ const bool needsPrimaryShardMerge =
+ (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
+
+ const auto shardQuery = pipeline->getInitialQuery();
+
+ auto pipelineForTargetedShards = std::move(pipeline);
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+
+ int numAttempts = 0;
+
+ do {
+ // We need to grab a new routing table at the start of each iteration, since a stale config
+ // exception will invalidate the previous one.
+ auto executionNsRoutingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss));
+
+ // Determine whether we can run the entire aggregation on a single shard.
+ std::set<ShardId> shardIds = getTargetedShards(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ shardQuery,
+ aggRequest.getCollation());
+
+ uassert(ErrorCodes::ShardNotFound,
+ "No targets were found for this aggregation. All shards were removed from the "
+ "cluster mid-operation",
+ shardIds.size() > 0);
+
+ // Don't need to split pipeline if we are only targeting a single shard, unless there is a
+ // stage that needs to be run on the primary shard and the single target shard is not the
+ // primary.
+ const bool needsSplit =
+ (shardIds.size() > 1u ||
+ (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId()));
+
+ const bool isSplit = pipelineForTargetedShards->isSplitForShards();
+
+ // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
+ // can run on a single shard and the pipeline is already split, reassemble it.
+ if (needsSplit && !isSplit) {
+ pipelineForMerging = std::move(pipelineForTargetedShards);
+ pipelineForTargetedShards = pipelineForMerging->splitForSharded();
+ } else if (!needsSplit && isSplit) {
+ pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
+ }
+
+ // Generate the command object for the targeted shards.
+ BSONObj targetedCommand =
+ createCommandForTargetedShards(aggRequest, originalCmdObj, pipelineForTargetedShards);
+
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAllShards(executionNss, liteParsedPipeline)) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and should
+ // not participate in the shard version protocol.
+ swShardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db().toString(),
+ executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target shards,
+ // and should participate in the shard version protocol.
+ swShardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db().toString(),
+ executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation(),
+ nullptr /* viewDefinition */);
+ }
+ } else {
+ swCursors = establishShardCursorsWithoutRetrying(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ &executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery,
+ aggRequest.getCollation());
+
+ if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
+ LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
+ << " while dispatching " << redact(targetedCommand) << " after "
+ << (numAttempts + 1) << " dispatch attempts";
+ }
+ }
+ } while (++numAttempts < kMaxNumStaleVersionRetries &&
+ (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK()));
+
+ if (!swShardResults.isOK()) {
+ return swShardResults.getStatus();
+ }
+ if (!swCursors.isOK()) {
+ return swCursors.getStatus();
+ }
+ return EstablishShardCursorsResults{needsPrimaryShardMerge,
+ std::move(swCursors.getValue()),
+ std::move(swShardResults.getValue()),
+ std::move(pipelineForTargetedShards),
+ std::move(pipelineForMerging)};
+}
+
StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -335,6 +589,16 @@ BSONObj establishMergingMongosCursor(
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging,
std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
+ // Inject the MongosProcessInterface for sources which need it.
+ const auto& sources = pipelineForMerging->getSources();
+ for (auto&& source : sources) {
+ DocumentSourceNeedsMongoProcessInterface* needsMongoProcessInterface =
+ dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get());
+ if (needsMongoProcessInterface) {
+ needsMongoProcessInterface->injectMongoProcessInterface(
+ std::make_shared<MongosProcessInterface>(opCtx));
+ }
+ }
ClusterClientCursorParams params(
requestedNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
@@ -415,6 +679,100 @@ BSONObj establishMergingMongosCursor(
return cursorResponse.obj();
}
+/**
+ * This is a special type of RouterExecStage that is used to iterate remote cursors that were
+ * created internally and do not represent a client cursor, such as those used in $changeStream's
+ * updateLookup functionality.
+ *
+ * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without
+ * creating a ClusterClientCursor, which would show up in the server stats for this mongos.
+ */
+class RouterStageInternalCursor final : public RouterExecStage {
+public:
+ RouterStageInternalCursor(OperationContext* opCtx,
+ std::unique_ptr<ClusterClientCursorParams>&& params,
+ std::unique_ptr<RouterExecStage> child)
+ : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {}
+
+ StatusWith<ClusterQueryResult> next(ExecContext execContext) {
+ return getChildStage()->next(execContext);
+ }
+
+private:
+ std::unique_ptr<ClusterClientCursorParams> _params;
+};
+
+StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> MongosProcessInterface::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions) {
+ // For the time being we don't expect any callers with options other than these.
+ invariant(pipelineOptions.optimize);
+ invariant(pipelineOptions.attachCursorSource);
+ invariant(!pipelineOptions.forceInjectMongoProcessInterface);
+
+ // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace
+ // than the DocumentSource this MongodImplementation is injected into, but both
+ // ExpressionContext instances should still have the same OperationContext.
+ invariant(_opCtx == expCtx->opCtx);
+
+ // Explain is not supported for auxiliary lookups.
+ invariant(!expCtx->explain);
+ return makePipelineWithOneRemote(rawPipeline, expCtx);
+}
+
+StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>>
+MongosProcessInterface::makePipelineWithOneRemote(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+
+ // Generate the command object for the targeted shards.
+ AggregationRequest aggRequest(expCtx->ns, rawPipeline);
+ LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto parsedPipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!parsedPipeline.isOK()) {
+ return parsedPipeline.getStatus();
+ }
+ parsedPipeline.getValue()->optimizePipeline();
+
+ auto targetStatus = establishShardCursors(expCtx,
+ expCtx->ns,
+ aggRequest.serializeToCommandObj().toBson(),
+ aggRequest,
+ liteParsedPipeline,
+ std::move(parsedPipeline.getValue()));
+
+ if (!targetStatus.isOK()) {
+ return targetStatus.getStatus();
+ }
+ auto targetingResults = std::move(targetStatus.getValue());
+ if (targetingResults.remoteCursors.size() != 1) {
+ return {ErrorCodes::InternalError,
+ str::stream() << "Unable to target pipeline to single shard: "
+ << Value(rawPipeline).toString()};
+ }
+ invariant(!targetingResults.pipelineForMerging);
+
+ auto params = stdx::make_unique<ClusterClientCursorParams>(
+ expCtx->ns,
+ AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(),
+ ReadPreferenceSetting::get(expCtx->opCtx));
+ params->remotes = std::move(targetingResults.remoteCursors);
+
+ // We will transfer ownership of the params to the RouterStageInternalCursor, but need a
+ // reference to them to construct the RouterStageMerge.
+ auto* unownedParams = params.get();
+ auto mergeStage = stdx::make_unique<RouterStageMerge>(
+ expCtx->opCtx,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ unownedParams);
+ auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>(
+ expCtx->opCtx, std::move(params), std::move(mergeStage));
+
+ return Pipeline::create(
+ {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx);
+}
+
} // namespace
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
@@ -502,163 +860,61 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return getStatusFromCommandResult(result->asTempObj());
}
- // Begin shard targeting. The process is as follows:
- // - First, determine whether we need to target more than one shard. If so, we split the
- // pipeline; if not, we retain the existing pipeline.
- // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
- // the refreshed routing table data.
- // - If the pipeline is not split and we now need to target multiple shards, split it. If the
- // pipeline is already split and we now only need to target a single shard, reassemble the
- // original pipeline.
- // - After exhausting 10 attempts to establish the cursors, we give up and throw.
- auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
- auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>();
-
- const bool needsPrimaryShardMerge =
- (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
-
- const auto shardQuery = pipeline->getInitialQuery();
-
- auto pipelineForTargetedShards = std::move(pipeline);
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
-
- int numAttempts = 0;
-
- do {
- // We need to grab a new routing table at the start of each iteration, since a stale config
- // exception will invalidate the previous one.
- executionNsRoutingInfo =
- uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, namespaces.executionNss));
-
- // Determine whether we can run the entire aggregation on a single shard.
- std::set<ShardId> shardIds = getTargetedShards(opCtx,
- namespaces.executionNss,
- liteParsedPipeline,
- executionNsRoutingInfo,
- shardQuery,
- request.getCollation());
-
- uassert(ErrorCodes::ShardNotFound,
- "No targets were found for this aggregation. All shards were removed from the "
- "cluster mid-operation",
- shardIds.size() > 0);
-
- // Don't need to split pipeline if we are only targeting a single shard, unless there is a
- // stage that needs to be run on the primary shard and the single target shard is not the
- // primary.
- const bool needsSplit =
- (shardIds.size() > 1u ||
- (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId()));
-
- const bool isSplit = pipelineForTargetedShards->isSplitForShards();
-
- // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
- // can run on a single shard and the pipeline is already split, reassemble it.
- if (needsSplit && !isSplit) {
- pipelineForMerging = std::move(pipelineForTargetedShards);
- pipelineForTargetedShards = pipelineForMerging->splitForSharded();
- } else if (!needsSplit && isSplit) {
- pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
- }
-
- // Generate the command object for the targeted shards.
- BSONObj targetedCommand =
- createCommandForTargetedShards(request, cmdObj, pipelineForTargetedShards);
-
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- if (mergeCtx->explain) {
- if (mustRunOnAllShards(namespaces.executionNss, liteParsedPipeline)) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and should
- // not participate in the shard version protocol.
- swShardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- namespaces.executionNss.db().toString(),
- namespaces.executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target shards,
- // and should participate in the shard version protocol.
- swShardResults = scatterGatherVersionedTargetByRoutingTable(
- opCtx,
- namespaces.executionNss.db().toString(),
- namespaces.executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- request.getCollation(),
- nullptr /* viewDefinition */);
- }
- } else {
- swCursors = establishShardCursors(opCtx,
- namespaces.executionNss,
- liteParsedPipeline,
- &executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- shardQuery,
- request.getCollation());
-
- if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
- LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
- << " while dispatching " << redact(targetedCommand) << " after "
- << (numAttempts + 1) << " dispatch attempts";
- }
- }
- } while (++numAttempts < kMaxNumStaleVersionRetries &&
- (mergeCtx->explain ? !swShardResults.isOK() : !swCursors.isOK()));
+ auto targetingResults = uassertStatusOK(establishShardCursors(mergeCtx,
+ namespaces.executionNss,
+ cmdObj,
+ request,
+ liteParsedPipeline,
+ std::move(pipeline)));
if (mergeCtx->explain) {
// If we reach here, we've either succeeded in running the explain or exhausted all
// attempts. In either case, attempt to append the explain results to the output builder.
- auto shardResults = uassertStatusOK(std::move(swShardResults));
- uassertAllShardsSupportExplain(shardResults);
+ uassertAllShardsSupportExplain(targetingResults.remoteExplainOutput);
- return appendExplainResults(std::move(shardResults),
+ return appendExplainResults(std::move(targetingResults.remoteExplainOutput),
mergeCtx,
- pipelineForTargetedShards,
- pipelineForMerging,
+ targetingResults.pipelineForTargetedShards,
+ targetingResults.pipelineForMerging,
result);
}
- // Retrieve the shard cursors and check whether or not we dispatched to a single shard.
- auto cursors = uassertStatusOK(std::move(swCursors));
- invariant(cursors.size() > 0);
+ invariant(targetingResults.remoteCursors.size() > 0);
// If we dispatched to a single shard, store the remote cursor and return immediately.
- if (!pipelineForTargetedShards->isSplitForShards()) {
- invariant(cursors.size() == 1);
+ if (!targetingResults.pipelineForTargetedShards->isSplitForShards()) {
+ invariant(targetingResults.remoteCursors.size() == 1);
+ const auto& remoteCursor = targetingResults.remoteCursors[0];
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
- cursors[0].shardId,
- cursors[0].hostAndPort,
- cursors[0].cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse),
+ remoteCursor.shardId,
+ remoteCursor.hostAndPort,
+ remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse),
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
mergeCtx->tailableMode));
- return appendCursorResponseToCommandResult(cursors[0].shardId, reply, result);
+ return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result);
}
// If we reach here, we have a merge pipeline to dispatch.
- invariant(pipelineForMerging);
+ auto mergingPipeline = std::move(targetingResults.pipelineForMerging);
+ invariant(mergingPipeline);
// First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
// then ignore the internalQueryProhibitMergingOnMongoS parameter.
- if (pipelineForMerging->requiredToRunOnMongos() ||
- (!internalQueryProhibitMergingOnMongoS.load() && pipelineForMerging->canRunOnMongos())) {
+ if (mergingPipeline->requiredToRunOnMongos() ||
+ (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) {
// Register the new mongoS cursor, and retrieve the initial batch of results.
- auto cursorResponse = establishMergingMongosCursor(opCtx,
- request,
- namespaces.requestedNss,
- std::move(pipelineForMerging),
- std::move(cursors));
+ auto cursorResponse =
+ establishMergingMongosCursor(opCtx,
+ request,
+ namespaces.requestedNss,
+ std::move(mergingPipeline),
+ std::move(targetingResults.remoteCursors));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
@@ -667,16 +923,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If we cannot merge on mongoS, establish the merge cursor on a shard.
- pipelineForMerging->addInitialSource(
- DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx));
- auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMerging);
+ mergingPipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(parseCursors(targetingResults.remoteCursors), mergeCtx));
+ auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
auto mergeResponse = uassertStatusOK(establishMergingShardCursor(
opCtx,
namespaces.executionNss,
- cursors,
+ targetingResults.remoteCursors,
mergeCmdObj,
- boost::optional<ShardId>{needsPrimaryShardMerge, executionNsRoutingInfo.primaryId()}));
+ boost::optional<ShardId>{targetingResults.needsPrimaryShardMerge,
+ executionNsRoutingInfo.primaryId()}));
auto mergingShardId = mergeResponse.first;
auto response = mergeResponse.second;
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index d3efbd95923..32359be6143 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -32,6 +32,7 @@ env.Library(
env.Library(
target="router_exec_stage",
source=[
+ "document_source_router_adapter.cpp",
"router_stage_limit.cpp",
"router_stage_merge.cpp",
"router_stage_mock.cpp",
diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp
new file mode 100644
index 00000000000..4e144751dcb
--- /dev/null
+++ b/src/mongo/s/query/document_source_router_adapter.cpp
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/document_source_router_adapter.h"
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression_context.h"
+
+namespace mongo {
+
+boost::intrusive_ptr<DocumentSourceRouterAdapter> DocumentSourceRouterAdapter::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage) {
+ return new DocumentSourceRouterAdapter(expCtx, std::move(childStage));
+}
+
+DocumentSource::GetNextResult DocumentSourceRouterAdapter::getNext() {
+ auto next = uassertStatusOK(_child->next(_execContext));
+ if (auto nextObj = next.getResult()) {
+ return Document::fromBsonWithMetaData(*nextObj);
+ }
+ return GetNextResult::makeEOF();
+}
+
+void DocumentSourceRouterAdapter::doDispose() {
+ _child->kill(pExpCtx->opCtx);
+}
+
+void DocumentSourceRouterAdapter::reattachToOperationContext(OperationContext* opCtx) {
+ _child->reattachToOperationContext(opCtx);
+}
+
+void DocumentSourceRouterAdapter::detachFromOperationContext() {
+ _child->detachFromOperationContext();
+}
+
+Value DocumentSourceRouterAdapter::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ invariant(explain); // We shouldn't need to serialize this stage to send it anywhere.
+ return Value(); // Return the empty value to hide this stage from explain output.
+}
+
+bool DocumentSourceRouterAdapter::remotesExhausted() {
+ return _child->remotesExhausted();
+}
+
+DocumentSourceRouterAdapter::DocumentSourceRouterAdapter(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage)
+ : DocumentSource(expCtx), _child(std::move(childStage)) {}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h
new file mode 100644
index 00000000000..1520713edd5
--- /dev/null
+++ b/src/mongo/s/query/document_source_router_adapter.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/query/router_exec_stage.h"
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/pipeline.h"
+
+namespace mongo {
+/**
+ * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces,
+ * translating results from an input RouterExecStage into DocumentSource::GetNextResults.
+ */
+class DocumentSourceRouterAdapter final : public DocumentSource {
+public:
+ static boost::intrusive_ptr<DocumentSourceRouterAdapter> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage);
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kMongoS,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
+ GetNextResult getNext() final;
+ void doDispose() final;
+ void reattachToOperationContext(OperationContext* opCtx) final;
+ void detachFromOperationContext() final;
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
+ bool remotesExhausted();
+
+ void setExecContext(RouterExecStage::ExecContext execContext) {
+ _execContext = execContext;
+ }
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const {
+ return _child->setAwaitDataTimeout(awaitDataTimeout);
+ }
+
+private:
+ DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage);
+
+ std::unique_ptr<RouterExecStage> _child;
+ RouterExecStage::ExecContext _execContext;
+};
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 159ffcc1f47..45111de6b98 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/pipeline/document_source_list_local_sessions.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/s/query/document_source_router_adapter.h"
namespace mongo {
@@ -62,7 +63,7 @@ StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecCo
}
// If we reach this point, we have hit EOF.
- if (!_mergePipeline->getContext()->isTailable()) {
+ if (!_mergePipeline->getContext()->isTailableAwaitData()) {
_mergePipeline.get_deleter().dismissDisposal();
_mergePipeline->dispose(getOpCtx());
}
@@ -91,47 +92,4 @@ Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout)
return _routerAdapter->setAwaitDataTimeout(awaitDataTimeout);
}
-boost::intrusive_ptr<RouterStagePipeline::DocumentSourceRouterAdapter>
-RouterStagePipeline::DocumentSourceRouterAdapter::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage) {
- return new DocumentSourceRouterAdapter(expCtx, std::move(childStage));
-}
-
-DocumentSource::GetNextResult RouterStagePipeline::DocumentSourceRouterAdapter::getNext() {
- auto next = uassertStatusOK(_child->next(_execContext));
- if (auto nextObj = next.getResult()) {
- return Document::fromBsonWithMetaData(*nextObj);
- }
- return GetNextResult::makeEOF();
-}
-
-void RouterStagePipeline::DocumentSourceRouterAdapter::doDispose() {
- _child->kill(pExpCtx->opCtx);
-}
-
-void RouterStagePipeline::DocumentSourceRouterAdapter::reattachToOperationContext(
- OperationContext* opCtx) {
- _child->reattachToOperationContext(opCtx);
-}
-
-void RouterStagePipeline::DocumentSourceRouterAdapter::detachFromOperationContext() {
- _child->detachFromOperationContext();
-}
-
-Value RouterStagePipeline::DocumentSourceRouterAdapter::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- invariant(explain); // We shouldn't need to serialize this stage to send it anywhere.
- return Value(); // Return the empty value to hide this stage from explain output.
-}
-
-bool RouterStagePipeline::DocumentSourceRouterAdapter::remotesExhausted() {
- return _child->remotesExhausted();
-}
-
-RouterStagePipeline::DocumentSourceRouterAdapter::DocumentSourceRouterAdapter(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage)
- : DocumentSource(expCtx), _child(std::move(childStage)) {}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index edb9fd2e074..3c54eb9d2bb 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/query/document_source_router_adapter.h"
namespace mongo {
@@ -58,50 +59,8 @@ protected:
void doDetachFromOperationContext() final;
private:
- /**
- * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces,
- * translating results from an input RouterExecStage into DocumentSource::GetNextResults.
- */
- class DocumentSourceRouterAdapter final : public DocumentSource {
- public:
- static boost::intrusive_ptr<DocumentSourceRouterAdapter> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage);
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- return {StreamType::kStreaming,
- PositionRequirement::kFirst,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
- }
-
- GetNextResult getNext() final;
- void doDispose() final;
- void reattachToOperationContext(OperationContext* opCtx) final;
- void detachFromOperationContext() final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
- bool remotesExhausted();
-
- void setExecContext(RouterExecStage::ExecContext execContext) {
- _execContext = execContext;
- }
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const {
- return _child->setAwaitDataTimeout(awaitDataTimeout);
- }
-
- private:
- DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage);
-
- std::unique_ptr<RouterExecStage> _child;
- ExecContext _execContext;
- };
-
boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter;
std::unique_ptr<Pipeline, Pipeline::Deleter> _mergePipeline;
bool _mongosOnlyPipeline;
};
-
} // namespace mongo