summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2020-02-24 19:20:29 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 23:54:51 +0000
commit768e87bbf6213d26f83ad2c526d4aab36e64d185 (patch)
tree877fb129c081c9813fd10e6e10fb85afbb8815c2
parent249f411a79fd11559830658d45b1ebca4d6c9865 (diff)
downloadmongo-768e87bbf6213d26f83ad2c526d4aab36e64d185.tar.gz
SERVER-45418 Avoid explicitly batching documents in $cursor for count-like aggregates.
(cherry picked from commit 7c4676ef0e8e47cf79e10b81f7661f8fbea82cb0)
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp68
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h62
2 files changed, 111 insertions, 19 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 61bbaecaec2..73b0f85f8a5 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -50,10 +50,50 @@ const char* DocumentSourceCursor::getSourceName() const {
return "$cursor";
}
+bool DocumentSourceCursor::Batch::isEmpty() const {
+ if (shouldProduceEmptyDocs) {
+ return !_count;
+ } else {
+ return _batchOfDocs.empty();
+ }
+ MONGO_UNREACHABLE;
+}
+
+void DocumentSourceCursor::Batch::enqueue(Document&& doc) {
+ if (shouldProduceEmptyDocs) {
+ ++_count;
+ } else {
+ _batchOfDocs.push_back(doc.getOwned());
+ _memUsageBytes += _batchOfDocs.back().getApproximateSize();
+ }
+}
+
+Document DocumentSourceCursor::Batch::dequeue() {
+ invariant(!isEmpty());
+ if (shouldProduceEmptyDocs) {
+ --_count;
+ return Document{};
+ } else {
+ Document out = std::move(_batchOfDocs.front());
+ _batchOfDocs.pop_front();
+ if (_batchOfDocs.empty()) {
+ _memUsageBytes = 0;
+ }
+ return out;
+ }
+ MONGO_UNREACHABLE;
+}
+
+void DocumentSourceCursor::Batch::clear() {
+ _batchOfDocs.clear();
+ _count = 0;
+ _memUsageBytes = 0;
+}
+
DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
pExpCtx->checkForInterrupt();
- if (_currentBatch.empty()) {
+ if (_currentBatch.isEmpty()) {
loadBatch();
}
@@ -61,12 +101,10 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
if (_trackOplogTS && _exec)
_updateOplogTimestamp();
- if (_currentBatch.empty())
+ if (_currentBatch.isEmpty())
return GetNextResult::makeEOF();
- Document out = std::move(_currentBatch.front());
- _currentBatch.pop_front();
- return std::move(out);
+ return _currentBatch.dequeue();
}
void DocumentSourceCursor::loadBatch() {
@@ -84,17 +122,16 @@ void DocumentSourceCursor::loadBatch() {
uassertStatusOK(_exec->restoreState());
- int memUsageBytes = 0;
{
ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); });
while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) {
- if (_shouldProduceEmptyDocs) {
- _currentBatch.push_back(Document());
+ if (_currentBatch.shouldProduceEmptyDocs) {
+ _currentBatch.enqueue(Document());
} else if (_dependencies) {
- _currentBatch.push_back(_dependencies->extractFields(resultObj));
+ _currentBatch.enqueue(_dependencies->extractFields(resultObj));
} else {
- _currentBatch.push_back(Document::fromBsonWithMetaData(resultObj));
+ _currentBatch.enqueue(Document::fromBsonWithMetaData(resultObj));
}
if (_limit) {
@@ -104,12 +141,11 @@ void DocumentSourceCursor::loadBatch() {
verify(_docsAddedToBatches < _limit->getLimit());
}
- memUsageBytes += _currentBatch.back().getApproximateSize();
-
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
- memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
+ static_cast<long long>(_currentBatch.memUsageBytes()) >
+ internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
@@ -149,8 +185,8 @@ void DocumentSourceCursor::loadBatch() {
void DocumentSourceCursor::_updateOplogTimestamp() {
// If we are about to return a result, set our oplog timestamp to the optime of that result.
- if (!_currentBatch.empty()) {
- const auto& ts = _currentBatch.front().getField(repl::OpTime::kTimestampFieldName);
+ if (!_currentBatch.isEmpty()) {
+ const auto& ts = _currentBatch.peekFront().getField(repl::OpTime::kTimestampFieldName);
invariant(ts.getType() == BSONType::bsonTimestamp);
_latestOplogTimestamp = ts.getTimestamp();
return;
@@ -344,4 +380,4 @@ intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
new DocumentSourceCursor(collection, std::move(exec), pExpCtx, trackOplogTimestamp));
return source;
}
-}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index a01605a1bfd..da520836a8c 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -132,9 +132,12 @@ public:
/**
* If subsequent sources need no information from the cursor, the cursor can simply output empty
* documents, avoiding the overhead of converting BSONObjs to Documents.
+ *
+ * Illegal to set if the $cursor stage was constructed with 'trackOplogTimestamp' enabled.
*/
void shouldProduceEmptyDocs() {
- _shouldProduceEmptyDocs = true;
+ invariant(!_trackOplogTS);
+ _currentBatch.shouldProduceEmptyDocs = true;
}
Timestamp getLatestOplogTimestamp() const {
@@ -170,6 +173,60 @@ private:
~DocumentSourceCursor();
/**
+ * A $cursor stage loads documents from the underlying PlanExecutor in batches. An object of
+ * this class represents one such batch. Acts like a queue into which documents can be queued
+ * and dequeued in FIFO order.
+ */
+ class Batch {
+ public:
+ /**
+ * Adds a new document to the batch.
+ */
+ void enqueue(Document&& doc);
+
+ /**
+ * Removes the first document from the batch.
+ */
+ Document dequeue();
+
+ void clear();
+
+ bool isEmpty() const;
+
+ /**
+ * Returns the approximate memory footprint of this batch, measured in bytes. Even after
+ * documents are dequeued from the batch, continues to indicate the batch's peak memory
+ * footprint. Resets to zero once the final document in the batch is dequeued.
+ */
+ size_t memUsageBytes() const {
+ return _memUsageBytes;
+ }
+
+ /**
+ * Illegal to call if 'shouldProduceEmptyDocs' is true.
+ */
+ const Document& peekFront() const {
+ invariant(!shouldProduceEmptyDocs);
+ return _batchOfDocs.front();
+ }
+
+ bool shouldProduceEmptyDocs = false;
+
+ private:
+ // Used only if 'shouldProduceEmptyDocs' is false. A deque of the documents comprising the
+ // batch.
+ std::deque<Document> _batchOfDocs;
+
+ // Used only if 'shouldProduceEmptyDocs' is true. In this case, we don't need to keep the
+ // documents themselves, only a count of the number of documents in the batch.
+ size_t _count = 0;
+
+ // The approximate memory footprint of the batch in bytes. Always kept at zero when
+ // 'shouldProduceEmptyDocs' is true.
+ size_t _memUsageBytes = 0;
+ };
+
+ /**
* Acquires the appropriate locks, then destroys and de-registers '_exec'. '_exec' must be
* non-null.
*/
@@ -194,13 +251,12 @@ private:
void _updateOplogTimestamp();
// Batches results returned from the underlying PlanExecutor.
- std::deque<Document> _currentBatch;
+ Batch _currentBatch;
// BSONObj members must outlive _projection and cursor.
BSONObj _query;
BSONObj _sort;
BSONObj _projection;
- bool _shouldProduceEmptyDocs = false;
boost::optional<ParsedDeps> _dependencies;
boost::intrusive_ptr<DocumentSourceLimit> _limit;
long long _docsAddedToBatches; // for _limit enforcement