summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/tee_buffer.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <cswanson310@gmail.com>2016-08-29 10:06:41 -0400
committerCharlie Swanson <cswanson310@gmail.com>2016-09-01 14:08:25 -0400
commitb1014fe1b40a69cd90b27cb336a170317eecc6b7 (patch)
treec189d2d931cdfbdec83359f08cbb1639c1d5e254 /src/mongo/db/pipeline/tee_buffer.cpp
parentd289e240b653e70a7d90be885a3ad6de21b7c6cb (diff)
downloadmongo-b1014fe1b40a69cd90b27cb336a170317eecc6b7.tar.gz
SERVER-24153 Allow pipelines within $facet stage to process in batches.
This approach removes the need to buffer all documents in memory, thus removing concerns about spilling intermediate results to disk.
Diffstat (limited to 'src/mongo/db/pipeline/tee_buffer.cpp')
-rw-r--r--src/mongo/db/pipeline/tee_buffer.cpp87
1 files changed, 58 insertions, 29 deletions
diff --git a/src/mongo/db/pipeline/tee_buffer.cpp b/src/mongo/db/pipeline/tee_buffer.cpp
index 9119665748e..51ed90241e9 100644
--- a/src/mongo/db/pipeline/tee_buffer.cpp
+++ b/src/mongo/db/pipeline/tee_buffer.cpp
@@ -30,49 +30,78 @@
#include "mongo/db/pipeline/tee_buffer.h"
+#include <algorithm>
+
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/db/pipeline/value.h"
namespace mongo {
-TeeBuffer::TeeBuffer(uint64_t maxMemoryUsageBytes)
- : _maxMemoryUsageBytes(maxMemoryUsageBytes), _buffer() {}
+TeeBuffer::TeeBuffer(size_t nConsumers, size_t bufferSizeBytes)
+ : _bufferSizeBytes(bufferSizeBytes), _consumers(nConsumers) {}
-boost::intrusive_ptr<TeeBuffer> TeeBuffer::create(uint64_t maxMemoryUsageBytes) {
- return new TeeBuffer(maxMemoryUsageBytes);
+boost::intrusive_ptr<TeeBuffer> TeeBuffer::create(size_t nConsumers, int bufferSizeBytes) {
+ uassert(40309, "need at least one consumer for a TeeBuffer", nConsumers > 0);
+ uassert(40310,
+ str::stream() << "TeeBuffer requires a positive buffer size, was given "
+ << bufferSizeBytes,
+ bufferSizeBytes > 0);
+ return new TeeBuffer(nConsumers, bufferSizeBytes);
}
-TeeBuffer::const_iterator TeeBuffer::begin() const {
- invariant(_populated);
- return _buffer.begin();
-}
+DocumentSource::GetNextResult TeeBuffer::getNext(size_t consumerId) {
+ size_t nConsumersStillProcessingThisBatch =
+ std::count_if(_consumers.begin(), _consumers.end(), [](const ConsumerInfo& info) {
+ return info.nLeftToReturn > 0;
+ });
+
+ if (_buffer.empty() || nConsumersStillProcessingThisBatch == 0) {
+ loadNextBatch();
+ }
+
+ if (_buffer.empty()) {
+ // If we've loaded the next batch and it's still empty, then we've exhausted our input.
+ return DocumentSource::GetNextResult::makeEOF();
+ }
+
+ if (_consumers[consumerId].nLeftToReturn == 0) {
+ // This consumer has reached the end of this batch, but there are still other consumers that
+ // haven't seen this whole batch.
+ return DocumentSource::GetNextResult::makePauseExecution();
+ }
-TeeBuffer::const_iterator TeeBuffer::end() const {
- invariant(_populated);
- return _buffer.end();
+ const size_t bufferIndex = _buffer.size() - _consumers[consumerId].nLeftToReturn;
+ --_consumers[consumerId].nLeftToReturn;
+
+ return _buffer[bufferIndex];
}
-void TeeBuffer::dispose() {
+void TeeBuffer::loadNextBatch() {
_buffer.clear();
- _populated = false; // Set this to ensure no one is calling begin() or end().
-}
+ size_t bytesInBuffer = 0;
-void TeeBuffer::populate() {
- invariant(_source);
- if (_populated) {
- return;
- }
- _populated = true;
+ auto input = _source->getNext();
+ for (; input.isAdvanced(); input = _source->getNext()) {
+
+ // For the following reasons, we invariant that we never get a paused input:
+ // - TeeBuffer is the only place where a paused GetNextReturn will be returned.
+ // - The $facet stage is the only stage that uses TeeBuffer.
+ // - We currently disallow nested $facet stages.
+ invariant(!input.isPaused());
- size_t estimatedMemoryUsageBytes = 0;
- while (auto next = _source->getNext()) {
- estimatedMemoryUsageBytes += next->getApproximateSize();
- uassert(40174,
- "Exceeded memory limit for $facet",
- estimatedMemoryUsageBytes <= _maxMemoryUsageBytes);
+ bytesInBuffer += input.getDocument().getApproximateSize();
+ _buffer.push_back(std::move(input));
- _buffer.emplace_back(std::move(*next));
+ if (bytesInBuffer >= _bufferSizeBytes) {
+ break; // Need to break here so we don't get the next input and accidentally ignore it.
+ }
+ }
+
+ // Populate the pending returns.
+ for (size_t consumerId = 0; consumerId < _consumers.size(); ++consumerId) {
+ if (_consumers[consumerId].stillInUse) {
+ _consumers[consumerId].nLeftToReturn = _buffer.size();
+ }
}
}
+
} // namespace mongo