diff options
author | David Storch <david.storch@mongodb.com> | 2020-02-24 19:20:29 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-25 15:22:45 +0000 |
commit | 7c4676ef0e8e47cf79e10b81f7661f8fbea82cb0 (patch) | |
tree | f405b7c008d97e839a9940004af4a934ab2db475 /src/mongo/db/pipeline | |
parent | bbdb6a411e614fcf048a8fe1ee363fac2ff0020c (diff) | |
download | mongo-7c4676ef0e8e47cf79e10b81f7661f8fbea82cb0.tar.gz |
SERVER-45418 Avoid explicitly batching documents in $cursor for count-like aggregates.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.h | 67 |
2 files changed, 112 insertions, 19 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 491fa942be3..f80860c7406 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -55,10 +55,50 @@ const char* DocumentSourceCursor::getSourceName() const { return "$cursor"; } +bool DocumentSourceCursor::Batch::isEmpty() const { + if (shouldProduceEmptyDocs) { + return !_count; + } else { + return _batchOfDocs.empty(); + } + MONGO_UNREACHABLE; +} + +void DocumentSourceCursor::Batch::enqueue(Document&& doc) { + if (shouldProduceEmptyDocs) { + ++_count; + } else { + _batchOfDocs.push_back(doc.getOwned()); + _memUsageBytes += _batchOfDocs.back().getApproximateSize(); + } +} + +Document DocumentSourceCursor::Batch::dequeue() { + invariant(!isEmpty()); + if (shouldProduceEmptyDocs) { + --_count; + return Document{}; + } else { + Document out = std::move(_batchOfDocs.front()); + _batchOfDocs.pop_front(); + if (_batchOfDocs.empty()) { + _memUsageBytes = 0; + } + return out; + } + MONGO_UNREACHABLE; +} + +void DocumentSourceCursor::Batch::clear() { + _batchOfDocs.clear(); + _count = 0; + _memUsageBytes = 0; +} + DocumentSource::GetNextResult DocumentSourceCursor::getNext() { pExpCtx->checkForInterrupt(); - if (_currentBatch.empty()) { + if (_currentBatch.isEmpty()) { loadBatch(); } @@ -66,12 +106,10 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() { if (_trackOplogTS && _exec) _updateOplogTimestamp(); - if (_currentBatch.empty()) + if (_currentBatch.isEmpty()) return GetNextResult::makeEOF(); - Document out = std::move(_currentBatch.front()); - _currentBatch.pop_front(); - return std::move(out); + return _currentBatch.dequeue(); } Document DocumentSourceCursor::transformBSONObjToDocument(const BSONObj& obj) const { @@ -98,15 +136,14 @@ void DocumentSourceCursor::loadBatch() { _exec->restoreState(); - int memUsageBytes = 0; { ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { - if (_shouldProduceEmptyDocs) { - _currentBatch.push_back(Document()); + if (_currentBatch.shouldProduceEmptyDocs) { + _currentBatch.enqueue(Document()); } else { - _currentBatch.push_back(transformBSONObjToDocument(resultObj)); + _currentBatch.enqueue(transformBSONObjToDocument(resultObj)); } if (_limit) { @@ -116,12 +153,11 @@ void DocumentSourceCursor::loadBatch() { verify(_docsAddedToBatches < _limit->getLimit()); } - memUsageBytes += _currentBatch.back().getApproximateSize(); - // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || - memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + static_cast<long long>(_currentBatch.memUsageBytes()) > + internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; @@ -160,8 +196,8 @@ void DocumentSourceCursor::loadBatch() { void DocumentSourceCursor::_updateOplogTimestamp() { // If we are about to return a result, set our oplog timestamp to the optime of that result. - if (!_currentBatch.empty()) { - const auto& ts = _currentBatch.front().getField(repl::OpTime::kTimestampFieldName); + if (!_currentBatch.isEmpty()) { + const auto& ts = _currentBatch.peekFront().getField(repl::OpTime::kTimestampFieldName); invariant(ts.getType() == BSONType::bsonTimestamp); _latestOplogTimestamp = ts.getTimestamp(); return; diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 215d94c3387..02653242890 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -135,9 +135,12 @@ public: /** * If subsequent sources need no information from the cursor, the cursor can simply output empty * documents, avoiding the overhead of converting BSONObjs to Documents. + * + * Illegal to set if the $cursor stage was constructed with 'trackOplogTimestamp' enabled. */ void shouldProduceEmptyDocs() { - _shouldProduceEmptyDocs = true; + invariant(!_trackOplogTS); + _currentBatch.shouldProduceEmptyDocs = true; } Timestamp getLatestOplogTimestamp() const { @@ -173,8 +176,9 @@ protected: Pipeline::SourceContainer* container) final; /** - * If '_shouldProduceEmptyDocs' is false, this function hook is called on each 'obj' returned by - * '_exec' when loading a batch and returns a Document to be added to '_currentBatch'. + * If '_currentBatch.shouldProduceEmptyDocs' is false, this function hook is called on each + * 'obj' returned by '_exec' when loading a batch and returns a Document to be added to + * '_currentBatch'. * * The default implementation is a dependency-aware BSONObj-to-Document transformation. */ @@ -182,6 +186,60 @@ protected: private: /** + * A $cursor stage loads documents from the underlying PlanExecutor in batches. An object of + * this class represents one such batch. Acts like a queue into which documents can be queued + * and dequeued in FIFO order. + */ + class Batch { + public: + /** + * Adds a new document to the batch. + */ + void enqueue(Document&& doc); + + /** + * Removes the first document from the batch. + */ + Document dequeue(); + + void clear(); + + bool isEmpty() const; + + /** + * Returns the approximate memory footprint of this batch, measured in bytes. Even after + * documents are dequeued from the batch, continues to indicate the batch's peak memory + * footprint. Resets to zero once the final document in the batch is dequeued. + */ + size_t memUsageBytes() const { + return _memUsageBytes; + } + + /** + * Illegal to call if 'shouldProduceEmptyDocs' is true. + */ + const Document& peekFront() const { + invariant(!shouldProduceEmptyDocs); + return _batchOfDocs.front(); + } + + bool shouldProduceEmptyDocs = false; + + private: + // Used only if 'shouldProduceEmptyDocs' is false. A deque of the documents comprising the + // batch. + std::deque<Document> _batchOfDocs; + + // Used only if 'shouldProduceEmptyDocs' is true. In this case, we don't need to keep the + // documents themselves, only a count of the number of documents in the batch. + size_t _count = 0; + + // The approximate memory footprint of the batch in bytes. Always kept at zero when + // 'shouldProduceEmptyDocs' is true. + size_t _memUsageBytes = 0; + }; + + /** * Acquires the appropriate locks, then destroys and de-registers '_exec'. '_exec' must be * non-null. */ @@ -202,13 +260,12 @@ private: void _updateOplogTimestamp(); // Batches results returned from the underlying PlanExecutor. - std::deque<Document> _currentBatch; + Batch _currentBatch; // BSONObj members must outlive _projection and cursor. BSONObj _query; BSONObj _sort; BSONObj _projection; - bool _shouldProduceEmptyDocs = false; boost::optional<ParsedDeps> _dependencies; boost::intrusive_ptr<DocumentSourceLimit> _limit; long long _docsAddedToBatches; // for _limit enforcement |