summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-10-11 11:21:10 -0400
committerDavid Storch <david.storch@10gen.com>2017-10-11 17:12:36 -0400
commit848eebc5fc29d3aa567495fdb96d489264089ad6 (patch)
treec5ecc193ba7d51b394704ed99eb15dd2daad0c58
parente7390933e152d8e6f00c90ac341f691780c261fd (diff)
downloadmongo-848eebc5fc29d3aa567495fdb96d489264089ad6.tar.gz
SERVER-31475 Always detach from op ctx after cluster getMore.
Signed-off-by: David Storch <david.storch@10gen.com>
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/sharding/error_during_agg_getmore.js52
-rw-r--r--jstests/sharding/resume_change_stream.js138
-rw-r--r--src/mongo/s/query/cluster_find.cpp13
4 files changed, 202 insertions, 2 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 1299b98d7bc..67b044b7b04 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -49,6 +49,7 @@ selector:
- jstests/sharding/move_chunk_update_with_write_retryability.js
- jstests/sharding/operation_time_api.js
- jstests/sharding/refresh_sessions.js
+ - jstests/sharding/resume_change_stream.js
- jstests/sharding/retryable_writes.js
- jstests/sharding/safe_secondary_reads_drop_recreate.js
- jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
diff --git a/jstests/sharding/error_during_agg_getmore.js b/jstests/sharding/error_during_agg_getmore.js
new file mode 100644
index 00000000000..adcc6a6172b
--- /dev/null
+++ b/jstests/sharding/error_during_agg_getmore.js
@@ -0,0 +1,52 @@
+// This test was designed to reproduce SERVER-31475. It issues sharded aggregations with an error
+// returned from one shard, and a delayed response from another shard.
+(function() {
+ "use strict";
+
+ const st = new ShardingTest({shards: 2, useBridge: true});
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey] chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.shard1.shardName}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}));
+ assert.writeOK(mongosColl.insert({_id: 1}));
+
+ // Delay messages between shard 1 and the mongos, long enough that shard 1's responses will
+ // likely arrive after the response from shard 0, but not so long that the background cluster
+ // client cleanup job will have been given a chance to run.
+ const delayMillis = 100;
+ st.shard1.delayMessagesFrom(st.s, delayMillis);
+
+ const nTrials = 10;
+ for (let i = 1; i < 10; ++i) {
+ // This will trigger an error on shard 0, but not shard 1. We set up a delay from shard 1,
+ // so the response should get back after the error has been returned to the client. We use a
+ // batch size of 0 to ensure the error happens during a getMore.
+ assert.throws(
+ () => mongosColl
+ .aggregate([{$project: {_id: 0, x: {$divide: [2, {$add: ["$_id", 1]}]}}}],
+ {cursor: {batchSize: 0}})
+ .itcount());
+ }
+
+ st.stop();
+}());
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js
new file mode 100644
index 00000000000..edd127e81d5
--- /dev/null
+++ b/jstests/sharding/resume_change_stream.js
@@ -0,0 +1,138 @@
+// Tests resuming change streams on sharded collections.
+// We need to use a readConcern in this test, which requires read commands.
+// @tags: [requires_find_command]
+(function() {
+ "use strict";
+
+ load('jstests/replsets/rslib.js'); // For getLatestOp.
+ load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+
+ // For supportsMajorityReadConcern.
+ load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+
+ // This test only works on storage engines that support committed reads, skip it if the
+ // configured engine doesn't support it.
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const oplogSize = 1; // size in MB
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ oplogSize: oplogSize,
+ enableMajorityReadConcern: '',
+ // Use the noop writer with a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey] chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ // We awaited the replication of the first writes, so the change stream shouldn't return them.
+ assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+ st.rs0.awaitReplication();
+ st.rs1.awaitReplication();
+
+ // Test that we see the two writes, and remember their resume tokens.
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, -1);
+ const resumeTokenFromFirstUpdateOnShard0 = next._id;
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, 1);
+ const resumeTokenFromFirstUpdateOnShard1 = next._id;
+
+ changeStream.close();
+
+ // Write some additional documents, then test that it's possible to resume after the first
+ // update.
+ assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+ changeStream =
+ mongosColl.aggregate([{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}]);
+
+ for (let nextExpectedId of[1, -2, 2]) {
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().documentKey._id, nextExpectedId);
+ }
+
+ changeStream.close();
+
+ // Test that the stream can't resume if the resume token is no longer present in the oplog.
+
+ // Roll over the entire oplog on the shard with the resume token for the first update.
+ const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1.
+ const mostRecentOplogEntry = getLatestOp(shardWithResumeToken);
+ assert.neq(mostRecentOplogEntry, null);
+ const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');
+ let i = 0;
+
+ function oplogIsRolledOver() {
+ // The oplog has rolled over if the op that used to be newest is now older than the oplog's
+ // current oldest entry. Said another way, the oplog is rolled over when everything in the
+ // oplog is newer than what used to be the newest entry.
+ return bsonWoCompare(
+ mostRecentOplogEntry.ts,
+ getLeastRecentOp({server: shardWithResumeToken, readConcern: "majority"}).ts) <
+ 0;
+ }
+
+ while (!oplogIsRolledOver()) {
+ let idVal = 100 + (i++);
+ assert.writeOK(
+ mongosColl.insert({_id: idVal, long_str: largeStr}, {writeConcern: {w: "majority"}}));
+ sleep(100);
+ }
+
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ collection: mongosColl,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
+ expectedCode: 40576
+ });
+
+ // Test that the change stream can't resume if the resume token *is* present in the oplog, but
+ // one of the shards has rolled over its oplog enough that it doesn't have a long enough history
+ // to resume. Since we just rolled over the oplog on shard 1, we know that
+ // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't have
+ // any changes earlier than that, so won't be able to resume.
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ collection: mongosColl,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
+ expectedCode: 40576
+ });
+
+ st.stop();
+})();
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index cbecc76c9bf..4d25910ae65 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -58,6 +58,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -444,6 +445,13 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
pinnedCursor.getValue().reattachToOperationContext(opCtx);
+ // A pinned cursor will not be destroyed immediately if an exception is thrown. Instead it will
+ // be marked as killed, then reaped by a background thread later. If this happens, we want to be
+ // sure the cursor does not have a pointer to this OperationContext, since it will be destroyed
+ // as soon as we return, but the cursor will live on a bit longer.
+ ScopeGuard cursorDetach =
+ MakeGuard([&pinnedCursor]() { pinnedCursor.getValue().detachFromOperationContext(); });
+
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
auto context = batch.empty()
? RouterExecStage::ExecContext::kGetMoreNoResultsYet
@@ -490,9 +498,10 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
batch.push_back(std::move(*next.getValue().getResult()));
}
+ // Upon successful completion, we need to detach from the operation and transfer ownership of
+ // the cursor back to the cursor manager.
+ cursorDetach.Dismiss();
pinnedCursor.getValue().detachFromOperationContext();
-
- // Transfer ownership of the cursor back to the cursor manager.
pinnedCursor.getValue().returnCursor(cursorState);
CursorId idToReturn = (cursorState == ClusterCursorManager::CursorState::Exhausted)