summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-11-06 03:49:57 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2017-11-21 10:10:07 +0000
commitaacb5b0d26ed1a660d0b4e736bb0095653b54429 (patch)
treec63ea602a1531a99f9b35f3eee230fd3e9721014
parent1d5b243cc283aaee884f1032d068b1e3e05c396f (diff)
downloadmongo-aacb5b0d26ed1a660d0b4e736bb0095653b54429.tar.gz
SERVER-31836 Always dispatch sorted tailable awaitdata getMores to the shards with timeouts of at most 1 second
(cherry picked from commit ed83e82fcd2731543724523e7fd2c8563ab717c9)
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js137
-rw-r--r--src/mongo/s/query/async_results_merger.cpp10
3 files changed, 147 insertions, 1 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 50cf62a2481..9ed90c5d903 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -34,6 +34,7 @@ selector:
- jstests/sharding/advance_logical_time_with_valid_signature.js
- jstests/sharding/after_cluster_time.js
- jstests/sharding/change_stream_chunk_migration.js
+ - jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js
- jstests/sharding/change_stream_invalidation.js
- jstests/sharding/change_stream_lookup_single_shard_cluster.js
- jstests/sharding/change_stream_remove_shard.js
diff --git a/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js b/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js
new file mode 100644
index 00000000000..62ae5e90cf0
--- /dev/null
+++ b/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js
@@ -0,0 +1,137 @@
+// Test that a $changeStream pipeline on a sharded cluster always enforces the user-specified
+// maxTimeMS on mongoS, but caps the maxTimeMS of getMores sent to the shards at one second. Doing
+// so allows the shards to regularly report their advancing optimes in the absence of any new data,
+// which in turn allows the ARM to return sorted results retrieved from the other shards.
+(function() {
+ "use strict";
+
+ // For supportsMajorityReadConcern.
+ load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+
+ // This test only works on storage engines that support committed reads, skip it if the
+ // configured engine doesn't support it.
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ // Create a 2-shard cluster. Enable 'writePeriodicNoops' and set 'periodicNoopIntervalSecs' to 1
+ // second so that each shard is continually advancing its optime, allowing the ARM to return
+ // sorted results even if some shards have not yet produced any data.
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ const shard0DB = st.shard0.getDB(jsTestName());
+ const shard1DB = st.shard1.getDB(jsTestName());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey] chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Start the profiler on each shard so that we can examine the getMores' maxTimeMS.
+ for (let profileDB of[shard0DB, shard1DB]) {
+ assert.commandWorked(profileDB.setProfilingLevel(0));
+ profileDB.system.profile.drop();
+ assert.commandWorked(profileDB.setProfilingLevel(2));
+ }
+
+ // Returns 'true' if there is at least one getMore profile entry matching the given namespace,
+ // identifying comment and maxTimeMS.
+ function profilerHasAtLeastOneMatchingGetMore(profileDB, nss, comment, timeout) {
+ return profileDB.system.profile.count({
+ "originatingCommand.comment": comment,
+ "command.maxTimeMS": timeout,
+ op: "getmore",
+ ns: nss
+ }) > 0;
+ }
+
+ // Asserts that there is at least one getMore profile entry matching the given namespace and
+ // identifying comment, and that all such entries have the given maxTimeMS.
+ function assertAllGetMoresHaveTimeout(profileDB, nss, comment, timeout) {
+ const getMoreTimeouts =
+ profileDB.system.profile
+ .aggregate([
+ {$match: {op: "getmore", ns: nss, "originatingCommand.comment": comment}},
+ {$group: {_id: "$command.maxTimeMS"}}
+ ])
+ .toArray();
+ assert.eq(getMoreTimeouts.length, 1);
+ assert.eq(getMoreTimeouts[0]._id, timeout);
+ }
+
+ // Timeout values used in the subsequent getMore tests.
+ const halfSec = 500;
+ const oneSec = 2 * halfSec;
+ const fiveSecs = 5 * oneSec;
+ const fiveMins = 60 * fiveSecs;
+ const thirtyMins = 6 * fiveMins;
+ const testComment = "change stream sharded maxTimeMS test";
+
+ // Open a $changeStream on the empty, inactive collection.
+ const csCmdRes = assert.commandWorked(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {}}],
+ comment: testComment,
+ cursor: {}
+ }));
+ assert.eq(csCmdRes.cursor.firstBatch.length, 0);
+ assert.neq(csCmdRes.cursor.id, 0);
+
+ // Confirm that getMores without an explicit maxTimeMS default to one second on the shards.
+ assert.commandWorked(
+ mongosDB.runCommand({getMore: csCmdRes.cursor.id, collection: mongosColl.getName()}));
+ for (let shardDB of[shard0DB, shard1DB]) {
+ assert.soon(() => profilerHasAtLeastOneMatchingGetMore(
+ shardDB, mongosColl.getFullName(), testComment, oneSec));
+ }
+
+ // Verify that with no activity on the shards, a $changeStream with maxTimeMS waits for the full
+ // duration on mongoS. Allow some leniency since the server-side wait may wake spuriously.
+ let startTime = (new Date()).getTime();
+ assert.commandWorked(mongosDB.runCommand(
+ {getMore: csCmdRes.cursor.id, collection: mongosColl.getName(), maxTimeMS: fiveSecs}));
+ assert.gte((new Date()).getTime() - startTime, fiveSecs - halfSec);
+
+ // Confirm that each getMore dispatched to the shards during this period had a maxTimeMS of 1s.
+ for (let shardDB of[shard0DB, shard1DB]) {
+ assertAllGetMoresHaveTimeout(shardDB, mongosColl.getFullName(), testComment, oneSec);
+ }
+
+ // Issue a getMore with a sub-second maxTimeMS. This should propagate to the shards as-is.
+ assert.commandWorked(mongosDB.runCommand(
+ {getMore: csCmdRes.cursor.id, collection: mongosColl.getName(), maxTimeMS: halfSec}));
+
+ for (let shardDB of[shard0DB, shard1DB]) {
+ assert.soon(() => profilerHasAtLeastOneMatchingGetMore(
+ shardDB, mongosColl.getFullName(), testComment, halfSec));
+ }
+
+ // Write a document to shard0, and confirm that - despite the fact that shard1 is still idle - a
+ // getMore with a high maxTimeMS returns the document before this timeout expires.
+ assert.writeOK(mongosColl.insert({_id: -1}));
+ startTime = (new Date()).getTime();
+ const csResult = assert.commandWorked(mongosDB.runCommand(
+ {getMore: csCmdRes.cursor.id, collection: mongosColl.getName(), maxTimeMS: thirtyMins}));
+ assert.lte((new Date()).getTime() - startTime, fiveMins);
+ assert.docEq(csResult.cursor.nextBatch[0].fullDocument, {_id: -1});
+
+ st.stop();
+})(); \ No newline at end of file
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 8bdc8e69353..241f09f209b 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -129,7 +129,15 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
- _awaitDataTimeout = awaitDataTimeout;
+ // For sorted tailable awaitData cursors on multiple shards, cap the getMore timeout at 1000ms.
+ // This is to ensure that we get a continuous stream of updates from each shard with their most
+ // recent optimes, which allows us to return sorted $changeStream results even if some shards
+ // are yet to provide a batch of data. If the timeout specified by the client is greater than
+ // 1000ms, then it will be enforced elsewhere.
+ _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u
+ ? std::min(awaitDataTimeout, Milliseconds{1000})
+ : awaitDataTimeout);
+
return Status::OK();
}