summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/group.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/group.cpp')
-rw-r--r--src/mongo/db/exec/group.cpp459
1 files changed, 227 insertions, 232 deletions
diff --git a/src/mongo/db/exec/group.cpp b/src/mongo/db/exec/group.cpp
index 68b21928b62..433b4802cdf 100644
--- a/src/mongo/db/exec/group.cpp
+++ b/src/mongo/db/exec/group.cpp
@@ -38,266 +38,261 @@
namespace mongo {
- using std::unique_ptr;
- using std::vector;
-
- namespace {
-
- // Helper function that extracts the group key from a BSONObj.
- Status getKey(const BSONObj& obj,
- const BSONObj& keyPattern,
- ScriptingFunction func,
- Scope* s,
- BSONObj* key) {
- if (func) {
- BSONObjBuilder b(obj.objsize() + 32);
- b.append("0", obj);
- const BSONObj& k = b.obj();
- int res = s->invoke(func, &k, 0);
- if (res != 0) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "invoke failed in $keyf: " << s->getError());
- }
- int type = s->type("__returnValue");
- if (type != Object) {
- return Status(ErrorCodes::BadValue, "return of $key has to be an object");
- }
- *key = s->getObject("__returnValue");
- return Status::OK();
- }
- *key = obj.extractFields(keyPattern, true).getOwned();
- return Status::OK();
+using std::unique_ptr;
+using std::vector;
+
+namespace {
+
+// Helper function that extracts the group key from a BSONObj.
+Status getKey(
+ const BSONObj& obj, const BSONObj& keyPattern, ScriptingFunction func, Scope* s, BSONObj* key) {
+ if (func) {
+ BSONObjBuilder b(obj.objsize() + 32);
+ b.append("0", obj);
+ const BSONObj& k = b.obj();
+ int res = s->invoke(func, &k, 0);
+ if (res != 0) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "invoke failed in $keyf: " << s->getError());
}
-
- } // namespace
-
- // static
- const char* GroupStage::kStageType = "GROUP";
-
- GroupStage::GroupStage(OperationContext* txn,
- const GroupRequest& request,
- WorkingSet* workingSet,
- PlanStage* child)
- : _txn(txn),
- _request(request),
- _ws(workingSet),
- _commonStats(kStageType),
- _specificStats(),
- _child(child),
- _groupState(GroupState_Initializing),
- _reduceFunction(0),
- _keyFunction(0) {}
-
- void GroupStage::initGroupScripting() {
- // Initialize _scope.
- const std::string userToken =
- AuthorizationSession::get(ClientBasic::getCurrent())
- ->getAuthenticatedUserNamesToken();
-
- const NamespaceString nss(_request.ns);
- _scope = globalScriptEngine->getPooledScope(_txn, nss.db().toString(), "group" + userToken);
- if (!_request.reduceScope.isEmpty()) {
- _scope->init(&_request.reduceScope);
- }
- _scope->setObject("$initial", _request.initial, true);
- _scope->exec("$reduce = " + _request.reduceCode, "$group reduce setup", false, true, true,
- 2 * 1000);
- _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000);
-
- // Initialize _reduceFunction.
- _reduceFunction = _scope->createFunction("function(){ "
- " if ( $arr[n] == null ){ "
- " next = {}; "
- " Object.extend( next , $key ); "
- " Object.extend( next , $initial , true ); "
- " $arr[n] = next; "
- " next = null; "
- " } "
- " $reduce( obj , $arr[n] ); "
- "}");
-
- // Initialize _keyFunction, if a key function was provided.
- if (_request.keyFunctionCode.size()) {
- _keyFunction = _scope->createFunction(_request.keyFunctionCode.c_str());
+ int type = s->type("__returnValue");
+ if (type != Object) {
+ return Status(ErrorCodes::BadValue, "return of $key has to be an object");
}
+ *key = s->getObject("__returnValue");
+ return Status::OK();
}
+ *key = obj.extractFields(keyPattern, true).getOwned();
+ return Status::OK();
+}
+
+} // namespace
+
+// static
+const char* GroupStage::kStageType = "GROUP";
+
+GroupStage::GroupStage(OperationContext* txn,
+ const GroupRequest& request,
+ WorkingSet* workingSet,
+ PlanStage* child)
+ : _txn(txn),
+ _request(request),
+ _ws(workingSet),
+ _commonStats(kStageType),
+ _specificStats(),
+ _child(child),
+ _groupState(GroupState_Initializing),
+ _reduceFunction(0),
+ _keyFunction(0) {}
+
+void GroupStage::initGroupScripting() {
+ // Initialize _scope.
+ const std::string userToken =
+ AuthorizationSession::get(ClientBasic::getCurrent())->getAuthenticatedUserNamesToken();
+
+ const NamespaceString nss(_request.ns);
+ _scope = globalScriptEngine->getPooledScope(_txn, nss.db().toString(), "group" + userToken);
+ if (!_request.reduceScope.isEmpty()) {
+ _scope->init(&_request.reduceScope);
+ }
+ _scope->setObject("$initial", _request.initial, true);
+ _scope->exec(
+ "$reduce = " + _request.reduceCode, "$group reduce setup", false, true, true, 2 * 1000);
+ _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000);
+
+ // Initialize _reduceFunction.
+ _reduceFunction = _scope->createFunction(
+ "function(){ "
+ " if ( $arr[n] == null ){ "
+ " next = {}; "
+ " Object.extend( next , $key ); "
+ " Object.extend( next , $initial , true ); "
+ " $arr[n] = next; "
+ " next = null; "
+ " } "
+ " $reduce( obj , $arr[n] ); "
+ "}");
+
+ // Initialize _keyFunction, if a key function was provided.
+ if (_request.keyFunctionCode.size()) {
+ _keyFunction = _scope->createFunction(_request.keyFunctionCode.c_str());
+ }
+}
- Status GroupStage::processObject(const BSONObj& obj) {
- BSONObj key;
- Status getKeyStatus = getKey(obj, _request.keyPattern, _keyFunction, _scope.get(),
- &key);
- if (!getKeyStatus.isOK()) {
- return getKeyStatus;
- }
-
- int& n = _groupMap[key];
- if (n == 0) {
- n = _groupMap.size();
- _scope->setObject("$key", key, true);
- if (n > 20000) {
- return Status(ErrorCodes::BadValue,
- "group() can't handle more than 20000 unique keys");
- }
- }
-
- _scope->setObject("obj", obj, true);
- _scope->setNumber("n", n - 1);
- if (_scope->invoke(_reduceFunction, 0, 0, 0, true)) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "reduce invoke failed: " << _scope->getError());
- }
-
- return Status::OK();
+Status GroupStage::processObject(const BSONObj& obj) {
+ BSONObj key;
+ Status getKeyStatus = getKey(obj, _request.keyPattern, _keyFunction, _scope.get(), &key);
+ if (!getKeyStatus.isOK()) {
+ return getKeyStatus;
}
- BSONObj GroupStage::finalizeResults() {
- if (!_request.finalize.empty()) {
- _scope->exec("$finalize = " + _request.finalize, "$group finalize define", false,
- true, true, 2 * 1000);
- ScriptingFunction finalizeFunction =
- _scope->createFunction("function(){ "
- " for(var i=0; i < $arr.length; i++){ "
- " var ret = $finalize($arr[i]); "
- " if (ret !== undefined) "
- " $arr[i] = ret; "
- " } "
- "}");
- _scope->invoke(finalizeFunction, 0, 0, 0, true);
+ int& n = _groupMap[key];
+ if (n == 0) {
+ n = _groupMap.size();
+ _scope->setObject("$key", key, true);
+ if (n > 20000) {
+ return Status(ErrorCodes::BadValue, "group() can't handle more than 20000 unique keys");
}
-
- _specificStats.nGroups = _groupMap.size();
-
- BSONObj results = _scope->getObject("$arr").getOwned();
-
- _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000);
- _scope->gc();
-
- return results;
}
- PlanStage::StageState GroupStage::work(WorkingSetID* out) {
- ++_commonStats.works;
+ _scope->setObject("obj", obj, true);
+ _scope->setNumber("n", n - 1);
+ if (_scope->invoke(_reduceFunction, 0, 0, 0, true)) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "reduce invoke failed: " << _scope->getError());
+ }
- ScopedTimer timer(&_commonStats.executionTimeMillis);
+ return Status::OK();
+}
- if (isEOF()) { return PlanStage::IS_EOF; }
+BSONObj GroupStage::finalizeResults() {
+ if (!_request.finalize.empty()) {
+ _scope->exec("$finalize = " + _request.finalize,
+ "$group finalize define",
+ false,
+ true,
+ true,
+ 2 * 1000);
+ ScriptingFunction finalizeFunction = _scope->createFunction(
+ "function(){ "
+ " for(var i=0; i < $arr.length; i++){ "
+ " var ret = $finalize($arr[i]); "
+ " if (ret !== undefined) "
+ " $arr[i] = ret; "
+ " } "
+ "}");
+ _scope->invoke(finalizeFunction, 0, 0, 0, true);
+ }
- // On the first call to work(), call initGroupScripting().
- if (_groupState == GroupState_Initializing) {
- initGroupScripting();
- _groupState = GroupState_ReadingFromChild;
- ++_commonStats.needTime;
- return PlanStage::NEED_TIME;
- }
+ _specificStats.nGroups = _groupMap.size();
- // Otherwise, read from our child.
- invariant(_groupState == GroupState_ReadingFromChild);
- WorkingSetID id = WorkingSet::INVALID_ID;
- StageState state = _child->work(&id);
+ BSONObj results = _scope->getObject("$arr").getOwned();
- if (PlanStage::NEED_TIME == state) {
- ++_commonStats.needTime;
- return state;
- }
- else if (PlanStage::NEED_YIELD == state) {
- ++_commonStats.needYield;
- *out = id;
- return state;
- }
- else if (PlanStage::FAILURE == state) {
- *out = id;
- // If a stage fails, it may create a status WSM to indicate why it failed, in which
- // case 'id' is valid. If ID is invalid, we create our own error message.
- if (WorkingSet::INVALID_ID == id) {
- const std::string errmsg = "group stage failed to read in results from child";
- *out = WorkingSetCommon::allocateStatusMember(_ws,
- Status(ErrorCodes::InternalError,
- errmsg));
- }
- return state;
- }
- else if (PlanStage::DEAD == state) {
- return state;
- }
- else if (PlanStage::ADVANCED == state) {
- WorkingSetMember* member = _ws->get(id);
- // Group queries can't have projections. This means that covering analysis will always
- // add a fetch. We should always get fetched data, and never just key data.
- invariant(member->hasObj());
-
- Status status = processObject(member->obj.value());
- if (!status.isOK()) {
- *out = WorkingSetCommon::allocateStatusMember(_ws, status);
- return PlanStage::FAILURE;
- }
-
- _ws->free(id);
-
- ++_commonStats.needTime;
- return PlanStage::NEED_TIME;
- }
- else {
- // We're done reading from our child.
- invariant(PlanStage::IS_EOF == state);
+ _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000);
+ _scope->gc();
- // Transition to state "done." Future calls to work() will return IS_EOF.
- _groupState = GroupState_Done;
+ return results;
+}
- BSONObj results = finalizeResults();
+PlanStage::StageState GroupStage::work(WorkingSetID* out) {
+ ++_commonStats.works;
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- member->obj = Snapshotted<BSONObj>(SnapshotId(), results);
- member->state = WorkingSetMember::OWNED_OBJ;
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
- ++_commonStats.advanced;
- return PlanStage::ADVANCED;
- }
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
}
- bool GroupStage::isEOF() {
- return _groupState == GroupState_Done;
+ // On the first call to work(), call initGroupScripting().
+ if (_groupState == GroupState_Initializing) {
+ initGroupScripting();
+ _groupState = GroupState_ReadingFromChild;
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
}
- void GroupStage::saveState() {
- _txn = NULL;
- ++_commonStats.yields;
- _child->saveState();
- }
+ // Otherwise, read from our child.
+ invariant(_groupState == GroupState_ReadingFromChild);
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ StageState state = _child->work(&id);
+
+ if (PlanStage::NEED_TIME == state) {
+ ++_commonStats.needTime;
+ return state;
+ } else if (PlanStage::NEED_YIELD == state) {
+ ++_commonStats.needYield;
+ *out = id;
+ return state;
+ } else if (PlanStage::FAILURE == state) {
+ *out = id;
+ // If a stage fails, it may create a status WSM to indicate why it failed, in which
+ // case 'id' is valid. If ID is invalid, we create our own error message.
+ if (WorkingSet::INVALID_ID == id) {
+ const std::string errmsg = "group stage failed to read in results from child";
+ *out = WorkingSetCommon::allocateStatusMember(
+ _ws, Status(ErrorCodes::InternalError, errmsg));
+ }
+ return state;
+ } else if (PlanStage::DEAD == state) {
+ return state;
+ } else if (PlanStage::ADVANCED == state) {
+ WorkingSetMember* member = _ws->get(id);
+ // Group queries can't have projections. This means that covering analysis will always
+ // add a fetch. We should always get fetched data, and never just key data.
+ invariant(member->hasObj());
+
+ Status status = processObject(member->obj.value());
+ if (!status.isOK()) {
+ *out = WorkingSetCommon::allocateStatusMember(_ws, status);
+ return PlanStage::FAILURE;
+ }
- void GroupStage::restoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
- ++_commonStats.unyields;
- _child->restoreState(opCtx);
- }
+ _ws->free(id);
- void GroupStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
- ++_commonStats.invalidates;
- _child->invalidate(txn, dl, type);
- }
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
+ } else {
+ // We're done reading from our child.
+ invariant(PlanStage::IS_EOF == state);
- vector<PlanStage*> GroupStage::getChildren() const {
- vector<PlanStage*> children;
- children.push_back(_child.get());
- return children;
- }
+ // Transition to state "done." Future calls to work() will return IS_EOF.
+ _groupState = GroupState_Done;
- PlanStageStats* GroupStage::getStats() {
- _commonStats.isEOF = isEOF();
- unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_GROUP));
- GroupStats* groupStats = new GroupStats(_specificStats);
- ret->specific.reset(groupStats);
- ret->children.push_back(_child->getStats());
- return ret.release();
- }
+ BSONObj results = finalizeResults();
- const CommonStats* GroupStage::getCommonStats() const {
- return &_commonStats;
- }
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->obj = Snapshotted<BSONObj>(SnapshotId(), results);
+ member->state = WorkingSetMember::OWNED_OBJ;
- const SpecificStats* GroupStage::getSpecificStats() const {
- return &_specificStats;
+ ++_commonStats.advanced;
+ return PlanStage::ADVANCED;
}
+}
+
+bool GroupStage::isEOF() {
+ return _groupState == GroupState_Done;
+}
+
+void GroupStage::saveState() {
+ _txn = NULL;
+ ++_commonStats.yields;
+ _child->saveState();
+}
+
+void GroupStage::restoreState(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ ++_commonStats.unyields;
+ _child->restoreState(opCtx);
+}
+
+void GroupStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ ++_commonStats.invalidates;
+ _child->invalidate(txn, dl, type);
+}
+
+vector<PlanStage*> GroupStage::getChildren() const {
+ vector<PlanStage*> children;
+ children.push_back(_child.get());
+ return children;
+}
+
+PlanStageStats* GroupStage::getStats() {
+ _commonStats.isEOF = isEOF();
+ unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_GROUP));
+ GroupStats* groupStats = new GroupStats(_specificStats);
+ ret->specific.reset(groupStats);
+ ret->children.push_back(_child->getStats());
+ return ret.release();
+}
+
+const CommonStats* GroupStage::getCommonStats() const {
+ return &_commonStats;
+}
+
+const SpecificStats* GroupStage::getSpecificStats() const {
+ return &_specificStats;
+}
} // namespace mongo