diff options
author | David Storch <david.storch@mongodb.com> | 2020-02-24 19:20:29 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-02 19:13:30 +0000 |
commit | bed0e7366eaeaaca150cd66058f37b643ed1c23f (patch) | |
tree | 5224087b1c053d79ffa242d42384ab433618e0a1 | |
parent | cdb931fdbf3dcb7dbc24067e83ba7fd350033eb9 (diff) | |
download | mongo-bed0e7366eaeaaca150cd66058f37b643ed1c23f.tar.gz |
SERVER-45418 Avoid explicitly batching documents in $cursor for count-like aggregates.
(cherry picked from commit 768e87bbf6213d26f83ad2c526d4aab36e64d185)
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.h | 60 |
2 files changed, 107 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 9602507944d..91c27790cfd 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -50,19 +50,57 @@ const char* DocumentSourceCursor::getSourceName() const { return "$cursor"; } +bool DocumentSourceCursor::Batch::isEmpty() const { + if (shouldProduceEmptyDocs) { + return !_count; + } else { + return _batchOfDocs.empty(); + } + MONGO_UNREACHABLE; +} + +void DocumentSourceCursor::Batch::enqueue(Document&& doc) { + if (shouldProduceEmptyDocs) { + ++_count; + } else { + _batchOfDocs.push_back(doc.getOwned()); + _memUsageBytes += _batchOfDocs.back().getApproximateSize(); + } +} + +Document DocumentSourceCursor::Batch::dequeue() { + invariant(!isEmpty()); + if (shouldProduceEmptyDocs) { + --_count; + return Document{}; + } else { + Document out = std::move(_batchOfDocs.front()); + _batchOfDocs.pop_front(); + if (_batchOfDocs.empty()) { + _memUsageBytes = 0; + } + return out; + } + MONGO_UNREACHABLE; +} + +void DocumentSourceCursor::Batch::clear() { + _batchOfDocs.clear(); + _count = 0; + _memUsageBytes = 0; +} + DocumentSource::GetNextResult DocumentSourceCursor::getNext() { pExpCtx->checkForInterrupt(); - if (_currentBatch.empty()) { + if (_currentBatch.isEmpty()) { loadBatch(); - if (_currentBatch.empty()) + if (_currentBatch.isEmpty()) return GetNextResult::makeEOF(); } - Document out = std::move(_currentBatch.front()); - _currentBatch.pop_front(); - return std::move(out); + return _currentBatch.dequeue(); } void DocumentSourceCursor::loadBatch() { @@ -81,17 +119,16 @@ void DocumentSourceCursor::loadBatch() { uassertStatusOK(_exec->restoreState()); - int memUsageBytes = 0; { ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { - if (_shouldProduceEmptyDocs) { - _currentBatch.push_back(Document()); + if (_currentBatch.shouldProduceEmptyDocs) { + _currentBatch.enqueue(Document()); } else if (_dependencies) { - _currentBatch.push_back(_dependencies->extractFields(resultObj)); + _currentBatch.enqueue(_dependencies->extractFields(resultObj)); } else { - _currentBatch.push_back(Document::fromBsonWithMetaData(resultObj)); + _currentBatch.enqueue(Document::fromBsonWithMetaData(resultObj)); } if (_limit) { @@ -101,15 +138,14 @@ void DocumentSourceCursor::loadBatch() { verify(_docsAddedToBatches < _limit->getLimit()); } - memUsageBytes += _currentBatch.back().getApproximateSize(); - // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and // needs-merge case), batching will result in a wrong time. if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || - memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + static_cast<long long>(_currentBatch.memUsageBytes()) > + internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; @@ -302,4 +338,4 @@ intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( collection, std::move(exec), pExpCtx, failsForExecutionLevelExplain)); return source; } -} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 960764dce89..4b623552436 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -132,7 +132,7 @@ public: * documents, avoiding the overhead of converting BSONObjs to Documents. */ void shouldProduceEmptyDocs() { - _shouldProduceEmptyDocs = true; + _currentBatch.shouldProduceEmptyDocs = true; } Timestamp getLatestOplogTimestamp() const { @@ -171,6 +171,60 @@ private: ~DocumentSourceCursor(); /** + * A $cursor stage loads documents from the underlying PlanExecutor in batches. An object of + * this class represents one such batch. Acts like a queue into which documents can be queued + * and dequeued in FIFO order. + */ + class Batch { + public: + /** + * Adds a new document to the batch. + */ + void enqueue(Document&& doc); + + /** + * Removes the first document from the batch. + */ + Document dequeue(); + + void clear(); + + bool isEmpty() const; + + /** + * Returns the approximate memory footprint of this batch, measured in bytes. Even after + * documents are dequeued from the batch, continues to indicate the batch's peak memory + * footprint. Resets to zero once the final document in the batch is dequeued. + */ + size_t memUsageBytes() const { + return _memUsageBytes; + } + + /** + * Illegal to call if 'shouldProduceEmptyDocs' is true. + */ + const Document& peekFront() const { + invariant(!shouldProduceEmptyDocs); + return _batchOfDocs.front(); + } + + bool shouldProduceEmptyDocs = false; + + private: + // Used only if 'shouldProduceEmptyDocs' is false. A deque of the documents comprising the + // batch. + std::deque<Document> _batchOfDocs; + + // Used only if 'shouldProduceEmptyDocs' is true. In this case, we don't need to keep the + // documents themselves, only a count of the number of documents in the batch. + size_t _count = 0; + + // The approximate memory footprint of the batch in bytes. Always kept at zero when + // 'shouldProduceEmptyDocs' is true. + size_t _memUsageBytes = 0; + }; + + /** * Acquires the appropriate locks, then destroys and de-registers '_exec'. '_exec' must be * non-null. */ @@ -188,13 +242,13 @@ private: void recordPlanSummaryStats(); - std::deque<Document> _currentBatch; + // Batches results returned from the underlying PlanExecutor. + Batch _currentBatch; // BSONObj members must outlive _projection and cursor. BSONObj _query; BSONObj _sort; BSONObj _projection; - bool _shouldProduceEmptyDocs = false; boost::optional<ParsedDeps> _dependencies; boost::intrusive_ptr<DocumentSourceLimit> _limit; long long _docsAddedToBatches; // for _limit enforcement |