summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-04-03 16:55:52 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2018-04-06 02:47:55 +0100
commit41084e8f0fa354a9efc28a354321200e94a2fcf6 (patch)
tree309fed55bfb25682117ca7a4c170c1699cadfbd6 /src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
parent87be281e31034f80723d5299b70e7e956a48c494 (diff)
downloadmongo-41084e8f0fa354a9efc28a354321200e94a2fcf6.tar.gz
SERVER-34090 Allow resuming change stream when resume token's document key does not contain the shard key
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token_test.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp116
1 files changed, 108 insertions, 8 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 1bd6f4a4988..5a65973f5d9 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -61,13 +61,19 @@ public:
protected:
/**
- * Puts an arbitrary document with resume token corresponding to the given timestamp, id, and
- * namespace in the mock queue.
+ * Pushes a document with a resume token corresponding to the given timestamp, docKey, and
+ * namespace into the mock queue.
+ */
+ void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
+ _mock->queue.push_back(
+ Document{{"_id", ResumeToken(ResumeTokenData(ts, Value(docKey), uuid)).toDocument()}});
+ }
+ /**
+ * Pushes a document with a resume token corresponding to the given timestamp, _id string, and
+ * namespace into the mock queue.
*/
void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) {
- _mock->queue.push_back(Document{
- {"_id",
- ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid)).toDocument()}});
+ addDocument(ts, Document{{"_id", id}}, uuid);
}
void addPause() {
@@ -75,15 +81,24 @@ protected:
}
/**
- * Convenience method to create the class under test with a given timestamp, id, and namespace.
+ * Convenience method to create the class under test with a given timestamp, docKey, and
+ * namespace.
*/
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
- Timestamp ts, StringData id, UUID uuid = testUuid()) {
- ResumeToken token(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid));
+ Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) {
+ ResumeToken token(ResumeTokenData(ts, docKey ? Value(*docKey) : Value(), uuid));
auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), token);
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
}
+ /**
+ * Convenience method to create the class under test with a given timestamp, _id string, and
+ * namespace.
+ */
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ Timestamp ts, StringData id, UUID uuid = testUuid()) {
+ return createCheckResumeToken(ts, Document{{"_id", id}}, uuid);
+ }
/**
* This method is required to avoid a static initialization fiasco resulting from calling
@@ -210,6 +225,91 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
+TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdMatchesFirstDoc) {
+ // Verify that a resume token whose documentKey only contains _id can be used to resume a stream
+ // on a sharded collection as long as its _id matches the first document. We set 'inMongos'
+ // since this behaviour is only applicable when DocumentSourceEnsureResumeTokenPresent is
+ // running on mongoS.
+ Timestamp resumeTimestamp(100, 1);
+ getExpCtx()->inMongos = true;
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+
+ Timestamp doc1Timestamp(100, 1);
+ addDocument(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
+ Timestamp doc2Timestamp(100, 2);
+ Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}};
+ addDocument(doc2Timestamp, doc2DocKey);
+
+ // We should skip doc1 since it satisfies the resume token, and retrieve doc2.
+ const auto firstDocAfterResume = checkResumeToken->getNext();
+ const auto tokenFromFirstDocAfterResume =
+ ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
+
+ ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, doc2Timestamp);
+ ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), doc2DocKey);
+}
+
+TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoesNotMatchFirstDoc) {
+ Timestamp resumeTimestamp(100, 1);
+ getExpCtx()->inMongos = true;
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+
+ addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}});
+ addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
+}
+
+TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumentKeyFields) {
+ // Verify that the relaxed _id check only applies if _id is the sole field present in the
+ // client's resume token, even if all the fields that are present match the first doc. We set
+ // 'inMongos' since this is only applicable when DocumentSourceEnsureResumeTokenPresent is
+ // running on mongoS.
+ Timestamp resumeTimestamp(100, 1);
+ getExpCtx()->inMongos = true;
+
+ auto checkResumeToken =
+ createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}});
+
+ addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}});
+ addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}});
+
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
+}
+
+TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject) {
+ // Verify that a resume token whose documentKey is not a valid object will neither succeed nor
+ // cause an invariant when we perform the relaxed documentKey._id check when running in a
+ // sharded context.
+ Timestamp resumeTimestamp(100, 1);
+ getExpCtx()->inMongos = true;
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, boost::none);
+
+ addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}});
+ addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
+}
+
+TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) {
+ // Verify that a resume token whose documentKey omits the _id field will neither succeed nor
+ // cause an invariant when we perform the relaxed documentKey._id, even when compared against an
+ // artificial stream token whose _id is also missing.
+ Timestamp resumeTimestamp(100, 1);
+ getExpCtx()->inMongos = true;
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}});
+
+ addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}});
+ addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}});
+ addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}});
+
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
+}
+
/**
* We should _error_ on the no-document case, because that means the resume token was not found.
*/