diff options
author | David Storch <david.storch@mongodb.com> | 2020-02-24 19:20:29 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-27 23:54:51 +0000 |
commit | 768e87bbf6213d26f83ad2c526d4aab36e64d185 (patch) | |
tree | 877fb129c081c9813fd10e6e10fb85afbb8815c2 /src/mongo/db | |
parent | 249f411a79fd11559830658d45b1ebca4d6c9865 (diff) | |
download | mongo-768e87bbf6213d26f83ad2c526d4aab36e64d185.tar.gz |
SERVER-45418 Avoid explicitly batching documents in $cursor for count-like aggregates.
(cherry picked from commit 7c4676ef0e8e47cf79e10b81f7661f8fbea82cb0)
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.h | 62 |
2 files changed, 111 insertions, 19 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 61bbaecaec2..73b0f85f8a5 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -50,10 +50,50 @@ const char* DocumentSourceCursor::getSourceName() const { return "$cursor"; } +bool DocumentSourceCursor::Batch::isEmpty() const { + if (shouldProduceEmptyDocs) { + return !_count; + } else { + return _batchOfDocs.empty(); + } + MONGO_UNREACHABLE; +} + +void DocumentSourceCursor::Batch::enqueue(Document&& doc) { + if (shouldProduceEmptyDocs) { + ++_count; + } else { + _batchOfDocs.push_back(doc.getOwned()); + _memUsageBytes += _batchOfDocs.back().getApproximateSize(); + } +} + +Document DocumentSourceCursor::Batch::dequeue() { + invariant(!isEmpty()); + if (shouldProduceEmptyDocs) { + --_count; + return Document{}; + } else { + Document out = std::move(_batchOfDocs.front()); + _batchOfDocs.pop_front(); + if (_batchOfDocs.empty()) { + _memUsageBytes = 0; + } + return out; + } + MONGO_UNREACHABLE; +} + +void DocumentSourceCursor::Batch::clear() { + _batchOfDocs.clear(); + _count = 0; + _memUsageBytes = 0; +} + DocumentSource::GetNextResult DocumentSourceCursor::getNext() { pExpCtx->checkForInterrupt(); - if (_currentBatch.empty()) { + if (_currentBatch.isEmpty()) { loadBatch(); } @@ -61,12 +101,10 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() { if (_trackOplogTS && _exec) _updateOplogTimestamp(); - if (_currentBatch.empty()) + if (_currentBatch.isEmpty()) return GetNextResult::makeEOF(); - Document out = std::move(_currentBatch.front()); - _currentBatch.pop_front(); - return std::move(out); + return _currentBatch.dequeue(); } void DocumentSourceCursor::loadBatch() { @@ -84,17 +122,16 @@ void DocumentSourceCursor::loadBatch() { uassertStatusOK(_exec->restoreState()); - int memUsageBytes = 0; { ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { - if (_shouldProduceEmptyDocs) { - _currentBatch.push_back(Document()); + if (_currentBatch.shouldProduceEmptyDocs) { + _currentBatch.enqueue(Document()); } else if (_dependencies) { - _currentBatch.push_back(_dependencies->extractFields(resultObj)); + _currentBatch.enqueue(_dependencies->extractFields(resultObj)); } else { - _currentBatch.push_back(Document::fromBsonWithMetaData(resultObj)); + _currentBatch.enqueue(Document::fromBsonWithMetaData(resultObj)); } if (_limit) { @@ -104,12 +141,11 @@ void DocumentSourceCursor::loadBatch() { verify(_docsAddedToBatches < _limit->getLimit()); } - memUsageBytes += _currentBatch.back().getApproximateSize(); - // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || - memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + static_cast<long long>(_currentBatch.memUsageBytes()) > + internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; @@ -149,8 +185,8 @@ void DocumentSourceCursor::loadBatch() { void DocumentSourceCursor::_updateOplogTimestamp() { // If we are about to return a result, set our oplog timestamp to the optime of that result. - if (!_currentBatch.empty()) { - const auto& ts = _currentBatch.front().getField(repl::OpTime::kTimestampFieldName); + if (!_currentBatch.isEmpty()) { + const auto& ts = _currentBatch.peekFront().getField(repl::OpTime::kTimestampFieldName); invariant(ts.getType() == BSONType::bsonTimestamp); _latestOplogTimestamp = ts.getTimestamp(); return; @@ -344,4 +380,4 @@ intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( new DocumentSourceCursor(collection, std::move(exec), pExpCtx, trackOplogTimestamp)); return source; } -} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index a01605a1bfd..da520836a8c 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -132,9 +132,12 @@ public: /** * If subsequent sources need no information from the cursor, the cursor can simply output empty * documents, avoiding the overhead of converting BSONObjs to Documents. + * + * Illegal to set if the $cursor stage was constructed with 'trackOplogTimestamp' enabled. */ void shouldProduceEmptyDocs() { - _shouldProduceEmptyDocs = true; + invariant(!_trackOplogTS); + _currentBatch.shouldProduceEmptyDocs = true; } Timestamp getLatestOplogTimestamp() const { @@ -170,6 +173,60 @@ private: ~DocumentSourceCursor(); /** + * A $cursor stage loads documents from the underlying PlanExecutor in batches. An object of + * this class represents one such batch. Acts like a queue into which documents can be queued + * and dequeued in FIFO order. + */ + class Batch { + public: + /** + * Adds a new document to the batch. + */ + void enqueue(Document&& doc); + + /** + * Removes the first document from the batch. + */ + Document dequeue(); + + void clear(); + + bool isEmpty() const; + + /** + * Returns the approximate memory footprint of this batch, measured in bytes. Even after + * documents are dequeued from the batch, continues to indicate the batch's peak memory + * footprint. Resets to zero once the final document in the batch is dequeued. + */ + size_t memUsageBytes() const { + return _memUsageBytes; + } + + /** + * Illegal to call if 'shouldProduceEmptyDocs' is true. + */ + const Document& peekFront() const { + invariant(!shouldProduceEmptyDocs); + return _batchOfDocs.front(); + } + + bool shouldProduceEmptyDocs = false; + + private: + // Used only if 'shouldProduceEmptyDocs' is false. A deque of the documents comprising the + // batch. + std::deque<Document> _batchOfDocs; + + // Used only if 'shouldProduceEmptyDocs' is true. In this case, we don't need to keep the + // documents themselves, only a count of the number of documents in the batch. + size_t _count = 0; + + // The approximate memory footprint of the batch in bytes. Always kept at zero when + // 'shouldProduceEmptyDocs' is true. + size_t _memUsageBytes = 0; + }; + + /** * Acquires the appropriate locks, then destroys and de-registers '_exec'. '_exec' must be * non-null. */ @@ -194,13 +251,12 @@ private: void _updateOplogTimestamp(); // Batches results returned from the underlying PlanExecutor. - std::deque<Document> _currentBatch; + Batch _currentBatch; // BSONObj members must outlive _projection and cursor. BSONObj _query; BSONObj _sort; BSONObj _projection; - bool _shouldProduceEmptyDocs = false; boost::optional<ParsedDeps> _dependencies; boost::intrusive_ptr<DocumentSourceLimit> _limit; long long _docsAddedToBatches; // for _limit enforcement |