From b1014fe1b40a69cd90b27cb336a170317eecc6b7 Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Mon, 29 Aug 2016 10:06:41 -0400 Subject: SERVER-24153 Allow pipelines within $facet stage to process in batches. This approach removes the need to buffer all documents in memory, thus removing concerns about spilling intermediate results to disk. --- src/mongo/db/pipeline/document_source_facet.cpp | 100 +++++++++++++----------- 1 file changed, 53 insertions(+), 47 deletions(-) (limited to 'src/mongo/db/pipeline/document_source_facet.cpp') diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 312ed25d1db..3761fb24e81 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -32,6 +32,7 @@ #include +#include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/bsontypes.h" @@ -47,25 +48,25 @@ namespace mongo { using boost::intrusive_ptr; +using std::string; using std::vector; -DocumentSourceFacet::DocumentSourceFacet(StringMap> facetPipelines, +DocumentSourceFacet::DocumentSourceFacet(std::vector facetPipelines, const intrusive_ptr& expCtx) - : DocumentSourceNeedsMongod(expCtx), _facetPipelines(std::move(facetPipelines)) { - - // Build the tee stage, and the consumers of the tee. - _teeBuffer = TeeBuffer::create(); - for (auto&& facet : _facetPipelines) { - auto pipeline = facet.second; - pipeline->addInitialSource(DocumentSourceTeeConsumer::create(pExpCtx, _teeBuffer)); + : DocumentSourceNeedsMongod(expCtx), + _teeBuffer(TeeBuffer::create(facetPipelines.size())), + _facets(std::move(facetPipelines)) { + for (size_t facetId = 0; facetId < _facets.size(); ++facetId) { + auto& facet = _facets[facetId]; + facet.pipeline->addInitialSource( + DocumentSourceTeeConsumer::create(pExpCtx, facetId, _teeBuffer)); } } REGISTER_DOCUMENT_SOURCE(facet, DocumentSourceFacet::createFromBson); intrusive_ptr DocumentSourceFacet::create( - StringMap> facetPipelines, - const intrusive_ptr& expCtx) { + std::vector facetPipelines, const intrusive_ptr& expCtx) { return new DocumentSourceFacet(std::move(facetPipelines), expCtx); } @@ -73,64 +74,69 @@ void DocumentSourceFacet::setSource(DocumentSource* source) { _teeBuffer->setSource(source); } -boost::optional DocumentSourceFacet::getNext() { +DocumentSource::GetNextResult DocumentSourceFacet::getNext() { pExpCtx->checkForInterrupt(); if (_done) { - return boost::none; + return GetNextResult::makeEOF(); } - _done = true; // We will only ever produce one result. - // Build the results by executing each pipeline serially, one at a time. - MutableDocument results; - for (auto&& facet : _facetPipelines) { - auto facetName = facet.first; - auto facetPipeline = facet.second; - - std::vector facetResults; - while (auto next = facetPipeline->getSources().back()->getNext()) { - facetResults.emplace_back(std::move(*next)); + vector> results(_facets.size()); + bool allPipelinesEOF = false; + while (!allPipelinesEOF) { + allPipelinesEOF = true; // Set this to false if any pipeline isn't EOF. + for (size_t facetId = 0; facetId < _facets.size(); ++facetId) { + const auto& pipeline = _facets[facetId].pipeline; + auto next = pipeline->getSources().back()->getNext(); + for (; next.isAdvanced(); next = pipeline->getSources().back()->getNext()) { + results[facetId].emplace_back(next.releaseDocument()); + } + allPipelinesEOF = allPipelinesEOF && next.isEOF(); } - results[facetName] = Value(std::move(facetResults)); } - _teeBuffer->dispose(); // Clear the buffer since we'll no longer need it. - return results.freeze(); + MutableDocument resultDoc; + for (size_t facetId = 0; facetId < _facets.size(); ++facetId) { + resultDoc[_facets[facetId].name] = Value(std::move(results[facetId])); + } + + _done = true; // We will only ever produce one result. + return resultDoc.freeze(); } Value DocumentSourceFacet::serialize(bool explain) const { MutableDocument serialized; - for (auto&& facet : _facetPipelines) { - serialized[facet.first] = - Value(explain ? facet.second->writeExplainOps() : facet.second->serialize()); + for (auto&& facet : _facets) { + serialized[facet.name] = + Value(explain ? facet.pipeline->writeExplainOps() : facet.pipeline->serialize()); } return Value(Document{{"$facet", serialized.freezeToValue()}}); } -void DocumentSourceFacet::addInvolvedCollections(std::vector* collections) const { - for (auto&& facet : _facetPipelines) { - for (auto&& source : facet.second->getSources()) { +void DocumentSourceFacet::addInvolvedCollections(vector* collections) const { + for (auto&& facet : _facets) { + for (auto&& source : facet.pipeline->getSources()) { source->addInvolvedCollections(collections); } } } intrusive_ptr DocumentSourceFacet::optimize() { - for (auto&& facet : _facetPipelines) { - facet.second->optimizePipeline(); + for (auto&& facet : _facets) { + facet.pipeline->optimizePipeline(); } return this; } void DocumentSourceFacet::doInjectExpressionContext() { - for (auto&& facet : _facetPipelines) { - facet.second->injectExpressionContext(pExpCtx); + for (auto&& facet : _facets) { + facet.pipeline->injectExpressionContext(pExpCtx); } } void DocumentSourceFacet::doInjectMongodInterface(std::shared_ptr mongod) { - for (auto&& facet : _facetPipelines) { - for (auto&& stage : facet.second->getSources()) { + for (auto&& facet : _facets) { + for (auto&& stage : facet.pipeline->getSources()) { if (auto stageNeedingMongod = dynamic_cast(stage.get())) { stageNeedingMongod->injectMongodInterface(mongod); } @@ -139,14 +145,14 @@ void DocumentSourceFacet::doInjectMongodInterface(std::shared_ptrdetachFromOperationContext(); + for (auto&& facet : _facets) { + facet.pipeline->detachFromOperationContext(); } } void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) { - for (auto&& facet : _facetPipelines) { - facet.second->reattachToOperationContext(opCtx); + for (auto&& facet : _facets) { + facet.pipeline->reattachToOperationContext(opCtx); } } @@ -154,8 +160,8 @@ bool DocumentSourceFacet::needsPrimaryShard() const { // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). // This means that if any stage in any of the $facet pipelines requires the primary shard, then // the entire $facet must happen on the merger, and the merger must be the primary shard. - for (auto&& facet : _facetPipelines) { - if (facet.second->needsPrimaryShardMerger()) { + for (auto&& facet : _facets) { + if (facet.pipeline->needsPrimaryShardMerger()) { return true; } } @@ -163,8 +169,8 @@ bool DocumentSourceFacet::needsPrimaryShard() const { } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { - for (auto&& facet : _facetPipelines) { - auto subDepsTracker = facet.second->getDependencies(deps->getMetadataAvailable()); + for (auto&& facet : _facets) { + auto subDepsTracker = facet.pipeline->getDependencies(deps->getMetadataAvailable()); deps->fields.insert(subDepsTracker.fields.begin(), subDepsTracker.fields.end()); @@ -188,7 +194,7 @@ intrusive_ptr DocumentSourceFacet::createFromBson( << elem, elem.type() == BSONType::Object && !elem.embeddedObject().isEmpty()); - StringMap> facetPipelines; + std::vector facetPipelines; for (auto&& facetElem : elem.embeddedObject()) { const auto facetName = facetElem.fieldNameStringData(); FieldPath::uassertValidFieldName(facetName); @@ -229,7 +235,7 @@ intrusive_ptr DocumentSourceFacet::createFromBson( } } - facetPipelines[facetName] = pipeline; + facetPipelines.emplace_back(facetName.toString(), std::move(pipeline)); } return new DocumentSourceFacet(std::move(facetPipelines), expCtx); -- cgit v1.2.1