summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-24 16:56:20 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2018-12-22 04:27:20 +0000
commitfc5bc0947ceedee3b61b2d922cabd3e5df7ec07c (patch)
tree75749d0e4ff2d9db2001252018cc91e78801bc44 /src/mongo/db
parentbdac7ced24f9ad8f9afac9c57e7184b1f2bf61b2 (diff)
downloadmongo-fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c.tar.gz
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp7
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp84
-rw-r--r--src/mongo/db/exec/change_stream_proxy.h86
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp12
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h28
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp18
-rw-r--r--src/mongo/db/pipeline/resume_token.h12
-rw-r--r--src/mongo/db/query/cursor_response.cpp21
-rw-r--r--src/mongo/db/query/cursor_response.h11
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp30
-rw-r--r--src/mongo/db/query/explain.cpp9
-rw-r--r--src/mongo/db/query/plan_executor.h8
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp14
-rw-r--r--src/mongo/db/query/plan_executor_impl.h3
-rw-r--r--src/mongo/db/query/stage_builder.cpp1
-rw-r--r--src/mongo/db/query/stage_types.h3
22 files changed, 341 insertions, 40 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 7402004d524..8dd72d0c5ff 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1080,6 +1080,7 @@ env.Library(
'exec/and_hash.cpp',
'exec/and_sorted.cpp',
'exec/cached_plan.cpp',
+ 'exec/change_stream_proxy.cpp',
'exec/collection_scan.cpp',
'exec/count.cpp',
'exec/count_scan.cpp',
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index e137a982ee7..ea669c4a209 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -233,8 +233,10 @@ public:
// As soon as we get a result, this operation no longer waits.
awaitDataState(opCtx).shouldWaitForInserts = false;
- // Add result to output buffer.
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken());
nextBatch->append(obj);
(*numResults)++;
}
@@ -259,7 +261,10 @@ public:
case PlanExecutor::IS_EOF:
// This causes the reported latest oplog timestamp to advance even when there
// are no results for this particular query.
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken());
default:
return Status::OK();
}
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 5febd7f14fc..6562dcc6fbc 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,7 +41,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/exec/pipeline_proxy.h"
+#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
@@ -155,8 +155,12 @@ bool handleCursorCommand(OperationContext* opCtx,
}
if (state == PlanExecutor::IS_EOF) {
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
responseBuilder.setLatestOplogTimestamp(
cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(
+ cursor->getExecutor()->getPostBatchResumeToken());
if (!cursor->isTailable()) {
// make it an obvious error to use cursor or executor after this point
cursor = nullptr;
@@ -176,7 +180,10 @@ bool handleCursorCommand(OperationContext* opCtx,
break;
}
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken());
responseBuilder.append(next);
}
@@ -578,8 +585,9 @@ Status runAggregate(OperationContext* opCtx,
for (size_t idx = 0; idx < pipelines.size(); ++idx) {
// Transfer ownership of the Pipeline to the PipelineProxyStage.
auto ws = make_unique<WorkingSet>();
- auto proxy =
- make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
+ auto proxy = liteParsedPipeline.hasChangeStream()
+ ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipelines[idx]), ws.get())
+ : make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
// This PlanExecutor will simply forward requests to the Pipeline, so does not need to
// yield or to be registered with any collection's CursorManager to receive
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
new file mode 100644
index 00000000000..5af30b4b5a3
--- /dev/null
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -0,0 +1,84 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/change_stream_proxy.h"
+
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/pipeline/resume_token.h"
+
+namespace mongo {
+
+const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY";
+
+ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws)
+ : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {
+ invariant(std::any_of(
+ _pipeline->getSources().begin(), _pipeline->getSources().end(), [](const auto& stage) {
+ return stage->constraints().isChangeStreamStage();
+ }));
+}
+
+boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
+ if (auto next = _pipeline->getNext()) {
+ // While we have more results to return, we track both the timestamp and the resume token of
+ // the latest event observed in the oplog, the latter via its _id field.
+ auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson());
+ _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (next->getField("_id").getType() == BSONType::Object) {
+ _postBatchResumeToken = next->getField("_id").getDocument().toBson();
+ }
+ return nextBSON;
+ }
+
+ // We ran out of results to return. Check whether the oplog cursor has moved forward since the
+ // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return,
+ // if the new time is higher than the last then we are guaranteed not to have already returned
+ // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token
+ // at the current clusterTime.
+ auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (highWaterMark > _latestOplogTimestamp) {
+ auto token = ResumeToken::makeHighWaterMarkResumeToken(highWaterMark);
+ _postBatchResumeToken = token.toDocument().toBson();
+ _latestOplogTimestamp = highWaterMark;
+ }
+ return boost::none;
+}
+
+std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() {
+ std::unique_ptr<PlanStageStats> ret =
+ std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY);
+ ret->specific = std::make_unique<CollectionScanStats>();
+ return ret;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h
new file mode 100644
index 00000000000..601fe43b582
--- /dev/null
+++ b/src/mongo/db/exec/change_stream_proxy.h
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/pipeline_proxy.h"
+
+namespace mongo {
+
+/**
+ * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the
+ * serialization of change stream pipeline output from Document to BSON. In particular, it is
+ * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that
+ * are necessary for correct merging on mongoS and, in the latter case, must also be provided to
+ * mongoD clients.
+ */
+class ChangeStreamProxyStage final : public PipelineProxyStage {
+public:
+ static const char* kStageType;
+
+ /**
+ * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into
+ * the constructor will cause an invariant() to fail.
+ */
+ ChangeStreamProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws);
+
+ /**
+ * Returns an empty PlanStageStats object.
+ */
+ std::unique_ptr<PlanStageStats> getStats() final;
+
+ /**
+ * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog
+ * timestamp in the event that we need to merge on mongoS.
+ */
+ Timestamp getLatestOplogTimestamp() const {
+ return _includeMetaData ? _latestOplogTimestamp : Timestamp();
+ }
+
+ /**
+ * Passes through the most recent resume token from the proxied pipeline.
+ */
+ BSONObj getPostBatchResumeToken() const {
+ return _postBatchResumeToken;
+ }
+
+ StageType stageType() const final {
+ return STAGE_CHANGE_STREAM_PROXY;
+ }
+
+protected:
+ boost::optional<BSONObj> getNextBson() final;
+
+private:
+ Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
+};
+} // namespace mongo
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index 4267337ab2b..9b0119d1429 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -51,7 +51,13 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
WorkingSet* ws)
- : PlanStage(kStageType, opCtx),
+ : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {}
+
+PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws,
+ const char* stageTypeName)
+ : PlanStage(stageTypeName, opCtx),
_pipeline(std::move(pipeline)),
_includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger
_ws(ws) {
@@ -128,10 +134,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
return boost::none;
}
-Timestamp PipelineProxyStage::getLatestOplogTimestamp() const {
- return PipelineD::getLatestOplogTimestamp(_pipeline.get());
-}
-
std::string PipelineProxyStage::getPlanSummaryStr() const {
return PipelineD::getPlanSummaryStr(_pipeline.get());
}
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index 9520b2139eb..4cd36a88aa7 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -45,12 +45,14 @@ namespace mongo {
/**
* Stage for pulling results out from an aggregation pipeline.
*/
-class PipelineProxyStage final : public PlanStage {
+class PipelineProxyStage : public PlanStage {
public:
PipelineProxyStage(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
WorkingSet* ws);
+ virtual ~PipelineProxyStage() = default;
+
PlanStage::StageState doWork(WorkingSetID* out) final;
bool isEOF() final;
@@ -62,22 +64,17 @@ public:
void doReattachToOperationContext() final;
// Returns empty PlanStageStats object
- std::unique_ptr<PlanStageStats> getStats() final;
+ std::unique_ptr<PlanStageStats> getStats() override;
// Not used.
SpecificStats* getSpecificStats() const final {
MONGO_UNREACHABLE;
}
- /**
- * Pass through the last oplog timestamp from the proxied pipeline.
- */
- Timestamp getLatestOplogTimestamp() const;
-
std::string getPlanSummaryStr() const;
void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
- StageType stageType() const final {
+ StageType stageType() const override {
return STAGE_PIPELINE_PROXY;
}
@@ -90,17 +87,20 @@ public:
static const char* kStageType;
protected:
- void doDispose() final;
+ PipelineProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws,
+ const char* stageTypeName);
-private:
- boost::optional<BSONObj> getNextBson();
+ virtual boost::optional<BSONObj> getNextBson();
+ void doDispose() final;
- // Things in the _stash should be returned before pulling items from _pipeline.
+ // Items in the _stash should be returned before pulling items from _pipeline.
std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
- std::vector<BSONObj> _stash;
const bool _includeMetaData;
- // Not owned by us.
+private:
+ std::vector<BSONObj> _stash;
WorkingSet* _ws;
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 00b41e38e71..96c917aacf6 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1458,9 +1458,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) {
TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false);
- // Drop database entry doesn't have a UUID.
+ // Drop database entry has a nil UUID.
Document expectedDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::nil())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
@@ -1468,7 +1468,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
Document expectedInvalidate{
{DSChangeStream::kIdField,
makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ kDefaultTs, UUID::nil(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index db600fb9e5b..df345d7eaf2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -347,6 +347,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
if (operationType != DocumentSourceChangeStream::kInvalidateOpType &&
operationType != DocumentSourceChangeStream::kDropDatabaseOpType) {
invariant(!uuid.missing(), "Saw a CRUD op without a UUID");
+ } else {
+ // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after
+ // high-water-mark tokens. Their sorting relative to other events remains unchanged.
+ uuid = Value(UUID::nil());
}
// Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index cd0783eb5ac..b019bbf8e35 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -118,9 +118,9 @@ void DocumentSourceCursor::loadBatch() {
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
- // needs-merge case), batching will result in a wrong time.
- if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
- (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
+ // awaitData case), batching will result in a wrong time.
+ if (pExpCtx->isTailableAwaitData() ||
+ awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 6e706c1e1f2..62ca426172f 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -574,7 +574,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::IS_COUNT;
}
- if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) {
+ invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
}
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index c7a40f3388c..70b3a1cf72d 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -46,6 +46,16 @@ namespace mongo {
constexpr StringData ResumeToken::kDataFieldName;
constexpr StringData ResumeToken::kTypeBitsFieldName;
+namespace {
+// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken.
+ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) {
+ invariant(!clusterTime.isNull());
+ ResumeTokenData tokenData;
+ tokenData.clusterTime = clusterTime;
+ return tokenData;
+}
+} // namespace
+
bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
return clusterTime == other.clusterTime && version == other.version &&
fromInvalidate == other.fromInvalidate &&
@@ -194,4 +204,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
+ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) {
+ return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime));
+}
+
+bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) {
+ return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 16a22311373..a50ac20d8b9 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -109,6 +109,18 @@ public:
static ResumeToken parse(const Document& document);
/**
+ * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey.
+ */
+ static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime);
+
+ /**
+ * Returns true if the given token data represents a valid high-water-mark resume token; that
+ * is, it does not refer to a specific operation, but instead specifies a clusterTime after
+ * which the stream should resume.
+ */
+ static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData);
+
+ /**
* The default no-argument constructor is required by the IDL for types used as non-optional
* fields.
*/
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 4828002e401..b246d6d2b5b 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -50,6 +50,7 @@ const char kBatchFieldInitial[] = "firstBatch";
const char kBatchDocSequenceField[] = "cursor.nextBatch";
const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch";
const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
+const char kPostBatchResumeTokenField[] = "postBatchResumeToken";
} // namespace
@@ -76,6 +77,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace)
} else {
_batch.reset();
}
+ if (!_postBatchResumeToken.isEmpty()) {
+ _cursorObject->append(kPostBatchResumeTokenField, _postBatchResumeToken);
+ }
_cursorObject->append(kIdField, cursorId);
_cursorObject->append(kNsField, cursorNamespace);
_cursorObject.reset();
@@ -124,12 +128,14 @@ CursorResponse::CursorResponse(NamespaceString nss,
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar,
boost::optional<Timestamp> latestOplogTimestamp,
+ boost::optional<BSONObj> postBatchResumeToken,
boost::optional<BSONObj> writeConcernError)
: _nss(std::move(nss)),
_cursorId(cursorId),
_batch(std::move(batch)),
_numReturnedSoFar(numReturnedSoFar),
_latestOplogTimestamp(latestOplogTimestamp),
+ _postBatchResumeToken(std::move(postBatchResumeToken)),
_writeConcernError(std::move(writeConcernError)) {}
std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany(
@@ -220,6 +226,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
doc.shareOwnershipWith(cmdResponse);
}
+ auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField];
+ if (postBatchResumeTokenElem && postBatchResumeTokenElem.type() != BSONType::Object) {
+ return {ErrorCodes::BadValue,
+ str::stream() << kPostBatchResumeTokenField
+ << " format is invalid; expected Object, but found: "
+ << postBatchResumeTokenElem.type()};
+ }
+
auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField];
if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) {
return {
@@ -243,6 +257,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
boost::none,
latestOplogTimestampElem ? latestOplogTimestampElem.timestamp()
: boost::optional<Timestamp>{},
+ postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned()
+ : boost::optional<BSONObj>{},
writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}};
}
@@ -261,6 +277,11 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
}
batchBuilder.doneFast();
+ if (_postBatchResumeToken) {
+ invariant(!_postBatchResumeToken->isEmpty());
+ cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
+ }
+
cursorBuilder.doneFast();
if (_latestOplogTimestamp) {
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 7ed4a5f3f73..af38ce46a6d 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -93,6 +93,10 @@ public:
_latestOplogTimestamp = ts;
}
+ void setPostBatchResumeToken(BSONObj token) {
+ _postBatchResumeToken = token.getOwned();
+ }
+
long long numDocs() const {
return _numDocs;
}
@@ -122,6 +126,7 @@ private:
bool _active = true;
long long _numDocs = 0;
Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
};
/**
@@ -198,6 +203,7 @@ public:
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar = boost::none,
boost::optional<Timestamp> latestOplogTimestamp = boost::none,
+ boost::optional<BSONObj> postBatchResumeToken = boost::none,
boost::optional<BSONObj> writeConcernError = boost::none);
CursorResponse(CursorResponse&& other) = default;
@@ -240,6 +246,10 @@ public:
return _latestOplogTimestamp;
}
+ boost::optional<BSONObj> getPostBatchResumeToken() const {
+ return _postBatchResumeToken;
+ }
+
boost::optional<BSONObj> getWriteConcernError() const {
return _writeConcernError;
}
@@ -259,6 +269,7 @@ private:
std::vector<BSONObj> _batch;
boost::optional<long long> _numReturnedSoFar;
boost::optional<Timestamp> _latestOplogTimestamp;
+ boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
};
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index 2ae05e88a7d..e1adef1fbd7 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/rpc/op_msg_rpc_impls.h"
+#include "mongo/db/pipeline/resume_token.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -334,6 +335,35 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) {
ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2));
}
+TEST(CursorResponseTest, serializePostBatchResumeToken) {
+ std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
+ auto postBatchResumeToken =
+ ResumeToken::makeHighWaterMarkResumeToken(Timestamp(1, 2)).toDocument().toBson();
+ CursorResponse response(NamespaceString("db.coll"),
+ CursorId(123),
+ batch,
+ boost::none,
+ boost::none,
+ postBatchResumeToken);
+ auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ ASSERT_BSONOBJ_EQ(serialized,
+ BSON("cursor" << BSON("id" << CursorId(123) << "ns"
+ << "db.coll"
+ << "nextBatch"
+ << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))
+ << "postBatchResumeToken"
+ << postBatchResumeToken)
+ << "ok"
+ << 1));
+ auto reparsed = CursorResponse::parseFromBSON(serialized);
+ ASSERT_OK(reparsed.getStatus());
+ CursorResponse reparsedResponse = std::move(reparsed.getValue());
+ ASSERT_EQ(reparsedResponse.getCursorId(), CursorId(123));
+ ASSERT_EQ(reparsedResponse.getNSS().ns(), "db.coll");
+ ASSERT_EQ(reparsedResponse.getBatch().size(), 2U);
+ ASSERT_BSONOBJ_EQ(*reparsedResponse.getPostBatchResumeToken(), postBatchResumeToken);
+}
+
TEST(CursorResponseTest, cursorReturnDocumentSequences) {
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index acff30e9a64..ad299239b82 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -129,7 +129,8 @@ MultiPlanStage* getMultiPlanStage(PlanStage* root) {
* there is no PPS that is root.
*/
PipelineProxyStage* getPipelineProxyStage(PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
return static_cast<PipelineProxyStage*>(root);
}
@@ -894,7 +895,8 @@ std::string Explain::getPlanSummary(const PlanExecutor* exec) {
// static
std::string Explain::getPlanSummary(const PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
auto pipelineProxy = static_cast<const PipelineProxyStage*>(root);
return pipelineProxy->getPlanSummaryStr();
}
@@ -928,7 +930,8 @@ void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsO
PlanStage* root = exec.getRootStage();
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
auto pipelineProxy = static_cast<PipelineProxyStage*>(root);
pipelineProxy->getPlanSummaryStats(statsOut);
return;
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index ea1eaed31f6..6c8d5defe74 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -452,7 +452,13 @@ public:
* If the last oplog timestamp is being tracked for this PlanExecutor, return it.
* Otherwise return a null timestamp.
*/
- virtual Timestamp getLatestOplogTimestamp() = 0;
+ virtual Timestamp getLatestOplogTimestamp() const = 0;
+
+ /**
+ * If this PlanExecutor is tracking change stream resume tokens, return the most recent token
+ * for the batch that is currently being built. Otherwise, return an empty object.
+ */
+ virtual BSONObj getPostBatchResumeToken() const = 0;
/**
* Turns a BSONObj representing an error status produced by getNext() into a Status.
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index b8b31dbac62..53d00097fd1 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -41,9 +41,9 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
+#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/multi_plan.h"
-#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/exec/subplan.h"
@@ -709,14 +709,20 @@ bool PlanExecutorImpl::isDetached() const {
return _currentState == kDetached;
}
-Timestamp PlanExecutorImpl::getLatestOplogTimestamp() {
- if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
- return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();
+Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const {
+ if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY))
+ return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getLatestOplogTimestamp();
if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN))
return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp();
return Timestamp();
}
+BSONObj PlanExecutorImpl::getPostBatchResumeToken() const {
+ if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY))
+ return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getPostBatchResumeToken();
+ return {};
+}
+
Status PlanExecutorImpl::getMemberObjectStatus(const BSONObj& memberObj) const {
return WorkingSetCommon::getMemberObjectStatus(memberObj);
}
diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h
index 9b9a0f4f7df..79a13f1086d 100644
--- a/src/mongo/db/query/plan_executor_impl.h
+++ b/src/mongo/db/query/plan_executor_impl.h
@@ -78,7 +78,8 @@ public:
Status getKillStatus() final;
bool isDisposed() const final;
bool isDetached() const final;
- Timestamp getLatestOplogTimestamp() final;
+ Timestamp getLatestOplogTimestamp() const final;
+ BSONObj getPostBatchResumeToken() const final;
Status getMemberObjectStatus(const BSONObj& memberObj) const final;
private:
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index 511c9c32c26..31a7ff7a8ab 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -363,6 +363,7 @@ PlanStage* buildStages(OperationContext* opCtx,
return new EnsureSortedStage(opCtx, esn->pattern, ws, childStage);
}
case STAGE_CACHED_PLAN:
+ case STAGE_CHANGE_STREAM_PROXY:
case STAGE_COUNT:
case STAGE_DELETE:
case STAGE_EOF:
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index d99d212124d..6fbdba5ea36 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -78,7 +78,8 @@ enum StageType {
STAGE_OR,
STAGE_PROJECTION,
- // Stage for running aggregation pipelines.
+ // Stages for running aggregation pipelines.
+ STAGE_CHANGE_STREAM_PROXY,
STAGE_PIPELINE_PROXY,
STAGE_QUEUED_DATA,