diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-01-15 23:54:23 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-02 20:44:19 +0000 |
commit | fbb9daeb791d16c49b861d82c097cf9bd9daf07e (patch) | |
tree | 660bb996be48530e2e1eb5c1c9fc303912c1812c /src/mongo | |
parent | 603a1d610e9ebfa7b43d4e5df0df2a5477622303 (diff) | |
download | mongo-fbb9daeb791d16c49b861d82c097cf9bd9daf07e.tar.gz |
SERVER-38975 Include UUID in high water marks from shards where the collection does not exist
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_watch_for_uuid.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_watch_for_uuid.h | 82 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_document_source.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.h | 15 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 16 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.cpp | 26 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.h | 5 |
15 files changed, 275 insertions, 32 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 78463d65c63..09ac63f0349 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -431,7 +431,7 @@ Status runAggregate(OperationContext* opCtx, // Upgrade and wait for read concern if necessary. _adjustChangeStreamReadConcern(opCtx); - if (!origNss.isCollectionlessAggregateNS()) { + if (liteParsedPipeline.shouldResolveUUIDAndCollation()) { // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view. AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 2480340ea2d..cda26fc3e84 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -403,6 +403,7 @@ pipelineeEnv.Library( 'document_source_sort_by_count.cpp', 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', + 'document_source_watch_for_uuid.cpp', 'pipeline.cpp', 'sequential_document_cache.cpp', 'stage_constraints.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 28f3c1df0c9..74ac3842377 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -45,6 +45,7 @@ #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_watch_for_uuid.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/resume_token.h" @@ -306,6 +307,14 @@ void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx, return; } + if (!tokenData.uuid && ResumeToken::isHighWaterMarkToken(tokenData)) { + // The only time we see a single-collection high water mark with no UUID is when the stream + // was opened on a non-existent collection. We allow this to proceed, as the resumed stream + // will immediately invalidate itself if it observes a createCollection event in the oplog + // with a non-simple collation. + return; + } + const auto cannotResumeErrMsg = "Attempted to resume a stream on a collection which has been dropped. The change stream's " "pipeline may need to make comparisons which should respect the collection's default " @@ -394,19 +403,26 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression // If we haven't already populated the initial PBRT, then we are starting from a specific // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. if (expCtx->initialPostBatchResumeToken.isEmpty()) { + Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)}; expCtx->initialPostBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(*startFrom, expCtx->uuid).toDocument().toBson(); + ResumeToken::makeHighWaterMarkToken(startTime, expCtx->uuid).toDocument().toBson(); } } const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); stages.push_back( DocumentSourceChangeStreamTransform::create(expCtx, fcv, elem.embeddedObject())); - stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); - // The resume stage must come after the check invalidate stage to allow the check invalidate - // stage to determine whether the oplog entry matching the resume token should be followed by an - // "invalidate" entry. + // If this is a single-collection stream but we don't have a UUID set on the expression context, + // then the stream was opened before the collection exists. Add a stage which will populate the + // UUID using the first change stream result observed by the pipeline during execution. + if (!expCtx->uuid && expCtx->isSingleNamespaceAggregation()) { + stages.push_back(DocumentSourceWatchForUUID::create(expCtx)); + } + + // The resume stage must come after the check invalidate stage so that the former can determine + // whether the event that matches the resume token should be followed by an "invalidate" event. + stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); if (resumeStage) { stages.push_back(resumeStage); } @@ -448,6 +464,13 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // to be merged, do not add a close cursor stage, since the mongos will already have one. stages.push_back(DocumentSourceCloseCursor::create(expCtx)); + // If this is a single-collection stream but we do not have a UUID set on the expression + // context, then the stream was opened before the collection exists. Add a stage on mongoS + // which will watch for and populate the UUID using the first result seen by the pipeline. + if (expCtx->inMongos && !expCtx->uuid && expCtx->isSingleNamespaceAggregation()) { + stages.push_back(DocumentSourceWatchForUUID::create(expCtx)); + } + // There should be only one post-image lookup stage. If we're on the shards and producing // input to be merged, the lookup is done on the mongos. if (shouldLookupPostImage) { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 7d922a3ec7f..5f529f77a9c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -49,15 +49,46 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(request.getNamespaceString()); + return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) { + // We don't do any validation here, just a minimal check for the resume token. We also + // do not need to extract the token unless the stream is running on a single namespace. + if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) { + return; + } + // Check the 'resumeAfter' field first; if empty, check the 'startAfter' field. + auto specObj = spec.embeddedObject(); + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kResumeAfterFieldName); + if (_resumeToken.isEmpty()) { + _resumeToken = + specObj.getObjectField(DocumentSourceChangeStreamSpec::kStartAfterFieldName); + } + } bool isChangeStream() const final { return true; } + bool shouldResolveUUIDAndCollation() const final { + // If this is a whole-db or whole-cluster stream, never resolve the UUID and collation. + if (_nss.isCollectionlessAggregateNS()) { + return false; + } + // If we are not resuming, always resolve the UUID and collation. + if (_resumeToken.isEmpty()) { + return true; + } + // If we are resuming a single-collection stream from a high water mark that does not + // have a UUID, then the token was generated before the collection was created. Do not + // attempt to resolve the collection's current UUID or collation, so that the stream + // resumes in exactly the same condition as it was in when the token was generated. + auto tokenData = ResumeToken::parse(_resumeToken).getData(); + return !(ResumeToken::isHighWaterMarkToken(tokenData) && !tokenData.uuid); + } + bool allowedToForwardFromMongos() const final { return false; } @@ -98,6 +129,7 @@ public: private: const NamespaceString _nss; + BSONObj _resumeToken; }; // The name of the field where the document key (_id and shard key, if present) will be found diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 493b284ba75..e9435d5ced2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -349,15 +349,6 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document invariant(!uuid.missing(), "Saw a CRUD op without a UUID"); } - // If the collection did not exist when the change stream was opened, then the UUID will not - // have been obtained from the catalog. In this case, we set the UUID on the ExpressionContext - // after obtaining it from the first relevant oplog entry, so that the UUID can be included in - // high water mark tokens for change streams watching a single collection. The UUID is needed - // for resumability against a single collection due to collation semantics. - if (!pExpCtx->uuid && !uuid.missing() && pExpCtx->isSingleNamespaceAggregation()) { - pExpCtx->uuid = uuid.getUuid(); - } - // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear // in the output. auto resumeTokenData = getResumeToken(ts, uuid, documentKey); diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp new file mode 100644 index 00000000000..1c26e4681ba --- /dev/null +++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.cpp @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_watch_for_uuid.h" + +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/resume_token.h" + +namespace mongo { + +DocumentSource::GetNextResult DocumentSourceWatchForUUID::getNext() { + pExpCtx->checkForInterrupt(); + + auto nextInput = pSource->getNext(); + if (!nextInput.isAdvanced()) + return nextInput; + + // This single-collection stream was opened before the collection was created, and the pipeline + // does not know its UUID. When we see the first event, we update our expression context with + // the UUID drawn from that event's resume token. + if (!pExpCtx->uuid) { + auto resumeToken = ResumeToken::parse( + nextInput.getDocument()[DocumentSourceChangeStream::kIdField].getDocument()); + pExpCtx->uuid = resumeToken.getData().uuid; + } + + // Forward the result without modification. + return nextInput; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.h b/src/mongo/db/pipeline/document_source_watch_for_uuid.h new file mode 100644 index 00000000000..1ff653378c1 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * In the event that a single-collection $changeStream is opened on a namespace whose UUID is not + * known, this stage will be added to the pipeline on both mongoD and mongoS. When the first event + * is observed by the pipeline, DSWatchForUUID will extract the collection's UUID from the event's + * resume token, and will use it to populate the pipeline's ExpressionContext::uuid. + */ +class DocumentSourceWatchForUUID final : public DocumentSource { +public: + GetNextResult getNext() final; + + const char* getSourceName() const final { + return "$watchForUUID"; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + // This stage is created by the DocumentSourceChangeStream stage, so serializing it + // here would result in it being created twice. + return Value(); + } + + static boost::intrusive_ptr<DocumentSourceWatchForUUID> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + // Only created for a single-collection stream where the UUID does not exist. + invariant(expCtx->isSingleNamespaceAggregation()); + invariant(!expCtx->uuid); + return new DocumentSourceWatchForUUID(expCtx); + } + +private: + /** + * The static 'create' method must be used to build a DocumentSourceWatchForUUID. + */ + DocumentSourceWatchForUUID(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx) {} +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index fc3b88163f3..119740410b0 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -109,6 +109,14 @@ public: } /** + * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this + * means adopting the collection's default collation, unless a custom collation was specified. + */ + virtual bool shouldResolveUUIDAndCollation() const { + return true; + } + + /** * Returns true if this stage does not require an input source. */ virtual bool isInitialSource() const { diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index dabffc24c3e..47b17706cf1 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -53,7 +53,7 @@ public: * May throw a AssertionException if there is an invalid stage specification, although full * validation happens later, during Pipeline construction. */ - LiteParsedPipeline(const AggregationRequest& request) { + LiteParsedPipeline(const AggregationRequest& request) : _nss(request.getNamespaceString()) { _stageSpecs.reserve(request.getPipeline().size()); for (auto&& rawStage : request.getPipeline()) { @@ -104,6 +104,18 @@ public: } /** + * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this + * means adopting the collection's default collation, unless a custom collation was specified. + */ + bool shouldResolveUUIDAndCollation() const { + // Collectionless aggregations do not have a UUID or default collation. + return !_nss.isCollectionlessAggregateNS() && + std::all_of(_stageSpecs.begin(), _stageSpecs.end(), [](auto&& spec) { + return spec->shouldResolveUUIDAndCollation(); + }); + } + + /** * Returns false if the pipeline has any stage which must be run locally on mongos. */ bool allowedToForwardFromMongos() const { @@ -153,6 +165,7 @@ public: private: std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs; + NamespaceString _nss; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 5f42b1a725b..b3b50d2fb46 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1574,6 +1574,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeStream" << BSON("fullDocument" @@ -1599,6 +1600,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeStream" << BSON("fullDocument" diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index 0ccb720b82b..5ddaf5a073a 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -71,7 +71,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { if (tokenData.version > 0) { out << ", tokenType: " << tokenData.tokenType; } - out << ", applyOpsIndex" << tokenData.applyOpsIndex; + out << ", applyOpsIndex: " << tokenData.applyOpsIndex; if (tokenData.version > 0) { out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate); } diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index b3713c6bd9f..637df1c1b3c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -461,15 +461,17 @@ BSONObj getDefaultCollationForUnshardedCollection(const BSONObj collectionInfo) std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( const boost::optional<CachedCollectionRoutingInfo>& routingInfo, const NamespaceString& nss, - const AggregationRequest& request) { + const AggregationRequest& request, + const LiteParsedPipeline& litePipe) { const bool collectionIsSharded = (routingInfo && routingInfo->cm()); const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); - // Because collectionless aggregations are generally run against the 'admin' database, the - // standard logic will attempt to resolve its non-existent UUID and collation by sending a - // specious 'listCollections' command to the config servers. To prevent this, we immediately - // return the user-defined collation if one exists, or an empty BSONObj otherwise. - if (nss.isCollectionlessAggregateNS()) { + // If the LiteParsedPipeline reports that we should not attempt to resolve the namespace's UUID + // and collation, we immediately return the user-defined collation if one exists, or an empty + // BSONObj otherwise. For instance, because collectionless aggregations generally run against + // the 'admin' database, the standard logic would attempt to resolve its non-existent UUID and + // collation by sending a specious 'listCollections' command to the config servers. + if (!litePipe.shouldResolveUUIDAndCollation()) { return {request.getCollation(), boost::none}; } @@ -741,7 +743,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // Populate the collection UUID and the appropriate collation to use. - auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request); + auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe); BSONObj collationObj = collInfo.first; boost::optional<UUID> uuid = collInfo.second; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 6aca6fa5cb7..e9f55a271e6 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -648,9 +648,14 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); } + // If the cursor has been exhausted, we will communicate this by returning a CursorId of zero. + auto idToReturn = + (cursorState == ClusterCursorManager::CursorState::Exhausted ? CursorId(0) + : request.cursorid); + // For empty batches, or in the case where the final result was added to the batch rather than // being stashed, we update the PBRT here to ensure that it is the most recent available. - if (!stashedResult) { + if (idToReturn && !stashedResult) { postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); } @@ -660,10 +665,6 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // the cursor has been exhausted, the cursor manager will clean it up for us. pinnedCursor.getValue().returnCursor(cursorState); - CursorId idToReturn = (cursorState == ClusterCursorManager::CursorState::Exhausted) - ? CursorId(0) - : request.cursorid; - // Set nReturned and whether the cursor has been exhausted. CurOp::get(opCtx)->debug().cursorExhausted = (idToReturn == 0); CurOp::get(opCtx)->debug().nreturned = batch.size(); diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index f79812092c3..0811c130abf 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -55,6 +55,7 @@ StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecCo // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. if (auto result = _mergePipeline->getNext()) { + _validateAndRecordSortKey(*result); return {result->toBson()}; } @@ -88,7 +89,30 @@ std::size_t RouterStagePipeline::getNumRemotes() const { } BSONObj RouterStagePipeline::getPostBatchResumeToken() const { - return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); + auto pbrt = _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); + return pbrt.isEmpty() ? pbrt : _setPostBatchResumeTokenUUID(pbrt); +} + +BSONObj RouterStagePipeline::_setPostBatchResumeTokenUUID(BSONObj pbrt) const { + // If the PBRT does not match the sort key of the latest document, it is a high water mark. + const bool isHighWaterMark = !pbrt.binaryEqual(_latestSortKey); + + // If this stream is on a single collection and the token is a high water mark, then it may have + // come from a shard that does not have the collection. If so, we must fill in the correct UUID. + if (isHighWaterMark && _mergePipeline->getContext()->uuid) { + auto tokenData = ResumeToken::parse(pbrt).getData(); + // Check whether the UUID is missing before regenerating the token. + if (!tokenData.uuid) { + invariant(tokenData.tokenType == ResumeTokenData::kHighWaterMarkToken); + tokenData.uuid = _mergePipeline->getContext()->uuid; + pbrt = ResumeToken(tokenData).toDocument().toBson(); + } + } + return pbrt; +} + +void RouterStagePipeline::_validateAndRecordSortKey(const Document& doc) { + _latestSortKey = doc.getSortKeyMetaField(); } bool RouterStagePipeline::remotesExhausted() { diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 7afc6bfd19f..167016ad103 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -64,10 +64,15 @@ protected: void doDetachFromOperationContext() final; private: + BSONObj _setPostBatchResumeTokenUUID(BSONObj pbrt) const; + void _validateAndRecordSortKey(const Document& doc); + std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline; // May be null if this pipeline is executing exclusively on mongos and will not contact the // shards at all. boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage; + + BSONObj _latestSortKey; }; } // namespace mongo |