diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-24 16:56:20 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-12-22 04:27:20 +0000 |
commit | fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c (patch) | |
tree | 75749d0e4ff2d9db2001252018cc91e78801bc44 /src/mongo/db | |
parent | bdac7ced24f9ad8f9afac9c57e7184b1f2bf61b2 (diff) | |
download | mongo-fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c.tar.gz |
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
Diffstat (limited to 'src/mongo/db')
22 files changed, 341 insertions, 40 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 7402004d524..8dd72d0c5ff 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1080,6 +1080,7 @@ env.Library( 'exec/and_hash.cpp', 'exec/and_sorted.cpp', 'exec/cached_plan.cpp', + 'exec/change_stream_proxy.cpp', 'exec/collection_scan.cpp', 'exec/count.cpp', 'exec/count_scan.cpp', diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e137a982ee7..ea669c4a209 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -233,8 +233,10 @@ public: // As soon as we get a result, this operation no longer waits. awaitDataState(opCtx).shouldWaitForInserts = false; - // Add result to output buffer. + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken()); nextBatch->append(obj); (*numResults)++; } @@ -259,7 +261,10 @@ public: case PlanExecutor::IS_EOF: // This causes the reported latest oplog timestamp to advance even when there // are no results for this particular query. + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken()); default: return Status::OK(); } diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 5febd7f14fc..6562dcc6fbc 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,7 +41,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/exec/pipeline_proxy.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" @@ -155,8 +155,12 @@ bool handleCursorCommand(OperationContext* opCtx, } if (state == PlanExecutor::IS_EOF) { + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. responseBuilder.setLatestOplogTimestamp( cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken( + cursor->getExecutor()->getPostBatchResumeToken()); if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; @@ -176,7 +180,10 @@ bool handleCursorCommand(OperationContext* opCtx, break; } + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken()); responseBuilder.append(next); } @@ -578,8 +585,9 @@ Status runAggregate(OperationContext* opCtx, for (size_t idx = 0; idx < pipelines.size(); ++idx) { // Transfer ownership of the Pipeline to the PipelineProxyStage. auto ws = make_unique<WorkingSet>(); - auto proxy = - make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); + auto proxy = liteParsedPipeline.hasChangeStream() + ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()) + : make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); // This PlanExecutor will simply forward requests to the Pipeline, so does not need to // yield or to be registered with any collection's CursorManager to receive diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp new file mode 100644 index 00000000000..5af30b4b5a3 --- /dev/null +++ b/src/mongo/db/exec/change_stream_proxy.cpp @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/change_stream_proxy.h" + +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/pipeline/resume_token.h" + +namespace mongo { + +const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY"; + +ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws) + : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) { + invariant(std::any_of( + _pipeline->getSources().begin(), _pipeline->getSources().end(), [](const auto& stage) { + return stage->constraints().isChangeStreamStage(); + })); +} + +boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() { + if (auto next = _pipeline->getNext()) { + // While we have more results to return, we track both the timestamp and the resume token of + // the latest event observed in the oplog, the latter via its _id field. + auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson()); + _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (next->getField("_id").getType() == BSONType::Object) { + _postBatchResumeToken = next->getField("_id").getDocument().toBson(); + } + return nextBSON; + } + + // We ran out of results to return. Check whether the oplog cursor has moved forward since the + // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return, + // if the new time is higher than the last then we are guaranteed not to have already returned + // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token + // at the current clusterTime. + auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (highWaterMark > _latestOplogTimestamp) { + auto token = ResumeToken::makeHighWaterMarkResumeToken(highWaterMark); + _postBatchResumeToken = token.toDocument().toBson(); + _latestOplogTimestamp = highWaterMark; + } + return boost::none; +} + +std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() { + std::unique_ptr<PlanStageStats> ret = + std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY); + ret->specific = std::make_unique<CollectionScanStats>(); + return ret; +} + +} // namespace mongo diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h new file mode 100644 index 00000000000..601fe43b582 --- /dev/null +++ b/src/mongo/db/exec/change_stream_proxy.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/pipeline_proxy.h" + +namespace mongo { + +/** + * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the + * serialization of change stream pipeline output from Document to BSON. In particular, it is + * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that + * are necessary for correct merging on mongoS and, in the latter case, must also be provided to + * mongoD clients. + */ +class ChangeStreamProxyStage final : public PipelineProxyStage { +public: + static const char* kStageType; + + /** + * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into + * the constructor will cause an invariant() to fail. + */ + ChangeStreamProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws); + + /** + * Returns an empty PlanStageStats object. + */ + std::unique_ptr<PlanStageStats> getStats() final; + + /** + * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog + * timestamp in the event that we need to merge on mongoS. + */ + Timestamp getLatestOplogTimestamp() const { + return _includeMetaData ? _latestOplogTimestamp : Timestamp(); + } + + /** + * Passes through the most recent resume token from the proxied pipeline. + */ + BSONObj getPostBatchResumeToken() const { + return _postBatchResumeToken; + } + + StageType stageType() const final { + return STAGE_CHANGE_STREAM_PROXY; + } + +protected: + boost::optional<BSONObj> getNextBson() final; + +private: + Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; +}; +} // namespace mongo diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index 4267337ab2b..9b0119d1429 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -51,7 +51,13 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, WorkingSet* ws) - : PlanStage(kStageType, opCtx), + : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {} + +PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws, + const char* stageTypeName) + : PlanStage(stageTypeName, opCtx), _pipeline(std::move(pipeline)), _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger _ws(ws) { @@ -128,10 +134,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() { return boost::none; } -Timestamp PipelineProxyStage::getLatestOplogTimestamp() const { - return PipelineD::getLatestOplogTimestamp(_pipeline.get()); -} - std::string PipelineProxyStage::getPlanSummaryStr() const { return PipelineD::getPlanSummaryStr(_pipeline.get()); } diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index 9520b2139eb..4cd36a88aa7 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -45,12 +45,14 @@ namespace mongo { /** * Stage for pulling results out from an aggregation pipeline. */ -class PipelineProxyStage final : public PlanStage { +class PipelineProxyStage : public PlanStage { public: PipelineProxyStage(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, WorkingSet* ws); + virtual ~PipelineProxyStage() = default; + PlanStage::StageState doWork(WorkingSetID* out) final; bool isEOF() final; @@ -62,22 +64,17 @@ public: void doReattachToOperationContext() final; // Returns empty PlanStageStats object - std::unique_ptr<PlanStageStats> getStats() final; + std::unique_ptr<PlanStageStats> getStats() override; // Not used. SpecificStats* getSpecificStats() const final { MONGO_UNREACHABLE; } - /** - * Pass through the last oplog timestamp from the proxied pipeline. - */ - Timestamp getLatestOplogTimestamp() const; - std::string getPlanSummaryStr() const; void getPlanSummaryStats(PlanSummaryStats* statsOut) const; - StageType stageType() const final { + StageType stageType() const override { return STAGE_PIPELINE_PROXY; } @@ -90,17 +87,20 @@ public: static const char* kStageType; protected: - void doDispose() final; + PipelineProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws, + const char* stageTypeName); -private: - boost::optional<BSONObj> getNextBson(); + virtual boost::optional<BSONObj> getNextBson(); + void doDispose() final; - // Things in the _stash should be returned before pulling items from _pipeline. + // Items in the _stash should be returned before pulling items from _pipeline. std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; - std::vector<BSONObj> _stash; const bool _includeMetaData; - // Not owned by us. +private: + std::vector<BSONObj> _stash; WorkingSet* _ws; }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 00b41e38e71..96c917aacf6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1458,9 +1458,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) { TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false); - // Drop database entry doesn't have a UUID. + // Drop database entry has a nil UUID. Document expectedDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::nil())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, @@ -1468,7 +1468,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + kDefaultTs, UUID::nil(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index db600fb9e5b..df345d7eaf2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -347,6 +347,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document if (operationType != DocumentSourceChangeStream::kInvalidateOpType && operationType != DocumentSourceChangeStream::kDropDatabaseOpType) { invariant(!uuid.missing(), "Saw a CRUD op without a UUID"); + } else { + // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after + // high-water-mark tokens. Their sorting relative to other events remains unchanged. + uuid = Value(UUID::nil()); } // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index cd0783eb5ac..b019bbf8e35 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -118,9 +118,9 @@ void DocumentSourceCursor::loadBatch() { // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and - // needs-merge case), batching will result in a wrong time. - if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || - (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || + // awaitData case), batching will result in a wrong time. + if (pExpCtx->isTailableAwaitData() || + awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 6e706c1e1f2..62ca426172f 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -574,7 +574,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep plannerOpts |= QueryPlannerParams::IS_COUNT; } - if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) { + invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; } diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index c7a40f3388c..70b3a1cf72d 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -46,6 +46,16 @@ namespace mongo { constexpr StringData ResumeToken::kDataFieldName; constexpr StringData ResumeToken::kTypeBitsFieldName; +namespace { +// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken. +ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) { + invariant(!clusterTime.isNull()); + ResumeTokenData tokenData; + tokenData.clusterTime = clusterTime; + return tokenData; +} +} // namespace + bool ResumeTokenData::operator==(const ResumeTokenData& other) const { return clusterTime == other.clusterTime && version == other.version && fromInvalidate == other.fromInvalidate && @@ -194,4 +204,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } +ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) { + return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime)); +} + +bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) { + return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 16a22311373..a50ac20d8b9 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -109,6 +109,18 @@ public: static ResumeToken parse(const Document& document); /** + * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey. + */ + static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime); + + /** + * Returns true if the given token data represents a valid high-water-mark resume token; that + * is, it does not refer to a specific operation, but instead specifies a clusterTime after + * which the stream should resume. + */ + static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData); + + /** * The default no-argument constructor is required by the IDL for types used as non-optional * fields. */ diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 4828002e401..b246d6d2b5b 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -50,6 +50,7 @@ const char kBatchFieldInitial[] = "firstBatch"; const char kBatchDocSequenceField[] = "cursor.nextBatch"; const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch"; const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp"; +const char kPostBatchResumeTokenField[] = "postBatchResumeToken"; } // namespace @@ -76,6 +77,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) } else { _batch.reset(); } + if (!_postBatchResumeToken.isEmpty()) { + _cursorObject->append(kPostBatchResumeTokenField, _postBatchResumeToken); + } _cursorObject->append(kIdField, cursorId); _cursorObject->append(kNsField, cursorNamespace); _cursorObject.reset(); @@ -124,12 +128,14 @@ CursorResponse::CursorResponse(NamespaceString nss, std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar, boost::optional<Timestamp> latestOplogTimestamp, + boost::optional<BSONObj> postBatchResumeToken, boost::optional<BSONObj> writeConcernError) : _nss(std::move(nss)), _cursorId(cursorId), _batch(std::move(batch)), _numReturnedSoFar(numReturnedSoFar), _latestOplogTimestamp(latestOplogTimestamp), + _postBatchResumeToken(std::move(postBatchResumeToken)), _writeConcernError(std::move(writeConcernError)) {} std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany( @@ -220,6 +226,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo doc.shareOwnershipWith(cmdResponse); } + auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField]; + if (postBatchResumeTokenElem && postBatchResumeTokenElem.type() != BSONType::Object) { + return {ErrorCodes::BadValue, + str::stream() << kPostBatchResumeTokenField + << " format is invalid; expected Object, but found: " + << postBatchResumeTokenElem.type()}; + } + auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField]; if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) { return { @@ -243,6 +257,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo boost::none, latestOplogTimestampElem ? latestOplogTimestampElem.timestamp() : boost::optional<Timestamp>{}, + postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned() + : boost::optional<BSONObj>{}, writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}}; } @@ -261,6 +277,11 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, } batchBuilder.doneFast(); + if (_postBatchResumeToken) { + invariant(!_postBatchResumeToken->isEmpty()); + cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); + } + cursorBuilder.doneFast(); if (_latestOplogTimestamp) { diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 7ed4a5f3f73..af38ce46a6d 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -93,6 +93,10 @@ public: _latestOplogTimestamp = ts; } + void setPostBatchResumeToken(BSONObj token) { + _postBatchResumeToken = token.getOwned(); + } + long long numDocs() const { return _numDocs; } @@ -122,6 +126,7 @@ private: bool _active = true; long long _numDocs = 0; Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; }; /** @@ -198,6 +203,7 @@ public: std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar = boost::none, boost::optional<Timestamp> latestOplogTimestamp = boost::none, + boost::optional<BSONObj> postBatchResumeToken = boost::none, boost::optional<BSONObj> writeConcernError = boost::none); CursorResponse(CursorResponse&& other) = default; @@ -240,6 +246,10 @@ public: return _latestOplogTimestamp; } + boost::optional<BSONObj> getPostBatchResumeToken() const { + return _postBatchResumeToken; + } + boost::optional<BSONObj> getWriteConcernError() const { return _writeConcernError; } @@ -259,6 +269,7 @@ private: std::vector<BSONObj> _batch; boost::optional<long long> _numReturnedSoFar; boost::optional<Timestamp> _latestOplogTimestamp; + boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; }; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index 2ae05e88a7d..e1adef1fbd7 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -34,6 +34,7 @@ #include "mongo/rpc/op_msg_rpc_impls.h" +#include "mongo/db/pipeline/resume_token.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -334,6 +335,35 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) { ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2)); } +TEST(CursorResponseTest, serializePostBatchResumeToken) { + std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; + auto postBatchResumeToken = + ResumeToken::makeHighWaterMarkResumeToken(Timestamp(1, 2)).toDocument().toBson(); + CursorResponse response(NamespaceString("db.coll"), + CursorId(123), + batch, + boost::none, + boost::none, + postBatchResumeToken); + auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); + ASSERT_BSONOBJ_EQ(serialized, + BSON("cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll" + << "nextBatch" + << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)) + << "postBatchResumeToken" + << postBatchResumeToken) + << "ok" + << 1)); + auto reparsed = CursorResponse::parseFromBSON(serialized); + ASSERT_OK(reparsed.getStatus()); + CursorResponse reparsedResponse = std::move(reparsed.getValue()); + ASSERT_EQ(reparsedResponse.getCursorId(), CursorId(123)); + ASSERT_EQ(reparsedResponse.getNSS().ns(), "db.coll"); + ASSERT_EQ(reparsedResponse.getBatch().size(), 2U); + ASSERT_BSONOBJ_EQ(*reparsedResponse.getPostBatchResumeToken(), postBatchResumeToken); +} + TEST(CursorResponseTest, cursorReturnDocumentSequences) { CursorResponseBuilder::Options options; options.isInitialResponse = true; diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp index acff30e9a64..ad299239b82 100644 --- a/src/mongo/db/query/explain.cpp +++ b/src/mongo/db/query/explain.cpp @@ -129,7 +129,8 @@ MultiPlanStage* getMultiPlanStage(PlanStage* root) { * there is no PPS that is root. */ PipelineProxyStage* getPipelineProxyStage(PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { return static_cast<PipelineProxyStage*>(root); } @@ -894,7 +895,8 @@ std::string Explain::getPlanSummary(const PlanExecutor* exec) { // static std::string Explain::getPlanSummary(const PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { auto pipelineProxy = static_cast<const PipelineProxyStage*>(root); return pipelineProxy->getPlanSummaryStr(); } @@ -928,7 +930,8 @@ void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsO PlanStage* root = exec.getRootStage(); - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { auto pipelineProxy = static_cast<PipelineProxyStage*>(root); pipelineProxy->getPlanSummaryStats(statsOut); return; diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index ea1eaed31f6..6c8d5defe74 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -452,7 +452,13 @@ public: * If the last oplog timestamp is being tracked for this PlanExecutor, return it. * Otherwise return a null timestamp. */ - virtual Timestamp getLatestOplogTimestamp() = 0; + virtual Timestamp getLatestOplogTimestamp() const = 0; + + /** + * If this PlanExecutor is tracking change stream resume tokens, return the most recent token + * for the batch that is currently being built. Otherwise, return an empty object. + */ + virtual BSONObj getPostBatchResumeToken() const = 0; /** * Turns a BSONObj representing an error status produced by getNext() into a Status. diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index b8b31dbac62..53d00097fd1 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -41,9 +41,9 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/multi_plan.h" -#include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/subplan.h" @@ -709,14 +709,20 @@ bool PlanExecutorImpl::isDetached() const { return _currentState == kDetached; } -Timestamp PlanExecutorImpl::getLatestOplogTimestamp() { - if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) - return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); +Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getLatestOplogTimestamp(); if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp(); return Timestamp(); } +BSONObj PlanExecutorImpl::getPostBatchResumeToken() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getPostBatchResumeToken(); + return {}; +} + Status PlanExecutorImpl::getMemberObjectStatus(const BSONObj& memberObj) const { return WorkingSetCommon::getMemberObjectStatus(memberObj); } diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h index 9b9a0f4f7df..79a13f1086d 100644 --- a/src/mongo/db/query/plan_executor_impl.h +++ b/src/mongo/db/query/plan_executor_impl.h @@ -78,7 +78,8 @@ public: Status getKillStatus() final; bool isDisposed() const final; bool isDetached() const final; - Timestamp getLatestOplogTimestamp() final; + Timestamp getLatestOplogTimestamp() const final; + BSONObj getPostBatchResumeToken() const final; Status getMemberObjectStatus(const BSONObj& memberObj) const final; private: diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 511c9c32c26..31a7ff7a8ab 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -363,6 +363,7 @@ PlanStage* buildStages(OperationContext* opCtx, return new EnsureSortedStage(opCtx, esn->pattern, ws, childStage); } case STAGE_CACHED_PLAN: + case STAGE_CHANGE_STREAM_PROXY: case STAGE_COUNT: case STAGE_DELETE: case STAGE_EOF: diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index d99d212124d..6fbdba5ea36 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -78,7 +78,8 @@ enum StageType { STAGE_OR, STAGE_PROJECTION, - // Stage for running aggregation pipelines. + // Stages for running aggregation pipelines. + STAGE_CHANGE_STREAM_PROXY, STAGE_PIPELINE_PROXY, STAGE_QUEUED_DATA, |