// parallel_collection_scan.cpp
/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/util/touch_pages.h"
namespace mongo {
class ParallelCollectionScanCmd : public Command {
public:
struct ExtentInfo {
ExtentInfo( DiskLoc dl, size_t s )
: diskLoc(dl), size(s) {
}
DiskLoc diskLoc;
size_t size;
};
// XXX: move this to the exec/ directory.
class MultiIteratorStage : public PlanStage {
public:
MultiIteratorStage(WorkingSet* ws, Collection* collection)
: _collection(collection),
_ws(ws) { }
~MultiIteratorStage() { }
// takes ownership of it
void addIterator(RecordIterator* it) {
_iterators.push_back(it);
}
virtual StageState work(WorkingSetID* out) {
if ( _collection == NULL )
return PlanStage::DEAD;
DiskLoc next = _advance();
if (next.isNull())
return PlanStage::IS_EOF;
*out = _ws->allocate();
WorkingSetMember* member = _ws->get(*out);
member->loc = next;
member->obj = _collection->docFor(next);
member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
return PlanStage::ADVANCED;
}
virtual bool isEOF() {
return _collection == NULL || _iterators.empty();
}
void kill() {
_collection = NULL;
_iterators.clear();
}
virtual void prepareToYield() {
for (size_t i = 0; i < _iterators.size(); i++) {
_iterators[i]->prepareToYield();
}
}
virtual void recoverFromYield(OperationContext* opCtx) {
for (size_t i = 0; i < _iterators.size(); i++) {
if (!_iterators[i]->recoverFromYield()) {
kill();
}
}
}
virtual void invalidate(const DiskLoc& dl, InvalidationType type) {
switch ( type ) {
case INVALIDATION_DELETION:
for (size_t i = 0; i < _iterators.size(); i++) {
_iterators[i]->invalidate(dl);
}
break;
case INVALIDATION_MUTATION:
// no-op
break;
}
}
//
// These should not be used.
//
virtual PlanStageStats* getStats() { return NULL; }
virtual CommonStats* getCommonStats() { return NULL; }
virtual SpecificStats* getSpecificStats() { return NULL; }
virtual std::vector getChildren() const {
vector empty;
return empty;
}
virtual StageType stageType() const { return STAGE_MULTI_ITERATOR; }
private:
/**
* @return if more data
*/
DiskLoc _advance() {
while (!_iterators.empty()) {
DiskLoc out = _iterators.back()->getNext();
if (!out.isNull())
return out;
_iterators.popAndDeleteBack();
}
return DiskLoc();
}
Collection* _collection;
OwnedPointerVector _iterators;
// Not owned by us.
WorkingSet* _ws;
};
// ------------------------------------------------
ParallelCollectionScanCmd() : Command( "parallelCollectionScan" ){}
virtual bool isWriteCommandForConfigServer() const { return false; }
virtual bool slaveOk() const { return true; }
virtual Status checkAuthForCommand(ClientBasic* client,
const std::string& dbname,
const BSONObj& cmdObj) {
ActionSet actions;
actions.addAction(ActionType::find);
Privilege p(parseResourcePattern(dbname, cmdObj), actions);
if ( client->getAuthorizationSession()->isAuthorizedForPrivilege(p) )
return Status::OK();
return Status(ErrorCodes::Unauthorized, "Unauthorized");
}
virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int options,
string& errmsg, BSONObjBuilder& result,
bool fromRepl = false ) {
NamespaceString ns( dbname, cmdObj[name].String() );
Client::ReadContext ctx(txn, ns.ns());
Database* db = ctx.ctx().db();
Collection* collection = db->getCollection( txn, ns );
if ( !collection )
return appendCommandStatus( result,
Status( ErrorCodes::NamespaceNotFound,
str::stream() <<
"ns does not exist: " << ns.ns() ) );
size_t numCursors = static_cast( cmdObj["numCursors"].numberInt() );
if ( numCursors == 0 || numCursors > 10000 )
return appendCommandStatus( result,
Status( ErrorCodes::BadValue,
str::stream() <<
"numCursors has to be between 1 and 10000" <<
" was: " << numCursors ) );
OwnedPointerVector iterators(collection->getManyIterators(txn));
if (iterators.size() < numCursors) {
numCursors = iterators.size();
}
OwnedPointerVector execs;
for ( size_t i = 0; i < numCursors; i++ ) {
WorkingSet* ws = new WorkingSet();
MultiIteratorStage* mis = new MultiIteratorStage(ws, collection);
// Takes ownership of 'ws' and 'mis'.
execs.push_back(new PlanExecutor(ws, mis, collection));
}
// transfer iterators to executors using a round-robin distribution.
// TODO consider using a common work queue once invalidation issues go away.
for (size_t i = 0; i < iterators.size(); i++) {
PlanExecutor* theExec = execs[i % execs.size()];
MultiIteratorStage* mis = static_cast(theExec->getStages());
mis->addIterator(iterators.releaseAt(i));
}
{
BSONArrayBuilder bucketsBuilder;
for (size_t i = 0; i < execs.size(); i++) {
// transfer ownership of an executor to the ClientCursor (which manages its own
// lifetime).
ClientCursor* cc = new ClientCursor( collection, execs.releaseAt(i) );
// we are mimicking the aggregation cursor output here
// that is why there are ns, ok and empty firstBatch
BSONObjBuilder threadResult;
{
BSONObjBuilder cursor;
cursor.appendArray( "firstBatch", BSONObj() );
cursor.append( "ns", ns );
cursor.append( "id", cc->cursorid() );
threadResult.append( "cursor", cursor.obj() );
}
threadResult.appendBool( "ok", 1 );
bucketsBuilder.append( threadResult.obj() );
}
result.appendArray( "cursors", bucketsBuilder.obj() );
}
return true;
}
} parallelCollectionScanCmd;
}