diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-24 16:56:20 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-12-22 04:27:20 +0000 |
commit | fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c (patch) | |
tree | 75749d0e4ff2d9db2001252018cc91e78801bc44 /src/mongo/db/pipeline | |
parent | bdac7ced24f9ad8f9afac9c57e7184b1f2bf61b2 (diff) | |
download | mongo-fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c.tar.gz |
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.h | 12 |
6 files changed, 42 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 00b41e38e71..96c917aacf6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1458,9 +1458,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) { TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false); - // Drop database entry doesn't have a UUID. + // Drop database entry has a nil UUID. Document expectedDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::nil())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, @@ -1468,7 +1468,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + kDefaultTs, UUID::nil(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index db600fb9e5b..df345d7eaf2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -347,6 +347,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document if (operationType != DocumentSourceChangeStream::kInvalidateOpType && operationType != DocumentSourceChangeStream::kDropDatabaseOpType) { invariant(!uuid.missing(), "Saw a CRUD op without a UUID"); + } else { + // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after + // high-water-mark tokens. Their sorting relative to other events remains unchanged. + uuid = Value(UUID::nil()); } // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index cd0783eb5ac..b019bbf8e35 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -118,9 +118,9 @@ void DocumentSourceCursor::loadBatch() { // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and - // needs-merge case), batching will result in a wrong time. - if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || - (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || + // awaitData case), batching will result in a wrong time. + if (pExpCtx->isTailableAwaitData() || + awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 6e706c1e1f2..62ca426172f 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -574,7 +574,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep plannerOpts |= QueryPlannerParams::IS_COUNT; } - if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) { + invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; } diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index c7a40f3388c..70b3a1cf72d 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -46,6 +46,16 @@ namespace mongo { constexpr StringData ResumeToken::kDataFieldName; constexpr StringData ResumeToken::kTypeBitsFieldName; +namespace { +// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken. +ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) { + invariant(!clusterTime.isNull()); + ResumeTokenData tokenData; + tokenData.clusterTime = clusterTime; + return tokenData; +} +} // namespace + bool ResumeTokenData::operator==(const ResumeTokenData& other) const { return clusterTime == other.clusterTime && version == other.version && fromInvalidate == other.fromInvalidate && @@ -194,4 +204,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } +ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) { + return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime)); +} + +bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) { + return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 16a22311373..a50ac20d8b9 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -109,6 +109,18 @@ public: static ResumeToken parse(const Document& document); /** + * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey. + */ + static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime); + + /** + * Returns true if the given token data represents a valid high-water-mark resume token; that + * is, it does not refer to a specific operation, but instead specifies a clusterTime after + * which the stream should resume. + */ + static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData); + + /** * The default no-argument constructor is required by the IDL for types used as non-optional * fields. */ |