summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/change_streams/required_as_first_stage.js6
-rw-r--r--jstests/libs/override_methods/continuous_stepdown.js34
-rw-r--r--jstests/sharding/change_stream_show_migration_events.js2
-rw-r--r--jstests/sharding/change_streams_new_shard_new_database.js88
-rw-r--r--jstests/sharding/change_streams_unsharded_becomes_sharded.js4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp59
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.idl8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp13
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp58
-rw-r--r--src/mongo/s/query/async_results_merger.cpp2
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.cpp72
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.h2
14 files changed, 259 insertions, 91 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index f3317e6811b..11f855555f9 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -24,6 +24,7 @@ selector:
- jstests/sharding/prepared_txn_metadata_refresh.js
# Enable when 4.4 becomes last stable
- jstests/sharding/bulk_insert.js
+ - jstests/sharding/change_streams_new_shard_new_database.js
- jstests/sharding/clear_jumbo.js
- jstests/sharding/comment_field.js
- jstests/sharding/covered_shard_key_indexes.js
diff --git a/jstests/change_streams/required_as_first_stage.js b/jstests/change_streams/required_as_first_stage.js
index 2c5128f4865..3920c9324c7 100644
--- a/jstests/change_streams/required_as_first_stage.js
+++ b/jstests/change_streams/required_as_first_stage.js
@@ -20,14 +20,14 @@ assertErrorCode(
40602);
let error = assert.throws(() => coll.aggregate([{$sort: {x: 1}}, {$changeStream: {}}]));
-assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error));
+assert.contains(error.code, [40602], "Unexpected error: " + tojson(error));
error = assert.throws(
() => coll.aggregate([{$sort: {x: 1}}, {$changeStream: {}}], {allowDiskUse: true}));
-assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error));
+assert.contains(error.code, [40602], "Unexpected error: " + tojson(error));
error = assert.throws(() => coll.aggregate([{$group: {_id: "$x"}}, {$changeStream: {}}]));
-assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error));
+assert.contains(error.code, [40602], "Unexpected error: " + tojson(error));
// This one has a different error code because of conflicting host type requirements: the $group
// needs to merge on a shard, but the $changeStream needs to merge on mongos. This doesn't
diff --git a/jstests/libs/override_methods/continuous_stepdown.js b/jstests/libs/override_methods/continuous_stepdown.js
index 3bd0ba101e2..48e4a79f197 100644
--- a/jstests/libs/override_methods/continuous_stepdown.js
+++ b/jstests/libs/override_methods/continuous_stepdown.js
@@ -176,6 +176,9 @@ ContinuousStepdown.configure = function(stepdownOptions,
* Overrides the ReplSetTest constructor to start the continuous primary stepdown thread.
*/
ReplSetTest = function ReplSetTestWithContinuousPrimaryStepdown() {
+ // Preserve the original set of nodeOptions passed to the constructor.
+ const origNodeOpts = Object.assign({}, (arguments[0] && arguments[0].nodeOptions) || {});
+
// Construct the original object
originalReplSetTest.apply(this, arguments);
@@ -185,24 +188,31 @@ ContinuousStepdown.configure = function(stepdownOptions,
const _originalAwaitLastOpCommitted = this.awaitLastOpCommitted;
/**
- * Overrides startSet call to increase logging verbosity.
+ * Overrides startSet call to increase logging verbosity. Ensure that we only override the
+ * 'logComponentVerbosity' server parameter, but retain any other parameters that were
+ * supplied during ReplSetTest construction.
*/
this.startSet = function() {
- let options = arguments[0] || {};
-
- if (typeof (options.setParameter) === "string") {
- var eqIdx = options.setParameter.indexOf("=");
- if (eqIdx != -1) {
- var param = options.setParameter.substring(0, eqIdx);
- var value = options.setParameter.substring(eqIdx + 1);
- options.setParameter = {};
- options.setParameter[param] = value;
+ // Helper function to convert a string representation of setParameter to object form.
+ function setParamToObj(setParam) {
+ if (typeof (setParam) === "string") {
+ var eqIdx = setParam.indexOf("=");
+ if (eqIdx != -1) {
+ var param = setParam.substring(0, eqIdx);
+ var value = setParam.substring(eqIdx + 1);
+ return {[param]: value};
+ }
}
+ return Object.assign({}, setParam || {});
}
+
+ const options = arguments[0] || {};
+
+ options.setParameter = Object.assign(setParamToObj(origNodeOpts.setParameter),
+ setParamToObj(options.setParameter),
+ {logComponentVerbosity: verbositySetting});
arguments[0] = options;
- options.setParameter = options.setParameter || {};
- options.setParameter.logComponentVerbosity = verbositySetting;
return _originalStartSetFn.apply(this, arguments);
};
diff --git a/jstests/sharding/change_stream_show_migration_events.js b/jstests/sharding/change_stream_show_migration_events.js
index 570a8039a8c..ba73cdd6ab3 100644
--- a/jstests/sharding/change_stream_show_migration_events.js
+++ b/jstests/sharding/change_stream_show_migration_events.js
@@ -1,4 +1,4 @@
-// Tests the undocumented 'showChunkMigrations' option for change streams.
+// Tests the undocumented 'showMigrationEvents' option for change streams.
//
// This test is connecting directly to a shard, and change streams require the getMore command.
// @tags: [requires_find_command, uses_change_streams]
diff --git a/jstests/sharding/change_streams_new_shard_new_database.js b/jstests/sharding/change_streams_new_shard_new_database.js
new file mode 100644
index 00000000000..a188d2e68ee
--- /dev/null
+++ b/jstests/sharding/change_streams_new_shard_new_database.js
@@ -0,0 +1,88 @@
+/**
+ * Tests that existing whole-cluster, whole-db and single-collection $changeStreams correctly pick
+ * up events on a newly-added shard when a new unsharded collection is created on it. Exercises the
+ * fix for SERVER-42723.
+ * Tagging as 'requires_find_command' to ensure that this test is not run in the legacy protocol
+ * passthroughs. Legacy getMore fails in cases where it is run on a database or collection which
+ * does not yet exist.
+ * @tags: [uses_change_streams, requires_sharding, requires_find_command]
+ */
+(function() {
+
+"use strict";
+
+const rsNodeOptions = {
+ setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
+};
+const st =
+ new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});
+
+// We require one 'test' database and a second 'other' database.
+const oldShardDB = st.s.getDB(jsTestName() + "_other");
+const newShardDB = st.s.getDB(jsTestName());
+
+const configDB = st.s.getDB("config");
+const adminDB = st.s.getDB("admin");
+
+const oldShardColl = oldShardDB.coll;
+const newShardColl = newShardDB.test;
+
+// Helper function to add a new ReplSetTest shard into the cluster.
+function addShardToCluster(shardName) {
+ const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
+ replTest.startSet({shardsvr: ""});
+ replTest.initiate();
+ assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));
+ return replTest;
+}
+
+// Helper function to confirm that a stream sees an expected sequence of documents.
+function assertAllEventsObserved(changeStream, expectedDocs) {
+ for (let expectedDoc of expectedDocs) {
+ assert.soon(() => changeStream.hasNext());
+ const nextEvent = changeStream.next();
+ assert.docEq(nextEvent.fullDocument, expectedDoc);
+ }
+}
+
+// Open a whole-db change stream on the as yet non-existent database.
+const wholeDBCS = newShardDB.watch();
+
+// Open a single-collection change stream on a namespace within the non-existent database.
+const singleCollCS = newShardColl.watch();
+
+// Open a whole-cluster stream on the deployment.
+const wholeClusterCS = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);
+
+// Insert some data into the 'other' database on the only existing shard. This should ensure that
+// the primary shard of the test database will be created on the second shard, after it is added.
+const insertedDocs = Array.from({length: 20}, (_, i) => ({_id: i}));
+assert.commandWorked(oldShardColl.insert(insertedDocs));
+
+// Verify that the whole-cluster stream sees all these events.
+assertAllEventsObserved(wholeClusterCS, insertedDocs);
+
+// Verify that the other two streams did not see any of the insertions on the 'other' collection.
+for (let csCursor of [wholeDBCS, singleCollCS]) {
+ assert(!csCursor.hasNext());
+}
+
+// Now add a new shard into the cluster...
+const newShard1 = addShardToCluster("newShard1");
+
+// ... create a new database and collection, and verify that they were placed on the new shard....
+assert.commandWorked(newShardDB.runCommand({create: newShardColl.getName()}));
+assert(configDB.databases.findOne({_id: newShardDB.getName(), primary: "newShard1"}));
+
+// ... insert some documents into the new, unsharded collection on the new shard...
+assert.commandWorked(newShardColl.insert(insertedDocs));
+
+// ... and confirm that all the pre-existing streams see all of these events.
+for (let csCursor of [singleCollCS, wholeDBCS, wholeClusterCS]) {
+ assertAllEventsObserved(csCursor, insertedDocs);
+}
+
+// Stop the new shard manually since the ShardingTest doesn't know anything about it.
+st.stop();
+newShard1.stopSet();
+})();
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
index 9ab4b1901fa..b5d869af9df 100644
--- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
+++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -97,9 +97,7 @@ function testUnshardedBecomesSharded(collToWatch) {
];
// Verify that the cursor on the original shard is still valid and sees new inserted
- // documents. The 'documentKey' field should now include the shard key, even before a
- // 'kNewShardDetected' operation has been generated by the migration of a chunk to a new
- // shard.
+ // documents. The 'documentKey' field should now include the shard key.
assert.commandWorked(mongosColl.insert({_id: 1, x: 1}));
assert.commandWorked(mongosCollOther.insert({_id: 1, y: 1}));
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]});
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a9d80a0e2a0..1049a9b04aa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -244,8 +244,7 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
BSONObj DocumentSourceChangeStream::buildMatchFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Timestamp startFrom,
- bool startFromInclusive,
+ Timestamp startFromInclusive,
bool showMigrationEvents) {
auto nss = expCtx->ns;
@@ -297,6 +296,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// 2.1) Normal CRUD ops.
auto normalOpTypeMatch = BSON("op" << NE << "n");
+ // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
+ // with 4.2, even though we no longer rely on them to detect new shards. We may wish to remove
+ // this mechanism in 4.6, or retain it for future cases where a change stream is targeted to a
+ // subset of shards. See SERVER-44039 for details.
+
// 2.2) A chunk gets migrated to a new shard that doesn't have any chunks.
auto chunkMigratedNewShardMatch = BSON("op"
<< "n"
@@ -326,7 +330,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// Only include CRUD operations tagged "fromMigrate" when the "showMigrationEvents" option is
// set - exempt all other operations and commands with that tag. Include the resume token, if
// resuming, so we can verify it was still present in the oplog.
- return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
+ return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive)
<< BSON(OR(opMatch, commandAndApplyOpsMatch))));
}
@@ -388,6 +392,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
}
}
+ // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'.
if (auto startAtOperationTime = spec.getStartAtOperationTime()) {
uassert(40674,
"Only one type of resume option is allowed, but multiple were found.",
@@ -396,8 +401,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
}
- // There might not be a starting point if we're on mongos, otherwise we should either have a
- // 'resumeAfter' starting point, or should start from the latest majority committed operation.
+ // We can only run on a replica set, or through mongoS. Confirm that this is the case.
auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
uassert(
40573,
@@ -405,24 +409,29 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
expCtx->inMongos ||
(replCoord &&
replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet));
- if (!startFrom && !expCtx->inMongos) {
- startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp();
+
+ // If we do not have an explicit starting point, we should start from the latest majority
+ // committed operation. If we are on mongoS and do not have a starting point, set it to the
+ // current clusterTime so that all shards start in sync. We always start one tick beyond the
+ // most recent operation, to ensure that the stream does not return it.
+ if (!startFrom) {
+ const auto currentTime = !expCtx->inMongos
+ ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()}
+ : LogicalClock::get(expCtx->opCtx)->getClusterTime();
+ startFrom = currentTime.addTicks(1).asTimestamp();
}
- if (startFrom) {
- const bool startFromInclusive = (resumeStage != nullptr);
- stages.push_back(DocumentSourceOplogMatch::create(
- DocumentSourceChangeStream::buildMatchFilter(
- expCtx, *startFrom, startFromInclusive, showMigrationEvents),
- expCtx));
-
- // If we haven't already populated the initial PBRT, then we are starting from a specific
- // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
- if (expCtx->initialPostBatchResumeToken.isEmpty()) {
- Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)};
- expCtx->initialPostBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson();
- }
+ // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies
+ // upon the fact that it is always the first stage in the pipeline.
+ stages.push_back(DocumentSourceOplogMatch::create(
+ DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, showMigrationEvents),
+ expCtx));
+
+ // If we haven't already populated the initial PBRT, then we are starting from a specific
+ // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
+ if (expCtx->initialPostBatchResumeToken.isEmpty()) {
+ expCtx->initialPostBatchResumeToken =
+ ResumeToken::makeHighWaterMarkToken(*startFrom).toDocument().toBson();
}
// Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage.
@@ -516,12 +525,14 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
(expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS()));
// Prevent $changeStream from running on internal databases. A stream may run against the
- // 'admin' database iff 'allChangesForCluster' is true.
+ // 'admin' database iff 'allChangesForCluster' is true. A stream may run against the 'config'
+ // database iff 'allowToRunOnConfigDB' is true.
+ const bool isNotBannedInternalDB =
+ !expCtx->ns.isLocal() && (!expCtx->ns.isConfigDB() || spec.getAllowToRunOnConfigDB());
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db()
<< " database",
- expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster()
- : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB()));
+ expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() : isNotBannedInternalDB);
// Prevent $changeStream from running on internal collections in any database.
uassert(ErrorCodes::InvalidNamespace,
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 182a9373e12..d3492b0c319 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -166,7 +166,6 @@ public:
*/
static BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp startFrom,
- bool startFromInclusive,
bool showMigrationEvents);
/**
diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl
index 93a92c25173..410e5ab9f15 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.idl
+++ b/src/mongo/db/pipeline/document_source_change_stream.idl
@@ -109,3 +109,11 @@ structs:
deletes may appear that do not reflect actual deletions or insertions
of data. Instead they reflect this data moving from one shard to
another.
+ allowToRunOnConfigDB:
+ cpp_name: allowToRunOnConfigDB
+ type: bool
+ default: false
+ description: A flag indicating whether the change stream may be opened on the
+ 'config' database, which is usually banned. This flag is used
+ internally to allow mongoS to open a stream on 'config.shards', in
+ order to monitor for the addition of new shards to the cluster. \ No newline at end of file
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index a147e5271c3..9d86e364189 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -362,14 +362,11 @@ Value DocumentSourceChangeStreamTransform::serialize(
changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAfterFieldName].missing()) {
MutableDocument newChangeStreamOptions(changeStreamOptions);
- // Use the current cluster time plus 1 tick since the oplog query will include all
- // operations/commands equal to or greater than the 'startAtOperationTime' timestamp. In
- // particular, avoid including the last operation that went through mongos in an attempt to
- // match the behavior of a replica set more closely.
- auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime();
- clusterTime.addTicks(1);
- newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] =
- Value(clusterTime.asTimestamp());
+ // Configure the serialized $changeStream to start from the initial high-watermark
+ // postBatchResumeToken which we generated while parsing the $changeStream pipeline.
+ invariant(!pExpCtx->initialPostBatchResumeToken.isEmpty());
+ newChangeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] =
+ Value(pExpCtx->initialPostBatchResumeToken);
changeStreamOptions = newChangeStreamOptions.freeze();
}
return Value(Document{{getSourceName(), changeStreamOptions}});
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index a469b6d9807..f451d139d21 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -36,13 +36,17 @@
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/curop.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/query/find_common.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/grid.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
#include "mongo/s/query/cluster_aggregation_planner.h"
#include "mongo/s/query/cluster_cursor_manager.h"
@@ -109,6 +113,33 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
return explainCommandBuilder.freeze();
}
+/**
+ * Open a $changeStream cursor on the 'config.shards' collection to watch for new shards.
+ */
+RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Timestamp startMonitoringAtTime) {
+ const auto& configShard = Grid::get(expCtx->opCtx)->shardRegistry()->getConfigShard();
+ // Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}}
+ AggregationRequest aggReq(
+ ShardType::ConfigNS,
+ {BSON(DocumentSourceChangeStream::kStageName
+ << BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
+ << startMonitoringAtTime
+ << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))});
+ aggReq.setFromMongos(true);
+ aggReq.setNeedsMerge(true);
+ aggReq.setBatchSize(0);
+ auto configCursor =
+ establishCursors(expCtx->opCtx,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ aggReq.getNamespaceString(),
+ ReadPreferenceSetting{ReadPreference::PrimaryPreferred},
+ {{configShard->getId(), aggReq.serializeToCommandObj().toBson()}},
+ false);
+ invariant(configCursor.size() == 1);
+ return std::move(*configCursor.begin());
+}
+
Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) {
// The idempotent retry policy will retry even for writeConcern failures, so only set it if the
// pipeline does not support writeConcern.
@@ -575,13 +606,17 @@ DispatchShardPipelineResults dispatchShardPipeline(
pipeline.get(),
expCtx->collation);
- // In order for a $changeStream to work reliably, we need the shard registry to be at least as
- // current as the logical time at which the pipeline was serialized to 'targetedCommand' above.
- // We therefore hard-reload and retarget the shards here. We don't refresh for other pipelines
- // that must run on all shards (e.g. $currentOp) because, unlike $changeStream, those pipelines
- // may not have been forced to split if there was only one shard in the cluster when the command
- // began execution. If a shard was added since the earlier targeting logic ran, then refreshing
- // here may cause us to illegally target an unsplit pipeline to more than one shard.
+ // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the
+ // config server in order to monitor for new shards. To guarantee that we do not miss any
+ // shards, we must ensure that the list of shards to which we initially dispatch the pipeline is
+ // at least as current as the logical time at which the stream begins scanning for new shards.
+ // We therefore set 'shardRegistryReloadTime' to the current clusterTime and then hard-reload
+ // the shard registry. We don't refresh for other pipelines that must run on all shards (e.g.
+ // $currentOp) because, unlike $changeStream, those pipelines may not have been forced to split
+ // if there was only one shard in the cluster when the command began execution. If a shard was
+ // added since the earlier targeting logic ran, then refreshing here may cause us to illegally
+ // target an unsplit pipeline to more than one shard.
+ auto shardRegistryReloadTime = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
if (hasChangeStream) {
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
if (!shardRegistry->reload(opCtx)) {
@@ -636,12 +671,19 @@ DispatchShardPipelineResults dispatchShardPipeline(
invariant(cursors.size() % shardIds.size() == 0,
str::stream() << "Number of cursors (" << cursors.size()
<< ") is not a multiple of producers (" << shardIds.size() << ")");
+
+ // For $changeStream, we must open an extra cursor on the 'config.shards' collection, so
+ // that we can monitor for the addition of new shards inline with real events.
+ if (hasChangeStream && expCtx->ns.db() != ShardType::ConfigNS.db()) {
+ cursors.emplace_back(openChangeStreamNewShardMonitor(expCtx, shardRegistryReloadTime));
+ }
}
// Convert remote cursors into a vector of "owned" cursors.
std::vector<OwnedRemoteCursor> ownedCursors;
for (auto&& cursor : cursors) {
- ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), expCtx->ns));
+ auto cursorNss = cursor.getCursorResponse().getNSS();
+ ownedCursors.emplace_back(opCtx, std::move(cursor), std::move(cursorNss));
}
// Record the number of shards involved in the aggregation. If we are required to merge on
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index bc89f6aa19a..ae6aaf092b4 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -396,7 +396,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
}
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx);
+ remote.getTargetHost(), remote.cursorNss.db().toString(), cmdObj, _opCtx);
auto callbackStatus =
_executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp
index d94a99518b7..4ae4318c997 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.cpp
+++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp
@@ -32,6 +32,7 @@
#include <algorithm>
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
@@ -40,10 +41,20 @@
namespace mongo {
namespace {
-// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
-bool needsUpdate(const Document& childResult) {
- return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
- DocumentSourceChangeStream::kNewShardDetectedOpType;
+// Returns true if the change stream document is an event in 'config.shards'.
+bool isShardConfigEvent(const Document& eventDoc) {
+ // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
+ // with 4.2, even though we no longer rely on them to detect new shards. We swallow the event
+ // here. We may wish to remove this mechanism entirely 4.6, or retain it for future cases where
+ // a change stream is targeted to a subset of shards. See SERVER-44039 for details.
+ if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
+ DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ return true;
+ }
+ auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField];
+ return nsObj.getType() == BSONType::Object &&
+ nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() &&
+ nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll();
}
} // namespace
@@ -69,14 +80,19 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
: DocumentSource(kStageName, expCtx),
_executor(std::move(executor)),
_mergeCursors(mergeCursors),
- _shardsWithCursors(std::move(shardsWithCursors)),
+ _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()),
_cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() {
auto childResult = pSource->getNext();
- while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) {
- addNewShardCursors(childResult.getDocument());
+ // If this is an insertion into the 'config.shards' collection, open a cursor on the new shard.
+ while (childResult.isAdvanced() && isShardConfigEvent(childResult.getDocument())) {
+ auto opType = childResult.getDocument()[DocumentSourceChangeStream::kOperationTypeField];
+ if (opType.getStringData() == DocumentSourceChangeStream::kInsertOpType) {
+ addNewShardCursors(childResult.getDocument());
+ }
+ // For shard removal or update, we do nothing. We also swallow kNewShardDetectedOpType.
childResult = pSource->getNext();
}
return childResult;
@@ -88,41 +104,39 @@ void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShard
std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards(
const Document& newShardDetectedObj) {
- auto* opCtx = pExpCtx->opCtx;
// Reload the shard registry. We need to ensure a reload initiated after calling this method
- // caused the reload, otherwise we aren't guaranteed to get all the new shards.
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
+ // caused the reload, otherwise we may not see the new shard, so we perform a "hard" reload.
+ auto* opCtx = pExpCtx->opCtx;
+ if (!Grid::get(opCtx)->shardRegistry()->reload(opCtx)) {
// A 'false' return from shardRegistry.reload() means a reload was already in progress and
// it completed before reload() returned. So another reload(), regardless of return value,
// will ensure a reload started after the first call to reload().
- shardRegistry->reload(opCtx);
+ Grid::get(opCtx)->shardRegistry()->reload(opCtx);
}
- std::vector<ShardId> shardIds, newShardIds;
- shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end());
- std::sort(shardIds.begin(), shardIds.end());
- std::set_difference(shardIds.begin(),
- shardIds.end(),
- _shardsWithCursors.begin(),
- _shardsWithCursors.end(),
- std::back_inserter(newShardIds));
+ // Parse the new shard's information from the document inserted into 'config.shards'.
+ auto newShardSpec = newShardDetectedObj[DocumentSourceChangeStream::kFullDocumentField];
+ auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson()));
- auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
- _cmdToRunOnNewShards,
- newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument());
- std::vector<std::pair<ShardId, BSONObj>> requests;
- for (const auto& shardId : newShardIds) {
- requests.emplace_back(shardId, cmdObj);
- _shardsWithCursors.push_back(shardId);
+ // Make sure we are not attempting to open a cursor on a shard that already has one.
+ if (!_shardsWithCursors.insert(newShard.getName()).second) {
+ return {};
}
+
+ // We must start the new cursor from the moment at which the shard became visible.
+ const auto newShardAddedTime = LogicalTime{
+ newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()};
+ auto resumeTokenForNewShard =
+ ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp());
+ auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
+ _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument());
+
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
_executor,
pExpCtx->ns,
ReadPreferenceSetting::get(opCtx),
- requests,
+ {{newShard.getName(), cmdObj}},
allowPartialResults);
}
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h
index 0b41fde92d1..ff76d2ce90e 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.h
+++ b/src/mongo/s/query/document_source_update_on_add_shard.h
@@ -102,7 +102,7 @@ private:
std::shared_ptr<executor::TaskExecutor> _executor;
boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors;
- std::vector<ShardId> _shardsWithCursors;
+ std::set<ShardId> _shardsWithCursors;
BSONObj _cmdToRunOnNewShards;
};
} // namespace mongo