diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-01-15 23:54:23 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-02 20:44:19 +0000 |
commit | fbb9daeb791d16c49b861d82c097cf9bd9daf07e (patch) | |
tree | 660bb996be48530e2e1eb5c1c9fc303912c1812c /src/mongo/db/pipeline/document_source_change_stream.h | |
parent | 603a1d610e9ebfa7b43d4e5df0df2a5477622303 (diff) | |
download | mongo-fbb9daeb791d16c49b861d82c097cf9bd9daf07e.tar.gz |
SERVER-38975 Include UUID in high water marks from shards where the collection does not exist
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 36 |
1 files changed, 34 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 7d922a3ec7f..5f529f77a9c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -49,15 +49,46 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(request.getNamespaceString()); + return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) { + // We don't do any validation here, just a minimal check for the resume token. We also + // do not need to extract the token unless the stream is running on a single namespace. + if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) { + return; + } + // Check the 'resumeAfter' field first; if empty, check the 'startAfter' field. + auto specObj = spec.embeddedObject(); + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kResumeAfterFieldName); + if (_resumeToken.isEmpty()) { + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kStartAfterFieldName); + } + } bool isChangeStream() const final { return true; } + bool shouldResolveUUIDAndCollation() const final { + // If this is a whole-db or whole-cluster stream, never resolve the UUID and collation. + if (_nss.isCollectionlessAggregateNS()) { + return false; + } + // If we are not resuming, always resolve the UUID and collation. + if (_resumeToken.isEmpty()) { + return true; + } + // If we are resuming a single-collection stream from a high water mark that does not + // have a UUID, then the token was generated before the collection was created. Do not + // attempt to resolve the collection's current UUID or collation, so that the stream + // resumes in exactly the same condition as it was in when the token was generated. + auto tokenData = ResumeToken::parse(_resumeToken).getData(); + return !(ResumeToken::isHighWaterMarkToken(tokenData) && !tokenData.uuid); + } + bool allowedToForwardFromMongos() const final { return false; } @@ -98,6 +129,7 @@ public: private: const NamespaceString _nss; + BSONObj _resumeToken; }; // The name of the field where the document key (_id and shard key, if present) will be found |