summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@10gen.com>2018-01-09 10:57:15 -0500
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-01-19 17:42:54 -0500
commit194ec4857fa0db8085da88e22eaae96687902d66 (patch)
tree7973e4852e590ebda38f6e39823fff50a97b466d
parenteaa820e25fed6b52ea3e9b9eade337e85aa91386 (diff)
downloadmongo-194ec4857fa0db8085da88e22eaae96687902d66.tar.gz
SERVER-32349 Change streams over sharded collections may produce merged op log entries
with the same timestamps if the operations are coming from multiple shards. When we resume the change stream we have to position to the right place - the position is determined both by the timestamp and the document id. Previously we checked the timestamp only, now we loop over the equal timestamps and find the right document.
-rw-r--r--jstests/sharding/resume_change_stream.js55
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp66
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h2
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp21
4 files changed, 125 insertions, 19 deletions
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js
index edd127e81d5..41a2426796a 100644
--- a/jstests/sharding/resume_change_stream.js
+++ b/jstests/sharding/resume_change_stream.js
@@ -59,8 +59,6 @@
// We awaited the replication of the first writes, so the change stream shouldn't return them.
assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
- st.rs0.awaitReplication();
- st.rs1.awaitReplication();
// Test that we see the two writes, and remember their resume tokens.
assert.soon(() => changeStream.hasNext());
@@ -134,5 +132,58 @@
expectedCode: 40576
});
+ // Drop the collection.
+ assert(mongosColl.drop());
+
+ // Shard the test collection on shardKey.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 50), [50, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 50}}));
+
+ // Move the [50, MaxKey] chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {shardKey: 51}, to: st.rs1.getURL()}));
+
+ const numberOfDocs = 100;
+
+ // Insert test documents.
+ for (let counter = 0; counter < numberOfDocs / 5; ++counter) {
+ assert.writeOK(mongosColl.insert({_id: "abcd" + counter, shardKey: counter * 5 + 0},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "Abcd" + counter, shardKey: counter * 5 + 1},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "aBcd" + counter, shardKey: counter * 5 + 2},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "abCd" + counter, shardKey: counter * 5 + 3},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "abcD" + counter, shardKey: counter * 5 + 4},
+ {writeConcern: {w: "majority"}}));
+ }
+
+ let allChangesCursor = mongosColl.aggregate([{$changeStream: {}}]);
+
+ // Perform the multi-update that will induce timestamp collisions
+ assert.writeOK(mongosColl.update({}, {$set: {updated: true}}, {multi: true}));
+
+ // Loop over documents and open inner change streams resuming from a specified position.
+ // Note we skip the last document as it does not have the next document so we would
+ // hang indefinitely.
+ for (let counter = 0; counter < numberOfDocs - 1; ++counter) {
+ assert.soon(() => allChangesCursor.hasNext());
+ let next = allChangesCursor.next();
+
+ const resumeToken = next._id;
+ const caseInsensitive = {locale: "en_US", strength: 2};
+ let resumedCaseInsensitiveCursor = mongosColl.aggregate(
+ [{$changeStream: {resumeAfter: resumeToken}}], {collation: caseInsensitive});
+ assert.soon(() => resumedCaseInsensitiveCursor.hasNext());
+ resumedCaseInsensitiveCursor.close();
+ }
+
+ allChangesCursor.close();
+
st.stop();
})();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 6fc95e0cec0..173eb1ae871 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -51,24 +51,66 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon
DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token)
- : DocumentSource(expCtx), _token(std::move(token)), _seenDoc(false) {}
+ : DocumentSource(expCtx), _token(std::move(token)), _haveSeenResumeToken(false) {}
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
pExpCtx->checkForInterrupt();
- auto nextInput = pSource->getNext();
- if (_seenDoc || !nextInput.isAdvanced())
- return nextInput;
+ if (_haveSeenResumeToken) {
+ // We've already verified the resume token is present.
+ return pSource->getNext();
+ }
+
+ bool cannotResume = false;
+ const auto tokenDataWeAreSearchingFor = _token.getData();
+ ResumeToken tokenFromSource;
+
+ // Keep iterating the stream until we see either the resume token we're looking for,
+ // or a change with a higher timestamp than our resume token.
+ while (!cannotResume && !_haveSeenResumeToken) {
+ auto nextInput = pSource->getNext();
+
+ if (!nextInput.isAdvanced())
+ return nextInput;
- _seenDoc = true;
- auto doc = nextInput.getDocument();
+ auto doc = nextInput.getDocument();
+
+ tokenFromSource = ResumeToken::parse(doc["_id"].getDocument());
+ auto tokenDataFromSource = tokenFromSource.getData();
+
+ // We start the resume with a $gte query on the timestamp, so we never expect it to be
+ // lower than our resume token's timestamp.
+ invariant(tokenDataFromSource.clusterTime >= tokenDataWeAreSearchingFor.clusterTime);
+
+ // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range
+ // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided
+ // token would sort before this received document we cannot resume the change stream.
+ // Use the simple collation to compare the resume tokens.
+ // Note this is purposefully avoiding the user's requested collation.
+ if (tokenDataWeAreSearchingFor.clusterTime == tokenDataFromSource.clusterTime) {
+ if (tokenDataWeAreSearchingFor.uuid != tokenDataFromSource.uuid) {
+ cannotResume = true;
+ } else if (ValueComparator::kInstance.evaluate(tokenDataWeAreSearchingFor.documentKey ==
+ tokenDataFromSource.documentKey)) {
+ _haveSeenResumeToken = true;
+ } else if (ValueComparator::kInstance.evaluate(tokenDataWeAreSearchingFor.documentKey <
+ tokenDataFromSource.documentKey)) {
+ // This means we will never see the resume token because it would have come before
+ // this one.
+ cannotResume = true;
+ }
+ } else {
+ cannotResume = true;
+ }
+ }
+
+ if (cannotResume) {
+ uasserted(40585,
+ str::stream() << "resume of change stream was not possible, as the resume "
+ "token was not found. "
+ << tokenFromSource.toDocument().toString());
+ }
- auto receivedToken = ResumeToken::parse(doc["_id"].getDocument());
- uassert(40585,
- str::stream()
- << "resume of change stream was not possible, as the resume token was not found. "
- << receivedToken.toDocument().toString(),
- receivedToken == _token);
// Don't return the document which has the token; the user has already seen it.
return pSource->getNext();
}
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index b06bae41da1..1f0244d4127 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -146,7 +146,7 @@ private:
ResumeToken token);
ResumeToken _token;
- bool _seenDoc;
+ bool _haveSeenResumeToken;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index cf1267eeb03..5d65c88044e 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/pipeline/stub_mongo_process_interface.h"
+#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
@@ -182,12 +183,12 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) {
ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
}
-TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongDocumentId) {
+TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
- addDocument(resumeTimestamp, "1");
- ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ addDocument(resumeTimestamp, "0");
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
@@ -200,6 +201,18 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
}
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
+ CollatorInterfaceMock collatorCompareLower(CollatorInterfaceMock::MockType::kToLowerString);
+ getExpCtx()->setCollator(&collatorCompareLower);
+
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "abc");
+ // We must not see the following document.
+ addDocument(resumeTimestamp, "ABC");
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
/**
* We should _error_ on the no-document case, because that means the resume token was not found.
*/