diff options
Diffstat (limited to 'src/mongo/db/exec/group.cpp')
-rw-r--r-- | src/mongo/db/exec/group.cpp | 459 |
1 files changed, 227 insertions, 232 deletions
diff --git a/src/mongo/db/exec/group.cpp b/src/mongo/db/exec/group.cpp index 68b21928b62..433b4802cdf 100644 --- a/src/mongo/db/exec/group.cpp +++ b/src/mongo/db/exec/group.cpp @@ -38,266 +38,261 @@ namespace mongo { - using std::unique_ptr; - using std::vector; - - namespace { - - // Helper function that extracts the group key from a BSONObj. - Status getKey(const BSONObj& obj, - const BSONObj& keyPattern, - ScriptingFunction func, - Scope* s, - BSONObj* key) { - if (func) { - BSONObjBuilder b(obj.objsize() + 32); - b.append("0", obj); - const BSONObj& k = b.obj(); - int res = s->invoke(func, &k, 0); - if (res != 0) { - return Status(ErrorCodes::BadValue, - str::stream() << "invoke failed in $keyf: " << s->getError()); - } - int type = s->type("__returnValue"); - if (type != Object) { - return Status(ErrorCodes::BadValue, "return of $key has to be an object"); - } - *key = s->getObject("__returnValue"); - return Status::OK(); - } - *key = obj.extractFields(keyPattern, true).getOwned(); - return Status::OK(); +using std::unique_ptr; +using std::vector; + +namespace { + +// Helper function that extracts the group key from a BSONObj. +Status getKey( + const BSONObj& obj, const BSONObj& keyPattern, ScriptingFunction func, Scope* s, BSONObj* key) { + if (func) { + BSONObjBuilder b(obj.objsize() + 32); + b.append("0", obj); + const BSONObj& k = b.obj(); + int res = s->invoke(func, &k, 0); + if (res != 0) { + return Status(ErrorCodes::BadValue, + str::stream() << "invoke failed in $keyf: " << s->getError()); } - - } // namespace - - // static - const char* GroupStage::kStageType = "GROUP"; - - GroupStage::GroupStage(OperationContext* txn, - const GroupRequest& request, - WorkingSet* workingSet, - PlanStage* child) - : _txn(txn), - _request(request), - _ws(workingSet), - _commonStats(kStageType), - _specificStats(), - _child(child), - _groupState(GroupState_Initializing), - _reduceFunction(0), - _keyFunction(0) {} - - void GroupStage::initGroupScripting() { - // Initialize _scope. - const std::string userToken = - AuthorizationSession::get(ClientBasic::getCurrent()) - ->getAuthenticatedUserNamesToken(); - - const NamespaceString nss(_request.ns); - _scope = globalScriptEngine->getPooledScope(_txn, nss.db().toString(), "group" + userToken); - if (!_request.reduceScope.isEmpty()) { - _scope->init(&_request.reduceScope); - } - _scope->setObject("$initial", _request.initial, true); - _scope->exec("$reduce = " + _request.reduceCode, "$group reduce setup", false, true, true, - 2 * 1000); - _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000); - - // Initialize _reduceFunction. - _reduceFunction = _scope->createFunction("function(){ " - " if ( $arr[n] == null ){ " - " next = {}; " - " Object.extend( next , $key ); " - " Object.extend( next , $initial , true ); " - " $arr[n] = next; " - " next = null; " - " } " - " $reduce( obj , $arr[n] ); " - "}"); - - // Initialize _keyFunction, if a key function was provided. - if (_request.keyFunctionCode.size()) { - _keyFunction = _scope->createFunction(_request.keyFunctionCode.c_str()); + int type = s->type("__returnValue"); + if (type != Object) { + return Status(ErrorCodes::BadValue, "return of $key has to be an object"); } + *key = s->getObject("__returnValue"); + return Status::OK(); } + *key = obj.extractFields(keyPattern, true).getOwned(); + return Status::OK(); +} + +} // namespace + +// static +const char* GroupStage::kStageType = "GROUP"; + +GroupStage::GroupStage(OperationContext* txn, + const GroupRequest& request, + WorkingSet* workingSet, + PlanStage* child) + : _txn(txn), + _request(request), + _ws(workingSet), + _commonStats(kStageType), + _specificStats(), + _child(child), + _groupState(GroupState_Initializing), + _reduceFunction(0), + _keyFunction(0) {} + +void GroupStage::initGroupScripting() { + // Initialize _scope. + const std::string userToken = + AuthorizationSession::get(ClientBasic::getCurrent())->getAuthenticatedUserNamesToken(); + + const NamespaceString nss(_request.ns); + _scope = globalScriptEngine->getPooledScope(_txn, nss.db().toString(), "group" + userToken); + if (!_request.reduceScope.isEmpty()) { + _scope->init(&_request.reduceScope); + } + _scope->setObject("$initial", _request.initial, true); + _scope->exec( + "$reduce = " + _request.reduceCode, "$group reduce setup", false, true, true, 2 * 1000); + _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000); + + // Initialize _reduceFunction. + _reduceFunction = _scope->createFunction( + "function(){ " + " if ( $arr[n] == null ){ " + " next = {}; " + " Object.extend( next , $key ); " + " Object.extend( next , $initial , true ); " + " $arr[n] = next; " + " next = null; " + " } " + " $reduce( obj , $arr[n] ); " + "}"); + + // Initialize _keyFunction, if a key function was provided. + if (_request.keyFunctionCode.size()) { + _keyFunction = _scope->createFunction(_request.keyFunctionCode.c_str()); + } +} - Status GroupStage::processObject(const BSONObj& obj) { - BSONObj key; - Status getKeyStatus = getKey(obj, _request.keyPattern, _keyFunction, _scope.get(), - &key); - if (!getKeyStatus.isOK()) { - return getKeyStatus; - } - - int& n = _groupMap[key]; - if (n == 0) { - n = _groupMap.size(); - _scope->setObject("$key", key, true); - if (n > 20000) { - return Status(ErrorCodes::BadValue, - "group() can't handle more than 20000 unique keys"); - } - } - - _scope->setObject("obj", obj, true); - _scope->setNumber("n", n - 1); - if (_scope->invoke(_reduceFunction, 0, 0, 0, true)) { - return Status(ErrorCodes::BadValue, - str::stream() << "reduce invoke failed: " << _scope->getError()); - } - - return Status::OK(); +Status GroupStage::processObject(const BSONObj& obj) { + BSONObj key; + Status getKeyStatus = getKey(obj, _request.keyPattern, _keyFunction, _scope.get(), &key); + if (!getKeyStatus.isOK()) { + return getKeyStatus; } - BSONObj GroupStage::finalizeResults() { - if (!_request.finalize.empty()) { - _scope->exec("$finalize = " + _request.finalize, "$group finalize define", false, - true, true, 2 * 1000); - ScriptingFunction finalizeFunction = - _scope->createFunction("function(){ " - " for(var i=0; i < $arr.length; i++){ " - " var ret = $finalize($arr[i]); " - " if (ret !== undefined) " - " $arr[i] = ret; " - " } " - "}"); - _scope->invoke(finalizeFunction, 0, 0, 0, true); + int& n = _groupMap[key]; + if (n == 0) { + n = _groupMap.size(); + _scope->setObject("$key", key, true); + if (n > 20000) { + return Status(ErrorCodes::BadValue, "group() can't handle more than 20000 unique keys"); } - - _specificStats.nGroups = _groupMap.size(); - - BSONObj results = _scope->getObject("$arr").getOwned(); - - _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000); - _scope->gc(); - - return results; } - PlanStage::StageState GroupStage::work(WorkingSetID* out) { - ++_commonStats.works; + _scope->setObject("obj", obj, true); + _scope->setNumber("n", n - 1); + if (_scope->invoke(_reduceFunction, 0, 0, 0, true)) { + return Status(ErrorCodes::BadValue, + str::stream() << "reduce invoke failed: " << _scope->getError()); + } - ScopedTimer timer(&_commonStats.executionTimeMillis); + return Status::OK(); +} - if (isEOF()) { return PlanStage::IS_EOF; } +BSONObj GroupStage::finalizeResults() { + if (!_request.finalize.empty()) { + _scope->exec("$finalize = " + _request.finalize, + "$group finalize define", + false, + true, + true, + 2 * 1000); + ScriptingFunction finalizeFunction = _scope->createFunction( + "function(){ " + " for(var i=0; i < $arr.length; i++){ " + " var ret = $finalize($arr[i]); " + " if (ret !== undefined) " + " $arr[i] = ret; " + " } " + "}"); + _scope->invoke(finalizeFunction, 0, 0, 0, true); + } - // On the first call to work(), call initGroupScripting(). - if (_groupState == GroupState_Initializing) { - initGroupScripting(); - _groupState = GroupState_ReadingFromChild; - ++_commonStats.needTime; - return PlanStage::NEED_TIME; - } + _specificStats.nGroups = _groupMap.size(); - // Otherwise, read from our child. - invariant(_groupState == GroupState_ReadingFromChild); - WorkingSetID id = WorkingSet::INVALID_ID; - StageState state = _child->work(&id); + BSONObj results = _scope->getObject("$arr").getOwned(); - if (PlanStage::NEED_TIME == state) { - ++_commonStats.needTime; - return state; - } - else if (PlanStage::NEED_YIELD == state) { - ++_commonStats.needYield; - *out = id; - return state; - } - else if (PlanStage::FAILURE == state) { - *out = id; - // If a stage fails, it may create a status WSM to indicate why it failed, in which - // case 'id' is valid. If ID is invalid, we create our own error message. - if (WorkingSet::INVALID_ID == id) { - const std::string errmsg = "group stage failed to read in results from child"; - *out = WorkingSetCommon::allocateStatusMember(_ws, - Status(ErrorCodes::InternalError, - errmsg)); - } - return state; - } - else if (PlanStage::DEAD == state) { - return state; - } - else if (PlanStage::ADVANCED == state) { - WorkingSetMember* member = _ws->get(id); - // Group queries can't have projections. This means that covering analysis will always - // add a fetch. We should always get fetched data, and never just key data. - invariant(member->hasObj()); - - Status status = processObject(member->obj.value()); - if (!status.isOK()) { - *out = WorkingSetCommon::allocateStatusMember(_ws, status); - return PlanStage::FAILURE; - } - - _ws->free(id); - - ++_commonStats.needTime; - return PlanStage::NEED_TIME; - } - else { - // We're done reading from our child. - invariant(PlanStage::IS_EOF == state); + _scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000); + _scope->gc(); - // Transition to state "done." Future calls to work() will return IS_EOF. - _groupState = GroupState_Done; + return results; +} - BSONObj results = finalizeResults(); +PlanStage::StageState GroupStage::work(WorkingSetID* out) { + ++_commonStats.works; - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - member->obj = Snapshotted<BSONObj>(SnapshotId(), results); - member->state = WorkingSetMember::OWNED_OBJ; + ScopedTimer timer(&_commonStats.executionTimeMillis); - ++_commonStats.advanced; - return PlanStage::ADVANCED; - } + if (isEOF()) { + return PlanStage::IS_EOF; } - bool GroupStage::isEOF() { - return _groupState == GroupState_Done; + // On the first call to work(), call initGroupScripting(). + if (_groupState == GroupState_Initializing) { + initGroupScripting(); + _groupState = GroupState_ReadingFromChild; + ++_commonStats.needTime; + return PlanStage::NEED_TIME; } - void GroupStage::saveState() { - _txn = NULL; - ++_commonStats.yields; - _child->saveState(); - } + // Otherwise, read from our child. + invariant(_groupState == GroupState_ReadingFromChild); + WorkingSetID id = WorkingSet::INVALID_ID; + StageState state = _child->work(&id); + + if (PlanStage::NEED_TIME == state) { + ++_commonStats.needTime; + return state; + } else if (PlanStage::NEED_YIELD == state) { + ++_commonStats.needYield; + *out = id; + return state; + } else if (PlanStage::FAILURE == state) { + *out = id; + // If a stage fails, it may create a status WSM to indicate why it failed, in which + // case 'id' is valid. If ID is invalid, we create our own error message. + if (WorkingSet::INVALID_ID == id) { + const std::string errmsg = "group stage failed to read in results from child"; + *out = WorkingSetCommon::allocateStatusMember( + _ws, Status(ErrorCodes::InternalError, errmsg)); + } + return state; + } else if (PlanStage::DEAD == state) { + return state; + } else if (PlanStage::ADVANCED == state) { + WorkingSetMember* member = _ws->get(id); + // Group queries can't have projections. This means that covering analysis will always + // add a fetch. We should always get fetched data, and never just key data. + invariant(member->hasObj()); + + Status status = processObject(member->obj.value()); + if (!status.isOK()) { + *out = WorkingSetCommon::allocateStatusMember(_ws, status); + return PlanStage::FAILURE; + } - void GroupStage::restoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - ++_commonStats.unyields; - _child->restoreState(opCtx); - } + _ws->free(id); - void GroupStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { - ++_commonStats.invalidates; - _child->invalidate(txn, dl, type); - } + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } else { + // We're done reading from our child. + invariant(PlanStage::IS_EOF == state); - vector<PlanStage*> GroupStage::getChildren() const { - vector<PlanStage*> children; - children.push_back(_child.get()); - return children; - } + // Transition to state "done." Future calls to work() will return IS_EOF. + _groupState = GroupState_Done; - PlanStageStats* GroupStage::getStats() { - _commonStats.isEOF = isEOF(); - unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_GROUP)); - GroupStats* groupStats = new GroupStats(_specificStats); - ret->specific.reset(groupStats); - ret->children.push_back(_child->getStats()); - return ret.release(); - } + BSONObj results = finalizeResults(); - const CommonStats* GroupStage::getCommonStats() const { - return &_commonStats; - } + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = Snapshotted<BSONObj>(SnapshotId(), results); + member->state = WorkingSetMember::OWNED_OBJ; - const SpecificStats* GroupStage::getSpecificStats() const { - return &_specificStats; + ++_commonStats.advanced; + return PlanStage::ADVANCED; } +} + +bool GroupStage::isEOF() { + return _groupState == GroupState_Done; +} + +void GroupStage::saveState() { + _txn = NULL; + ++_commonStats.yields; + _child->saveState(); +} + +void GroupStage::restoreState(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + ++_commonStats.unyields; + _child->restoreState(opCtx); +} + +void GroupStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { + ++_commonStats.invalidates; + _child->invalidate(txn, dl, type); +} + +vector<PlanStage*> GroupStage::getChildren() const { + vector<PlanStage*> children; + children.push_back(_child.get()); + return children; +} + +PlanStageStats* GroupStage::getStats() { + _commonStats.isEOF = isEOF(); + unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_GROUP)); + GroupStats* groupStats = new GroupStats(_specificStats); + ret->specific.reset(groupStats); + ret->children.push_back(_child->getStats()); + return ret.release(); +} + +const CommonStats* GroupStage::getCommonStats() const { + return &_commonStats; +} + +const SpecificStats* GroupStage::getSpecificStats() const { + return &_specificStats; +} } // namespace mongo |