summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-02-01 15:17:28 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-01 04:44:10 +0000
commit204d63a92d588b9891277caf70a257b42f82ac32 (patch)
treeef31c1afdb0f14b84d6dacab5f7521872aa4d5e5
parentb9542a99354c7e9e6a2ec277b504a8ac94d285b7 (diff)
downloadmongo-204d63a92d588b9891277caf70a257b42f82ac32.tar.gz
SERVER-39410 Re-enable batching in DSCursor for change stream cursors
(cherry picked from commit 04882fa7f5210cfb14918ecddbbc5acbd88e86b6)
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp39
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h25
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline.h5
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp6
5 files changed, 61 insertions, 18 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 8871bf07bb7..61bbaecaec2 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -55,11 +55,15 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
if (_currentBatch.empty()) {
loadBatch();
-
- if (_currentBatch.empty())
- return GetNextResult::makeEOF();
}
+ // If we are tracking the oplog timestamp, update our cached latest optime.
+ if (_trackOplogTS && _exec)
+ _updateOplogTimestamp();
+
+ if (_currentBatch.empty())
+ return GetNextResult::makeEOF();
+
Document out = std::move(_currentBatch.front());
_currentBatch.pop_front();
return std::move(out);
@@ -104,10 +108,7 @@ void DocumentSourceCursor::loadBatch() {
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
- // Furthermore, if we need to return the latest oplog time (in the tailable and
- // awaitData case), batching will result in a wrong time.
- if (pExpCtx->isTailableAwaitData() ||
- awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
+ if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
@@ -146,6 +147,19 @@ void DocumentSourceCursor::loadBatch() {
}
}
+void DocumentSourceCursor::_updateOplogTimestamp() {
+ // If we are about to return a result, set our oplog timestamp to the optime of that result.
+ if (!_currentBatch.empty()) {
+ const auto& ts = _currentBatch.front().getField(repl::OpTime::kTimestampFieldName);
+ invariant(ts.getType() == BSONType::bsonTimestamp);
+ _latestOplogTimestamp = ts.getTimestamp();
+ return;
+ }
+
+ // If we have no more results to return, advance to the latest oplog timestamp.
+ _latestOplogTimestamp = _exec->getLatestOplogTimestamp();
+}
+
Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
@@ -299,11 +313,13 @@ DocumentSourceCursor::~DocumentSourceCursor() {
DocumentSourceCursor::DocumentSourceCursor(
Collection* collection,
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
- const intrusive_ptr<ExpressionContext>& pCtx)
+ const intrusive_ptr<ExpressionContext>& pCtx,
+ bool trackOplogTimestamp)
: DocumentSource(pCtx),
_docsAddedToBatches(0),
_exec(std::move(exec)),
- _outputSorts(_exec->getOutputSorts()) {
+ _outputSorts(_exec->getOutputSorts()),
+ _trackOplogTS(trackOplogTimestamp) {
_planSummary = Explain::getPlanSummary(_exec.get());
recordPlanSummaryStats();
@@ -322,9 +338,10 @@ DocumentSourceCursor::DocumentSourceCursor(
intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
Collection* collection,
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
- const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ const intrusive_ptr<ExpressionContext>& pExpCtx,
+ bool trackOplogTimestamp) {
intrusive_ptr<DocumentSourceCursor> source(
- new DocumentSourceCursor(collection, std::move(exec), pExpCtx));
+ new DocumentSourceCursor(collection, std::move(exec), pExpCtx, trackOplogTimestamp));
return source;
}
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 9b7e80a9f89..a01605a1bfd 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -78,7 +78,8 @@ public:
static boost::intrusive_ptr<DocumentSourceCursor> create(
Collection* collection,
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ bool trackOplogTimestamp = false);
/*
Record the query that was specified for the cursor this wraps, if
@@ -137,10 +138,7 @@ public:
}
Timestamp getLatestOplogTimestamp() const {
- if (_exec) {
- return _exec->getLatestOplogTimestamp();
- }
- return Timestamp();
+ return _latestOplogTimestamp;
}
const std::string& getPlanSummaryStr() const {
@@ -167,7 +165,8 @@ protected:
private:
DocumentSourceCursor(Collection* collection,
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ bool trackOplogTimestamp = false);
~DocumentSourceCursor();
/**
@@ -188,6 +187,13 @@ private:
void recordPlanSummaryStats();
+ /**
+ * If we are tailing the oplog, this method updates the cached timestamp to that of the latest
+ * document returned, or the latest timestamp observed in the oplog if we have no more results.
+ */
+ void _updateOplogTimestamp();
+
+ // Batches results returned from the underlying PlanExecutor.
std::deque<Document> _currentBatch;
// BSONObj members must outlive _projection and cursor.
@@ -216,6 +222,13 @@ private:
// stage is a MultiPlanStage. When the query is executed (with exec->executePlan()), it will
// wipe out its own copy of the winning plan's statistics, so they need to be saved here.
std::unique_ptr<PlanStageStats> _winningPlanTrialStats;
+
+ // True if we are tracking the latest observed oplog timestamp, false otherwise.
+ bool _trackOplogTS = false;
+
+ // If we are tailing the oplog and tracking the latest observed oplog time, this is the latest
+ // timestamp seen in the collection. Otherwise, this is a null timestamp.
+ Timestamp _latestOplogTimestamp;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 17b2a7c6211..8424a150884 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -613,6 +613,10 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFront() {
return targetStage;
}
+DocumentSource* Pipeline::peekFront() const {
+ return _sources.empty() ? nullptr : _sources.front().get();
+}
+
boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) {
return popFrontWithNameAndCriteria(targetStageName, nullptr);
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index d7cafb8bf20..798af8ba707 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -270,6 +270,11 @@ public:
boost::intrusive_ptr<DocumentSource> popFront();
/**
+ * Returns a pointer to the first stage of the pipeline, or a nullptr if the pipeline is empty.
+ */
+ DocumentSource* peekFront() const;
+
+ /**
* Removes and returns the last stage of the pipeline. Returns nullptr if the pipeline is empty.
*/
boost::intrusive_ptr<DocumentSource> popBack();
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index beb9542fcca..466c438c030 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -520,9 +520,13 @@ void PipelineD::addCursorSource(Collection* collection,
// DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved.
exec->saveState();
+ // If this is a change stream pipeline, make sure that we tell DSCursor to track the oplog time.
+ const bool trackOplogTS =
+ (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage());
+
// Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
intrusive_ptr<DocumentSourceCursor> pSource =
- DocumentSourceCursor::create(collection, std::move(exec), expCtx);
+ DocumentSourceCursor::create(collection, std::move(exec), expCtx, trackOplogTS);
// Add the cursor to the pipeline first so that it's correctly disposed of as part of the
// pipeline if an exception is thrown during this method.