summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_facet.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <cswanson310@gmail.com>2016-08-29 10:06:41 -0400
committerCharlie Swanson <cswanson310@gmail.com>2016-09-01 14:08:25 -0400
commitb1014fe1b40a69cd90b27cb336a170317eecc6b7 (patch)
treec189d2d931cdfbdec83359f08cbb1639c1d5e254 /src/mongo/db/pipeline/document_source_facet.cpp
parentd289e240b653e70a7d90be885a3ad6de21b7c6cb (diff)
downloadmongo-b1014fe1b40a69cd90b27cb336a170317eecc6b7.tar.gz
SERVER-24153 Allow pipelines within $facet stage to process in batches.
This approach removes the need to buffer all documents in memory, thus removing concerns about spilling intermediate results to disk.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_facet.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp100
1 files changed, 53 insertions, 47 deletions
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 312ed25d1db..3761fb24e81 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -32,6 +32,7 @@
#include <vector>
+#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/bsontypes.h"
@@ -47,25 +48,25 @@
namespace mongo {
using boost::intrusive_ptr;
+using std::string;
using std::vector;
-DocumentSourceFacet::DocumentSourceFacet(StringMap<intrusive_ptr<Pipeline>> facetPipelines,
+DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines,
const intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceNeedsMongod(expCtx), _facetPipelines(std::move(facetPipelines)) {
-
- // Build the tee stage, and the consumers of the tee.
- _teeBuffer = TeeBuffer::create();
- for (auto&& facet : _facetPipelines) {
- auto pipeline = facet.second;
- pipeline->addInitialSource(DocumentSourceTeeConsumer::create(pExpCtx, _teeBuffer));
+ : DocumentSourceNeedsMongod(expCtx),
+ _teeBuffer(TeeBuffer::create(facetPipelines.size())),
+ _facets(std::move(facetPipelines)) {
+ for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
+ auto& facet = _facets[facetId];
+ facet.pipeline->addInitialSource(
+ DocumentSourceTeeConsumer::create(pExpCtx, facetId, _teeBuffer));
}
}
REGISTER_DOCUMENT_SOURCE(facet, DocumentSourceFacet::createFromBson);
intrusive_ptr<DocumentSourceFacet> DocumentSourceFacet::create(
- StringMap<intrusive_ptr<Pipeline>> facetPipelines,
- const intrusive_ptr<ExpressionContext>& expCtx) {
+ std::vector<FacetPipeline> facetPipelines, const intrusive_ptr<ExpressionContext>& expCtx) {
return new DocumentSourceFacet(std::move(facetPipelines), expCtx);
}
@@ -73,64 +74,69 @@ void DocumentSourceFacet::setSource(DocumentSource* source) {
_teeBuffer->setSource(source);
}
-boost::optional<Document> DocumentSourceFacet::getNext() {
+DocumentSource::GetNextResult DocumentSourceFacet::getNext() {
pExpCtx->checkForInterrupt();
if (_done) {
- return boost::none;
+ return GetNextResult::makeEOF();
}
- _done = true; // We will only ever produce one result.
- // Build the results by executing each pipeline serially, one at a time.
- MutableDocument results;
- for (auto&& facet : _facetPipelines) {
- auto facetName = facet.first;
- auto facetPipeline = facet.second;
-
- std::vector<Value> facetResults;
- while (auto next = facetPipeline->getSources().back()->getNext()) {
- facetResults.emplace_back(std::move(*next));
+ vector<vector<Value>> results(_facets.size());
+ bool allPipelinesEOF = false;
+ while (!allPipelinesEOF) {
+ allPipelinesEOF = true; // Set this to false if any pipeline isn't EOF.
+ for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
+ const auto& pipeline = _facets[facetId].pipeline;
+ auto next = pipeline->getSources().back()->getNext();
+ for (; next.isAdvanced(); next = pipeline->getSources().back()->getNext()) {
+ results[facetId].emplace_back(next.releaseDocument());
+ }
+ allPipelinesEOF = allPipelinesEOF && next.isEOF();
}
- results[facetName] = Value(std::move(facetResults));
}
- _teeBuffer->dispose(); // Clear the buffer since we'll no longer need it.
- return results.freeze();
+ MutableDocument resultDoc;
+ for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
+ resultDoc[_facets[facetId].name] = Value(std::move(results[facetId]));
+ }
+
+ _done = true; // We will only ever produce one result.
+ return resultDoc.freeze();
}
Value DocumentSourceFacet::serialize(bool explain) const {
MutableDocument serialized;
- for (auto&& facet : _facetPipelines) {
- serialized[facet.first] =
- Value(explain ? facet.second->writeExplainOps() : facet.second->serialize());
+ for (auto&& facet : _facets) {
+ serialized[facet.name] =
+ Value(explain ? facet.pipeline->writeExplainOps() : facet.pipeline->serialize());
}
return Value(Document{{"$facet", serialized.freezeToValue()}});
}
-void DocumentSourceFacet::addInvolvedCollections(std::vector<NamespaceString>* collections) const {
- for (auto&& facet : _facetPipelines) {
- for (auto&& source : facet.second->getSources()) {
+void DocumentSourceFacet::addInvolvedCollections(vector<NamespaceString>* collections) const {
+ for (auto&& facet : _facets) {
+ for (auto&& source : facet.pipeline->getSources()) {
source->addInvolvedCollections(collections);
}
}
}
intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() {
- for (auto&& facet : _facetPipelines) {
- facet.second->optimizePipeline();
+ for (auto&& facet : _facets) {
+ facet.pipeline->optimizePipeline();
}
return this;
}
void DocumentSourceFacet::doInjectExpressionContext() {
- for (auto&& facet : _facetPipelines) {
- facet.second->injectExpressionContext(pExpCtx);
+ for (auto&& facet : _facets) {
+ facet.pipeline->injectExpressionContext(pExpCtx);
}
}
void DocumentSourceFacet::doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) {
- for (auto&& facet : _facetPipelines) {
- for (auto&& stage : facet.second->getSources()) {
+ for (auto&& facet : _facets) {
+ for (auto&& stage : facet.pipeline->getSources()) {
if (auto stageNeedingMongod = dynamic_cast<DocumentSourceNeedsMongod*>(stage.get())) {
stageNeedingMongod->injectMongodInterface(mongod);
}
@@ -139,14 +145,14 @@ void DocumentSourceFacet::doInjectMongodInterface(std::shared_ptr<MongodInterfac
}
void DocumentSourceFacet::doDetachFromOperationContext() {
- for (auto&& facet : _facetPipelines) {
- facet.second->detachFromOperationContext();
+ for (auto&& facet : _facets) {
+ facet.pipeline->detachFromOperationContext();
}
}
void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) {
- for (auto&& facet : _facetPipelines) {
- facet.second->reattachToOperationContext(opCtx);
+ for (auto&& facet : _facets) {
+ facet.pipeline->reattachToOperationContext(opCtx);
}
}
@@ -154,8 +160,8 @@ bool DocumentSourceFacet::needsPrimaryShard() const {
// Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
// This means that if any stage in any of the $facet pipelines requires the primary shard, then
// the entire $facet must happen on the merger, and the merger must be the primary shard.
- for (auto&& facet : _facetPipelines) {
- if (facet.second->needsPrimaryShardMerger()) {
+ for (auto&& facet : _facets) {
+ if (facet.pipeline->needsPrimaryShardMerger()) {
return true;
}
}
@@ -163,8 +169,8 @@ bool DocumentSourceFacet::needsPrimaryShard() const {
}
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
- for (auto&& facet : _facetPipelines) {
- auto subDepsTracker = facet.second->getDependencies(deps->getMetadataAvailable());
+ for (auto&& facet : _facets) {
+ auto subDepsTracker = facet.pipeline->getDependencies(deps->getMetadataAvailable());
deps->fields.insert(subDepsTracker.fields.begin(), subDepsTracker.fields.end());
@@ -188,7 +194,7 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
<< elem,
elem.type() == BSONType::Object && !elem.embeddedObject().isEmpty());
- StringMap<intrusive_ptr<Pipeline>> facetPipelines;
+ std::vector<FacetPipeline> facetPipelines;
for (auto&& facetElem : elem.embeddedObject()) {
const auto facetName = facetElem.fieldNameStringData();
FieldPath::uassertValidFieldName(facetName);
@@ -229,7 +235,7 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
}
}
- facetPipelines[facetName] = pipeline;
+ facetPipelines.emplace_back(facetName.toString(), std::move(pipeline));
}
return new DocumentSourceFacet(std::move(facetPipelines), expCtx);