summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-11-10 10:25:31 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2017-11-14 15:50:03 -0500
commitd4a526fdcfa3f740220940b8bf6767da959d4b3d (patch)
tree7bf2a5a1985c1388554390bc73eb3f06fa9e1b40
parent714b2f9c4c0db34ad9a678a2be538f05ba0d9c41 (diff)
downloadmongo-d4a526fdcfa3f740220940b8bf6767da959d4b3d.tar.gz
SERVER-30834 Make mongos reload the shard registry and re-establish changeStream cursors when encountering a 'retryNeeded' entry
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--jstests/sharding/change_stream_chunk_migration.js (renamed from jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js)63
-rw-r--r--jstests/sharding/change_streams_unsharded_becomes_sharded.js9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp37
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h13
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp40
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp43
-rw-r--r--src/mongo/s/query/SConscript4
-rw-r--r--src/mongo/s/query/async_results_merger.cpp10
-rw-r--r--src/mongo/s/query/async_results_merger.h8
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp20
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h11
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp6
-rw-r--r--src/mongo/s/query/router_stage_merge.h5
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.cpp138
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.h68
16 files changed, 391 insertions, 86 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index b915f69e0fa..ac13b8f9416 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -36,13 +36,13 @@ selector:
- jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
- jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
- jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
+ - jstests/sharding/change_stream_chunk_migration.js
- jstests/sharding/change_stream_invalidation.js
- jstests/sharding/change_stream_lookup_single_shard_cluster.js
- jstests/sharding/change_stream_remove_shard.js
- jstests/sharding/change_streams.js
- jstests/sharding/change_streams_shards_start_in_sync.js
- jstests/sharding/change_streams_unsharded_becomes_sharded.js
- - jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js
- jstests/sharding/enable_sharding_basic.js
- jstests/sharding/key_rotation.js
- jstests/sharding/kill_sessions.js
diff --git a/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js b/jstests/sharding/change_stream_chunk_migration.js
index 523eacf47d9..207c8ab0dee 100644
--- a/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js
+++ b/jstests/sharding/change_stream_chunk_migration.js
@@ -1,7 +1,5 @@
-// Tests that change stream returns a special entry and close the cursor when it's migrating
-// a chunk to a new shard.
-// TODO: SERVER-30834 the mongos should internally swallow and automatically retry the 'retryNeeded'
-// entries, so the client shouldn't see any invalidations.
+// Tests that change stream returns the stream of results continuously and in the right order when
+// it's migrating a chunk to a new shard.
(function() {
'use strict';
@@ -14,7 +12,6 @@
}
const rsNodeOptions = {
- enableMajorityReadConcern: '',
// Use a higher frequency for periodic noops to speed up the test.
setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
};
@@ -35,6 +32,7 @@
const changeStream = mongosColl.aggregate([{$changeStream: {}}]);
assert(!changeStream.hasNext(), "Do not expect any results yet");
+ jsTestLog("Sharding collection");
// Once we have a cursor, actually shard the collection.
assert.commandWorked(
mongos.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
@@ -46,7 +44,7 @@
// Split the collection into two chunks: [MinKey, 10) and [10, MaxKey].
assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 10}}));
- // Migrate the [10, MaxKey] chunk to shard1.
+ jsTestLog("Migrating [10, MaxKey] chunk to shard1.");
assert.commandWorked(mongos.adminCommand({
moveChunk: mongosColl.getFullName(),
find: {_id: 20},
@@ -60,14 +58,6 @@
assert.eq(next.operationType, "insert");
assert.eq(next.documentKey, {_id: id});
}
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "retryNeeded");
- const retryResumeToken = next._id;
-
- // A change stream only gets closed on the first chunk migration to a new shard. Test that
- // another chunk split and migration does not invalidate the cursor.
- const resumedCursor = mongosColl.aggregate([{$changeStream: {resumeAfter: retryResumeToken}}]);
// Insert into both the chunks.
assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
@@ -75,6 +65,7 @@
// Split again, and move a second chunk to the first shard. The new chunks are:
// [MinKey, 0), [0, 10), and [10, MaxKey].
+ jsTestLog("Moving [MinKey, 0] to shard 1");
assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
assert.commandWorked(mongos.adminCommand({
moveChunk: mongosColl.getFullName(),
@@ -90,23 +81,24 @@
// Make sure we can see all the inserts, without any 'retryNeeded' entries.
for (let nextExpectedId of[1, 21, -2, 2, 22]) {
- assert.soon(() => resumedCursor.hasNext());
- assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId});
+ assert.soon(() => changeStream.hasNext());
+ let item = changeStream.next();
+ assert.eq(item.documentKey, {_id: nextExpectedId});
}
- // Verify the original cursor has been closed since the first migration, and that it can't see
- // any new inserts.
+ // Make sure we're at the end of the stream.
assert(!changeStream.hasNext());
// Test that migrating the last chunk to shard 1 (meaning all chunks are now on the same shard)
// will not invalidate the change stream.
// Insert into all three chunks.
+ jsTestLog("Insert into all three chunks");
assert.writeOK(mongosColl.insert({_id: -3}, {writeConcern: {w: "majority"}}));
assert.writeOK(mongosColl.insert({_id: 3}, {writeConcern: {w: "majority"}}));
assert.writeOK(mongosColl.insert({_id: 23}, {writeConcern: {w: "majority"}}));
- // Move the last chunk, [MinKey, 0), to shard 1.
+ jsTestLog("Move the [Minkey, 0) chunk to shard 1.");
assert.commandWorked(mongos.adminCommand({
moveChunk: mongosColl.getFullName(),
find: {_id: -5},
@@ -120,31 +112,34 @@
assert.writeOK(mongosColl.insert({_id: 24}, {writeConcern: {w: "majority"}}));
// Make sure we can see all the inserts, without any 'retryNeeded' entries.
- assert.soon(() => resumedCursor.hasNext());
for (let nextExpectedId of[-3, 3, 23, -4, 4, 24]) {
- assert.soon(() => resumedCursor.hasNext());
- assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId});
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().documentKey, {_id: nextExpectedId});
}
- // Now test that adding a new shard and migrating a chunk to it will again invalidate the
- // cursor.
+ // Now test that adding a new shard and migrating a chunk to it will continue to
+ // return the correct results.
const newShard = new ReplSetTest({name: "newShard", nodes: 1, nodeOptions: rsNodeOptions});
newShard.startSet({shardsvr: ''});
newShard.initiate();
assert.commandWorked(mongos.adminCommand({addShard: newShard.getURL(), name: "newShard"}));
- // At this point, there haven't been any migrations to that shard, so we should still be able to
- // use the change stream.
+ // At this point, there haven't been any migrations to that shard; check that the changeStream
+ // works normally.
assert.writeOK(mongosColl.insert({_id: -5}, {writeConcern: {w: "majority"}}));
assert.writeOK(mongosColl.insert({_id: 5}, {writeConcern: {w: "majority"}}));
assert.writeOK(mongosColl.insert({_id: 25}, {writeConcern: {w: "majority"}}));
for (let nextExpectedId of[-5, 5, 25]) {
- assert.soon(() => resumedCursor.hasNext());
- assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId});
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().documentKey, {_id: nextExpectedId});
}
- // Now migrate a chunk to the new shard and verify the stream is closed.
+ assert.writeOK(mongosColl.insert({_id: 16}, {writeConcern: {w: "majority"}}));
+
+ // Now migrate a chunk to the new shard and verify the stream continues to return results
+ // from both before and after the migration.
+ jsTestLog("Migrating [10, MaxKey] chunk to new shard.");
assert.commandWorked(mongos.adminCommand({
moveChunk: mongosColl.getFullName(),
find: {_id: 20},
@@ -155,11 +150,11 @@
assert.writeOK(mongosColl.insert({_id: 6}, {writeConcern: {w: "majority"}}));
assert.writeOK(mongosColl.insert({_id: 26}, {writeConcern: {w: "majority"}}));
- // We again need to wait for the noop writer on shard 0 to ensure we can return the new results
- // (in this case the 'retryNeeded' entry) from shard 1.
- assert.soon(() => resumedCursor.hasNext());
- assert.eq(resumedCursor.next().operationType, "retryNeeded");
- assert(!resumedCursor.hasNext());
+ for (let nextExpectedId of[16, -6, 6, 26]) {
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().documentKey, {_id: nextExpectedId});
+ }
+ assert(!changeStream.hasNext());
st.stop();
newShard.stopSet();
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
index 07fd882cc5f..e172b28f463 100644
--- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
+++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -70,13 +70,14 @@
}));
assert.writeOK(mongosColl.insert({_id: -1}));
- // Since a moveChunk was requested, the cursor results should indicate a retry is needed.
- // TODO: SERVER-30834 this result will get swallowed and the change stream cursor should see
- // the inserted document.
+ // Make sure the change stream cursor sees the inserted document even after the moveChunk.
cst.assertNextChangesEqual({
cursor: cursor,
expectedChanges: [{
- operationType: "retryNeeded",
+ documentKey: {_id: -1},
+ fullDocument: {_id: -1},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
}]
});
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b9bd8564527..9dbad7ae96d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -78,7 +78,7 @@ constexpr StringData DocumentSourceChangeStream::kDeleteOpType;
constexpr StringData DocumentSourceChangeStream::kReplaceOpType;
constexpr StringData DocumentSourceChangeStream::kInsertOpType;
constexpr StringData DocumentSourceChangeStream::kInvalidateOpType;
-constexpr StringData DocumentSourceChangeStream::kRetryNeededOpType;
+constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType;
const BSONObj DocumentSourceChangeStream::kSortSpec =
BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1);
@@ -218,8 +218,7 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField;
checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String);
auto operationType = doc[kOperationTypeField].getString();
- if (operationType == DocumentSourceChangeStream::kInvalidateOpType ||
- operationType == DocumentSourceChangeStream::kRetryNeededOpType) {
+ if (operationType == DocumentSourceChangeStream::kInvalidateOpType) {
// Pass the invalidation forward, so that it can be included in the results, or
// filtered/transformed by further stages in the pipeline, then throw an exception
// to close the cursor on the next call to getNext().
@@ -377,6 +376,28 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
return stages;
}
+BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj originalCmdObj,
+ const BSONObj resumeToken) {
+ Document originalCmd(originalCmdObj);
+ auto pipeline = originalCmd[AggregationRequest::kPipelineName].getArray();
+ // A $changeStream must be the first element of the pipeline in order to be able
+ // to replace (or add) a resume token.
+ invariant(!pipeline[0][DocumentSourceChangeStream::kStageName].missing());
+
+ MutableDocument changeStreamStage(
+ pipeline[0][DocumentSourceChangeStream::kStageName].getDocument());
+ changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken);
+
+ // If the command was initially specified with a resumeAfterClusterTime, we need to remove it
+ // to use the new resume token.
+ changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeFieldName] = Value();
+ pipeline[0] =
+ Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
+ MutableDocument newCmd(originalCmd);
+ newCmd[AggregationRequest::kPipelineName] = Value(pipeline);
+ return newCmd.freeze().toBson();
+}
+
intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage(
BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation(
@@ -463,9 +484,9 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
break;
}
case repl::OpTypeEnum::kNoop: {
- operationType = kRetryNeededOpType;
- // Generate a fake document Id for RetryNeeded operation so that we can resume after
- // this operation.
+ operationType = kNewShardDetectedOpType;
+ // Generate a fake document Id for NewShardDetected operation so that we can resume
+ // after this operation.
documentKey = Value(Document{{kIdField, input[repl::OplogEntry::kObject2FieldName]}});
break;
}
@@ -499,8 +520,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
- // "invalidate" and "retryNeeded" entries have fewer fields.
- if (operationType == kInvalidateOpType || operationType == kRetryNeededOpType) {
+ // "invalidate" and "newShardDetected" entries have fewer fields.
+ if (operationType == kInvalidateOpType || operationType == kNewShardDetectedOpType) {
return doc.freeze();
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 944a50e66b0..b4baa40dddc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -141,8 +141,8 @@ public:
static constexpr StringData kReplaceOpType = "replace"_sd;
static constexpr StringData kInsertOpType = "insert"_sd;
static constexpr StringData kInvalidateOpType = "invalidate"_sd;
- // Internal op type to close the cursor.
- static constexpr StringData kRetryNeededOpType = "retryNeeded"_sd;
+ // Internal op type to signal mongos to open cursors on new shards.
+ static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;
/**
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
@@ -162,6 +162,15 @@ public:
static boost::intrusive_ptr<DocumentSource> createTransformationStage(
BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ /**
+ * Given a BSON object containing an aggregation command with a $changeStream stage, and a
+ * resume token, returns a new BSON object with the same command except with the addition of a
+ * resumeAfter: option containing the resume token. If there was a previous resumeAfter:
+ * option, it is removed.
+ */
+ static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj,
+ const BSONObj resumeToken);
+
private:
// It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
// instead.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index e70544085c1..d7ce589deb4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -560,6 +560,22 @@ TEST_F(ChangeStreamStageTest, TransformInvalidateRenameDropTarget) {
checkTransformation(rename, expectedInvalidate);
}
+TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
+ auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}};
+ auto newShardDetected = makeOplogEntry(OpTypeEnum::kNoop,
+ nss,
+ testUuid(),
+ boost::none, // fromMigrate
+ BSONObj(),
+ o2Field.toBson());
+
+ Document expectedNewShardDetected{
+ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
+ };
+ checkTransformation(newShardDetected, expectedNewShardDetected);
+}
+
TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) {
auto collSpec =
D{{"create", "foo"_sd},
@@ -671,29 +687,5 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut)
ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
}
-TEST_F(ChangeStreamStageTest, CloseCursorOnRetryNeededEntries) {
- auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}};
- auto retryNeeded = makeOplogEntry(OpTypeEnum::kNoop, // op type
- nss, // namespace
- testUuid(), // uuid
- boost::none, // fromMigrate
- {}, // o
- o2Field.toBson()); // o2
-
- auto stages = makeStages(retryNeeded);
- auto closeCursor = stages.back();
-
- Document expectedRetryNeeded{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kRetryNeededOpType},
- };
-
- auto next = closeCursor->getNext();
- // Transform into RetryNeeded entry.
- ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedRetryNeeded);
- // Then throw an exception on the next call of getNext().
- ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index efea5e07060..da76c01db97 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
@@ -60,6 +61,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/s/query/router_stage_update_on_add_shard.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -323,6 +325,9 @@ struct DispatchShardPipelineResults {
// The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+
+ // The command object to send to the targeted shards.
+ BSONObj commandForTargetedShards;
};
/**
@@ -360,6 +365,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
auto pipelineForTargetedShards = std::move(pipeline);
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+ BSONObj targetedCommand;
int numAttempts = 0;
@@ -402,7 +408,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
}
// Generate the command object for the targeted shards.
- BSONObj targetedCommand =
+ targetedCommand =
createCommandForTargetedShards(aggRequest, originalCmdObj, pipelineForTargetedShards);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
@@ -470,7 +476,8 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
std::move(swCursors.getValue()),
std::move(swShardResults.getValue()),
std::move(pipelineForTargetedShards),
- std::move(pipelineForMerging)};
+ std::move(pipelineForMerging),
+ targetedCommand};
}
StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor(
@@ -500,6 +507,8 @@ BSONObj establishMergingMongosCursor(
OperationContext* opCtx,
const AggregationRequest& request,
const NamespaceString& requestedNss,
+ BSONObj cmdToRunOnNewShards,
+ const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging,
std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
@@ -522,6 +531,16 @@ BSONObj establishMergingMongosCursor(
? boost::none
: boost::optional<long long>(request.getBatchSize());
+ if (liteParsedPipeline.hasChangeStream()) {
+ // For change streams, we need to set up a custom stage to establish cursors on new shards
+ // when they are added.
+ params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
+ return stdx::make_unique<RouterStageUpdateOnAddShard>(
+ opCtx, executor, params, cmdToRunOnNewShards);
+ };
+ }
auto ccc = ClusterClientCursorImpl::make(
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
@@ -625,11 +644,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
}
- // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, is not
- // a collectionless aggregation that needs to run on all shards, and doesn't need transformation
- // via DocumentSoruce::serialize(), then go ahead and pass it through to the owning shard
+ // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does
+ // not need to run on all shards, and doesn't need transformation via
+ // DocumentSource::serialize(), then go ahead and pass it through to the owning shard
// unmodified.
- if (!executionNsRoutingInfo.cm() && !namespaces.executionNss.isCollectionlessAggregateNS() &&
+ if (!executionNsRoutingInfo.cm() &&
+ !mustRunOnAllShards(namespaces.executionNss, liteParsedPipeline) &&
liteParsedPipeline.allowedToForwardFromMongos() &&
liteParsedPipeline.allowedToPassthroughFromMongos()) {
return aggPassthrough(opCtx,
@@ -667,8 +687,13 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
<< " is not capable of producing input",
!pipeline->getSources().front()->constraints().requiresInputDocSource);
- auto cursorResponse = establishMergingMongosCursor(
- opCtx, request, namespaces.requestedNss, std::move(pipeline), {});
+ auto cursorResponse = establishMergingMongosCursor(opCtx,
+ request,
+ namespaces.requestedNss,
+ cmdObj,
+ liteParsedPipeline,
+ std::move(pipeline),
+ {});
Command::filterCommandReplyForPassthrough(cursorResponse, result);
return getStatusFromCommandResult(result->asTempObj());
}
@@ -726,6 +751,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
establishMergingMongosCursor(opCtx,
request,
namespaces.requestedNss,
+ dispatchResults.commandForTargetedShards,
+ liteParsedPipeline,
std::move(mergingPipeline),
std::move(dispatchResults.remoteCursors));
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index c6a9572a9c1..342b5e06823 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -39,11 +39,15 @@ env.Library(
"router_stage_pipeline.cpp",
"router_stage_remove_metadata_fields.cpp",
"router_stage_skip.cpp",
+ "router_stage_update_on_add_shard.cpp",
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/query/query_common",
"async_results_merger",
],
+ LIBDEPS_PRIVATE=[
+ "$BUILD_DIR/mongo/db/pipeline/document_source_lookup",
+ ],
)
env.CppUnitTest(
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 45f4cbf350d..8bdc8e69353 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -154,6 +154,16 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
_opCtx = opCtx;
}
+void AsyncResultsMerger::addNewShardCursors(
+ const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ for (auto&& remote : newCursors) {
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
+ }
+}
+
bool AsyncResultsMerger::_ready(WithLock lk) {
if (_lifecycleState != kAlive) {
return true;
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 5068efc8db2..da21593486c 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -179,6 +179,12 @@ public:
StatusWith<executor::TaskExecutor::EventHandle> nextEvent();
/**
+ * Adds the specified shard cursors to the set of cursors to be merged. The results from the
+ * new cursors will be returned as normal through nextReady().
+ */
+ void addNewShardCursors(const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors);
+
+ /**
* Starts shutting down this ARM by canceling all pending requests. Returns a handle to an event
* that is signaled when this ARM is safe to destroy.
* If there are no pending requests, schedules killCursors and signals the event immediately.
@@ -349,7 +355,7 @@ private:
bool _addBatchToBuffer(WithLock, size_t remoteIndex, const CursorResponse& response);
/**
- * If there is a valid unsignaled event that has been requested via nextReady() and there are
+ * If there is a valid unsignaled event that has been requested via nextEvent() and there are
* buffered results that are ready to return, signals that event.
*
* Invalidates the current event, as we must signal the event exactly once and we only keep a
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index bccdc369705..1cad6ef0830 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -183,6 +183,20 @@ bool isAllLimitsAndSkips(Pipeline* pipeline) {
stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); });
}
+/**
+ * Creates the initial stage to feed data into the execution plan. By default, a RouterExecMerge
+ * stage, or a custom stage if specified in 'params->creatCustomMerge'.
+ */
+std::unique_ptr<RouterExecStage> createInitialStage(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
+ if (params->createCustomCursorSource) {
+ return params->createCustomCursorSource(opCtx, executor, params);
+ } else {
+ return stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
+ }
+}
+
std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor,
ClusterClientCursorParams* params) {
invariant(params->mergePipeline);
@@ -191,8 +205,7 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu
auto* pipeline = params->mergePipeline.get();
auto* opCtx = pipeline->getContext()->opCtx;
- std::unique_ptr<RouterExecStage> root =
- stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
+ std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
if (!isAllLimitsAndSkips(pipeline)) {
return stdx::make_unique<RouterStagePipeline>(std::move(root),
std::move(params->mergePipeline));
@@ -235,8 +248,7 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
return buildPipelinePlan(executor, params);
}
- std::unique_ptr<RouterExecStage> root =
- stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
+ std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
if (skip) {
root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip);
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index e3a5e4f62cb..b936405e220 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -29,6 +29,7 @@
#pragma once
#include <boost/optional.hpp>
+#include <functional>
#include <memory>
#include <vector>
@@ -44,8 +45,12 @@
#include "mongo/util/net/hostandport.h"
namespace mongo {
+namespace executor {
+class TaskExecutor;
+}
class OperationContext;
+class RouterExecStage;
/**
* The resulting ClusterClientCursor will take ownership of the existing remote cursor, generating
@@ -122,6 +127,12 @@ struct ClusterClientCursorParams {
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
+ // If valid, is called to return the RouterExecStage which becomes the initial source in this
+ // cursor's execution plan. Otherwise, a RouterStageMerge is used.
+ stdx::function<std::unique_ptr<RouterExecStage>(
+ OperationContext*, executor::TaskExecutor*, ClusterClientCursorParams*)>
+ createCustomCursorSource;
+
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 4bb69592bb8..e571ed4ed2e 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -127,4 +127,10 @@ Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageMerge::addNewShardCursors(
+ std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards) {
+ _arm.addNewShardCursors(newShards);
+ std::move(newShards.begin(), newShards.end(), std::back_inserter(_params->remotes));
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 96fb4f6a4cf..bb7d6ed81a7 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -57,6 +57,11 @@ public:
bool remotesExhausted() final;
+ /**
+ * Adds the cursors in 'newShards' to those being merged by the ARM.
+ */
+ void addNewShardCursors(std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards);
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
new file mode 100644
index 00000000000..0cccc6fb062
--- /dev/null
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
@@ -0,0 +1,138 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+#include "mongo/s/query/router_stage_update_on_add_shard.h"
+
+#include <algorithm>
+
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/query/establish_cursors.h"
+#include "mongo/s/query/router_stage_merge.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
+bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) {
+ if (!childResult.isOK() || childResult.getValue().isEOF()) {
+ return false;
+ }
+ return ((*childResult.getValue().getResult())[DocumentSourceChangeStream::kOperationTypeField]
+ .str() == DocumentSourceChangeStream::kNewShardDetectedOpType);
+}
+}
+
+RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params,
+ BSONObj cmdToRunOnNewShards)
+ : RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)),
+ _params(params),
+ _cmdToRunOnNewShards(cmdToRunOnNewShards) {}
+
+StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
+ RouterExecStage::ExecContext execContext) {
+ auto childStage = getChildStage();
+ auto childResult = childStage->next(execContext);
+ while (needsUpdate(childResult)) {
+ auto status = addNewShardCursors(*childResult.getValue().getResult());
+ if (!status.isOK())
+ return status;
+ childResult = childStage->next(execContext);
+ }
+ return childResult;
+}
+
+Status RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) {
+ std::vector<ShardId> existingShardIds;
+ for (const auto& remote : _params->remotes) {
+ existingShardIds.push_back(remote.shardId);
+ }
+ auto newRemotes =
+ establishShardCursorsOnNewShards(std::move(existingShardIds), newShardDetectedObj);
+ if (!newRemotes.isOK())
+ return newRemotes.getStatus();
+ static_cast<RouterStageMerge*>(getChildStage())
+ ->addNewShardCursors(std::move(newRemotes.getValue()));
+ return Status::OK();
+}
+
+StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>
+RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds,
+ const BSONObj& newShardDetectedObj) {
+ auto* opCtx = getOpCtx();
+ // Temporarily remove any deadline from this operation to avoid timing out while creating new
+ // cursors.
+ OperationContext::DeadlineStash deadlineStash(opCtx);
+ // Reload the shard registry. We need to ensure a reload initiated after calling this method
+ // caused the reload, otherwise we aren't guaranteed to get all the new shards.
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ // A 'false' return from shardRegistry.reload() means a reload was already in progress and
+ // it completed before reload() returned. So another reload(), regardless of return
+ // value, will ensure a reload started after the first call to reload().
+ shardRegistry->reload(opCtx);
+ }
+
+ std::vector<ShardId> shardIds, newShardIds;
+ shardRegistry->getAllShardIds(&shardIds);
+ std::sort(existingShardIds.begin(), existingShardIds.end());
+ std::sort(shardIds.begin(), shardIds.end());
+ std::set_difference(shardIds.begin(),
+ shardIds.end(),
+ existingShardIds.begin(),
+ existingShardIds.end(),
+ std::back_inserter(newShardIds));
+
+ auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
+ _cmdToRunOnNewShards,
+ newShardDetectedObj[DocumentSourceChangeStream::kIdField].embeddedObject());
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+ for (const auto& shardId : newShardIds) {
+ requests.emplace_back(shardId, cmdObj);
+ }
+ BSONObj* viewDefinitionOut = nullptr; // Views are not allowed
+ const bool allowPartialResults = false; // partial results are not allowed
+ auto swCursors = establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ _params->nsString,
+ _params->readPreference.value_or(ReadPreferenceSetting()),
+ requests,
+ allowPartialResults,
+ viewDefinitionOut);
+ // We aren't using shard routing information, so stale routing info shouldn't be possible and
+ // there is no need to check for stale sharding errors here.
+ return swCursors;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h
new file mode 100644
index 00000000000..0d1e937a01c
--- /dev/null
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.h
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+#pragma once
+
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/router_exec_stage.h"
+
+namespace mongo {
+/**
+ * Uses a RouterStageMerge to merge results, and monitors the merged stream for special
+ * sentinel documents which indicate the the set of cursors needs to be updated. When the
+ * sentinel is detected, removes it from the stream and updates the set of cursors.
+ *
+ * cmdToRunOnNewShards: Command to execute on the new shard to open the cursor.
+ */
+class RouterStageUpdateOnAddShard final : public RouterExecStage {
+public:
+ RouterStageUpdateOnAddShard(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params,
+ BSONObj cmdToRunOnNewShards);
+
+ StatusWith<ClusterQueryResult> next(ExecContext) final;
+
+private:
+ /**
+ * Establish the new cursors and tell the RouterStageMerge about them.
+ * obj: The BSONObj which triggered the establishment of the new cursors
+ */
+ Status addNewShardCursors(BSONObj obj);
+
+ /**
+ * Open the cursors on the new shards.
+ */
+ StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>
+ establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds,
+ const BSONObj& newShardDetectedObj);
+
+ ClusterClientCursorParams* _params;
+ BSONObj _cmdToRunOnNewShards;
+};
+}