diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-01-15 23:54:23 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-02 20:44:19 +0000 |
commit | fbb9daeb791d16c49b861d82c097cf9bd9daf07e (patch) | |
tree | 660bb996be48530e2e1eb5c1c9fc303912c1812c /src/mongo/db | |
parent | 603a1d610e9ebfa7b43d4e5df0df2a5477622303 (diff) | |
download | mongo-fbb9daeb791d16c49b861d82c097cf9bd9daf07e.tar.gz |
SERVER-38975 Include UUID in high water marks from shards where the collection does not exist
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_watch_for_uuid.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_watch_for_uuid.h | 82 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_document_source.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.h | 15 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.cpp | 2 |
11 files changed, 230 insertions, 19 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 78463d65c63..09ac63f0349 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -431,7 +431,7 @@ Status runAggregate(OperationContext* opCtx, // Upgrade and wait for read concern if necessary. _adjustChangeStreamReadConcern(opCtx); - if (!origNss.isCollectionlessAggregateNS()) { + if (liteParsedPipeline.shouldResolveUUIDAndCollation()) { // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view. AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 2480340ea2d..cda26fc3e84 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -403,6 +403,7 @@ pipelineeEnv.Library( 'document_source_sort_by_count.cpp', 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', + 'document_source_watch_for_uuid.cpp', 'pipeline.cpp', 'sequential_document_cache.cpp', 'stage_constraints.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 28f3c1df0c9..74ac3842377 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -45,6 +45,7 @@ #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_watch_for_uuid.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/resume_token.h" @@ -306,6 +307,14 @@ void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx, return; } + if (!tokenData.uuid && ResumeToken::isHighWaterMarkToken(tokenData)) { + // The only time we see a single-collection high water mark with no UUID is when the stream + // was opened on a non-existent collection. We allow this to proceed, as the resumed stream + // will immediately invalidate itself if it observes a createCollection event in the oplog + // with a non-simple collation. + return; + } + const auto cannotResumeErrMsg = "Attempted to resume a stream on a collection which has been dropped. The change stream's " "pipeline may need to make comparisons which should respect the collection's default " @@ -394,19 +403,26 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression // If we haven't already populated the initial PBRT, then we are starting from a specific // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. if (expCtx->initialPostBatchResumeToken.isEmpty()) { + Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)}; expCtx->initialPostBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(*startFrom, expCtx->uuid).toDocument().toBson(); + ResumeToken::makeHighWaterMarkToken(startTime, expCtx->uuid).toDocument().toBson(); } } const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); stages.push_back( DocumentSourceChangeStreamTransform::create(expCtx, fcv, elem.embeddedObject())); - stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); - // The resume stage must come after the check invalidate stage to allow the check invalidate - // stage to determine whether the oplog entry matching the resume token should be followed by an - // "invalidate" entry. + // If this is a single-collection stream but we don't have a UUID set on the expression context, + // then the stream was opened before the collection exists. Add a stage which will populate the + // UUID using the first change stream result observed by the pipeline during execution. + if (!expCtx->uuid && expCtx->isSingleNamespaceAggregation()) { + stages.push_back(DocumentSourceWatchForUUID::create(expCtx)); + } + + // The resume stage must come after the check invalidate stage so that the former can determine + // whether the event that matches the resume token should be followed by an "invalidate" event. + stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); if (resumeStage) { stages.push_back(resumeStage); } @@ -448,6 +464,13 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // to be merged, do not add a close cursor stage, since the mongos will already have one. stages.push_back(DocumentSourceCloseCursor::create(expCtx)); + // If this is a single-collection stream but we do not have a UUID set on the expression + // context, then the stream was opened before the collection exists. Add a stage on mongoS + // which will watch for and populate the UUID using the first result seen by the pipeline. + if (expCtx->inMongos && !expCtx->uuid && expCtx->isSingleNamespaceAggregation()) { + stages.push_back(DocumentSourceWatchForUUID::create(expCtx)); + } + // There should be only one post-image lookup stage. If we're on the shards and producing // input to be merged, the lookup is done on the mongos. if (shouldLookupPostImage) { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 7d922a3ec7f..5f529f77a9c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -49,15 +49,46 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(request.getNamespaceString()); + return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) { + // We don't do any validation here, just a minimal check for the resume token. We also + // do not need to extract the token unless the stream is running on a single namespace. + if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) { + return; + } + // Check the 'resumeAfter' field first; if empty, check the 'startAfter' field. + auto specObj = spec.embeddedObject(); + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kResumeAfterFieldName); + if (_resumeToken.isEmpty()) { + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kStartAfterFieldName); + } + } bool isChangeStream() const final { return true; } + bool shouldResolveUUIDAndCollation() const final { + // If this is a whole-db or whole-cluster stream, never resolve the UUID and collation. + if (_nss.isCollectionlessAggregateNS()) { + return false; + } + // If we are not resuming, always resolve the UUID and collation. + if (_resumeToken.isEmpty()) { + return true; + } + // If we are resuming a single-collection stream from a high water mark that does not + // have a UUID, then the token was generated before the collection was created. Do not + // attempt to resolve the collection's current UUID or collation, so that the stream + // resumes in exactly the same condition as it was in when the token was generated. + auto tokenData = ResumeToken::parse(_resumeToken).getData(); + return !(ResumeToken::isHighWaterMarkToken(tokenData) && !tokenData.uuid); + } + bool allowedToForwardFromMongos() const final { return false; } @@ -98,6 +129,7 @@ public: private: const NamespaceString _nss; + BSONObj _resumeToken; }; // The name of the field where the document key (_id and shard key, if present) will be found diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 493b284ba75..e9435d5ced2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -349,15 +349,6 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document invariant(!uuid.missing(), "Saw a CRUD op without a UUID"); } - // If the collection did not exist when the change stream was opened, then the UUID will not - // have been obtained from the catalog. In this case, we set the UUID on the ExpressionContext - // after obtaining it from the first relevant oplog entry, so that the UUID can be included in - // high water mark tokens for change streams watching a single collection. The UUID is needed - // for resumability against a single collection due to collation semantics. - if (!pExpCtx->uuid && !uuid.missing() && pExpCtx->isSingleNamespaceAggregation()) { - pExpCtx->uuid = uuid.getUuid(); - } - // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear // in the output. auto resumeTokenData = getResumeToken(ts, uuid, documentKey); diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp new file mode 100644 index 00000000000..1c26e4681ba --- /dev/null +++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_watch_for_uuid.h" + +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/resume_token.h" + +namespace mongo { + +DocumentSource::GetNextResult DocumentSourceWatchForUUID::getNext() { + pExpCtx->checkForInterrupt(); + + auto nextInput = pSource->getNext(); + if (!nextInput.isAdvanced()) + return nextInput; + + // This single-collection stream was opened before the collection was created, and the pipeline + // does not know its UUID. When we see the first event, we update our expression context with + // the UUID drawn from that event's resume token. + if (!pExpCtx->uuid) { + auto resumeToken = ResumeToken::parse( + nextInput.getDocument()[DocumentSourceChangeStream::kIdField].getDocument()); + pExpCtx->uuid = resumeToken.getData().uuid; + } + + // Forward the result without modification. + return nextInput; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.h b/src/mongo/db/pipeline/document_source_watch_for_uuid.h new file mode 100644 index 00000000000..1ff653378c1 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * In the event that a single-collection $changeStream is opened on a namespace whose UUID is not + * known, this stage will be added to the pipeline on both mongoD and mongoS. When the first event + * is observed by the pipeline, DSWatchForUUID will extract the collection's UUID from the event's + * resume token, and will use it to populate the pipeline's ExpressionContext::uuid. + */ +class DocumentSourceWatchForUUID final : public DocumentSource { +public: + GetNextResult getNext() final; + + const char* getSourceName() const final { + return "$watchForUUID"; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + // This stage is created by the DocumentSourceChangeStream stage, so serializing it + // here would result in it being created twice. + return Value(); + } + + static boost::intrusive_ptr<DocumentSourceWatchForUUID> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + // Only created for a single-collection stream where the UUID does not exist. + invariant(expCtx->isSingleNamespaceAggregation()); + invariant(!expCtx->uuid); + return new DocumentSourceWatchForUUID(expCtx); + } + +private: + /** + * The static 'create' method must be used to build a DocumentSourceWatchForUUID. + */ + DocumentSourceWatchForUUID(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx) {} +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index fc3b88163f3..119740410b0 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -109,6 +109,14 @@ public: } /** + * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this + * means adopting the collection's default collation, unless a custom collation was specified. + */ + virtual bool shouldResolveUUIDAndCollation() const { + return true; + } + + /** * Returns true if this stage does not require an input source. */ virtual bool isInitialSource() const { diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index dabffc24c3e..47b17706cf1 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -53,7 +53,7 @@ public: * May throw a AssertionException if there is an invalid stage specification, although full * validation happens later, during Pipeline construction. */ - LiteParsedPipeline(const AggregationRequest& request) { + LiteParsedPipeline(const AggregationRequest& request) : _nss(request.getNamespaceString()) { _stageSpecs.reserve(request.getPipeline().size()); for (auto&& rawStage : request.getPipeline()) { @@ -104,6 +104,18 @@ public: } /** + * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this + * means adopting the collection's default collation, unless a custom collation was specified. + */ + bool shouldResolveUUIDAndCollation() const { + // Collectionless aggregations do not have a UUID or default collation. + return !_nss.isCollectionlessAggregateNS() && + std::all_of(_stageSpecs.begin(), _stageSpecs.end(), [](auto&& spec) { + return spec->shouldResolveUUIDAndCollation(); + }); + } + + /** * Returns false if the pipeline has any stage which must be run locally on mongos. */ bool allowedToForwardFromMongos() const { @@ -153,6 +165,7 @@ public: private: std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs; + NamespaceString _nss; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 5f42b1a725b..b3b50d2fb46 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1574,6 +1574,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeStream" << BSON("fullDocument" @@ -1599,6 +1600,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeStream" << BSON("fullDocument" diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index 0ccb720b82b..5ddaf5a073a 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -71,7 +71,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { if (tokenData.version > 0) { out << ", tokenType: " << tokenData.tokenType; } - out << ", applyOpsIndex" << tokenData.applyOpsIndex; + out << ", applyOpsIndex: " << tokenData.applyOpsIndex; if (tokenData.version > 0) { out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate); } |