summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream.h
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-01-15 23:54:23 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-02-02 20:44:19 +0000
commitfbb9daeb791d16c49b861d82c097cf9bd9daf07e (patch)
tree660bb996be48530e2e1eb5c1c9fc303912c1812c /src/mongo/db/pipeline/document_source_change_stream.h
parent603a1d610e9ebfa7b43d4e5df0df2a5477622303 (diff)
downloadmongo-fbb9daeb791d16c49b861d82c097cf9bd9daf07e.tar.gz
SERVER-38975 Include UUID in high water marks from shards where the collection does not exist
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream.h')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h36
1 files changed, 34 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 7d922a3ec7f..5f529f77a9c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -49,15 +49,46 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>(request.getNamespaceString());
+ return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec);
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) {
+ // We don't do any validation here, just a minimal check for the resume token. We also
+ // do not need to extract the token unless the stream is running on a single namespace.
+ if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) {
+ return;
+ }
+ // Check the 'resumeAfter' field first; if empty, check the 'startAfter' field.
+ auto specObj = spec.embeddedObject();
+ _resumeToken =
+ specObj.getObjectField(DocumentSourceChangeStreamSpec::kResumeAfterFieldName);
+ if (_resumeToken.isEmpty()) {
+ _resumeToken =
+ specObj.getObjectField(DocumentSourceChangeStreamSpec::kStartAfterFieldName);
+ }
+ }
bool isChangeStream() const final {
return true;
}
+ bool shouldResolveUUIDAndCollation() const final {
+ // If this is a whole-db or whole-cluster stream, never resolve the UUID and collation.
+ if (_nss.isCollectionlessAggregateNS()) {
+ return false;
+ }
+ // If we are not resuming, always resolve the UUID and collation.
+ if (_resumeToken.isEmpty()) {
+ return true;
+ }
+ // If we are resuming a single-collection stream from a high water mark that does not
+ // have a UUID, then the token was generated before the collection was created. Do not
+ // attempt to resolve the collection's current UUID or collation, so that the stream
+ // resumes in exactly the same condition as it was in when the token was generated.
+ auto tokenData = ResumeToken::parse(_resumeToken).getData();
+ return !(ResumeToken::isHighWaterMarkToken(tokenData) && !tokenData.uuid);
+ }
+
bool allowedToForwardFromMongos() const final {
return false;
}
@@ -98,6 +129,7 @@ public:
private:
const NamespaceString _nss;
+ BSONObj _resumeToken;
};
// The name of the field where the document key (_id and shard key, if present) will be found