summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-06-22 06:25:30 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2019-06-29 17:12:16 +0100
commit498b7851cfa4d4860fd6d865647c24f680b32bbd (patch)
tree4adaf8b49e60de88f1d2a19db014ee39eb4f2afe /src/mongo/db
parente6513a6b1d45045f735f207ea78f35efaac7967e (diff)
downloadmongo-498b7851cfa4d4860fd6d865647c24f680b32bbd.tar.gz
SERVER-41196 Integrate 'invalidate' tokens into change stream's standard resume logic
(cherry picked from commit 7e4423b458fcefd37a62ebecf168716166b7dc4c)
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.cpp22
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.h14
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp84
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h7
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp146
6 files changed, 236 insertions, 49 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b9225fbe8cf..5e5861f2971 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -341,7 +341,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
list<intrusive_ptr<DocumentSource>> stages;
boost::optional<Timestamp> startFrom;
intrusive_ptr<DocumentSource> resumeStage = nullptr;
- bool ignoreFirstInvalidate = false;
+ boost::optional<ResumeTokenData> startAfterInvalidate;
bool showMigrationEvents = spec.getShowMigrationEvents();
uassert(31123,
"Change streams from mongos may not show migration events.",
@@ -357,9 +357,11 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get();
ResumeTokenData tokenData = token.getData();
- // If resuming from an "invalidate" using "startAfter", set this bit to indicate to the
- // DocumentSourceCheckInvalidate stage that a second invalidate should not be generated.
- ignoreFirstInvalidate = startAfter && tokenData.fromInvalidate;
+ // If resuming from an "invalidate" using "startAfter", pass along the resume token data to
+ // DocumentSourceCheckInvalidate to signify that another invalidate should not be generated.
+ if (startAfter && tokenData.fromInvalidate) {
+ startAfterInvalidate = tokenData;
+ }
uassert(ErrorCodes::InvalidResumeToken,
"Attempting to resume a change stream using 'resumeAfter' is not allowed from an "
@@ -432,7 +434,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
// The resume stage must come after the check invalidate stage so that the former can determine
// whether the event that matches the resume token should be followed by an "invalidate" event.
- stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate));
+ stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, startAfterInvalidate));
if (resumeStage) {
stages.push_back(resumeStage);
}
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
index ff15060d44f..bc7d8993045 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
@@ -81,14 +81,22 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() {
// identical resume token to the notification for the command, except with an extra flag
// indicating that the token is from an invalidate. This flag is necessary to disambiguate
// the two tokens, and thus preserve a total ordering on the stream.
-
- // As a special case, if a client receives an invalidate like this one and then wants to
- // start a new stream after the invalidate, they can use the "startAfter" option, in which
- // case '_ignoreFirstInvalidate' will be set, and we should ignore (AKA not generate) the
- // very first invalidation.
- if (isInvalidatingCommand(pExpCtx, operationType) && !_ignoreFirstInvalidate) {
+ if (isInvalidatingCommand(pExpCtx, operationType)) {
auto resumeTokenData = ResumeToken::parse(doc[DSCS::kIdField].getDocument()).getData();
resumeTokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate;
+
+ // If a client receives an invalidate and wants to start a new stream after the invalidate,
+ // they can use the 'startAfter' option. In this case, '_startAfterInvalidate' will be set
+ // to the resume token with which the client restarted the stream. We must be sure to avoid
+ // re-invalidating the new stream, and so we will swallow the first invalidate we see on
+ // each shard. The one exception is the invalidate which matches the 'startAfter' resume
+ // token. We must re-generate this invalidate, since DSEnsureResumeTokenPresent needs to see
+ // (and will take care of swallowing) the event which exactly matches the client's token.
+ if (_startAfterInvalidate && resumeTokenData != _startAfterInvalidate) {
+ _startAfterInvalidate.reset();
+ return nextInput;
+ }
+
auto resumeTokenDoc = ResumeToken(resumeTokenData).toDocument();
MutableDocument result(Document{{DSCS::kIdField, resumeTokenDoc},
@@ -113,7 +121,7 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() {
// Regardless of whether the first document we see is an invalidating command, we only skip the
// first invalidate for streams with the 'startAfter' option, so we should not skip any
// invalidates that come after the first one.
- _ignoreFirstInvalidate = false;
+ _startAfterInvalidate.reset();
return nextInput;
}
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h
index 66b289bb36b..349ad68c589 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.h
@@ -69,8 +69,9 @@ public:
}
static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, bool ignoreFirstInvalidate) {
- return new DocumentSourceCheckInvalidate(expCtx, ignoreFirstInvalidate);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<ResumeTokenData> startAfterInvalidate) {
+ return new DocumentSourceCheckInvalidate(expCtx, std::move(startAfterInvalidate));
}
private:
@@ -78,11 +79,14 @@ private:
* Use the create static method to create a DocumentSourceCheckInvalidate.
*/
DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- bool ignoreFirstInvalidate)
- : DocumentSource(expCtx), _ignoreFirstInvalidate(ignoreFirstInvalidate) {}
+ boost::optional<ResumeTokenData> startAfterInvalidate)
+ : DocumentSource(expCtx), _startAfterInvalidate(std::move(startAfterInvalidate)) {
+ invariant(!_startAfterInvalidate ||
+ _startAfterInvalidate->fromInvalidate == ResumeTokenData::kFromInvalidate);
+ }
+ boost::optional<ResumeTokenData> _startAfterInvalidate;
boost::optional<Document> _queuedInvalidate;
- bool _ignoreFirstInvalidate;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index c3383dfb9d9..719be15ea2b 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -78,6 +78,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
: ResumeStatus::kCheckNextDoc;
}
+ // If the document's 'txnIndex' sorts before that of the client token, we must keep looking.
if (tokenDataFromResumedStream.txnOpIndex < tokenDataFromClient.txnOpIndex) {
return ResumeStatus::kCheckNextDoc;
} else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) {
@@ -90,6 +91,12 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
return ResumeStatus::kSurpassedToken;
}
+ // If 'fromInvalidate' exceeds the client's token value, then we have passed the resume point.
+ if (tokenDataFromResumedStream.fromInvalidate != tokenDataFromClient.fromInvalidate) {
+ return tokenDataFromResumedStream.fromInvalidate ? ResumeStatus::kSurpassedToken
+ : ResumeStatus::kCheckNextDoc;
+ }
+
// It is acceptable for the stream UUID to differ from the client's, if this is a whole-database
// or cluster-wide stream and we are comparing operations from different shards at the same
// clusterTime. If the stream UUID sorts after the client's, however, then the stream is not
@@ -184,39 +191,68 @@ DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
pExpCtx->checkForInterrupt();
- if (_resumeStatus == ResumeStatus::kFoundToken) {
+ if (_resumeStatus == ResumeStatus::kSurpassedToken) {
// We've already verified the resume token is present.
return pSource->getNext();
}
- Document documentFromResumedStream;
-
- // Keep iterating the stream until we see either the resume token we're looking for,
- // or a change with a higher timestamp than our resume token.
- while (_resumeStatus == ResumeStatus::kCheckNextDoc) {
+ // The incoming documents are sorted by resume token. We examine a range of documents that have
+ // the same clusterTime as the client's resume token, until we either find (and swallow) a match
+ // for the token or pass the point in the stream where it should have been.
+ while (_resumeStatus != ResumeStatus::kSurpassedToken) {
auto nextInput = pSource->getNext();
- if (!nextInput.isAdvanced())
+ // If there are no more results, return EOF. We will continue checking for the client's
+ // resume token the next time the getNext method is called.
+ if (!nextInput.isAdvanced()) {
return nextInput;
-
- // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range
- // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided
- // token would sort before this received document we cannot resume the change stream.
- _resumeStatus = compareAgainstClientResumeToken(
- pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient);
+ }
+ // Check the current event. If we found and swallowed the resume token, then the result will
+ // be the first event in the stream which should be returned to the user. Otherwise, we keep
+ // iterating the stream until we find an event matching the client's resume token.
+ if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) {
+ return *nextOutput;
+ }
}
+ MONGO_UNREACHABLE;
+}
- uassert(40585,
- str::stream()
- << "resume of change stream was not possible, as the resume token was not found. "
- << documentFromResumedStream["_id"].getDocument().toString(),
- _resumeStatus != ResumeStatus::kSurpassedToken);
-
- // If we reach this point, then we've seen the resume token.
- invariant(_resumeStatus == ResumeStatus::kFoundToken);
-
- // Don't return the document which has the token; the user has already seen it.
- return pSource->getNext();
+boost::optional<DocumentSource::GetNextResult>
+DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken(
+ const DocumentSource::GetNextResult& nextInput) {
+ // We should only ever call this method when we have a new event to examine.
+ invariant(nextInput.isAdvanced());
+ auto resumeStatus =
+ compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
+ switch (resumeStatus) {
+ case ResumeStatus::kCheckNextDoc:
+ return boost::none;
+ case ResumeStatus::kFoundToken:
+ // We found the resume token. If we are starting after an 'invalidate' token and the
+ // invalidating command (e.g. collection drop) occurred at the same clusterTime on
+ // more than one shard, then we will see multiple identical 'invalidate' events
+ // here. We should continue to swallow all of them to ensure that the new stream
+ // begins after the collection drop, and that it is not immediately re-invalidated.
+ if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) {
+ _resumeStatus = ResumeStatus::kFoundToken;
+ return boost::none;
+ }
+ // If the token is not an invalidate or if we are not running in a cluster, we mark
+ // the stream as having surpassed the resume token, skip the current event since the
+ // client has already seen it, and return the next event in the stream.
+ _resumeStatus = ResumeStatus::kSurpassedToken;
+ return pSource->getNext();
+ case ResumeStatus::kSurpassedToken:
+ // If we have surpassed the point in the stream where the resume token should have
+ // been and we did not see the token itself, then this stream cannot be resumed.
+ uassert(40585,
+ str::stream() << "cannot resume stream; the resume token was not found. "
+ << nextInput.getDocument()["_id"].getDocument().toString(),
+ _resumeStatus == ResumeStatus::kFoundToken);
+ _resumeStatus = ResumeStatus::kSurpassedToken;
+ return nextInput;
+ }
+ MONGO_UNREACHABLE;
}
const char* DocumentSourceShardCheckResumability::getSourceName() const {
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index c4cd53bb2cf..8c90a88b564 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -154,6 +154,13 @@ private:
DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx,
ResumeTokenData token);
+ /**
+ * Check the given event to determine whether it matches the client's resume token. If so, we
+ * swallow this event and return the next event in the stream. Otherwise, return boost::none.
+ */
+ boost::optional<DocumentSource::GetNextResult> _checkNextDocAndSwallowResumeToken(
+ const DocumentSource::GetNextResult& nextInput);
+
ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
ResumeTokenData _tokenFromClient;
};
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 85d424b3bdd..a63579fa9fb 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -62,15 +62,20 @@ public:
protected:
/**
+ * Pushes a document with a resume token corresponding to the given ResumeTokenData into the
+ * mock queue.
+ */
+ void addDocument(ResumeTokenData tokenData) {
+ _mock->push_back(Document{{"_id", ResumeToken(std::move(tokenData)).toDocument()}});
+ }
+
+ /**
* Pushes a document with a resume token corresponding to the given timestamp, version,
* txnOpIndex, docKey, and namespace into the mock queue.
*/
void addDocument(
Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) {
- _mock->push_back(
- Document{{"_id",
- ResumeToken(ResumeTokenData(ts, version, txnOpIndex, uuid, Value(docKey)))
- .toDocument()}});
+ return addDocument({ts, version, txnOpIndex, uuid, Value(docKey)});
}
/**
@@ -93,6 +98,17 @@ protected:
}
/**
+ * Convenience method to create the class under test with a given ResumeTokenData.
+ */
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ ResumeTokenData tokenData) {
+ auto checkResumeToken =
+ DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), std::move(tokenData));
+ checkResumeToken->setSource(_mock.get());
+ return checkResumeToken;
+ }
+
+ /**
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
@@ -102,10 +118,8 @@ protected:
std::size_t txnOpIndex,
boost::optional<Document> docKey,
UUID uuid) {
- auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(
- getExpCtx(), {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()});
- checkResumeToken->setSource(_mock.get());
- return checkResumeToken;
+ return createCheckResumeToken(
+ {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()});
}
/**
@@ -409,6 +423,122 @@ TEST_F(CheckResumeTokenTest,
ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
}
+TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterInvalidate) {
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp firstEventAfter(100, 2);
+
+ // Create an array of 2 UUIDs. The first represents the UUID of the namespace before it was
+ // dropped. The second is the UUID of the collection after it is recreated.
+ UUID uuids[2] = {UUID::gen(), UUID::gen()};
+
+ // This behaviour is only relevant when DSEnsureResumeTokenPresent is running on mongoS.
+ getExpCtx()->inMongos = true;
+
+ // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A
+ // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new
+ // stream after the old stream is invalidated.
+ ResumeTokenData invalidateToken;
+ invalidateToken.clusterTime = resumeTimestamp;
+ invalidateToken.uuid = uuids[0];
+ invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
+ auto checkResumeToken = createCheckResumeToken(invalidateToken);
+
+ // Add three documents which each have the invalidate resume token. We expect to see this in the
+ // event that we are starting after an invalidate and the invalidating event occurred on several
+ // shards at the same clusterTime.
+ addDocument(invalidateToken);
+ addDocument(invalidateToken);
+ addDocument(invalidateToken);
+
+ // Add a document representing an insert which recreated the collection after it was dropped.
+ auto expectedDocKey = Document{{"_id"_sd, 1}};
+ addDocument(Timestamp{100, 2}, expectedDocKey, uuids[1]);
+
+ // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it
+ // and the next two invalidates, and return the insert event after the collection drop.
+ const auto firstDocAfterResume = checkResumeToken->getNext();
+ const auto tokenFromFirstDocAfterResume =
+ ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
+
+ ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, firstEventAfter);
+ ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey);
+}
+
+TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInvalidate) {
+ Timestamp resumeTimestamp(100, 1);
+
+ // This behaviour is only relevant when DSEnsureResumeTokenPresent is running on mongoS.
+ getExpCtx()->inMongos = true;
+
+ // Create an ordered array of of 2 UUIDs.
+ std::vector<UUID> uuids = {UUID::gen(), UUID::gen()};
+ std::sort(uuids.begin(), uuids.end());
+
+ // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A
+ // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new
+ // stream after the old stream is invalidated.
+ ResumeTokenData invalidateToken;
+ invalidateToken.clusterTime = resumeTimestamp;
+ invalidateToken.uuid = uuids[0];
+ invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
+ auto checkResumeToken = createCheckResumeToken(invalidateToken);
+
+ // Create a second invalidate token with the same clusterTime but a different UUID.
+ auto unrelatedInvalidateToken = invalidateToken;
+ unrelatedInvalidateToken.uuid = uuids[1];
+
+ // Add three documents which each have the invalidate resume token. We expect to see this in the
+ // event that we are starting after an invalidate and the invalidating event occurred on several
+ // shards at the same clusterTime.
+ addDocument(invalidateToken);
+ addDocument(invalidateToken);
+ addDocument(invalidateToken);
+
+ // Add a fourth document which has the unrelated invalidate at the same clusterTime.
+ addDocument(unrelatedInvalidateToken);
+
+ // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it
+ // and the next two invalidates, but decline to swallow the unrelated invalidate.
+ const auto firstDocAfterResume = checkResumeToken->getNext();
+ const auto tokenFromFirstDocAfterResume =
+ ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
+
+ ASSERT_EQ(tokenFromFirstDocAfterResume, unrelatedInvalidateToken);
+}
+
+TEST_F(CheckResumeTokenTest, ShouldSwallowOnlyFirstInvalidateForStartAfterInvalidateInReplSet) {
+ Timestamp resumeTimestamp(100, 1);
+
+ // We only swallow multiple invalidates when DSEnsureResumeTokenPresent is running on mongoS.
+ // Set {inMongos:false} to verify that we do not swallow additional invalidates on a replica
+ // set, since this should never occur.
+ getExpCtx()->inMongos = false;
+
+ // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A
+ // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new
+ // stream after the old stream is invalidated.
+ ResumeTokenData invalidateToken;
+ invalidateToken.clusterTime = resumeTimestamp;
+ invalidateToken.uuid = testUuid();
+ invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
+ auto checkResumeToken = createCheckResumeToken(invalidateToken);
+
+ // Add three documents which each have the invalidate resume token.
+ addDocument(invalidateToken);
+ addDocument(invalidateToken);
+ addDocument(invalidateToken);
+
+ // DSEnsureResumeTokenPresent should confirm that the invalidate event is present and swallow
+ // it. However, it should not swallow the subsequent two invalidates.
+ for (size_t i = 0; i < 2; ++i) {
+ const auto nextDocAfterResume = checkResumeToken->getNext();
+ const auto tokenFromNextDocAfterResume =
+ ResumeToken::parse(nextDocAfterResume.getDocument()["_id"].getDocument()).getData();
+ ASSERT_EQ(tokenFromNextDocAfterResume, invalidateToken);
+ }
+ ASSERT(checkResumeToken->getNext().isEOF());
+}
+
TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
Timestamp resumeTimestamp(100, 1);