diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-12-19 20:39:43 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-19 20:39:43 +0000 |
commit | 4cd114fb943075fe87879d66bfc764ce75aa565b (patch) | |
tree | 33cda1f1884fd4502f3a41540018362dcaa6adbf | |
parent | 9d74018cd0c058f4fcd5512c3e19d8ca5204306f (diff) | |
download | mongo-4cd114fb943075fe87879d66bfc764ce75aa565b.tar.gz |
SERVER-44733 Change stream should throw ChangeStreamFatalError if a single shard cannot be targeted for updateLookup
(cherry picked from commit ccecd50087d22d90df71ab3c5dd4a58905590307)
(cherry picked from commit 20d508fbf59b05c6728162e85a9abfd06d41d0cd)
8 files changed, 87 insertions, 13 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 63b464c161e..a6de0680202 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -119,6 +119,8 @@ selector: - jstests/sharding/major_version_check.js # Enable after SERVER-38691 gets backported to 4.2 and becomes the last stable. - jstests/sharding/explain_cmd.js + # Enable when SERVER-44733 is backported + - jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js executor: config: diff --git a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js new file mode 100644 index 00000000000..1018ba5fab2 --- /dev/null +++ b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js @@ -0,0 +1,66 @@ +/** + * Tests that an updateLookup change stream throws ChangeStreamFatalError when it encounters an + * oplog entry whose documentKey omits the shard key. + * TODO SERVER-44598: the oplog entry will no longer omit the shard key when SERVER-44598 is fixed, + * and so this test will no longer be relevant. + * @tags: [uses_change_streams] + */ +(function() { + "use strict"; + + // The UUID consistency check can hit NotMasterNoSlaveOk when it attempts to obtain a list of + // collections from the shard Primaries through mongoS at the end of this test. + TestData.skipCheckingUUIDsConsistentAcrossCluster = true; + + // Start a new sharded cluster with 2 nodes and obtain references to the test DB and collection. + const st = new ShardingTest({ + shards: 2, + mongos: 1, + rs: {nodes: 3, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} + }); + + const mongosDB = st.s.getDB(jsTestName()); + const mongosColl = mongosDB.test; + const shard0 = st.rs0; + + // Enable sharding on the the test database and ensure that the primary is shard0. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), shard0.getURL()); + + // Shard the source collection on {a: 1}, split across the shards at {a: 0}. + st.shardColl(mongosColl, {a: 1}, {a: 0}, {a: 1}); + + // Open a change stream on the collection. + const csCursor = mongosColl.watch(); + + // Write one document onto shard0 and obtain its resume token. + assert.writeOK(mongosColl.insert({_id: 0, a: -100})); + assert.soon(() => csCursor.hasNext()); + + const resumeToken = csCursor.next()._id; + + // Step up one of the Secondaries, which will not have any sharding metadata loaded. + assert.commandWorked(shard0.getSecondary().adminCommand({replSetStepUp: 1})); + shard0.awaitNodesAgreeOnPrimary(); + + // Do a {multi:true} update. This will scatter to all shards and update the document on shard0. + // Because no metadata is loaded, this will write the update into the oplog with a documentKey + // containing only the _id field. + assert.soonNoExcept( + () => assert.writeOK(mongosColl.update({_id: 0}, {$set: {updated: true}}, false, true))); + + // Resume the change stream with {fullDocument: 'updateLookup'}. + const cmdRes = assert.commandWorked(mongosColl.runCommand("aggregate", { + pipeline: [{$changeStream: {resumeAfter: resumeToken, fullDocument: "updateLookup"}}], + cursor: {} + })); + + // Begin pulling from the stream. We should hit a ChangeStreamFatalError when the updateLookup + // attempts to read the update entry that is missing the shard key value of the document. + assert.soonNoExcept( + () => assert.commandFailedWithCode( + mongosColl.runCommand({getMore: cmdRes.cursor.id, collection: mongosColl.getName()}), + ErrorCodes.ChangeStreamFatalError)); + + st.stop(); +})();
\ No newline at end of file diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js index 0e885550350..360b54a26dd 100644 --- a/jstests/sharding/resume_change_stream.js +++ b/jstests/sharding/resume_change_stream.js @@ -118,7 +118,7 @@ ChangeStreamTest.assertChangeStreamThrowsCode({ collection: mongosColl, pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], - expectedCode: 40576 + expectedCode: ErrorCodes.ChangeStreamFatalError }); // Test that the change stream can't resume if the resume token *is* present in the oplog, but @@ -129,7 +129,7 @@ ChangeStreamTest.assertChangeStreamThrowsCode({ collection: mongosColl, pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], - expectedCode: 40576 + expectedCode: ErrorCodes.ChangeStreamFatalError }); // Drop the collection. diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index ec32feb946f..a04f5826fae 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -239,7 +239,7 @@ error_code("ProducerConsumerQueueBatchTooLarge", 247) error_code("ProducerConsumerQueueEndClosed", 248) error_code("KeyedExecutorRetry", 259); error_code("TooManyLogicalSessions", 261); - +error_code("ChangeStreamFatalError", 280) # Error codes 4000-8999 are reserved. # Non-sequential error codes (for compatibility only) @@ -292,3 +292,5 @@ error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag", "TooManyDocumentSequences"]) error_class("ExceededTimeLimitError", ["ExceededTimeLimit", "NetworkInterfaceExceededTimeLimit"]) + +error_class("NonResumableChangeStreamError", ["ChangeStreamFatalError"]) diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 79d813237ff..1f969a34666 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -107,7 +107,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() } if (cannotResume) { - uasserted(40585, + uasserted(ErrorCodes::ChangeStreamFatalError, str::stream() << "resume of change stream was not possible, as the resume " "token was not found. " << tokenFromSource.toDocument().toString()); @@ -166,7 +166,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { uassertStatusOK(_mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); - uassert(40576, + uassert(ErrorCodes::ChangeStreamFatalError, "resume of change notification was not possible, as the resume point may no longer " "be in the oplog. ", firstOplogEntry["ts"].getTimestamp() < _token.getData().clusterTime); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index b0f858ba933..e83f45e9556 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -182,7 +182,8 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) { Timestamp doc2Timestamp(101, 1); addDocument(doc1Timestamp, "1"); addDocument(doc2Timestamp, "2"); - ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585); + ASSERT_THROWS_CODE( + checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) { @@ -200,7 +201,8 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) { auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID); auto otherUUID = UUID::gen(); addDocument(resumeTimestamp, "1", otherUUID); - ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585); + ASSERT_THROWS_CODE( + checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) { @@ -338,7 +340,8 @@ TEST_F(ShardCheckResumabilityTest, deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); shardCheckResumability->injectMongoProcessInterface( std::make_shared<MockMongoProcessInterface>(mockOplog)); - ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576); + ASSERT_THROWS_CODE( + shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) { @@ -380,7 +383,8 @@ TEST_F(ShardCheckResumabilityTest, deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); shardCheckResumability->injectMongoProcessInterface( std::make_shared<MockMongoProcessInterface>(mockOplog)); - ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576); + ASSERT_THROWS_CODE( + shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 3dc295727eb..e195b75c091 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -408,7 +408,7 @@ public: auto lookedUpDocument = pipeline->getNext(); if (auto next = pipeline->getNext()) { - uasserted(ErrorCodes::TooManyMatchingDocuments, + uasserted(ErrorCodes::ChangeStreamFatalError, str::stream() << "found more than one document with document key " << documentKey.toString() << " [" diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index e27a91ee096..0590e33feb7 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -58,7 +58,7 @@ std::pair<ShardId, ChunkVersion> getSingleTargetedShardForQuery( if (auto chunkMgr = routingInfo.cm()) { std::set<ShardId> shardIds; chunkMgr->getShardIdsForQuery(opCtx, query, CollationSpec::kSimpleSpec, &shardIds); - uassert(ErrorCodes::InternalError, + uassert(ErrorCodes::ChangeStreamFatalError, str::stream() << "Unable to target lookup query to a single shard: " << query.toString(), shardIds.size() == 1u); @@ -252,13 +252,13 @@ public: auto& batch = cursor.getBatch(); // We should have at most 1 result, and the cursor should be exhausted. - uassert(ErrorCodes::InternalError, + uassert(ErrorCodes::ChangeStreamFatalError, str::stream() << "Shard cursor was unexpectedly open after lookup: " << shardResult.front().hostAndPort << ", id: " << cursor.getCursorId(), cursor.getCursorId() == 0); - uassert(ErrorCodes::TooManyMatchingDocuments, + uassert(ErrorCodes::ChangeStreamFatalError, str::stream() << "found more than one document matching " << filter.toString() << " [" << batch.begin()->toString() |