summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@mongodb.com>2019-12-19 20:39:43 +0000
committerevergreen <evergreen@mongodb.com>2019-12-19 20:39:43 +0000
commit4cd114fb943075fe87879d66bfc764ce75aa565b (patch)
tree33cda1f1884fd4502f3a41540018362dcaa6adbf
parent9d74018cd0c058f4fcd5512c3e19d8ca5204306f (diff)
downloadmongo-4cd114fb943075fe87879d66bfc764ce75aa565b.tar.gz
SERVER-44733 Change stream should throw ChangeStreamFatalError if a single shard cannot be targeted for updateLookup
(cherry picked from commit ccecd50087d22d90df71ab3c5dd4a58905590307) (cherry picked from commit 20d508fbf59b05c6728162e85a9abfd06d41d0cd)
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js66
-rw-r--r--jstests/sharding/resume_change_stream.js4
-rw-r--r--src/mongo/base/error_codes.err4
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp12
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp2
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp6
8 files changed, 87 insertions, 13 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 63b464c161e..a6de0680202 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -119,6 +119,8 @@ selector:
- jstests/sharding/major_version_check.js
# Enable after SERVER-38691 gets backported to 4.2 and becomes the last stable.
- jstests/sharding/explain_cmd.js
+ # Enable when SERVER-44733 is backported
+ - jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
executor:
config:
diff --git a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
new file mode 100644
index 00000000000..1018ba5fab2
--- /dev/null
+++ b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
@@ -0,0 +1,66 @@
+/**
+ * Tests that an updateLookup change stream throws ChangeStreamFatalError when it encounters an
+ * oplog entry whose documentKey omits the shard key.
+ * TODO SERVER-44598: the oplog entry will no longer omit the shard key when SERVER-44598 is fixed,
+ * and so this test will no longer be relevant.
+ * @tags: [uses_change_streams]
+ */
+(function() {
+ "use strict";
+
+ // The UUID consistency check can hit NotMasterNoSlaveOk when it attempts to obtain a list of
+ // collections from the shard Primaries through mongoS at the end of this test.
+ TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+ // Start a new sharded cluster with 2 nodes and obtain references to the test DB and collection.
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ rs: {nodes: 3, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+ });
+
+ const mongosDB = st.s.getDB(jsTestName());
+ const mongosColl = mongosDB.test;
+ const shard0 = st.rs0;
+
+ // Enable sharding on the the test database and ensure that the primary is shard0.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), shard0.getURL());
+
+ // Shard the source collection on {a: 1}, split across the shards at {a: 0}.
+ st.shardColl(mongosColl, {a: 1}, {a: 0}, {a: 1});
+
+ // Open a change stream on the collection.
+ const csCursor = mongosColl.watch();
+
+ // Write one document onto shard0 and obtain its resume token.
+ assert.writeOK(mongosColl.insert({_id: 0, a: -100}));
+ assert.soon(() => csCursor.hasNext());
+
+ const resumeToken = csCursor.next()._id;
+
+ // Step up one of the Secondaries, which will not have any sharding metadata loaded.
+ assert.commandWorked(shard0.getSecondary().adminCommand({replSetStepUp: 1}));
+ shard0.awaitNodesAgreeOnPrimary();
+
+ // Do a {multi:true} update. This will scatter to all shards and update the document on shard0.
+ // Because no metadata is loaded, this will write the update into the oplog with a documentKey
+ // containing only the _id field.
+ assert.soonNoExcept(
+ () => assert.writeOK(mongosColl.update({_id: 0}, {$set: {updated: true}}, false, true)));
+
+ // Resume the change stream with {fullDocument: 'updateLookup'}.
+ const cmdRes = assert.commandWorked(mongosColl.runCommand("aggregate", {
+ pipeline: [{$changeStream: {resumeAfter: resumeToken, fullDocument: "updateLookup"}}],
+ cursor: {}
+ }));
+
+ // Begin pulling from the stream. We should hit a ChangeStreamFatalError when the updateLookup
+ // attempts to read the update entry that is missing the shard key value of the document.
+ assert.soonNoExcept(
+ () => assert.commandFailedWithCode(
+ mongosColl.runCommand({getMore: cmdRes.cursor.id, collection: mongosColl.getName()}),
+ ErrorCodes.ChangeStreamFatalError));
+
+ st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js
index 0e885550350..360b54a26dd 100644
--- a/jstests/sharding/resume_change_stream.js
+++ b/jstests/sharding/resume_change_stream.js
@@ -118,7 +118,7 @@
ChangeStreamTest.assertChangeStreamThrowsCode({
collection: mongosColl,
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
- expectedCode: 40576
+ expectedCode: ErrorCodes.ChangeStreamFatalError
});
// Test that the change stream can't resume if the resume token *is* present in the oplog, but
@@ -129,7 +129,7 @@
ChangeStreamTest.assertChangeStreamThrowsCode({
collection: mongosColl,
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
- expectedCode: 40576
+ expectedCode: ErrorCodes.ChangeStreamFatalError
});
// Drop the collection.
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index ec32feb946f..a04f5826fae 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -239,7 +239,7 @@ error_code("ProducerConsumerQueueBatchTooLarge", 247)
error_code("ProducerConsumerQueueEndClosed", 248)
error_code("KeyedExecutorRetry", 259);
error_code("TooManyLogicalSessions", 261);
-
+error_code("ChangeStreamFatalError", 280)
# Error codes 4000-8999 are reserved.
# Non-sequential error codes (for compatibility only)
@@ -292,3 +292,5 @@ error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag",
"TooManyDocumentSequences"])
error_class("ExceededTimeLimitError", ["ExceededTimeLimit", "NetworkInterfaceExceededTimeLimit"])
+
+error_class("NonResumableChangeStreamError", ["ChangeStreamFatalError"])
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 79d813237ff..1f969a34666 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -107,7 +107,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext()
}
if (cannotResume) {
- uasserted(40585,
+ uasserted(ErrorCodes::ChangeStreamFatalError,
str::stream() << "resume of change stream was not possible, as the resume "
"token was not found. "
<< tokenFromSource.toDocument().toString());
@@ -166,7 +166,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
uassertStatusOK(_mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
- uassert(40576,
+ uassert(ErrorCodes::ChangeStreamFatalError,
"resume of change notification was not possible, as the resume point may no longer "
"be in the oplog. ",
firstOplogEntry["ts"].getTimestamp() < _token.getData().clusterTime);
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index b0f858ba933..e83f45e9556 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -182,7 +182,8 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) {
Timestamp doc2Timestamp(101, 1);
addDocument(doc1Timestamp, "1");
addDocument(doc2Timestamp, "2");
- ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
+ ASSERT_THROWS_CODE(
+ checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) {
@@ -200,7 +201,8 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID);
auto otherUUID = UUID::gen();
addDocument(resumeTimestamp, "1", otherUUID);
- ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
+ ASSERT_THROWS_CODE(
+ checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
@@ -338,7 +340,8 @@ TEST_F(ShardCheckResumabilityTest,
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
shardCheckResumability->injectMongoProcessInterface(
std::make_shared<MockMongoProcessInterface>(mockOplog));
- ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576);
+ ASSERT_THROWS_CODE(
+ shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) {
@@ -380,7 +383,8 @@ TEST_F(ShardCheckResumabilityTest,
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
shardCheckResumability->injectMongoProcessInterface(
std::make_shared<MockMongoProcessInterface>(mockOplog));
- ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576);
+ ASSERT_THROWS_CODE(
+ shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 3dc295727eb..e195b75c091 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -408,7 +408,7 @@ public:
auto lookedUpDocument = pipeline->getNext();
if (auto next = pipeline->getNext()) {
- uasserted(ErrorCodes::TooManyMatchingDocuments,
+ uasserted(ErrorCodes::ChangeStreamFatalError,
str::stream() << "found more than one document with document key "
<< documentKey.toString()
<< " ["
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index e27a91ee096..0590e33feb7 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -58,7 +58,7 @@ std::pair<ShardId, ChunkVersion> getSingleTargetedShardForQuery(
if (auto chunkMgr = routingInfo.cm()) {
std::set<ShardId> shardIds;
chunkMgr->getShardIdsForQuery(opCtx, query, CollationSpec::kSimpleSpec, &shardIds);
- uassert(ErrorCodes::InternalError,
+ uassert(ErrorCodes::ChangeStreamFatalError,
str::stream() << "Unable to target lookup query to a single shard: "
<< query.toString(),
shardIds.size() == 1u);
@@ -252,13 +252,13 @@ public:
auto& batch = cursor.getBatch();
// We should have at most 1 result, and the cursor should be exhausted.
- uassert(ErrorCodes::InternalError,
+ uassert(ErrorCodes::ChangeStreamFatalError,
str::stream() << "Shard cursor was unexpectedly open after lookup: "
<< shardResult.front().hostAndPort
<< ", id: "
<< cursor.getCursorId(),
cursor.getCursorId() == 0);
- uassert(ErrorCodes::TooManyMatchingDocuments,
+ uassert(ErrorCodes::ChangeStreamFatalError,
str::stream() << "found more than one document matching " << filter.toString()
<< " ["
<< batch.begin()->toString()