From fbb9daeb791d16c49b861d82c097cf9bd9daf07e Mon Sep 17 00:00:00 2001 From: Bernard Gorman Date: Tue, 15 Jan 2019 23:54:23 +0000 Subject: SERVER-38975 Include UUID in high water marks from shards where the collection does not exist --- .../db/pipeline/document_source_change_stream.h | 36 ++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) (limited to 'src/mongo/db/pipeline/document_source_change_stream.h') diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 7d922a3ec7f..5f529f77a9c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -49,15 +49,46 @@ public: public: static std::unique_ptr parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique(request.getNamespaceString()); + return stdx::make_unique(request.getNamespaceString(), spec); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) { + // We don't do any validation here, just a minimal check for the resume token. We also + // do not need to extract the token unless the stream is running on a single namespace. + if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) { + return; + } + // Check the 'resumeAfter' field first; if empty, check the 'startAfter' field. + auto specObj = spec.embeddedObject(); + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kResumeAfterFieldName); + if (_resumeToken.isEmpty()) { + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kStartAfterFieldName); + } + } bool isChangeStream() const final { return true; } + bool shouldResolveUUIDAndCollation() const final { + // If this is a whole-db or whole-cluster stream, never resolve the UUID and collation. + if (_nss.isCollectionlessAggregateNS()) { + return false; + } + // If we are not resuming, always resolve the UUID and collation. + if (_resumeToken.isEmpty()) { + return true; + } + // If we are resuming a single-collection stream from a high water mark that does not + // have a UUID, then the token was generated before the collection was created. Do not + // attempt to resolve the collection's current UUID or collation, so that the stream + // resumes in exactly the same condition as it was in when the token was generated. + auto tokenData = ResumeToken::parse(_resumeToken).getData(); + return !(ResumeToken::isHighWaterMarkToken(tokenData) && !tokenData.uuid); + } + bool allowedToForwardFromMongos() const final { return false; } @@ -98,6 +129,7 @@ public: private: const NamespaceString _nss; + BSONObj _resumeToken; }; // The name of the field where the document key (_id and shard key, if present) will be found -- cgit v1.2.1