diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-10-11 11:21:10 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2017-10-11 17:12:36 -0400 |
commit | 848eebc5fc29d3aa567495fdb96d489264089ad6 (patch) | |
tree | c5ecc193ba7d51b394704ed99eb15dd2daad0c58 | |
parent | e7390933e152d8e6f00c90ac341f691780c261fd (diff) | |
download | mongo-848eebc5fc29d3aa567495fdb96d489264089ad6.tar.gz |
SERVER-31475 Always detach from op ctx after cluster getMore.
Signed-off-by: David Storch <david.storch@10gen.com>
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml | 1 | ||||
-rw-r--r-- | jstests/sharding/error_during_agg_getmore.js | 52 | ||||
-rw-r--r-- | jstests/sharding/resume_change_stream.js | 138 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 13 |
4 files changed, 202 insertions, 2 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 1299b98d7bc..67b044b7b04 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -49,6 +49,7 @@ selector: - jstests/sharding/move_chunk_update_with_write_retryability.js - jstests/sharding/operation_time_api.js - jstests/sharding/refresh_sessions.js + - jstests/sharding/resume_change_stream.js - jstests/sharding/retryable_writes.js - jstests/sharding/safe_secondary_reads_drop_recreate.js - jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js diff --git a/jstests/sharding/error_during_agg_getmore.js b/jstests/sharding/error_during_agg_getmore.js new file mode 100644 index 00000000000..adcc6a6172b --- /dev/null +++ b/jstests/sharding/error_during_agg_getmore.js @@ -0,0 +1,52 @@ +// This test was designed to reproduce SERVER-31475. It issues sharded aggregations with an error +// returned from one shard, and a delayed response from another shard. +(function() { + "use strict"; + + const st = new ShardingTest({shards: 2, useBridge: true}); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey] chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.shard1.shardName})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1})); + assert.writeOK(mongosColl.insert({_id: 1})); + + // Delay messages between shard 1 and the mongos, long enough that shard 1's responses will + // likely arrive after the response from shard 0, but not so long that the background cluster + // client cleanup job will have been given a chance to run. + const delayMillis = 100; + st.shard1.delayMessagesFrom(st.s, delayMillis); + + const nTrials = 10; + for (let i = 1; i < 10; ++i) { + // This will trigger an error on shard 0, but not shard 1. We set up a delay from shard 1, + // so the response should get back after the error has been returned to the client. We use a + // batch size of 0 to ensure the error happens during a getMore. + assert.throws( + () => mongosColl + .aggregate([{$project: {_id: 0, x: {$divide: [2, {$add: ["$_id", 1]}]}}}], + {cursor: {batchSize: 0}}) + .itcount()); + } + + st.stop(); +}()); diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js new file mode 100644 index 00000000000..edd127e81d5 --- /dev/null +++ b/jstests/sharding/resume_change_stream.js @@ -0,0 +1,138 @@ +// Tests resuming change streams on sharded collections. +// We need to use a readConcern in this test, which requires read commands. +// @tags: [requires_find_command] +(function() { + "use strict"; + + load('jstests/replsets/rslib.js'); // For getLatestOp. + load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + + // For supportsMajorityReadConcern. + load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + + // This test only works on storage engines that support committed reads, skip it if the + // configured engine doesn't support it. + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const oplogSize = 1; // size in MB + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + oplogSize: oplogSize, + enableMajorityReadConcern: '', + // Use the noop writer with a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey] chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + let changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + // We awaited the replication of the first writes, so the change stream shouldn't return them. + assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); + st.rs0.awaitReplication(); + st.rs1.awaitReplication(); + + // Test that we see the two writes, and remember their resume tokens. + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, -1); + const resumeTokenFromFirstUpdateOnShard0 = next._id; + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, 1); + const resumeTokenFromFirstUpdateOnShard1 = next._id; + + changeStream.close(); + + // Write some additional documents, then test that it's possible to resume after the first + // update. + assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); + changeStream = + mongosColl.aggregate([{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}]); + + for (let nextExpectedId of[1, -2, 2]) { + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().documentKey._id, nextExpectedId); + } + + changeStream.close(); + + // Test that the stream can't resume if the resume token is no longer present in the oplog. + + // Roll over the entire oplog on the shard with the resume token for the first update. + const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1. + const mostRecentOplogEntry = getLatestOp(shardWithResumeToken); + assert.neq(mostRecentOplogEntry, null); + const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi'); + let i = 0; + + function oplogIsRolledOver() { + // The oplog has rolled over if the op that used to be newest is now older than the oplog's + // current oldest entry. Said another way, the oplog is rolled over when everything in the + // oplog is newer than what used to be the newest entry. + return bsonWoCompare( + mostRecentOplogEntry.ts, + getLeastRecentOp({server: shardWithResumeToken, readConcern: "majority"}).ts) < + 0; + } + + while (!oplogIsRolledOver()) { + let idVal = 100 + (i++); + assert.writeOK( + mongosColl.insert({_id: idVal, long_str: largeStr}, {writeConcern: {w: "majority"}})); + sleep(100); + } + + ChangeStreamTest.assertChangeStreamThrowsCode({ + collection: mongosColl, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], + expectedCode: 40576 + }); + + // Test that the change stream can't resume if the resume token *is* present in the oplog, but + // one of the shards has rolled over its oplog enough that it doesn't have a long enough history + // to resume. Since we just rolled over the oplog on shard 1, we know that + // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't have + // any changes earlier than that, so won't be able to resume. + ChangeStreamTest.assertChangeStreamThrowsCode({ + collection: mongosColl, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], + expectedCode: 40576 + }); + + st.stop(); +})(); diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index cbecc76c9bf..4d25910ae65 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -58,6 +58,7 @@ #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -444,6 +445,13 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, pinnedCursor.getValue().reattachToOperationContext(opCtx); + // A pinned cursor will not be destroyed immediately if an exception is thrown. Instead it will + // be marked as killed, then reaped by a background thread later. If this happens, we want to be + // sure the cursor does not have a pointer to this OperationContext, since it will be destroyed + // as soon as we return, but the cursor will live on a bit longer. + ScopeGuard cursorDetach = + MakeGuard([&pinnedCursor]() { pinnedCursor.getValue().detachFromOperationContext(); }); + while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { auto context = batch.empty() ? RouterExecStage::ExecContext::kGetMoreNoResultsYet @@ -490,9 +498,10 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, batch.push_back(std::move(*next.getValue().getResult())); } + // Upon successful completion, we need to detach from the operation and transfer ownership of + // the cursor back to the cursor manager. + cursorDetach.Dismiss(); pinnedCursor.getValue().detachFromOperationContext(); - - // Transfer ownership of the cursor back to the cursor manager. pinnedCursor.getValue().returnCursor(cursorState); CursorId idToReturn = (cursorState == ClusterCursorManager::CursorState::Exhausted) |