summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-24 16:56:20 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2018-12-22 04:27:20 +0000
commitfc5bc0947ceedee3b61b2d922cabd3e5df7ec07c (patch)
tree75749d0e4ff2d9db2001252018cc91e78801bc44 /src/mongo/db/pipeline
parentbdac7ced24f9ad8f9afac9c57e7184b1f2bf61b2 (diff)
downloadmongo-fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c.tar.gz
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp18
-rw-r--r--src/mongo/db/pipeline/resume_token.h12
6 files changed, 42 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 00b41e38e71..96c917aacf6 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1458,9 +1458,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) {
TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false);
- // Drop database entry doesn't have a UUID.
+ // Drop database entry has a nil UUID.
Document expectedDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::nil())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
@@ -1468,7 +1468,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
Document expectedInvalidate{
{DSChangeStream::kIdField,
makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ kDefaultTs, UUID::nil(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index db600fb9e5b..df345d7eaf2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -347,6 +347,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
if (operationType != DocumentSourceChangeStream::kInvalidateOpType &&
operationType != DocumentSourceChangeStream::kDropDatabaseOpType) {
invariant(!uuid.missing(), "Saw a CRUD op without a UUID");
+ } else {
+ // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after
+ // high-water-mark tokens. Their sorting relative to other events remains unchanged.
+ uuid = Value(UUID::nil());
}
// Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index cd0783eb5ac..b019bbf8e35 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -118,9 +118,9 @@ void DocumentSourceCursor::loadBatch() {
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
- // needs-merge case), batching will result in a wrong time.
- if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
- (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
+ // awaitData case), batching will result in a wrong time.
+ if (pExpCtx->isTailableAwaitData() ||
+ awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 6e706c1e1f2..62ca426172f 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -574,7 +574,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::IS_COUNT;
}
- if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) {
+ invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
}
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index c7a40f3388c..70b3a1cf72d 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -46,6 +46,16 @@ namespace mongo {
constexpr StringData ResumeToken::kDataFieldName;
constexpr StringData ResumeToken::kTypeBitsFieldName;
+namespace {
+// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken.
+ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) {
+ invariant(!clusterTime.isNull());
+ ResumeTokenData tokenData;
+ tokenData.clusterTime = clusterTime;
+ return tokenData;
+}
+} // namespace
+
bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
return clusterTime == other.clusterTime && version == other.version &&
fromInvalidate == other.fromInvalidate &&
@@ -194,4 +204,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
+ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) {
+ return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime));
+}
+
+bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) {
+ return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 16a22311373..a50ac20d8b9 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -109,6 +109,18 @@ public:
static ResumeToken parse(const Document& document);
/**
+ * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey.
+ */
+ static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime);
+
+ /**
+ * Returns true if the given token data represents a valid high-water-mark resume token; that
+ * is, it does not refer to a specific operation, but instead specifies a clusterTime after
+ * which the stream should resume.
+ */
+ static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData);
+
+ /**
* The default no-argument constructor is required by the IDL for types used as non-optional
* fields.
*/