summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorCharlie Swanson <cswanson310@gmail.com>2015-06-24 10:03:42 -0400
committerCharlie Swanson <cswanson310@gmail.com>2015-06-24 18:18:18 -0400
commit0e9371dbed499f6f2ab5066fffdc746747351c6d (patch)
tree8a15fd87d8004fbcf324e02fe52586f8f509deab /src/mongo/db/pipeline
parent92750368e7a65031f4545d7c52ba268033b94a74 (diff)
downloadmongo-0e9371dbed499f6f2ab5066fffdc746747351c6d.tar.gz
SERVER-19105 Define macro for registering DocumentSources
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source.cpp32
-rw-r--r--src/mongo/db/pipeline/document_source.h31
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_project.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_redact.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_skip.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.cpp12
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp52
14 files changed, 106 insertions, 101 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 57d12a7c85c..718fa618300 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -31,15 +31,47 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/value.h"
+#include "mongo/util/string_map.h"
namespace mongo {
+using Parser = DocumentSource::Parser;
using boost::intrusive_ptr;
+using std::string;
using std::vector;
DocumentSource::DocumentSource(const intrusive_ptr<ExpressionContext>& pCtx)
: pSource(NULL), pExpCtx(pCtx) {}
+namespace {
+// Used to keep track of which DocumentSources are registered under which name.
+static StringMap<Parser> parserMap;
+} // namespace
+
+void DocumentSource::registerParser(string name, Parser parser) {
+ auto it = parserMap.find(name);
+ massert(28707,
+ str::stream() << "Duplicate document source (" << name << ") registered.",
+ it == parserMap.end());
+ parserMap[name] = parser;
+}
+
+intrusive_ptr<DocumentSource> DocumentSource::parse(const intrusive_ptr<ExpressionContext> expCtx,
+ BSONObj stageObj) {
+ uassert(16435,
+ "A pipeline stage specification object must contain exactly one field.",
+ stageObj.nFields() == 1);
+ BSONElement stageSpec = stageObj.firstElement();
+ auto stageName = stageSpec.fieldNameStringData();
+
+ // Get the registered parser and call that.
+ auto it = parserMap.find(stageName);
+ uassert(16436,
+ str::stream() << "Unrecognized pipeline stage name: '" << stageName << "'",
+ it != parserMap.end());
+ return it->second(stageSpec, expCtx);
+}
+
const char* DocumentSource::getSourceName() const {
static const char unknown[] = "[UNKNOWN]";
return unknown;
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 4902f8b4a40..eec85b22c07 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -35,6 +35,7 @@
#include <boost/unordered_map.hpp>
#include <deque>
+#include "mongo/base/init.h"
#include "mongo/client/connpool.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/jsobj.h"
@@ -46,6 +47,7 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/sorter/sorter.h"
#include "mongo/s/strategy.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/intrusive_counter.h"
@@ -59,8 +61,22 @@ class ExpressionObject;
class DocumentSourceLimit;
class PlanExecutor;
+/**
+ * Registers a DocumentSource to have the name 'key'. When a stage with name '$key' is found,
+ * 'parser' will be called to construct a DocumentSource.
+ */
+#define REGISTER_DOCUMENT_SOURCE(key, parser) \
+ MONGO_INITIALIZER(addToDocSourceParserMap_##key)(InitializerContext*) { \
+ /* Prevent duplicate document sources with the same name. */ \
+ DocumentSource::registerParser("$" #key, (parser)); \
+ return Status::OK(); \
+ }
+
class DocumentSource : public IntrusiveCounterUnsigned {
public:
+ using Parser = stdx::function<boost::intrusive_ptr<DocumentSource>(
+ BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
+
virtual ~DocumentSource() {}
/** Returns the next Document if there is one or boost::none if at EOF.
@@ -160,6 +176,21 @@ public:
return false;
}
+ /**
+ * Create a DocumentSource pipeline stage from 'stageObj'.
+ */
+ static boost::intrusive_ptr<DocumentSource> parse(
+ const boost::intrusive_ptr<ExpressionContext> expCtx, BSONObj stageObj);
+
+ /**
+ * Registers a DocumentSource with a parsing function, so that when a stage with the given name
+ * is encountered, it will call 'parser' to construct that stage.
+ *
+ * DO NOT call this method directly. Instead, use the REGISTER_DOCUMENT_SOURCE macro defined in
+ * this file.
+ */
+ static void registerParser(std::string name, Parser parser);
+
protected:
/**
Base constructor.
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index c66702480fc..a8ec65a775b 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -39,9 +39,10 @@ namespace mongo {
using boost::intrusive_ptr;
using std::min;
-char DocumentSourceGeoNear::geoNearName[] = "$geoNear";
+REGISTER_DOCUMENT_SOURCE(geoNear, DocumentSourceGeoNear::createFromBson);
+
const char* DocumentSourceGeoNear::getSourceName() const {
- return geoNearName;
+ return "$geoNear";
}
boost::optional<Document> DocumentSourceGeoNear::getNext() {
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 487f0809ed0..53f8dc3f8e3 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -44,10 +44,10 @@ using std::shared_ptr;
using std::pair;
using std::vector;
-const char DocumentSourceGroup::groupName[] = "$group";
+REGISTER_DOCUMENT_SOURCE(group, DocumentSourceGroup::createFromBson);
const char* DocumentSourceGroup::getSourceName() const {
- return groupName;
+ return "$group";
}
boost::optional<Document> DocumentSourceGroup::getNext() {
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index 9729c6bae8f..b7e09785734 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -39,14 +39,14 @@ namespace mongo {
using boost::intrusive_ptr;
-const char DocumentSourceLimit::limitName[] = "$limit";
-
DocumentSourceLimit::DocumentSourceLimit(const intrusive_ptr<ExpressionContext>& pExpCtx,
long long limit)
: DocumentSource(pExpCtx), limit(limit), count(0) {}
+REGISTER_DOCUMENT_SOURCE(limit, DocumentSourceLimit::createFromBson);
+
const char* DocumentSourceLimit::getSourceName() const {
- return limitName;
+ return "$limit";
}
bool DocumentSourceLimit::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index 50ef5e95eb5..5000a1c7542 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -43,10 +43,10 @@ using boost::intrusive_ptr;
using std::string;
using std::vector;
-const char DocumentSourceMatch::matchName[] = "$match";
+REGISTER_DOCUMENT_SOURCE(match, DocumentSourceMatch::createFromBson);
const char* DocumentSourceMatch::getSourceName() const {
- return matchName;
+ return "$match";
}
Value DocumentSourceMatch::serialize(bool explain) const {
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index d1e618f35bf..81df6ceb2bb 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -38,10 +38,14 @@ using std::make_pair;
using std::string;
using std::vector;
-const char DocumentSourceMergeCursors::name[] = "$mergeCursors";
+DocumentSourceMergeCursors::DocumentSourceMergeCursors(
+ const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSource(pExpCtx), _cursorIds(cursorIds), _unstarted(true) {}
+
+REGISTER_DOCUMENT_SOURCE(mergeCursors, DocumentSourceMergeCursors::createFromBson);
const char* DocumentSourceMergeCursors::getSourceName() const {
- return name;
+ return "$mergeCursors";
}
void DocumentSourceMergeCursors::setSource(DocumentSource* pSource) {
@@ -49,10 +53,6 @@ void DocumentSourceMergeCursors::setSource(DocumentSource* pSource) {
verify(false);
}
-DocumentSourceMergeCursors::DocumentSourceMergeCursors(
- const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _cursorIds(cursorIds), _unstarted(true) {}
-
intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx) {
return new DocumentSourceMergeCursors(cursorIds, pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 858df389be8..cb702ef9cf7 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -35,8 +35,6 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
-const char DocumentSourceOut::outName[] = "$out";
-
DocumentSourceOut::~DocumentSourceOut() {
DESTRUCTOR_GUARD(
// Make sure we drop the temp collection if anything goes wrong. Errors are ignored
@@ -45,8 +43,10 @@ DocumentSourceOut::~DocumentSourceOut() {
if (_mongod && _tempNs.size()) _mongod->directClient()->dropCollection(_tempNs.ns());)
}
+REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::createFromBson);
+
const char* DocumentSourceOut::getSourceName() const {
- return outName;
+ return "$out";
}
static AtomicUInt32 aggOutCounter;
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp
index c0bc9aa68a2..a5522fd2cf4 100644
--- a/src/mongo/db/pipeline/document_source_project.cpp
+++ b/src/mongo/db/pipeline/document_source_project.cpp
@@ -42,14 +42,14 @@ using boost::intrusive_ptr;
using std::string;
using std::vector;
-const char DocumentSourceProject::projectName[] = "$project";
-
DocumentSourceProject::DocumentSourceProject(const intrusive_ptr<ExpressionContext>& pExpCtx,
const intrusive_ptr<ExpressionObject>& exprObj)
: DocumentSource(pExpCtx), pEO(exprObj) {}
+REGISTER_DOCUMENT_SOURCE(project, DocumentSourceProject::createFromBson);
+
const char* DocumentSourceProject::getSourceName() const {
- return projectName;
+ return "$project";
}
boost::optional<Document> DocumentSourceProject::getNext() {
@@ -90,9 +90,7 @@ Value DocumentSourceProject::serialize(bool explain) const {
intrusive_ptr<DocumentSource> DocumentSourceProject::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
/* validate */
- uassert(15969,
- str::stream() << projectName << " specification must be an object",
- elem.type() == Object);
+ uassert(15969, "$project specification must be an object", elem.type() == Object);
Expression::ObjectCtx objectCtx(Expression::ObjectCtx::DOCUMENT_OK |
Expression::ObjectCtx::TOP_LEVEL |
diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp
index 860dd3a8f73..7067d27930a 100644
--- a/src/mongo/db/pipeline/document_source_redact.cpp
+++ b/src/mongo/db/pipeline/document_source_redact.cpp
@@ -42,14 +42,14 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
-const char DocumentSourceRedact::redactName[] = "$redact";
-
DocumentSourceRedact::DocumentSourceRedact(const intrusive_ptr<ExpressionContext>& expCtx,
const intrusive_ptr<Expression>& expression)
: DocumentSource(expCtx), _expression(expression) {}
+REGISTER_DOCUMENT_SOURCE(redact, DocumentSourceRedact::createFromBson);
+
const char* DocumentSourceRedact::getSourceName() const {
- return redactName;
+ return "$redact";
}
static const Value descendVal = Value("descend");
diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp
index 3a1430b3b98..ff2e5d37161 100644
--- a/src/mongo/db/pipeline/document_source_skip.cpp
+++ b/src/mongo/db/pipeline/document_source_skip.cpp
@@ -39,13 +39,13 @@ namespace mongo {
using boost::intrusive_ptr;
-const char DocumentSourceSkip::skipName[] = "$skip";
-
DocumentSourceSkip::DocumentSourceSkip(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx), _skip(0), _needToSkip(true) {}
+REGISTER_DOCUMENT_SOURCE(skip, DocumentSourceSkip::createFromBson);
+
const char* DocumentSourceSkip::getSourceName() const {
- return skipName;
+ return "$skip";
}
bool DocumentSourceSkip::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
@@ -91,17 +91,13 @@ intrusive_ptr<DocumentSourceSkip> DocumentSourceSkip::create(
intrusive_ptr<DocumentSource> DocumentSourceSkip::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(15972,
- str::stream() << DocumentSourceSkip::skipName
- << ": the value to skip must be a number",
+ str::stream() << "Argument to $skip must be a number not a " << typeName(elem.type()),
elem.isNumber());
intrusive_ptr<DocumentSourceSkip> pSkip(DocumentSourceSkip::create(pExpCtx));
pSkip->_skip = elem.numberLong();
- uassert(15956,
- str::stream() << DocumentSourceSkip::skipName
- << ": the number to skip cannot be negative",
- pSkip->_skip >= 0);
+ uassert(15956, "Argument to $skip cannot be negative", pSkip->_skip >= 0);
return pSkip;
}
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 1b7396b8513..3efe891acf9 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -45,10 +45,10 @@ using std::make_pair;
using std::string;
using std::vector;
-const char DocumentSourceSort::sortName[] = "$sort";
+REGISTER_DOCUMENT_SOURCE(sort, DocumentSourceSort::createFromBson);
const char* DocumentSourceSort::getSourceName() const {
- return sortName;
+ return "$sort";
}
boost::optional<Document> DocumentSourceSort::getNext() {
@@ -143,10 +143,7 @@ DocumentSource::GetDepsReturn DocumentSourceSort::getDependencies(DepsTracker* d
intrusive_ptr<DocumentSource> DocumentSourceSort::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
- uassert(15973,
- str::stream() << " the " << sortName << " key specification must be an object",
- elem.type() == Object);
-
+ uassert(15973, "the $sort key specification must be an object", elem.type() == Object);
return create(pExpCtx, elem.embeddedObject());
}
@@ -189,9 +186,7 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
pSort->addKey(fieldName, (sortOrder > 0));
}
- uassert(15976,
- str::stream() << sortName << " must have at least one sort key",
- !pSort->vSortKey.empty());
+ uassert(15976, "$sort stage must have at least one sort key", !pSort->vSortKey.empty());
if (limit > 0) {
bool coalesced = pSort->coalesce(DocumentSourceLimit::create(pExpCtx, limit));
diff --git a/src/mongo/db/pipeline/document_source_unwind.cpp b/src/mongo/db/pipeline/document_source_unwind.cpp
index 8ec126b967c..efc6aaa13ee 100644
--- a/src/mongo/db/pipeline/document_source_unwind.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind.cpp
@@ -111,13 +111,13 @@ boost::optional<Document> DocumentSourceUnwind::Unwinder::getNext() {
return _output.peek();
}
-const char DocumentSourceUnwind::unwindName[] = "$unwind";
-
DocumentSourceUnwind::DocumentSourceUnwind(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx) {}
+REGISTER_DOCUMENT_SOURCE(unwind, DocumentSourceUnwind::createFromBson);
+
const char* DocumentSourceUnwind::getSourceName() const {
- return unwindName;
+ return "$unwind";
}
boost::optional<Document> DocumentSourceUnwind::getNext() {
@@ -151,7 +151,7 @@ DocumentSource::GetDepsReturn DocumentSourceUnwind::getDependencies(DepsTracker*
void DocumentSourceUnwind::unwindPath(const FieldPath& fieldPath) {
// Can't set more than one unwind path.
- uassert(15979, str::stream() << unwindName << "can't unwind more than one path", !_unwindPath);
+ uassert(15979, "$unwind can't unwind more than one path", !_unwindPath);
// Record the unwind path.
_unwindPath.reset(new FieldPath(fieldPath));
_unwinder.reset(new Unwinder(fieldPath));
@@ -162,9 +162,7 @@ intrusive_ptr<DocumentSource> DocumentSourceUnwind::createFromBson(
/*
The value of $unwind should just be a field path.
*/
- uassert(15981,
- str::stream() << "the " << unwindName << " field path must be specified as a string",
- elem.type() == String);
+ uassert(15981, "the $unwind field path must be specified as a string", elem.type() == String);
string prefixedPathString(elem.str());
string pathString(Expression::removeFieldPrefix(prefixedPathString));
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 9e9427190f1..9103a69407a 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -62,33 +62,6 @@ const char Pipeline::mongosPipelineName[] = "mongosPipeline";
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx)
: explain(false), pCtx(pTheCtx) {}
-
-/* this structure is used to make a lookup table of operators */
-struct StageDesc {
- const char* pName;
- intrusive_ptr<DocumentSource>(*pFactory)(BSONElement, const intrusive_ptr<ExpressionContext>&);
-};
-
-/* this table must be in alphabetical order by name for bsearch() */
-static const StageDesc stageDesc[] = {
- {DocumentSourceGeoNear::geoNearName, DocumentSourceGeoNear::createFromBson},
- {DocumentSourceGroup::groupName, DocumentSourceGroup::createFromBson},
- {DocumentSourceLimit::limitName, DocumentSourceLimit::createFromBson},
- {DocumentSourceMatch::matchName, DocumentSourceMatch::createFromBson},
- {DocumentSourceMergeCursors::name, DocumentSourceMergeCursors::createFromBson},
- {DocumentSourceOut::outName, DocumentSourceOut::createFromBson},
- {DocumentSourceProject::projectName, DocumentSourceProject::createFromBson},
- {DocumentSourceRedact::redactName, DocumentSourceRedact::createFromBson},
- {DocumentSourceSkip::skipName, DocumentSourceSkip::createFromBson},
- {DocumentSourceSort::sortName, DocumentSourceSort::createFromBson},
- {DocumentSourceUnwind::unwindName, DocumentSourceUnwind::createFromBson},
-};
-static const size_t nStageDesc = sizeof(stageDesc) / sizeof(StageDesc);
-
-static int stageDescCmp(const void* pL, const void* pR) {
- return strcmp(((const StageDesc*)pL)->pName, ((const StageDesc*)pR)->pName);
-}
-
intrusive_ptr<Pipeline> Pipeline::parseCommand(string& errmsg,
const BSONObj& cmdObj,
const intrusive_ptr<ExpressionContext>& pCtx) {
@@ -174,31 +147,12 @@ intrusive_ptr<Pipeline> Pipeline::parseCommand(string& errmsg,
uassert(15942,
str::stream() << "pipeline element " << iStep << " is not an object",
pipeElement.type() == Object);
- BSONObj bsonObj(pipeElement.Obj());
-
- // Parse a pipeline stage from 'bsonObj'.
- uassert(16435,
- "A pipeline stage specification object must contain exactly one field.",
- bsonObj.nFields() == 1);
- BSONElement stageSpec = bsonObj.firstElement();
- const char* stageName = stageSpec.fieldName();
-
- // Create a DocumentSource pipeline stage from 'stageSpec'.
- StageDesc key;
- key.pName = stageName;
- const StageDesc* pDesc =
- (const StageDesc*)bsearch(&key, stageDesc, nStageDesc, sizeof(StageDesc), stageDescCmp);
-
- uassert(16436,
- str::stream() << "Unrecognized pipeline stage name: '" << stageName << "'",
- pDesc);
- intrusive_ptr<DocumentSource> stage = pDesc->pFactory(stageSpec, pCtx);
- verify(stage);
- sources.push_back(stage);
+
+ sources.push_back(DocumentSource::parse(pCtx, pipeElement.Obj()));
// TODO find a good general way to check stages that must be first syntactically
- if (dynamic_cast<DocumentSourceOut*>(stage.get())) {
+ if (dynamic_cast<DocumentSourceOut*>(sources.back().get())) {
uassert(16991, "$out can only be the final stage in the pipeline", iStep == nSteps - 1);
}
}