diff options
author | Charlie Swanson <cswanson310@gmail.com> | 2016-08-29 10:06:41 -0400 |
---|---|---|
committer | Charlie Swanson <cswanson310@gmail.com> | 2016-09-01 14:08:25 -0400 |
commit | b1014fe1b40a69cd90b27cb336a170317eecc6b7 (patch) | |
tree | c189d2d931cdfbdec83359f08cbb1639c1d5e254 /src/mongo/db/pipeline/tee_buffer.cpp | |
parent | d289e240b653e70a7d90be885a3ad6de21b7c6cb (diff) | |
download | mongo-b1014fe1b40a69cd90b27cb336a170317eecc6b7.tar.gz |
SERVER-24153 Allow pipelines within $facet stage to process in batches.
This approach removes the need to buffer all documents in memory, thus
removing concerns about spilling intermediate results to disk.
Diffstat (limited to 'src/mongo/db/pipeline/tee_buffer.cpp')
-rw-r--r-- | src/mongo/db/pipeline/tee_buffer.cpp | 87 |
1 files changed, 58 insertions, 29 deletions
diff --git a/src/mongo/db/pipeline/tee_buffer.cpp b/src/mongo/db/pipeline/tee_buffer.cpp index 9119665748e..51ed90241e9 100644 --- a/src/mongo/db/pipeline/tee_buffer.cpp +++ b/src/mongo/db/pipeline/tee_buffer.cpp @@ -30,49 +30,78 @@ #include "mongo/db/pipeline/tee_buffer.h" +#include <algorithm> + #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/pipeline/value.h" namespace mongo { -TeeBuffer::TeeBuffer(uint64_t maxMemoryUsageBytes) - : _maxMemoryUsageBytes(maxMemoryUsageBytes), _buffer() {} +TeeBuffer::TeeBuffer(size_t nConsumers, size_t bufferSizeBytes) + : _bufferSizeBytes(bufferSizeBytes), _consumers(nConsumers) {} -boost::intrusive_ptr<TeeBuffer> TeeBuffer::create(uint64_t maxMemoryUsageBytes) { - return new TeeBuffer(maxMemoryUsageBytes); +boost::intrusive_ptr<TeeBuffer> TeeBuffer::create(size_t nConsumers, int bufferSizeBytes) { + uassert(40309, "need at least one consumer for a TeeBuffer", nConsumers > 0); + uassert(40310, + str::stream() << "TeeBuffer requires a positive buffer size, was given " + << bufferSizeBytes, + bufferSizeBytes > 0); + return new TeeBuffer(nConsumers, bufferSizeBytes); } -TeeBuffer::const_iterator TeeBuffer::begin() const { - invariant(_populated); - return _buffer.begin(); -} +DocumentSource::GetNextResult TeeBuffer::getNext(size_t consumerId) { + size_t nConsumersStillProcessingThisBatch = + std::count_if(_consumers.begin(), _consumers.end(), [](const ConsumerInfo& info) { + return info.nLeftToReturn > 0; + }); + + if (_buffer.empty() || nConsumersStillProcessingThisBatch == 0) { + loadNextBatch(); + } + + if (_buffer.empty()) { + // If we've loaded the next batch and it's still empty, then we've exhausted our input. + return DocumentSource::GetNextResult::makeEOF(); + } + + if (_consumers[consumerId].nLeftToReturn == 0) { + // This consumer has reached the end of this batch, but there are still other consumers that + // haven't seen this whole batch. + return DocumentSource::GetNextResult::makePauseExecution(); + } -TeeBuffer::const_iterator TeeBuffer::end() const { - invariant(_populated); - return _buffer.end(); + const size_t bufferIndex = _buffer.size() - _consumers[consumerId].nLeftToReturn; + --_consumers[consumerId].nLeftToReturn; + + return _buffer[bufferIndex]; } -void TeeBuffer::dispose() { +void TeeBuffer::loadNextBatch() { _buffer.clear(); - _populated = false; // Set this to ensure no one is calling begin() or end(). -} + size_t bytesInBuffer = 0; -void TeeBuffer::populate() { - invariant(_source); - if (_populated) { - return; - } - _populated = true; + auto input = _source->getNext(); + for (; input.isAdvanced(); input = _source->getNext()) { + + // For the following reasons, we invariant that we never get a paused input: + // - TeeBuffer is the only place where a paused GetNextReturn will be returned. + // - The $facet stage is the only stage that uses TeeBuffer. + // - We currently disallow nested $facet stages. + invariant(!input.isPaused()); - size_t estimatedMemoryUsageBytes = 0; - while (auto next = _source->getNext()) { - estimatedMemoryUsageBytes += next->getApproximateSize(); - uassert(40174, - "Exceeded memory limit for $facet", - estimatedMemoryUsageBytes <= _maxMemoryUsageBytes); + bytesInBuffer += input.getDocument().getApproximateSize(); + _buffer.push_back(std::move(input)); - _buffer.emplace_back(std::move(*next)); + if (bytesInBuffer >= _bufferSizeBytes) { + break; // Need to break here so we don't get the next input and accidentally ignore it. + } + } + + // Populate the pending returns. + for (size_t consumerId = 0; consumerId < _consumers.size(); ++consumerId) { + if (_consumers[consumerId].stillInUse) { + _consumers[consumerId].nLeftToReturn = _buffer.size(); + } } } + } // namespace mongo |