diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-01 15:17:28 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-03-01 04:44:10 +0000 |
commit | 204d63a92d588b9891277caf70a257b42f82ac32 (patch) | |
tree | ef31c1afdb0f14b84d6dacab5f7521872aa4d5e5 | |
parent | b9542a99354c7e9e6a2ec277b504a8ac94d285b7 (diff) | |
download | mongo-204d63a92d588b9891277caf70a257b42f82ac32.tar.gz |
SERVER-39410 Re-enable batching in DSCursor for change stream cursors
(cherry picked from commit 04882fa7f5210cfb14918ecddbbc5acbd88e86b6)
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.h | 25 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 6 |
5 files changed, 61 insertions, 18 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 8871bf07bb7..61bbaecaec2 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -55,11 +55,15 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() { if (_currentBatch.empty()) { loadBatch(); - - if (_currentBatch.empty()) - return GetNextResult::makeEOF(); } + // If we are tracking the oplog timestamp, update our cached latest optime. + if (_trackOplogTS && _exec) + _updateOplogTimestamp(); + + if (_currentBatch.empty()) + return GetNextResult::makeEOF(); + Document out = std::move(_currentBatch.front()); _currentBatch.pop_front(); return std::move(out); @@ -104,10 +108,7 @@ void DocumentSourceCursor::loadBatch() { // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. - // Furthermore, if we need to return the latest oplog time (in the tailable and - // awaitData case), batching will result in a wrong time. - if (pExpCtx->isTailableAwaitData() || - awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || + if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); @@ -146,6 +147,19 @@ void DocumentSourceCursor::loadBatch() { } } +void DocumentSourceCursor::_updateOplogTimestamp() { + // If we are about to return a result, set our oplog timestamp to the optime of that result. + if (!_currentBatch.empty()) { + const auto& ts = _currentBatch.front().getField(repl::OpTime::kTimestampFieldName); + invariant(ts.getType() == BSONType::bsonTimestamp); + _latestOplogTimestamp = ts.getTimestamp(); + return; + } + + // If we have no more results to return, advance to the latest oplog timestamp. + _latestOplogTimestamp = _exec->getLatestOplogTimestamp(); +} + Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); @@ -299,11 +313,13 @@ DocumentSourceCursor::~DocumentSourceCursor() { DocumentSourceCursor::DocumentSourceCursor( Collection* collection, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, - const intrusive_ptr<ExpressionContext>& pCtx) + const intrusive_ptr<ExpressionContext>& pCtx, + bool trackOplogTimestamp) : DocumentSource(pCtx), _docsAddedToBatches(0), _exec(std::move(exec)), - _outputSorts(_exec->getOutputSorts()) { + _outputSorts(_exec->getOutputSorts()), + _trackOplogTS(trackOplogTimestamp) { _planSummary = Explain::getPlanSummary(_exec.get()); recordPlanSummaryStats(); @@ -322,9 +338,10 @@ DocumentSourceCursor::DocumentSourceCursor( intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( Collection* collection, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, - const intrusive_ptr<ExpressionContext>& pExpCtx) { + const intrusive_ptr<ExpressionContext>& pExpCtx, + bool trackOplogTimestamp) { intrusive_ptr<DocumentSourceCursor> source( - new DocumentSourceCursor(collection, std::move(exec), pExpCtx)); + new DocumentSourceCursor(collection, std::move(exec), pExpCtx, trackOplogTimestamp)); return source; } } diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 9b7e80a9f89..a01605a1bfd 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -78,7 +78,8 @@ public: static boost::intrusive_ptr<DocumentSourceCursor> create( Collection* collection, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + bool trackOplogTimestamp = false); /* Record the query that was specified for the cursor this wraps, if @@ -137,10 +138,7 @@ public: } Timestamp getLatestOplogTimestamp() const { - if (_exec) { - return _exec->getLatestOplogTimestamp(); - } - return Timestamp(); + return _latestOplogTimestamp; } const std::string& getPlanSummaryStr() const { @@ -167,7 +165,8 @@ protected: private: DocumentSourceCursor(Collection* collection, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + bool trackOplogTimestamp = false); ~DocumentSourceCursor(); /** @@ -188,6 +187,13 @@ private: void recordPlanSummaryStats(); + /** + * If we are tailing the oplog, this method updates the cached timestamp to that of the latest + * document returned, or the latest timestamp observed in the oplog if we have no more results. + */ + void _updateOplogTimestamp(); + + // Batches results returned from the underlying PlanExecutor. std::deque<Document> _currentBatch; // BSONObj members must outlive _projection and cursor. @@ -216,6 +222,13 @@ private: // stage is a MultiPlanStage. When the query is executed (with exec->executePlan()), it will // wipe out its own copy of the winning plan's statistics, so they need to be saved here. std::unique_ptr<PlanStageStats> _winningPlanTrialStats; + + // True if we are tracking the latest observed oplog timestamp, false otherwise. + bool _trackOplogTS = false; + + // If we are tailing the oplog and tracking the latest observed oplog time, this is the latest + // timestamp seen in the collection. Otherwise, this is a null timestamp. + Timestamp _latestOplogTimestamp; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 17b2a7c6211..8424a150884 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -613,6 +613,10 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFront() { return targetStage; } +DocumentSource* Pipeline::peekFront() const { + return _sources.empty() ? nullptr : _sources.front().get(); +} + boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) { return popFrontWithNameAndCriteria(targetStageName, nullptr); } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index d7cafb8bf20..798af8ba707 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -270,6 +270,11 @@ public: boost::intrusive_ptr<DocumentSource> popFront(); /** + * Returns a pointer to the first stage of the pipeline, or a nullptr if the pipeline is empty. + */ + DocumentSource* peekFront() const; + + /** * Removes and returns the last stage of the pipeline. Returns nullptr if the pipeline is empty. */ boost::intrusive_ptr<DocumentSource> popBack(); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index beb9542fcca..466c438c030 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -520,9 +520,13 @@ void PipelineD::addCursorSource(Collection* collection, // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. exec->saveState(); + // If this is a change stream pipeline, make sure that we tell DSCursor to track the oplog time. + const bool trackOplogTS = + (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()); + // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. intrusive_ptr<DocumentSourceCursor> pSource = - DocumentSourceCursor::create(collection, std::move(exec), expCtx); + DocumentSourceCursor::create(collection, std::move(exec), expCtx, trackOplogTS); // Add the cursor to the pipeline first so that it's correctly disposed of as part of the // pipeline if an exception is thrown during this method. |