/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/exec/group.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/client_basic.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set_common.h"
namespace mongo {
using std::auto_ptr;
using std::vector;
namespace {
// Helper function that extracts the group key from a BSONObj.
Status getKey(const BSONObj& obj,
const BSONObj& keyPattern,
ScriptingFunction func,
Scope* s,
BSONObj* key) {
if (func) {
BSONObjBuilder b(obj.objsize() + 32);
b.append("0", obj);
const BSONObj& k = b.obj();
int res = s->invoke(func, &k, 0);
if (res != 0) {
return Status(ErrorCodes::BadValue,
str::stream() << "invoke failed in $keyf: " << s->getError());
}
int type = s->type("__returnValue");
if (type != Object) {
return Status(ErrorCodes::BadValue, "return of $key has to be an object");
}
*key = s->getObject("__returnValue");
return Status::OK();
}
*key = obj.extractFields(keyPattern, true).getOwned();
return Status::OK();
}
} // namespace
// static
const char* GroupStage::kStageType = "GROUP";
GroupStage::GroupStage(OperationContext* txn,
const GroupRequest& request,
WorkingSet* workingSet,
PlanStage* child)
: _txn(txn),
_request(request),
_ws(workingSet),
_commonStats(kStageType),
_specificStats(),
_child(child),
_groupState(GroupState_Initializing),
_reduceFunction(0),
_keyFunction(0) {}
void GroupStage::initGroupScripting() {
// Initialize _scope.
const std::string userToken =
ClientBasic::getCurrent()->getAuthorizationSession()
->getAuthenticatedUserNamesToken();
const NamespaceString nss(_request.ns);
_scope = globalScriptEngine->getPooledScope(_txn, nss.db().toString(), "group" + userToken);
if (!_request.reduceScope.isEmpty()) {
_scope->init(&_request.reduceScope);
}
_scope->setObject("$initial", _request.initial, true);
_scope->exec("$reduce = " + _request.reduceCode, "$group reduce setup", false, true, true,
2 * 1000);
_scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000);
// Initialize _reduceFunction.
_reduceFunction = _scope->createFunction("function(){ "
" if ( $arr[n] == null ){ "
" next = {}; "
" Object.extend( next , $key ); "
" Object.extend( next , $initial , true ); "
" $arr[n] = next; "
" next = null; "
" } "
" $reduce( obj , $arr[n] ); "
"}");
// Initialize _keyFunction, if a key function was provided.
if (_request.keyFunctionCode.size()) {
_keyFunction = _scope->createFunction(_request.keyFunctionCode.c_str());
}
}
Status GroupStage::processObject(const BSONObj& obj) {
BSONObj key;
Status getKeyStatus = getKey(obj, _request.keyPattern, _keyFunction, _scope.get(),
&key);
if (!getKeyStatus.isOK()) {
return getKeyStatus;
}
int& n = _groupMap[key];
if (n == 0) {
n = _groupMap.size();
_scope->setObject("$key", key, true);
if (n > 20000) {
return Status(ErrorCodes::BadValue,
"group() can't handle more than 20000 unique keys");
}
}
_scope->setObject("obj", obj, true);
_scope->setNumber("n", n - 1);
if (_scope->invoke(_reduceFunction, 0, 0, 0, true)) {
return Status(ErrorCodes::BadValue,
str::stream() << "reduce invoke failed: " << _scope->getError());
}
return Status::OK();
}
BSONObj GroupStage::finalizeResults() {
if (!_request.finalize.empty()) {
_scope->exec("$finalize = " + _request.finalize, "$group finalize define", false,
true, true, 2 * 1000);
ScriptingFunction finalizeFunction =
_scope->createFunction("function(){ "
" for(var i=0; i < $arr.length; i++){ "
" var ret = $finalize($arr[i]); "
" if (ret !== undefined) "
" $arr[i] = ret; "
" } "
"}");
_scope->invoke(finalizeFunction, 0, 0, 0, true);
}
_specificStats.nGroups = _groupMap.size();
BSONObj results = _scope->getObject("$arr").getOwned();
_scope->exec("$arr = [];", "$group reduce setup 2", false, true, true, 2 * 1000);
_scope->gc();
return results;
}
PlanStage::StageState GroupStage::work(WorkingSetID* out) {
++_commonStats.works;
ScopedTimer timer(&_commonStats.executionTimeMillis);
if (isEOF()) { return PlanStage::IS_EOF; }
// On the first call to work(), call initGroupScripting().
if (_groupState == GroupState_Initializing) {
initGroupScripting();
_groupState = GroupState_ReadingFromChild;
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
// Otherwise, read from our child.
invariant(_groupState == GroupState_ReadingFromChild);
WorkingSetID id = WorkingSet::INVALID_ID;
StageState state = _child->work(&id);
if (PlanStage::NEED_TIME == state) {
++_commonStats.needTime;
return state;
}
else if (PlanStage::NEED_YIELD == state) {
++_commonStats.needYield;
*out = id;
return state;
}
else if (PlanStage::FAILURE == state) {
*out = id;
// If a stage fails, it may create a status WSM to indicate why it failed, in which
// case 'id' is valid. If ID is invalid, we create our own error message.
if (WorkingSet::INVALID_ID == id) {
const std::string errmsg = "group stage failed to read in results from child";
*out = WorkingSetCommon::allocateStatusMember(_ws,
Status(ErrorCodes::InternalError,
errmsg));
}
return state;
}
else if (PlanStage::DEAD == state) {
return state;
}
else if (PlanStage::ADVANCED == state) {
WorkingSetMember* member = _ws->get(id);
// Group queries can't have projections. This means that covering analysis will always
// add a fetch. We should always get fetched data, and never just key data.
invariant(member->hasObj());
Status status = processObject(member->obj.value());
if (!status.isOK()) {
*out = WorkingSetCommon::allocateStatusMember(_ws, status);
return PlanStage::FAILURE;
}
_ws->free(id);
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
else {
// We're done reading from our child.
invariant(PlanStage::IS_EOF == state);
// Transition to state "done." Future calls to work() will return IS_EOF.
_groupState = GroupState_Done;
BSONObj results = finalizeResults();
*out = _ws->allocate();
WorkingSetMember* member = _ws->get(*out);
member->obj = Snapshotted(SnapshotId(), results);
member->state = WorkingSetMember::OWNED_OBJ;
++_commonStats.advanced;
return PlanStage::ADVANCED;
}
}
bool GroupStage::isEOF() {
return _groupState == GroupState_Done;
}
void GroupStage::saveState() {
_txn = NULL;
++_commonStats.yields;
_child->saveState();
}
void GroupStage::restoreState(OperationContext* opCtx) {
invariant(_txn == NULL);
_txn = opCtx;
++_commonStats.unyields;
_child->restoreState(opCtx);
}
void GroupStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
++_commonStats.invalidates;
_child->invalidate(txn, dl, type);
}
vector GroupStage::getChildren() const {
vector children;
children.push_back(_child.get());
return children;
}
PlanStageStats* GroupStage::getStats() {
_commonStats.isEOF = isEOF();
auto_ptr ret(new PlanStageStats(_commonStats, STAGE_GROUP));
GroupStats* groupStats = new GroupStats(_specificStats);
ret->specific.reset(groupStats);
ret->children.push_back(_child->getStats());
return ret.release();
}
const CommonStats* GroupStage::getCommonStats() {
return &_commonStats;
}
const SpecificStats* GroupStage::getSpecificStats() {
return &_specificStats;
}
} // namespace mongo