summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2020-02-24 19:20:29 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-02 19:13:30 +0000
commitbed0e7366eaeaaca150cd66058f37b643ed1c23f (patch)
tree5224087b1c053d79ffa242d42384ab433618e0a1
parentcdb931fdbf3dcb7dbc24067e83ba7fd350033eb9 (diff)
downloadmongo-bed0e7366eaeaaca150cd66058f37b643ed1c23f.tar.gz
SERVER-45418 Avoid explicitly batching documents in $cursor for count-like aggregates.
(cherry picked from commit 768e87bbf6213d26f83ad2c526d4aab36e64d185)
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp64
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h60
2 files changed, 107 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 9602507944d..91c27790cfd 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -50,19 +50,57 @@ const char* DocumentSourceCursor::getSourceName() const {
return "$cursor";
}
+bool DocumentSourceCursor::Batch::isEmpty() const {
+ if (shouldProduceEmptyDocs) {
+ return !_count;
+ } else {
+ return _batchOfDocs.empty();
+ }
+ MONGO_UNREACHABLE;
+}
+
+void DocumentSourceCursor::Batch::enqueue(Document&& doc) {
+ if (shouldProduceEmptyDocs) {
+ ++_count;
+ } else {
+ _batchOfDocs.push_back(doc.getOwned());
+ _memUsageBytes += _batchOfDocs.back().getApproximateSize();
+ }
+}
+
+Document DocumentSourceCursor::Batch::dequeue() {
+ invariant(!isEmpty());
+ if (shouldProduceEmptyDocs) {
+ --_count;
+ return Document{};
+ } else {
+ Document out = std::move(_batchOfDocs.front());
+ _batchOfDocs.pop_front();
+ if (_batchOfDocs.empty()) {
+ _memUsageBytes = 0;
+ }
+ return out;
+ }
+ MONGO_UNREACHABLE;
+}
+
+void DocumentSourceCursor::Batch::clear() {
+ _batchOfDocs.clear();
+ _count = 0;
+ _memUsageBytes = 0;
+}
+
DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
pExpCtx->checkForInterrupt();
- if (_currentBatch.empty()) {
+ if (_currentBatch.isEmpty()) {
loadBatch();
- if (_currentBatch.empty())
+ if (_currentBatch.isEmpty())
return GetNextResult::makeEOF();
}
- Document out = std::move(_currentBatch.front());
- _currentBatch.pop_front();
- return std::move(out);
+ return _currentBatch.dequeue();
}
void DocumentSourceCursor::loadBatch() {
@@ -81,17 +119,16 @@ void DocumentSourceCursor::loadBatch() {
uassertStatusOK(_exec->restoreState());
- int memUsageBytes = 0;
{
ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); });
while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) {
- if (_shouldProduceEmptyDocs) {
- _currentBatch.push_back(Document());
+ if (_currentBatch.shouldProduceEmptyDocs) {
+ _currentBatch.enqueue(Document());
} else if (_dependencies) {
- _currentBatch.push_back(_dependencies->extractFields(resultObj));
+ _currentBatch.enqueue(_dependencies->extractFields(resultObj));
} else {
- _currentBatch.push_back(Document::fromBsonWithMetaData(resultObj));
+ _currentBatch.enqueue(Document::fromBsonWithMetaData(resultObj));
}
if (_limit) {
@@ -101,15 +138,14 @@ void DocumentSourceCursor::loadBatch() {
verify(_docsAddedToBatches < _limit->getLimit());
}
- memUsageBytes += _currentBatch.back().getApproximateSize();
-
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
// needs-merge case), batching will result in a wrong time.
if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
(pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
- memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
+ static_cast<long long>(_currentBatch.memUsageBytes()) >
+ internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
@@ -302,4 +338,4 @@ intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
collection, std::move(exec), pExpCtx, failsForExecutionLevelExplain));
return source;
}
-}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 960764dce89..4b623552436 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -132,7 +132,7 @@ public:
* documents, avoiding the overhead of converting BSONObjs to Documents.
*/
void shouldProduceEmptyDocs() {
- _shouldProduceEmptyDocs = true;
+ _currentBatch.shouldProduceEmptyDocs = true;
}
Timestamp getLatestOplogTimestamp() const {
@@ -171,6 +171,60 @@ private:
~DocumentSourceCursor();
/**
+ * A $cursor stage loads documents from the underlying PlanExecutor in batches. An object of
+ * this class represents one such batch. Acts like a queue into which documents can be queued
+ * and dequeued in FIFO order.
+ */
+ class Batch {
+ public:
+ /**
+ * Adds a new document to the batch.
+ */
+ void enqueue(Document&& doc);
+
+ /**
+ * Removes the first document from the batch.
+ */
+ Document dequeue();
+
+ void clear();
+
+ bool isEmpty() const;
+
+ /**
+ * Returns the approximate memory footprint of this batch, measured in bytes. Even after
+ * documents are dequeued from the batch, continues to indicate the batch's peak memory
+ * footprint. Resets to zero once the final document in the batch is dequeued.
+ */
+ size_t memUsageBytes() const {
+ return _memUsageBytes;
+ }
+
+ /**
+ * Illegal to call if 'shouldProduceEmptyDocs' is true.
+ */
+ const Document& peekFront() const {
+ invariant(!shouldProduceEmptyDocs);
+ return _batchOfDocs.front();
+ }
+
+ bool shouldProduceEmptyDocs = false;
+
+ private:
+ // Used only if 'shouldProduceEmptyDocs' is false. A deque of the documents comprising the
+ // batch.
+ std::deque<Document> _batchOfDocs;
+
+ // Used only if 'shouldProduceEmptyDocs' is true. In this case, we don't need to keep the
+ // documents themselves, only a count of the number of documents in the batch.
+ size_t _count = 0;
+
+ // The approximate memory footprint of the batch in bytes. Always kept at zero when
+ // 'shouldProduceEmptyDocs' is true.
+ size_t _memUsageBytes = 0;
+ };
+
+ /**
* Acquires the appropriate locks, then destroys and de-registers '_exec'. '_exec' must be
* non-null.
*/
@@ -188,13 +242,13 @@ private:
void recordPlanSummaryStats();
- std::deque<Document> _currentBatch;
+ // Batches results returned from the underlying PlanExecutor.
+ Batch _currentBatch;
// BSONObj members must outlive _projection and cursor.
BSONObj _query;
BSONObj _sort;
BSONObj _projection;
- bool _shouldProduceEmptyDocs = false;
boost::optional<ParsedDeps> _dependencies;
boost::intrusive_ptr<DocumentSourceLimit> _limit;
long long _docsAddedToBatches; // for _limit enforcement