summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/mr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r--src/mongo/db/commands/mr.cpp2894
1 files changed, 1422 insertions, 1472 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index d88f05f733f..59eca8ae4c4 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -71,882 +71,848 @@
namespace mongo {
- using std::endl;
- using std::set;
- using std::shared_ptr;
- using std::string;
- using std::stringstream;
- using std::unique_ptr;
- using std::vector;
+using std::endl;
+using std::set;
+using std::shared_ptr;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
+using std::vector;
- namespace mr {
+namespace mr {
- AtomicUInt32 Config::JOB_NUMBER;
+AtomicUInt32 Config::JOB_NUMBER;
- JSFunction::JSFunction( const std::string& type , const BSONElement& e ) {
- _type = type;
- _code = e._asCode();
+JSFunction::JSFunction(const std::string& type, const BSONElement& e) {
+ _type = type;
+ _code = e._asCode();
- if ( e.type() == CodeWScope )
- _wantedScope = e.codeWScopeObject();
- }
+ if (e.type() == CodeWScope)
+ _wantedScope = e.codeWScopeObject();
+}
- void JSFunction::init( State * state ) {
- _scope = state->scope();
- verify( _scope );
- _scope->init( &_wantedScope );
+void JSFunction::init(State* state) {
+ _scope = state->scope();
+ verify(_scope);
+ _scope->init(&_wantedScope);
- _func = _scope->createFunction( _code.c_str() );
- uassert( 13598 , str::stream() << "couldn't compile code for: " << _type , _func );
+ _func = _scope->createFunction(_code.c_str());
+ uassert(13598, str::stream() << "couldn't compile code for: " << _type, _func);
- // install in JS scope so that it can be called in JS mode
- _scope->setFunction(_type.c_str(), _code.c_str());
- }
+ // install in JS scope so that it can be called in JS mode
+ _scope->setFunction(_type.c_str(), _code.c_str());
+}
- void JSMapper::init( State * state ) {
- _func.init( state );
- _params = state->config().mapParams;
- }
+void JSMapper::init(State* state) {
+ _func.init(state);
+ _params = state->config().mapParams;
+}
- /**
- * Applies the map function to an object, which should internally call emit()
- */
- void JSMapper::map( const BSONObj& o ) {
- Scope * s = _func.scope();
- verify( s );
- if (s->invoke(_func.func(), &_params, &o, 0, true))
- uasserted(9014, str::stream() << "map invoke failed: " << s->getError());
- }
+/**
+ * Applies the map function to an object, which should internally call emit()
+ */
+void JSMapper::map(const BSONObj& o) {
+ Scope* s = _func.scope();
+ verify(s);
+ if (s->invoke(_func.func(), &_params, &o, 0, true))
+ uasserted(9014, str::stream() << "map invoke failed: " << s->getError());
+}
- /**
- * Applies the finalize function to a tuple obj (key, val)
- * Returns tuple obj {_id: key, value: newval}
- */
- BSONObj JSFinalizer::finalize( const BSONObj& o ) {
- Scope * s = _func.scope();
-
- Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" );
- s->invokeSafe( _func.func() , &o, 0 );
-
- // don't want to use o.objsize() to size b
- // since there are many cases where the point of finalize
- // is converting many fields to 1
- BSONObjBuilder b;
- b.append( o.firstElement() );
- s->append( b , "value" , "__returnValue" );
- return b.obj();
- }
+/**
+ * Applies the finalize function to a tuple obj (key, val)
+ * Returns tuple obj {_id: key, value: newval}
+ */
+BSONObj JSFinalizer::finalize(const BSONObj& o) {
+ Scope* s = _func.scope();
+
+ Scope::NoDBAccess no = s->disableDBAccess("can't access db inside finalize");
+ s->invokeSafe(_func.func(), &o, 0);
+
+ // don't want to use o.objsize() to size b
+ // since there are many cases where the point of finalize
+ // is converting many fields to 1
+ BSONObjBuilder b;
+ b.append(o.firstElement());
+ s->append(b, "value", "__returnValue");
+ return b.obj();
+}
- void JSReducer::init( State * state ) {
- _func.init( state );
- }
+void JSReducer::init(State* state) {
+ _func.init(state);
+}
- /**
- * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value}
- */
- BSONObj JSReducer::reduce( const BSONList& tuples ) {
- if (tuples.size() <= 1)
- return tuples[0];
- BSONObj key;
- int endSizeEstimate = 16;
- _reduce( tuples , key , endSizeEstimate );
-
- BSONObjBuilder b(endSizeEstimate);
- b.appendAs( key.firstElement() , "0" );
- _func.scope()->append( b , "1" , "__returnValue" );
- return b.obj();
- }
+/**
+ * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value}
+ */
+BSONObj JSReducer::reduce(const BSONList& tuples) {
+ if (tuples.size() <= 1)
+ return tuples[0];
+ BSONObj key;
+ int endSizeEstimate = 16;
+ _reduce(tuples, key, endSizeEstimate);
+
+ BSONObjBuilder b(endSizeEstimate);
+ b.appendAs(key.firstElement(), "0");
+ _func.scope()->append(b, "1", "__returnValue");
+ return b.obj();
+}
- /**
- * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val}
- * Also applies a finalizer method if present.
- */
- BSONObj JSReducer::finalReduce( const BSONList& tuples , Finalizer * finalizer ) {
+/**
+ * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val}
+ * Also applies a finalizer method if present.
+ */
+BSONObj JSReducer::finalReduce(const BSONList& tuples, Finalizer* finalizer) {
+ BSONObj res;
+ BSONObj key;
+
+ if (tuples.size() == 1) {
+ // 1 obj, just use it
+ key = tuples[0];
+ BSONObjBuilder b(key.objsize());
+ BSONObjIterator it(key);
+ b.appendAs(it.next(), "_id");
+ b.appendAs(it.next(), "value");
+ res = b.obj();
+ } else {
+ // need to reduce
+ int endSizeEstimate = 16;
+ _reduce(tuples, key, endSizeEstimate);
+ BSONObjBuilder b(endSizeEstimate);
+ b.appendAs(key.firstElement(), "_id");
+ _func.scope()->append(b, "value", "__returnValue");
+ res = b.obj();
+ }
- BSONObj res;
- BSONObj key;
-
- if (tuples.size() == 1) {
- // 1 obj, just use it
- key = tuples[0];
- BSONObjBuilder b(key.objsize());
- BSONObjIterator it(key);
- b.appendAs( it.next() , "_id" );
- b.appendAs( it.next() , "value" );
- res = b.obj();
- }
- else {
- // need to reduce
- int endSizeEstimate = 16;
- _reduce( tuples , key , endSizeEstimate );
- BSONObjBuilder b(endSizeEstimate);
- b.appendAs( key.firstElement() , "_id" );
- _func.scope()->append( b , "value" , "__returnValue" );
- res = b.obj();
- }
+ if (finalizer) {
+ res = finalizer->finalize(res);
+ }
- if ( finalizer ) {
- res = finalizer->finalize( res );
- }
+ return res;
+}
- return res;
+/**
+ * actually applies a reduce, to a list of tuples (key, value).
+ * After the call, tuples will hold a single tuple {"0": key, "1": value}
+ */
+void JSReducer::_reduce(const BSONList& tuples, BSONObj& key, int& endSizeEstimate) {
+ uassert(10074, "need values", tuples.size());
+
+ int sizeEstimate = (tuples.size() * tuples.begin()->getField("value").size()) + 128;
+
+ // need to build the reduce args: ( key, [values] )
+ BSONObjBuilder reduceArgs(sizeEstimate);
+ std::unique_ptr<BSONArrayBuilder> valueBuilder;
+ unsigned n = 0;
+ for (; n < tuples.size(); n++) {
+ BSONObjIterator j(tuples[n]);
+ BSONElement keyE = j.next();
+ if (n == 0) {
+ reduceArgs.append(keyE);
+ key = keyE.wrap();
+ valueBuilder.reset(new BSONArrayBuilder(reduceArgs.subarrayStart("tuples")));
}
- /**
- * actually applies a reduce, to a list of tuples (key, value).
- * After the call, tuples will hold a single tuple {"0": key, "1": value}
- */
- void JSReducer::_reduce( const BSONList& tuples , BSONObj& key , int& endSizeEstimate ) {
- uassert( 10074 , "need values" , tuples.size() );
-
- int sizeEstimate = ( tuples.size() * tuples.begin()->getField( "value" ).size() ) + 128;
-
- // need to build the reduce args: ( key, [values] )
- BSONObjBuilder reduceArgs( sizeEstimate );
- std::unique_ptr<BSONArrayBuilder> valueBuilder;
- unsigned n = 0;
- for ( ; n<tuples.size(); n++ ) {
- BSONObjIterator j(tuples[n]);
- BSONElement keyE = j.next();
- if ( n == 0 ) {
- reduceArgs.append( keyE );
- key = keyE.wrap();
- valueBuilder.reset(new BSONArrayBuilder( reduceArgs.subarrayStart( "tuples" ) ));
- }
-
- BSONElement ee = j.next();
-
- uassert( 13070 , "value too large to reduce" , ee.size() < ( BSONObjMaxUserSize / 2 ) );
+ BSONElement ee = j.next();
- // If adding this element to the array would cause it to be too large, break. The
- // remainder of the tuples will be processed recursively at the end of this
- // function.
- if ( valueBuilder->len() + ee.size() > BSONObjMaxUserSize ) {
- verify( n > 1 ); // if not, inf. loop
- break;
- }
+ uassert(13070, "value too large to reduce", ee.size() < (BSONObjMaxUserSize / 2));
- valueBuilder->append( ee );
- }
- verify(valueBuilder);
- valueBuilder->done();
- BSONObj args = reduceArgs.obj();
+ // If adding this element to the array would cause it to be too large, break. The
+ // remainder of the tuples will be processed recursively at the end of this
+ // function.
+ if (valueBuilder->len() + ee.size() > BSONObjMaxUserSize) {
+ verify(n > 1); // if not, inf. loop
+ break;
+ }
- Scope * s = _func.scope();
+ valueBuilder->append(ee);
+ }
+ verify(valueBuilder);
+ valueBuilder->done();
+ BSONObj args = reduceArgs.obj();
- s->invokeSafe(_func.func(), &args, 0);
- ++numReduces;
+ Scope* s = _func.scope();
- if ( s->type( "__returnValue" ) == Array ) {
- uasserted( 10075 , "reduce -> multiple not supported yet");
- return;
- }
+ s->invokeSafe(_func.func(), &args, 0);
+ ++numReduces;
- endSizeEstimate = key.objsize() + ( args.objsize() / tuples.size() );
+ if (s->type("__returnValue") == Array) {
+ uasserted(10075, "reduce -> multiple not supported yet");
+ return;
+ }
- if ( n == tuples.size() )
- return;
+ endSizeEstimate = key.objsize() + (args.objsize() / tuples.size());
- // the input list was too large, add the rest of elmts to new tuples and reduce again
- // note: would be better to use loop instead of recursion to avoid stack overflow
- BSONList x;
- for ( ; n < tuples.size(); n++ ) {
- x.push_back( tuples[n] );
- }
- BSONObjBuilder temp( endSizeEstimate );
- temp.append( key.firstElement() );
- s->append( temp , "1" , "__returnValue" );
- x.push_back( temp.obj() );
- _reduce( x , key , endSizeEstimate );
- }
+ if (n == tuples.size())
+ return;
- Config::Config( const string& _dbname , const BSONObj& cmdObj )
- {
- dbname = _dbname;
- ns = dbname + "." + cmdObj.firstElement().valuestrsafe();
+ // the input list was too large, add the rest of elmts to new tuples and reduce again
+ // note: would be better to use loop instead of recursion to avoid stack overflow
+ BSONList x;
+ for (; n < tuples.size(); n++) {
+ x.push_back(tuples[n]);
+ }
+ BSONObjBuilder temp(endSizeEstimate);
+ temp.append(key.firstElement());
+ s->append(temp, "1", "__returnValue");
+ x.push_back(temp.obj());
+ _reduce(x, key, endSizeEstimate);
+}
- verbose = cmdObj["verbose"].trueValue();
- jsMode = cmdObj["jsMode"].trueValue();
- splitInfo = 0;
+Config::Config(const string& _dbname, const BSONObj& cmdObj) {
+ dbname = _dbname;
+ ns = dbname + "." + cmdObj.firstElement().valuestrsafe();
- if (cmdObj.hasField("splitInfo")) {
- splitInfo = cmdObj["splitInfo"].Int();
- }
+ verbose = cmdObj["verbose"].trueValue();
+ jsMode = cmdObj["jsMode"].trueValue();
+ splitInfo = 0;
- jsMaxKeys = 500000;
- reduceTriggerRatio = 10.0;
- maxInMemSize = 500 * 1024;
+ if (cmdObj.hasField("splitInfo")) {
+ splitInfo = cmdObj["splitInfo"].Int();
+ }
- uassert( 13602 , "outType is no longer a valid option" , cmdObj["outType"].eoo() );
+ jsMaxKeys = 500000;
+ reduceTriggerRatio = 10.0;
+ maxInMemSize = 500 * 1024;
- outputOptions = parseOutputOptions(dbname, cmdObj);
+ uassert(13602, "outType is no longer a valid option", cmdObj["outType"].eoo());
- shardedFirstPass = false;
- if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()){
- massert(16054,
- "shardedFirstPass should only use replace outType",
- outputOptions.outType == REPLACE);
- shardedFirstPass = true;
- }
+ outputOptions = parseOutputOptions(dbname, cmdObj);
- if ( outputOptions.outType != INMEMORY ) { // setup temp collection name
- tempNamespace = str::stream()
- << (outputOptions.outDB.empty() ? dbname : outputOptions.outDB)
- << ".tmp.mr."
- << cmdObj.firstElement().String()
- << "_"
- << JOB_NUMBER.fetchAndAdd(1);
- incLong = tempNamespace + "_inc";
- }
+ shardedFirstPass = false;
+ if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()) {
+ massert(16054,
+ "shardedFirstPass should only use replace outType",
+ outputOptions.outType == REPLACE);
+ shardedFirstPass = true;
+ }
- {
- // scope and code
+ if (outputOptions.outType != INMEMORY) { // setup temp collection name
+ tempNamespace = str::stream()
+ << (outputOptions.outDB.empty() ? dbname : outputOptions.outDB) << ".tmp.mr."
+ << cmdObj.firstElement().String() << "_" << JOB_NUMBER.fetchAndAdd(1);
+ incLong = tempNamespace + "_inc";
+ }
- if ( cmdObj["scope"].type() == Object )
- scopeSetup = cmdObj["scope"].embeddedObjectUserCheck();
+ {
+ // scope and code
- mapper.reset( new JSMapper( cmdObj["map"] ) );
- reducer.reset( new JSReducer( cmdObj["reduce"] ) );
- if ( cmdObj["finalize"].type() && cmdObj["finalize"].trueValue() )
- finalizer.reset( new JSFinalizer( cmdObj["finalize"] ) );
+ if (cmdObj["scope"].type() == Object)
+ scopeSetup = cmdObj["scope"].embeddedObjectUserCheck();
- if ( cmdObj["mapparams"].type() == Array ) {
- mapParams = cmdObj["mapparams"].embeddedObjectUserCheck();
- }
+ mapper.reset(new JSMapper(cmdObj["map"]));
+ reducer.reset(new JSReducer(cmdObj["reduce"]));
+ if (cmdObj["finalize"].type() && cmdObj["finalize"].trueValue())
+ finalizer.reset(new JSFinalizer(cmdObj["finalize"]));
- }
+ if (cmdObj["mapparams"].type() == Array) {
+ mapParams = cmdObj["mapparams"].embeddedObjectUserCheck();
+ }
+ }
- {
- // query options
- BSONElement q = cmdObj["query"];
- if ( q.type() == Object )
- filter = q.embeddedObjectUserCheck();
- else
- uassert( 13608 , "query has to be blank or an Object" , ! q.trueValue() );
+ {
+ // query options
+ BSONElement q = cmdObj["query"];
+ if (q.type() == Object)
+ filter = q.embeddedObjectUserCheck();
+ else
+ uassert(13608, "query has to be blank or an Object", !q.trueValue());
+
+
+ BSONElement s = cmdObj["sort"];
+ if (s.type() == Object)
+ sort = s.embeddedObjectUserCheck();
+ else
+ uassert(13609, "sort has to be blank or an Object", !s.trueValue());
+
+ if (cmdObj["limit"].isNumber())
+ limit = cmdObj["limit"].numberLong();
+ else
+ limit = 0;
+ }
+}
+/**
+ * Clean up the temporary and incremental collections
+ */
+void State::dropTempCollections() {
+ _db.dropCollection(_config.tempNamespace);
+ // Always forget about temporary namespaces, so we don't cache lots of them
+ ShardConnection::forgetNS(_config.tempNamespace);
+ if (_useIncremental) {
+ // We don't want to log the deletion of incLong as it isn't replicated. While
+ // harmless, this would lead to a scary looking warning on the secondaries.
+ bool shouldReplicateWrites = _txn->writesAreReplicated();
+ _txn->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
+
+ ScopedTransaction scopedXact(_txn, MODE_IX);
+ Lock::DBLock lk(_txn->lockState(), nsToDatabaseSubstring(_config.incLong), MODE_X);
+ if (Database* db = dbHolder().get(_txn, _config.incLong)) {
+ WriteUnitOfWork wunit(_txn);
+ db->dropCollection(_txn, _config.incLong);
+ wunit.commit();
+ }
- BSONElement s = cmdObj["sort"];
- if ( s.type() == Object )
- sort = s.embeddedObjectUserCheck();
- else
- uassert( 13609 , "sort has to be blank or an Object" , ! s.trueValue() );
+ ShardConnection::forgetNS(_config.incLong);
+ }
+}
- if ( cmdObj["limit"].isNumber() )
- limit = cmdObj["limit"].numberLong();
- else
- limit = 0;
- }
+/**
+ * Create temporary collection, set up indexes
+ */
+void State::prepTempCollection() {
+ if (!_onDisk)
+ return;
+
+ dropTempCollections();
+ if (_useIncremental) {
+ // Create the inc collection and make sure we have index on "0" key.
+ // Intentionally not replicating the inc collection to secondaries.
+ bool shouldReplicateWrites = _txn->writesAreReplicated();
+ _txn->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
+
+ OldClientWriteContext incCtx(_txn, _config.incLong);
+ WriteUnitOfWork wuow(_txn);
+ Collection* incColl = incCtx.getCollection();
+ invariant(!incColl);
+
+ CollectionOptions options;
+ options.setNoIdIndex();
+ options.temp = true;
+ incColl = incCtx.db()->createCollection(_txn, _config.incLong, options);
+ invariant(incColl);
+
+ BSONObj indexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong << "name"
+ << "_temp_0");
+ Status status = incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, indexSpec);
+ if (!status.isOK()) {
+ uasserted(17305,
+ str::stream() << "createIndex failed for mr incLong ns: " << _config.incLong
+ << " err: " << status.code());
}
+ wuow.commit();
+ }
- /**
- * Clean up the temporary and incremental collections
- */
- void State::dropTempCollections() {
- _db.dropCollection(_config.tempNamespace);
- // Always forget about temporary namespaces, so we don't cache lots of them
- ShardConnection::forgetNS( _config.tempNamespace );
- if (_useIncremental) {
- // We don't want to log the deletion of incLong as it isn't replicated. While
- // harmless, this would lead to a scary looking warning on the secondaries.
- bool shouldReplicateWrites = _txn->writesAreReplicated();
- _txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
-
- ScopedTransaction scopedXact(_txn, MODE_IX);
- Lock::DBLock lk(_txn->lockState(),
- nsToDatabaseSubstring(_config.incLong),
- MODE_X);
- if (Database* db = dbHolder().get(_txn, _config.incLong)) {
- WriteUnitOfWork wunit(_txn);
- db->dropCollection(_txn, _config.incLong);
- wunit.commit();
+ CollectionOptions finalOptions;
+ vector<BSONObj> indexesToInsert;
+
+ {
+ // copy indexes and collection options into temporary storage
+ OldClientWriteContext finalCtx(_txn, _config.outputOptions.finalNamespace);
+ Collection* const finalColl = finalCtx.getCollection();
+ if (finalColl) {
+ finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_txn);
+
+ IndexCatalog::IndexIterator ii =
+ finalColl->getIndexCatalog()->getIndexIterator(_txn, true);
+ // Iterate over finalColl's indexes.
+ while (ii.more()) {
+ IndexDescriptor* currIndex = ii.next();
+ BSONObjBuilder b;
+ b.append("ns", _config.tempNamespace);
+
+ // Copy over contents of the index descriptor's infoObj.
+ BSONObjIterator j(currIndex->infoObj());
+ while (j.more()) {
+ BSONElement e = j.next();
+ if (str::equals(e.fieldName(), "_id") || str::equals(e.fieldName(), "ns"))
+ continue;
+ b.append(e);
}
-
- ShardConnection::forgetNS( _config.incLong );
+ indexesToInsert.push_back(b.obj());
}
-
}
+ }
- /**
- * Create temporary collection, set up indexes
- */
- void State::prepTempCollection() {
- if ( ! _onDisk )
- return;
-
- dropTempCollections();
- if (_useIncremental) {
- // Create the inc collection and make sure we have index on "0" key.
- // Intentionally not replicating the inc collection to secondaries.
- bool shouldReplicateWrites = _txn->writesAreReplicated();
- _txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
-
- OldClientWriteContext incCtx(_txn, _config.incLong);
- WriteUnitOfWork wuow(_txn);
- Collection* incColl = incCtx.getCollection();
- invariant(!incColl);
-
- CollectionOptions options;
- options.setNoIdIndex();
- options.temp = true;
- incColl = incCtx.db()->createCollection(_txn, _config.incLong, options);
- invariant(incColl);
-
- BSONObj indexSpec = BSON( "key" << BSON( "0" << 1 ) << "ns" << _config.incLong
- << "name" << "_temp_0" );
- Status status = incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn,
- indexSpec);
- if ( !status.isOK() ) {
- uasserted( 17305 , str::stream() << "createIndex failed for mr incLong ns: " <<
- _config.incLong << " err: " << status.code() );
+ {
+ // create temp collection and insert the indexes from temporary storage
+ OldClientWriteContext tempCtx(_txn, _config.tempNamespace);
+ WriteUnitOfWork wuow(_txn);
+ NamespaceString tempNss(_config.tempNamespace);
+ uassert(ErrorCodes::NotMaster,
+ "no longer master",
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(tempNss));
+ Collection* tempColl = tempCtx.getCollection();
+ invariant(!tempColl);
+
+ CollectionOptions options = finalOptions;
+ options.temp = true;
+ tempColl = tempCtx.db()->createCollection(_txn, _config.tempNamespace, options);
+
+ for (vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end();
+ ++it) {
+ Status status = tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, *it);
+ if (!status.isOK()) {
+ if (status.code() == ErrorCodes::IndexAlreadyExists) {
+ continue;
}
- wuow.commit();
+ uassertStatusOK(status);
}
+ // Log the createIndex operation.
+ string logNs = nsToDatabase(_config.tempNamespace) + ".system.indexes";
+ getGlobalServiceContext()->getOpObserver()->onCreateIndex(_txn, logNs, *it);
+ }
+ wuow.commit();
+ }
+}
- CollectionOptions finalOptions;
- vector<BSONObj> indexesToInsert;
-
- {
- // copy indexes and collection options into temporary storage
- OldClientWriteContext finalCtx(_txn, _config.outputOptions.finalNamespace);
- Collection* const finalColl = finalCtx.getCollection();
- if ( finalColl ) {
- finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_txn);
-
- IndexCatalog::IndexIterator ii =
- finalColl->getIndexCatalog()->getIndexIterator( _txn, true );
- // Iterate over finalColl's indexes.
- while ( ii.more() ) {
- IndexDescriptor* currIndex = ii.next();
- BSONObjBuilder b;
- b.append( "ns" , _config.tempNamespace );
-
- // Copy over contents of the index descriptor's infoObj.
- BSONObjIterator j( currIndex->infoObj() );
- while ( j.more() ) {
- BSONElement e = j.next();
- if ( str::equals( e.fieldName() , "_id" ) ||
- str::equals( e.fieldName() , "ns" ) )
- continue;
- b.append( e );
- }
- indexesToInsert.push_back( b.obj() );
- }
- }
- }
+/**
+ * For inline mode, appends results to output object.
+ * Makes sure (key, value) tuple is formatted as {_id: key, value: val}
+ */
+void State::appendResults(BSONObjBuilder& final) {
+ if (_onDisk) {
+ if (!_config.outputOptions.outDB.empty()) {
+ BSONObjBuilder loc;
+ if (!_config.outputOptions.outDB.empty())
+ loc.append("db", _config.outputOptions.outDB);
+ if (!_config.outputOptions.collectionName.empty())
+ loc.append("collection", _config.outputOptions.collectionName);
+ final.append("result", loc.obj());
+ } else {
+ if (!_config.outputOptions.collectionName.empty())
+ final.append("result", _config.outputOptions.collectionName);
+ }
- {
- // create temp collection and insert the indexes from temporary storage
- OldClientWriteContext tempCtx(_txn, _config.tempNamespace);
- WriteUnitOfWork wuow(_txn);
- NamespaceString tempNss(_config.tempNamespace);
- uassert(ErrorCodes::NotMaster, "no longer master",
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(tempNss));
- Collection* tempColl = tempCtx.getCollection();
- invariant(!tempColl);
-
- CollectionOptions options = finalOptions;
- options.temp = true;
- tempColl = tempCtx.db()->createCollection(_txn, _config.tempNamespace, options);
-
- for ( vector<BSONObj>::iterator it = indexesToInsert.begin();
- it != indexesToInsert.end(); ++it ) {
- Status status =
- tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, *it);
- if (!status.isOK()) {
- if (status.code() == ErrorCodes::IndexAlreadyExists) {
- continue;
- }
- uassertStatusOK(status);
- }
- // Log the createIndex operation.
- string logNs = nsToDatabase( _config.tempNamespace ) + ".system.indexes";
- getGlobalServiceContext()->getOpObserver()->onCreateIndex(_txn, logNs, *it);
- }
- wuow.commit();
+ if (_config.splitInfo > 0) {
+ // add split points, used for shard
+ BSONObj res;
+ BSONObj idKey = BSON("_id" << 1);
+ if (!_db.runCommand("admin",
+ BSON("splitVector" << _config.outputOptions.finalNamespace
+ << "keyPattern" << idKey << "maxChunkSizeBytes"
+ << _config.splitInfo),
+ res)) {
+ uasserted(15921, str::stream() << "splitVector failed: " << res);
}
-
+ if (res.hasField("splitKeys"))
+ final.append(res.getField("splitKeys"));
}
+ return;
+ }
- /**
- * For inline mode, appends results to output object.
- * Makes sure (key, value) tuple is formatted as {_id: key, value: val}
- */
- void State::appendResults( BSONObjBuilder& final ) {
- if ( _onDisk ) {
- if (!_config.outputOptions.outDB.empty()) {
- BSONObjBuilder loc;
- if ( !_config.outputOptions.outDB.empty())
- loc.append( "db" , _config.outputOptions.outDB );
- if ( !_config.outputOptions.collectionName.empty() )
- loc.append( "collection" , _config.outputOptions.collectionName );
- final.append("result", loc.obj());
- }
- else {
- if ( !_config.outputOptions.collectionName.empty() )
- final.append( "result" , _config.outputOptions.collectionName );
- }
+ if (_jsMode) {
+ ScriptingFunction getResult = _scope->createFunction(
+ "var map = _mrMap;"
+ "var result = [];"
+ "for (key in map) {"
+ " result.push({_id: key, value: map[key]});"
+ "}"
+ "return result;");
+ _scope->invoke(getResult, 0, 0, 0, false);
+ BSONObj obj = _scope->getObject("__returnValue");
+ final.append("results", BSONArray(obj));
+ return;
+ }
- if ( _config.splitInfo > 0 ) {
- // add split points, used for shard
- BSONObj res;
- BSONObj idKey = BSON( "_id" << 1 );
- if (!_db.runCommand("admin",
- BSON("splitVector" << _config.outputOptions.finalNamespace
- << "keyPattern" << idKey
- << "maxChunkSizeBytes" << _config.splitInfo),
- res)) {
- uasserted( 15921 , str::stream() << "splitVector failed: " << res );
- }
- if ( res.hasField( "splitKeys" ) )
- final.append( res.getField( "splitKeys" ) );
- }
- return;
- }
+ uassert(13604, "too much data for in memory map/reduce", _size < BSONObjMaxUserSize);
- if (_jsMode) {
- ScriptingFunction getResult = _scope->createFunction(
- "var map = _mrMap;"
- "var result = [];"
- "for (key in map) {"
- " result.push({_id: key, value: map[key]});"
- "}"
- "return result;");
- _scope->invoke(getResult, 0, 0, 0, false);
- BSONObj obj = _scope->getObject("__returnValue");
- final.append("results", BSONArray(obj));
- return;
- }
+ BSONArrayBuilder b((int)(_size * 1.2)); // _size is data size, doesn't count overhead and keys
- uassert( 13604 , "too much data for in memory map/reduce" , _size < BSONObjMaxUserSize );
+ for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) {
+ BSONObj key = i->first;
+ BSONList& all = i->second;
- BSONArrayBuilder b( (int)(_size * 1.2) ); // _size is data size, doesn't count overhead and keys
+ verify(all.size() == 1);
- for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) {
- BSONObj key = i->first;
- BSONList& all = i->second;
+ BSONObjIterator vi(all[0]);
+ vi.next();
- verify( all.size() == 1 );
+ BSONObjBuilder temp(b.subobjStart());
+ temp.appendAs(key.firstElement(), "_id");
+ temp.appendAs(vi.next(), "value");
+ temp.done();
+ }
- BSONObjIterator vi( all[0] );
- vi.next();
+ BSONArray res = b.arr();
+ final.append("results", res);
+}
- BSONObjBuilder temp( b.subobjStart() );
- temp.appendAs( key.firstElement() , "_id" );
- temp.appendAs( vi.next() , "value" );
- temp.done();
- }
+/**
+ * Does post processing on output collection.
+ * This may involve replacing, merging or reducing.
+ */
+long long State::postProcessCollection(OperationContext* txn, CurOp* op, ProgressMeterHolder& pm) {
+ if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY)
+ return numInMemKeys();
- BSONArray res = b.arr();
- final.append( "results" , res );
- }
+ if (_config.outputOptions.outNonAtomic)
+ return postProcessCollectionNonAtomic(txn, op, pm);
- /**
- * Does post processing on output collection.
- * This may involve replacing, merging or reducing.
- */
- long long State::postProcessCollection(
- OperationContext* txn, CurOp* op, ProgressMeterHolder& pm) {
+ invariant(!txn->lockState()->isLocked());
- if ( _onDisk == false || _config.outputOptions.outType == Config::INMEMORY )
- return numInMemKeys();
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lock(
+ txn->lockState()); // TODO(erh): this is how it was, but seems it doesn't need to be global
+ return postProcessCollectionNonAtomic(txn, op, pm);
+}
- if (_config.outputOptions.outNonAtomic)
- return postProcessCollectionNonAtomic(txn, op, pm);
+//
+// For SERVER-6116 - can't handle version errors in count currently
+//
- invariant( !txn->lockState()->isLocked() );
+/**
+ * Runs count and disables version errors.
+ *
+ * TODO: make count work with versioning
+ */
+unsigned long long _safeCount(Client* client,
+ // Can't be const b/c count isn't
+ /* const */ DBDirectClient& db,
+ const string& ns,
+ const BSONObj& query = BSONObj(),
+ int options = 0,
+ int limit = 0,
+ int skip = 0) {
+ ShardForceVersionOkModeBlock ignoreVersion(client); // ignore versioning here
+ return db.count(ns, query, options, limit, skip);
+}
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lock(txn->lockState()); // TODO(erh): this is how it was, but seems it doesn't need to be global
- return postProcessCollectionNonAtomic(txn, op, pm);
+//
+// End SERVER-6116
+//
+
+long long State::postProcessCollectionNonAtomic(OperationContext* txn,
+ CurOp* op,
+ ProgressMeterHolder& pm) {
+ auto client = txn->getClient();
+
+ if (_config.outputOptions.finalNamespace == _config.tempNamespace)
+ return _safeCount(client, _db, _config.outputOptions.finalNamespace);
+
+ if (_config.outputOptions.outType == Config::REPLACE ||
+ _safeCount(client, _db, _config.outputOptions.finalNamespace) == 0) {
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lock(txn->lockState()); // TODO(erh): why global???
+ // replace: just rename from temp to final collection name, dropping previous collection
+ _db.dropCollection(_config.outputOptions.finalNamespace);
+ BSONObj info;
+
+ if (!_db.runCommand("admin",
+ BSON("renameCollection" << _config.tempNamespace << "to"
+ << _config.outputOptions.finalNamespace
+ << "stayTemp" << _config.shardedFirstPass),
+ info)) {
+ uasserted(10076, str::stream() << "rename failed: " << info);
}
- //
- // For SERVER-6116 - can't handle version errors in count currently
- //
-
- /**
- * Runs count and disables version errors.
- *
- * TODO: make count work with versioning
- */
- unsigned long long _safeCount( Client* client,
- // Can't be const b/c count isn't
- /* const */ DBDirectClient& db,
- const string &ns,
- const BSONObj& query = BSONObj(),
- int options = 0,
- int limit = 0,
- int skip = 0 )
+ _db.dropCollection(_config.tempNamespace);
+ } else if (_config.outputOptions.outType == Config::MERGE) {
+ // merge: upsert new docs into old collection
{
- ShardForceVersionOkModeBlock ignoreVersion(client); // ignore versioning here
- return db.count( ns, query, options, limit, skip );
+ const auto count = _safeCount(client, _db, _config.tempNamespace, BSONObj());
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ op->setMessage_inlock(
+ "m/r: merge post processing", "M/R Merge Post Processing Progress", count);
}
+ unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj());
+ while (cursor->more()) {
+ ScopedTransaction scopedXact(_txn, MODE_IX);
+ Lock::DBLock lock(_txn->lockState(),
+ nsToDatabaseSubstring(_config.outputOptions.finalNamespace),
+ MODE_X);
+ BSONObj o = cursor->nextSafe();
+ Helpers::upsert(_txn, _config.outputOptions.finalNamespace, o);
+ pm.hit();
+ }
+ _db.dropCollection(_config.tempNamespace);
+ pm.finished();
+ } else if (_config.outputOptions.outType == Config::REDUCE) {
+ // reduce: apply reduce op on new result and existing one
+ BSONList values;
- //
- // End SERVER-6116
- //
-
- long long State::postProcessCollectionNonAtomic(
- OperationContext* txn, CurOp* op, ProgressMeterHolder& pm) {
-
- auto client = txn->getClient();
-
- if ( _config.outputOptions.finalNamespace == _config.tempNamespace )
- return _safeCount( client, _db, _config.outputOptions.finalNamespace );
-
- if (_config.outputOptions.outType == Config::REPLACE ||
- _safeCount(client, _db, _config.outputOptions.finalNamespace) == 0) {
-
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lock(txn->lockState()); // TODO(erh): why global???
- // replace: just rename from temp to final collection name, dropping previous collection
- _db.dropCollection( _config.outputOptions.finalNamespace );
- BSONObj info;
+ {
+ const auto count = _safeCount(client, _db, _config.tempNamespace, BSONObj());
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ op->setMessage_inlock(
+ "m/r: reduce post processing", "M/R Reduce Post Processing Progress", count);
+ }
+ unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj());
+ while (cursor->more()) {
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lock(txn->lockState()); // TODO(erh) why global?
+ BSONObj temp = cursor->nextSafe();
+ BSONObj old;
- if ( ! _db.runCommand( "admin"
- , BSON( "renameCollection" << _config.tempNamespace <<
- "to" << _config.outputOptions.finalNamespace <<
- "stayTemp" << _config.shardedFirstPass )
- , info ) ) {
- uasserted( 10076 , str::stream() << "rename failed: " << info );
- }
-
- _db.dropCollection( _config.tempNamespace );
- }
- else if ( _config.outputOptions.outType == Config::MERGE ) {
- // merge: upsert new docs into old collection
- {
- const auto count = _safeCount(client, _db, _config.tempNamespace, BSONObj());
- stdx::lock_guard<Client> lk(*txn->getClient());
- op->setMessage_inlock("m/r: merge post processing",
- "M/R Merge Post Processing Progress",
- count);
- }
- unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace , BSONObj());
- while (cursor->more()) {
- ScopedTransaction scopedXact(_txn, MODE_IX);
- Lock::DBLock lock(_txn->lockState(),
- nsToDatabaseSubstring(_config.outputOptions.finalNamespace),
- MODE_X);
- BSONObj o = cursor->nextSafe();
- Helpers::upsert( _txn, _config.outputOptions.finalNamespace , o );
- pm.hit();
- }
- _db.dropCollection( _config.tempNamespace );
- pm.finished();
+ bool found;
+ {
+ const std::string& finalNamespace = _config.outputOptions.finalNamespace;
+ OldClientContext tx(txn, finalNamespace);
+ Collection* coll = getCollectionOrUassert(tx.db(), finalNamespace);
+ found = Helpers::findOne(_txn, coll, temp["_id"].wrap(), old, true);
}
- else if ( _config.outputOptions.outType == Config::REDUCE ) {
- // reduce: apply reduce op on new result and existing one
- BSONList values;
-
- {
- const auto count = _safeCount(client, _db, _config.tempNamespace, BSONObj());
- stdx::lock_guard<Client> lk(*txn->getClient());
- op->setMessage_inlock("m/r: reduce post processing",
- "M/R Reduce Post Processing Progress",
- count);
- }
- unique_ptr<DBClientCursor> cursor = _db.query( _config.tempNamespace , BSONObj() );
- while ( cursor->more() ) {
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lock(txn->lockState()); // TODO(erh) why global?
- BSONObj temp = cursor->nextSafe();
- BSONObj old;
-
- bool found;
- {
- const std::string& finalNamespace = _config.outputOptions.finalNamespace;
- OldClientContext tx(txn, finalNamespace);
- Collection* coll = getCollectionOrUassert(tx.db(), finalNamespace);
- found = Helpers::findOne(_txn,
- coll,
- temp["_id"].wrap(),
- old,
- true);
- }
- if ( found ) {
- // need to reduce
- values.clear();
- values.push_back( temp );
- values.push_back( old );
- Helpers::upsert(_txn,
- _config.outputOptions.finalNamespace,
- _config.reducer->finalReduce(values,
- _config.finalizer.get()));
- }
- else {
- Helpers::upsert( _txn, _config.outputOptions.finalNamespace , temp );
- }
- pm.hit();
- }
- pm.finished();
+ if (found) {
+ // need to reduce
+ values.clear();
+ values.push_back(temp);
+ values.push_back(old);
+ Helpers::upsert(_txn,
+ _config.outputOptions.finalNamespace,
+ _config.reducer->finalReduce(values, _config.finalizer.get()));
+ } else {
+ Helpers::upsert(_txn, _config.outputOptions.finalNamespace, temp);
}
-
- return _safeCount( txn->getClient(), _db, _config.outputOptions.finalNamespace );
+ pm.hit();
}
+ pm.finished();
+ }
- /**
- * Insert doc in collection. This should be replicated.
- */
- void State::insert( const string& ns , const BSONObj& o ) {
- verify( _onDisk );
+ return _safeCount(txn->getClient(), _db, _config.outputOptions.finalNamespace);
+}
+/**
+ * Insert doc in collection. This should be replicated.
+ */
+void State::insert(const string& ns, const BSONObj& o) {
+ verify(_onDisk);
- OldClientWriteContext ctx(_txn, ns );
- WriteUnitOfWork wuow(_txn);
- NamespaceString nss(ns);
- uassert(ErrorCodes::NotMaster, "no longer master",
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss));
- Collection* coll = getCollectionOrUassert(ctx.db(), ns);
- BSONObjBuilder b;
- if ( !o.hasField( "_id" ) ) {
- b.appendOID( "_id", NULL, true );
- }
- b.appendElements(o);
- BSONObj bo = b.obj();
+ OldClientWriteContext ctx(_txn, ns);
+ WriteUnitOfWork wuow(_txn);
+ NamespaceString nss(ns);
+ uassert(ErrorCodes::NotMaster,
+ "no longer master",
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss));
+ Collection* coll = getCollectionOrUassert(ctx.db(), ns);
- uassertStatusOK( coll->insertDocument( _txn, bo, true ).getStatus() );
- wuow.commit();
- }
+ BSONObjBuilder b;
+ if (!o.hasField("_id")) {
+ b.appendOID("_id", NULL, true);
+ }
+ b.appendElements(o);
+ BSONObj bo = b.obj();
- /**
- * Insert doc into the inc collection. This should not be replicated.
- */
- void State::_insertToInc( BSONObj& o ) {
- verify( _onDisk );
-
- OldClientWriteContext ctx(_txn, _config.incLong );
- WriteUnitOfWork wuow(_txn);
- Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
- bool shouldReplicateWrites = _txn->writesAreReplicated();
- _txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
- uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus());
- wuow.commit();
- }
+ uassertStatusOK(coll->insertDocument(_txn, bo, true).getStatus());
+ wuow.commit();
+}
- State::State(OperationContext* txn, const Config& c) :
- _config(c),
- _db(txn),
- _useIncremental(true),
- _txn(txn),
- _size(0),
- _dupCount(0),
- _numEmits(0) {
- _temp.reset( new InMemory() );
- _onDisk = _config.outputOptions.outType != Config::INMEMORY;
- }
+/**
+ * Insert doc into the inc collection. This should not be replicated.
+ */
+void State::_insertToInc(BSONObj& o) {
+ verify(_onDisk);
+
+ OldClientWriteContext ctx(_txn, _config.incLong);
+ WriteUnitOfWork wuow(_txn);
+ Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
+ bool shouldReplicateWrites = _txn->writesAreReplicated();
+ _txn->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
+ uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus());
+ wuow.commit();
+}
- bool State::sourceExists() {
- return _db.exists( _config.ns );
- }
+State::State(OperationContext* txn, const Config& c)
+ : _config(c), _db(txn), _useIncremental(true), _txn(txn), _size(0), _dupCount(0), _numEmits(0) {
+ _temp.reset(new InMemory());
+ _onDisk = _config.outputOptions.outType != Config::INMEMORY;
+}
- long long State::incomingDocuments() {
- return _safeCount( _txn->getClient(), _db, _config.ns , _config.filter , QueryOption_SlaveOk , (unsigned) _config.limit );
- }
+bool State::sourceExists() {
+ return _db.exists(_config.ns);
+}
- State::~State() {
- if ( _onDisk ) {
- try {
- dropTempCollections();
- }
- catch ( std::exception& e ) {
- error() << "couldn't cleanup after map reduce: " << e.what() << endl;
- }
- }
- if (_scope && !_scope->isKillPending() && _scope->getError().empty()) {
- // cleanup js objects
- try {
- ScriptingFunction cleanup =
- _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;");
- _scope->invoke(cleanup, 0, 0, 0, true);
- }
- catch (const DBException &) {
- // not important because properties will be reset if scope is reused
- LOG(1) << "MapReduce terminated during state destruction" << endl;
- }
- }
- }
+long long State::incomingDocuments() {
+ return _safeCount(_txn->getClient(),
+ _db,
+ _config.ns,
+ _config.filter,
+ QueryOption_SlaveOk,
+ (unsigned)_config.limit);
+}
- /**
- * Initialize the mapreduce operation, creating the inc collection
- */
- void State::init() {
- // setup js
- const string userToken = AuthorizationSession::get(ClientBasic::getCurrent())
- ->getAuthenticatedUserNamesToken();
- _scope.reset(globalScriptEngine->getPooledScope(
- _txn, _config.dbname, "mapreduce" + userToken).release());
-
- if ( ! _config.scopeSetup.isEmpty() )
- _scope->init( &_config.scopeSetup );
-
- _config.mapper->init( this );
- _config.reducer->init( this );
- if ( _config.finalizer )
- _config.finalizer->init( this );
- _scope->setBoolean("_doFinal", _config.finalizer.get() != 0);
-
- switchMode(_config.jsMode); // set up js-mode based on Config
-
- // global JS map/reduce hashmap
- // we use a standard JS object which means keys are only simple types
- // we could also add a real hashmap from a library and object comparison methods
- // for increased performance, we may want to look at v8 Harmony Map support
- // _scope->setObject("_mrMap", BSONObj(), false);
- ScriptingFunction init = _scope->createFunction(
- "_emitCt = 0;"
- "_keyCt = 0;"
- "_dupCt = 0;"
- "_redCt = 0;"
- "if (typeof(_mrMap) === 'undefined') {"
- " _mrMap = {};"
- "}");
- _scope->invoke(init, 0, 0, 0, true);
-
- // js function to run reduce on all keys
- // redfunc = _scope->createFunction("for (var key in hashmap) { print('Key is ' + key); list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };");
- _reduceAll = _scope->createFunction(
- "var map = _mrMap;"
- "var list, ret;"
- "for (var key in map) {"
- " list = map[key];"
- " if (list.length != 1) {"
- " ret = _reduce(key, list);"
- " map[key] = [ret];"
- " ++_redCt;"
- " }"
- "}"
- "_dupCt = 0;");
- massert(16717, "error initializing JavaScript reduceAll function",
- _reduceAll != 0);
-
- _reduceAndEmit = _scope->createFunction(
- "var map = _mrMap;"
- "var list, ret;"
- "for (var key in map) {"
- " list = map[key];"
- " if (list.length == 1)"
- " ret = list[0];"
- " else {"
- " ret = _reduce(key, list);"
- " ++_redCt;"
- " }"
- " emit(key, ret);"
- "}"
- "delete _mrMap;");
- massert(16718, "error initializing JavaScript reduce/emit function",
- _reduceAndEmit != 0);
-
- _reduceAndFinalize = _scope->createFunction(
- "var map = _mrMap;"
- "var list, ret;"
- "for (var key in map) {"
- " list = map[key];"
- " if (list.length == 1) {"
- " if (!_doFinal) { continue; }"
- " ret = list[0];"
- " }"
- " else {"
- " ret = _reduce(key, list);"
- " ++_redCt;"
- " }"
- " if (_doFinal)"
- " ret = _finalize(key, ret);"
- " map[key] = ret;"
- "}");
- massert(16719, "error creating JavaScript reduce/finalize function",
- _reduceAndFinalize != 0);
-
- _reduceAndFinalizeAndInsert = _scope->createFunction(
- "var map = _mrMap;"
- "var list, ret;"
- "for (var key in map) {"
- " list = map[key];"
- " if (list.length == 1)"
- " ret = list[0];"
- " else {"
- " ret = _reduce(key, list);"
- " ++_redCt;"
- " }"
- " if (_doFinal)"
- " ret = _finalize(key, ret);"
- " _nativeToTemp({_id: key, value: ret});"
- "}");
- massert(16720, "error initializing JavaScript functions",
- _reduceAndFinalizeAndInsert != 0);
+State::~State() {
+ if (_onDisk) {
+ try {
+ dropTempCollections();
+ } catch (std::exception& e) {
+ error() << "couldn't cleanup after map reduce: " << e.what() << endl;
}
-
- void State::switchMode(bool jsMode) {
- _jsMode = jsMode;
- if (jsMode) {
- // emit function that stays in JS
- _scope->setFunction("emit",
- "function(key, value) {"
- " if (typeof(key) === 'object') {"
- " _bailFromJS(key, value);"
- " return;"
- " }"
- " ++_emitCt;"
- " var map = _mrMap;"
- " var list = map[key];"
- " if (!list) {"
- " ++_keyCt;"
- " list = [];"
- " map[key] = list;"
- " }"
- " else"
- " ++_dupCt;"
- " list.push(value);"
- "}");
- _scope->injectNative("_bailFromJS", _bailFromJS, this);
- }
- else {
- // emit now populates C++ map
- _scope->injectNative( "emit" , fast_emit, this );
- }
+ }
+ if (_scope && !_scope->isKillPending() && _scope->getError().empty()) {
+ // cleanup js objects
+ try {
+ ScriptingFunction cleanup =
+ _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;");
+ _scope->invoke(cleanup, 0, 0, 0, true);
+ } catch (const DBException&) {
+ // not important because properties will be reset if scope is reused
+ LOG(1) << "MapReduce terminated during state destruction" << endl;
}
+ }
+}
- void State::bailFromJS() {
- LOG(1) << "M/R: Switching from JS mode to mixed mode" << endl;
+/**
+ * Initialize the mapreduce operation, creating the inc collection
+ */
+void State::init() {
+ // setup js
+ const string userToken =
+ AuthorizationSession::get(ClientBasic::getCurrent())->getAuthenticatedUserNamesToken();
+ _scope.reset(globalScriptEngine->getPooledScope(_txn, _config.dbname, "mapreduce" + userToken)
+ .release());
+
+ if (!_config.scopeSetup.isEmpty())
+ _scope->init(&_config.scopeSetup);
+
+ _config.mapper->init(this);
+ _config.reducer->init(this);
+ if (_config.finalizer)
+ _config.finalizer->init(this);
+ _scope->setBoolean("_doFinal", _config.finalizer.get() != 0);
+
+ switchMode(_config.jsMode); // set up js-mode based on Config
+
+ // global JS map/reduce hashmap
+ // we use a standard JS object which means keys are only simple types
+ // we could also add a real hashmap from a library and object comparison methods
+ // for increased performance, we may want to look at v8 Harmony Map support
+ // _scope->setObject("_mrMap", BSONObj(), false);
+ ScriptingFunction init = _scope->createFunction(
+ "_emitCt = 0;"
+ "_keyCt = 0;"
+ "_dupCt = 0;"
+ "_redCt = 0;"
+ "if (typeof(_mrMap) === 'undefined') {"
+ " _mrMap = {};"
+ "}");
+ _scope->invoke(init, 0, 0, 0, true);
+
+ // js function to run reduce on all keys
+ // redfunc = _scope->createFunction("for (var key in hashmap) { print('Key is ' + key); list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };");
+ _reduceAll = _scope->createFunction(
+ "var map = _mrMap;"
+ "var list, ret;"
+ "for (var key in map) {"
+ " list = map[key];"
+ " if (list.length != 1) {"
+ " ret = _reduce(key, list);"
+ " map[key] = [ret];"
+ " ++_redCt;"
+ " }"
+ "}"
+ "_dupCt = 0;");
+ massert(16717, "error initializing JavaScript reduceAll function", _reduceAll != 0);
+
+ _reduceAndEmit = _scope->createFunction(
+ "var map = _mrMap;"
+ "var list, ret;"
+ "for (var key in map) {"
+ " list = map[key];"
+ " if (list.length == 1)"
+ " ret = list[0];"
+ " else {"
+ " ret = _reduce(key, list);"
+ " ++_redCt;"
+ " }"
+ " emit(key, ret);"
+ "}"
+ "delete _mrMap;");
+ massert(16718, "error initializing JavaScript reduce/emit function", _reduceAndEmit != 0);
+
+ _reduceAndFinalize = _scope->createFunction(
+ "var map = _mrMap;"
+ "var list, ret;"
+ "for (var key in map) {"
+ " list = map[key];"
+ " if (list.length == 1) {"
+ " if (!_doFinal) { continue; }"
+ " ret = list[0];"
+ " }"
+ " else {"
+ " ret = _reduce(key, list);"
+ " ++_redCt;"
+ " }"
+ " if (_doFinal)"
+ " ret = _finalize(key, ret);"
+ " map[key] = ret;"
+ "}");
+ massert(16719, "error creating JavaScript reduce/finalize function", _reduceAndFinalize != 0);
+
+ _reduceAndFinalizeAndInsert = _scope->createFunction(
+ "var map = _mrMap;"
+ "var list, ret;"
+ "for (var key in map) {"
+ " list = map[key];"
+ " if (list.length == 1)"
+ " ret = list[0];"
+ " else {"
+ " ret = _reduce(key, list);"
+ " ++_redCt;"
+ " }"
+ " if (_doFinal)"
+ " ret = _finalize(key, ret);"
+ " _nativeToTemp({_id: key, value: ret});"
+ "}");
+ massert(16720, "error initializing JavaScript functions", _reduceAndFinalizeAndInsert != 0);
+}
- // reduce and reemit into c++
- switchMode(false);
- _scope->invoke(_reduceAndEmit, 0, 0, 0, true);
- // need to get the real number emitted so far
- _numEmits = _scope->getNumberInt("_emitCt");
- _config.reducer->numReduces = _scope->getNumberInt("_redCt");
- }
+void State::switchMode(bool jsMode) {
+ _jsMode = jsMode;
+ if (jsMode) {
+ // emit function that stays in JS
+ _scope->setFunction("emit",
+ "function(key, value) {"
+ " if (typeof(key) === 'object') {"
+ " _bailFromJS(key, value);"
+ " return;"
+ " }"
+ " ++_emitCt;"
+ " var map = _mrMap;"
+ " var list = map[key];"
+ " if (!list) {"
+ " ++_keyCt;"
+ " list = [];"
+ " map[key] = list;"
+ " }"
+ " else"
+ " ++_dupCt;"
+ " list.push(value);"
+ "}");
+ _scope->injectNative("_bailFromJS", _bailFromJS, this);
+ } else {
+ // emit now populates C++ map
+ _scope->injectNative("emit", fast_emit, this);
+ }
+}
- Collection* State::getCollectionOrUassert(Database* db, StringData ns) {
- Collection* out = db ? db->getCollection(ns) : NULL;
- uassert(18697, "Collection unexpectedly disappeared: " + ns.toString(),
- out);
- return out;
- }
+void State::bailFromJS() {
+ LOG(1) << "M/R: Switching from JS mode to mixed mode" << endl;
- /**
- * Applies last reduce and finalize on a list of tuples (key, val)
- * Inserts single result {_id: key, value: val} into temp collection
- */
- void State::finalReduce( BSONList& values ) {
- if ( !_onDisk || values.size() == 0 )
- return;
+ // reduce and reemit into c++
+ switchMode(false);
+ _scope->invoke(_reduceAndEmit, 0, 0, 0, true);
+ // need to get the real number emitted so far
+ _numEmits = _scope->getNumberInt("_emitCt");
+ _config.reducer->numReduces = _scope->getNumberInt("_redCt");
+}
- BSONObj res = _config.reducer->finalReduce( values , _config.finalizer.get() );
- insert( _config.tempNamespace , res );
- }
+Collection* State::getCollectionOrUassert(Database* db, StringData ns) {
+ Collection* out = db ? db->getCollection(ns) : NULL;
+ uassert(18697, "Collection unexpectedly disappeared: " + ns.toString(), out);
+ return out;
+}
- BSONObj _nativeToTemp( const BSONObj& args, void* data ) {
- State* state = (State*) data;
- BSONObjIterator it(args);
- state->insert(state->_config.tempNamespace, it.next().Obj());
- return BSONObj();
- }
+/**
+ * Applies last reduce and finalize on a list of tuples (key, val)
+ * Inserts single result {_id: key, value: val} into temp collection
+ */
+void State::finalReduce(BSONList& values) {
+ if (!_onDisk || values.size() == 0)
+ return;
+
+ BSONObj res = _config.reducer->finalReduce(values, _config.finalizer.get());
+ insert(_config.tempNamespace, res);
+}
+
+BSONObj _nativeToTemp(const BSONObj& args, void* data) {
+ State* state = (State*)data;
+ BSONObjIterator it(args);
+ state->insert(state->_config.tempNamespace, it.next().Obj());
+ return BSONObj();
+}
// BSONObj _nativeToInc( const BSONObj& args, void* data ) {
// State* state = (State*) data;
@@ -956,807 +922,791 @@ namespace mongo {
// return BSONObj();
// }
- /**
- * Applies last reduce and finalize.
- * After calling this method, the temp collection will be completed.
- * If inline, the results will be in the in memory map
- */
- void State::finalReduce(CurOp * op , ProgressMeterHolder& pm ) {
-
- if (_jsMode) {
- // apply the reduce within JS
- if (_onDisk) {
- _scope->injectNative("_nativeToTemp", _nativeToTemp, this);
- _scope->invoke(_reduceAndFinalizeAndInsert, 0, 0, 0, true);
- return;
- }
- else {
- _scope->invoke(_reduceAndFinalize, 0, 0, 0, true);
- return;
- }
- }
+/**
+ * Applies last reduce and finalize.
+ * After calling this method, the temp collection will be completed.
+ * If inline, the results will be in the in memory map
+ */
+void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) {
+ if (_jsMode) {
+ // apply the reduce within JS
+ if (_onDisk) {
+ _scope->injectNative("_nativeToTemp", _nativeToTemp, this);
+ _scope->invoke(_reduceAndFinalizeAndInsert, 0, 0, 0, true);
+ return;
+ } else {
+ _scope->invoke(_reduceAndFinalize, 0, 0, 0, true);
+ return;
+ }
+ }
- if ( ! _onDisk ) {
- // all data has already been reduced, just finalize
- if ( _config.finalizer ) {
- long size = 0;
- for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) {
- BSONObj key = i->first;
- BSONList& all = i->second;
+ if (!_onDisk) {
+ // all data has already been reduced, just finalize
+ if (_config.finalizer) {
+ long size = 0;
+ for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) {
+ BSONObj key = i->first;
+ BSONList& all = i->second;
- verify( all.size() == 1 );
+ verify(all.size() == 1);
- BSONObj res = _config.finalizer->finalize( all[0] );
+ BSONObj res = _config.finalizer->finalize(all[0]);
- all.clear();
- all.push_back( res );
- size += res.objsize();
- }
- _size = size;
- }
- return;
+ all.clear();
+ all.push_back(res);
+ size += res.objsize();
}
+ _size = size;
+ }
+ return;
+ }
- // use index on "0" to pull sorted data
- verify( _temp->size() == 0 );
- BSONObj sortKey = BSON( "0" << 1 );
+ // use index on "0" to pull sorted data
+ verify(_temp->size() == 0);
+ BSONObj sortKey = BSON("0" << 1);
+
+ {
+ OldClientWriteContext incCtx(_txn, _config.incLong);
+ WriteUnitOfWork wuow(_txn);
+ Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong);
+
+ bool foundIndex = false;
+ IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_txn, true);
+ // Iterate over incColl's indexes.
+ while (ii.more()) {
+ IndexDescriptor* currIndex = ii.next();
+ BSONObj x = currIndex->infoObj();
+ if (sortKey.woCompare(x["key"].embeddedObject()) == 0) {
+ foundIndex = true;
+ break;
+ }
+ }
- {
- OldClientWriteContext incCtx(_txn, _config.incLong );
- WriteUnitOfWork wuow(_txn);
- Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong );
-
- bool foundIndex = false;
- IndexCatalog::IndexIterator ii =
- incColl->getIndexCatalog()->getIndexIterator( _txn, true );
- // Iterate over incColl's indexes.
- while ( ii.more() ) {
- IndexDescriptor* currIndex = ii.next();
- BSONObj x = currIndex->infoObj();
- if ( sortKey.woCompare( x["key"].embeddedObject() ) == 0 ) {
- foundIndex = true;
- break;
- }
- }
+ verify(foundIndex);
+ wuow.commit();
+ }
- verify( foundIndex );
- wuow.commit();
- }
+ unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong));
- unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong));
+ BSONObj prev;
+ BSONList all;
- BSONObj prev;
- BSONList all;
+ {
+ const auto count = _db.count(_config.incLong, BSONObj(), QueryOption_SlaveOk);
+ stdx::lock_guard<Client> lk(*_txn->getClient());
+ verify(pm ==
+ op->setMessage_inlock("m/r: (3/3) final reduce to collection",
+ "M/R: (3/3) Final Reduce Progress",
+ count));
+ }
- {
- const auto count = _db.count(_config.incLong, BSONObj(), QueryOption_SlaveOk);
- stdx::lock_guard<Client> lk(*_txn->getClient());
- verify(pm == op->setMessage_inlock("m/r: (3/3) final reduce to collection",
- "M/R: (3/3) Final Reduce Progress",
- count));
+ const NamespaceString nss(_config.incLong);
+ const WhereCallbackReal whereCallback(_txn, nss.db());
+
+ CanonicalQuery* cqRaw;
+ verify(CanonicalQuery::canonicalize(
+ _config.incLong, BSONObj(), sortKey, BSONObj(), &cqRaw, whereCallback).isOK());
+ std::unique_ptr<CanonicalQuery> cq(cqRaw);
+
+ Collection* coll = getCollectionOrUassert(ctx->getDb(), _config.incLong);
+ invariant(coll);
+
+ PlanExecutor* rawExec;
+ verify(getExecutor(_txn,
+ coll,
+ cq.release(),
+ PlanExecutor::YIELD_AUTO,
+ &rawExec,
+ QueryPlannerParams::NO_TABLE_SCAN).isOK());
+
+ unique_ptr<PlanExecutor> exec(rawExec);
+
+ // iterate over all sorted objects
+ BSONObj o;
+ PlanExecutor::ExecState state;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) {
+ o = o.getOwned(); // we will be accessing outside of the lock
+ pm.hit();
+
+ if (o.woSortOrder(prev, sortKey) == 0) {
+ // object is same as previous, add to array
+ all.push_back(o);
+ if (pm->hits() % 100 == 0) {
+ _txn->checkForInterrupt();
}
+ continue;
+ }
- const NamespaceString nss(_config.incLong);
- const WhereCallbackReal whereCallback(_txn, nss.db());
-
- CanonicalQuery* cqRaw;
- verify(CanonicalQuery::canonicalize(_config.incLong,
- BSONObj(),
- sortKey,
- BSONObj(),
- &cqRaw,
- whereCallback).isOK());
- std::unique_ptr<CanonicalQuery> cq(cqRaw);
-
- Collection* coll = getCollectionOrUassert(ctx->getDb(), _config.incLong);
- invariant(coll);
-
- PlanExecutor* rawExec;
- verify(getExecutor(_txn,
- coll,
- cq.release(),
- PlanExecutor::YIELD_AUTO,
- &rawExec,
- QueryPlannerParams::NO_TABLE_SCAN).isOK());
-
- unique_ptr<PlanExecutor> exec(rawExec);
-
- // iterate over all sorted objects
- BSONObj o;
- PlanExecutor::ExecState state;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) {
- o = o.getOwned(); // we will be accessing outside of the lock
- pm.hit();
-
- if ( o.woSortOrder( prev , sortKey ) == 0 ) {
- // object is same as previous, add to array
- all.push_back( o );
- if ( pm->hits() % 100 == 0 ) {
- _txn->checkForInterrupt();
- }
- continue;
- }
+ exec->saveState();
- exec->saveState();
+ ctx.reset();
- ctx.reset();
+ // reduce a finalize array
+ finalReduce(all);
- // reduce a finalize array
- finalReduce( all );
+ ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong));
- ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong));
+ all.clear();
+ prev = o;
+ all.push_back(o);
- all.clear();
- prev = o;
- all.push_back( o );
+ if (!exec->restoreState(_txn)) {
+ break;
+ }
- if (!exec->restoreState(_txn)) {
- break;
- }
+ _txn->checkForInterrupt();
+ }
- _txn->checkForInterrupt();
- }
+ ctx.reset();
+ // reduce and finalize last array
+ finalReduce(all);
+ ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong));
- ctx.reset();
- // reduce and finalize last array
- finalReduce( all );
- ctx.reset(new AutoGetCollectionForRead(_txn, _config.incLong));
+ pm.finished();
+}
- pm.finished();
- }
+/**
+ * Attempts to reduce objects in the memory map.
+ * A new memory map will be created to hold the results.
+ * If applicable, objects with unique key may be dumped to inc collection.
+ * Input and output objects are both {"0": key, "1": val}
+ */
+void State::reduceInMemory() {
+ if (_jsMode) {
+ // in js mode the reduce is applied when writing to collection
+ return;
+ }
- /**
- * Attempts to reduce objects in the memory map.
- * A new memory map will be created to hold the results.
- * If applicable, objects with unique key may be dumped to inc collection.
- * Input and output objects are both {"0": key, "1": val}
- */
- void State::reduceInMemory() {
-
- if (_jsMode) {
- // in js mode the reduce is applied when writing to collection
- return;
+ unique_ptr<InMemory> n(new InMemory()); // for new data
+ long nSize = 0;
+ _dupCount = 0;
+
+ for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) {
+ BSONList& all = i->second;
+
+ if (all.size() == 1) {
+ // only 1 value for this key
+ if (_onDisk) {
+ // this key has low cardinality, so just write to collection
+ _insertToInc(*(all.begin()));
+ } else {
+ // add to new map
+ nSize += _add(n.get(), all[0]);
}
+ } else if (all.size() > 1) {
+ // several values, reduce and add to map
+ BSONObj res = _config.reducer->reduce(all);
+ nSize += _add(n.get(), res);
+ }
+ }
- unique_ptr<InMemory> n( new InMemory() ); // for new data
- long nSize = 0;
- _dupCount = 0;
+ // swap maps
+ _temp.reset(n.release());
+ _size = nSize;
+}
- for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) {
- BSONList& all = i->second;
+/**
+ * Dumps the entire in memory map to the inc collection.
+ */
+void State::dumpToInc() {
+ if (!_onDisk)
+ return;
- if ( all.size() == 1 ) {
- // only 1 value for this key
- if ( _onDisk ) {
- // this key has low cardinality, so just write to collection
- _insertToInc( *(all.begin()) );
- }
- else {
- // add to new map
- nSize += _add(n.get(), all[0]);
- }
- }
- else if ( all.size() > 1 ) {
- // several values, reduce and add to map
- BSONObj res = _config.reducer->reduce( all );
- nSize += _add(n.get(), res);
- }
- }
+ for (InMemory::iterator i = _temp->begin(); i != _temp->end(); i++) {
+ BSONList& all = i->second;
+ if (all.size() < 1)
+ continue;
- // swap maps
- _temp.reset( n.release() );
- _size = nSize;
- }
+ for (BSONList::iterator j = all.begin(); j != all.end(); j++)
+ _insertToInc(*j);
+ }
+ _temp->clear();
+ _size = 0;
+}
- /**
- * Dumps the entire in memory map to the inc collection.
- */
- void State::dumpToInc() {
- if ( ! _onDisk )
- return;
+/**
+ * Adds object to in memory map
+ */
+void State::emit(const BSONObj& a) {
+ _numEmits++;
+ _size += _add(_temp.get(), a);
+}
- for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ) {
- BSONList& all = i->second;
- if ( all.size() < 1 )
- continue;
+int State::_add(InMemory* im, const BSONObj& a) {
+ BSONList& all = (*im)[a];
+ all.push_back(a);
+ if (all.size() > 1) {
+ ++_dupCount;
+ }
- for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ )
- _insertToInc( *j );
- }
- _temp->clear();
- _size = 0;
+ return a.objsize() + 16;
+}
+
+void State::reduceAndSpillInMemoryStateIfNeeded() {
+ // Make sure no DB locks are held, because this method manages its own locking and
+ // write units of work.
+ invariant(!_txn->lockState()->isLocked());
+
+ if (_jsMode) {
+ // try to reduce if it is beneficial
+ int dupCt = _scope->getNumberInt("_dupCt");
+ int keyCt = _scope->getNumberInt("_keyCt");
+
+ if (keyCt > _config.jsMaxKeys) {
+ // too many keys for JS, switch to mixed
+ _bailFromJS(BSONObj(), this);
+ // then fall through to check map size
+ } else if (dupCt > (keyCt * _config.reduceTriggerRatio)) {
+ // reduce now to lower mem usage
+ Timer t;
+ _scope->invoke(_reduceAll, 0, 0, 0, true);
+ LOG(3) << " MR - did reduceAll: keys=" << keyCt << " dups=" << dupCt
+ << " newKeys=" << _scope->getNumberInt("_keyCt") << " time=" << t.millis()
+ << "ms" << endl;
+ return;
}
+ }
- /**
- * Adds object to in memory map
- */
- void State::emit( const BSONObj& a ) {
- _numEmits++;
- _size += _add(_temp.get(), a);
+ if (_jsMode)
+ return;
+
+ if (_size > _config.maxInMemSize || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) {
+ // attempt to reduce in memory map, if memory is too high or we have many duplicates
+ long oldSize = _size;
+ Timer t;
+ reduceInMemory();
+ LOG(3) << " MR - did reduceInMemory: size=" << oldSize << " dups=" << _dupCount
+ << " newSize=" << _size << " time=" << t.millis() << "ms" << endl;
+
+ // if size is still high, or values are not reducing well, dump
+ if (_onDisk && (_size > _config.maxInMemSize || _size > oldSize / 2)) {
+ dumpToInc();
+ LOG(3) << " MR - dumping to db" << endl;
}
+ }
+}
- int State::_add(InMemory* im, const BSONObj& a) {
- BSONList& all = (*im)[a];
- all.push_back( a );
- if (all.size() > 1) {
- ++_dupCount;
- }
+/**
+ * emit that will be called by js function
+ */
+BSONObj fast_emit(const BSONObj& args, void* data) {
+ uassert(10077, "fast_emit takes 2 args", args.nFields() == 2);
+ uassert(13069,
+ "an emit can't be more than half max bson size",
+ args.objsize() < (BSONObjMaxUserSize / 2));
+
+ State* state = (State*)data;
+ if (args.firstElement().type() == Undefined) {
+ BSONObjBuilder b(args.objsize());
+ b.appendNull("");
+ BSONObjIterator i(args);
+ i.next();
+ b.append(i.next());
+ state->emit(b.obj());
+ } else {
+ state->emit(args);
+ }
+ return BSONObj();
+}
- return a.objsize() + 16;
- }
+/**
+ * function is called when we realize we cant use js mode for m/r on the 1st key
+ */
+BSONObj _bailFromJS(const BSONObj& args, void* data) {
+ State* state = (State*)data;
+ state->bailFromJS();
- void State::reduceAndSpillInMemoryStateIfNeeded() {
- // Make sure no DB locks are held, because this method manages its own locking and
- // write units of work.
- invariant(!_txn->lockState()->isLocked());
+ // emit this particular key if there is one
+ if (!args.isEmpty()) {
+ fast_emit(args, data);
+ }
+ return BSONObj();
+}
- if (_jsMode) {
- // try to reduce if it is beneficial
- int dupCt = _scope->getNumberInt("_dupCt");
- int keyCt = _scope->getNumberInt("_keyCt");
+/**
+ * This class represents a map/reduce command executed on a single server
+ */
+class MapReduceCommand : public Command {
+public:
+ MapReduceCommand() : Command("mapReduce", false, "mapreduce") {}
- if (keyCt > _config.jsMaxKeys) {
- // too many keys for JS, switch to mixed
- _bailFromJS(BSONObj(), this);
- // then fall through to check map size
- }
- else if (dupCt > (keyCt * _config.reduceTriggerRatio)) {
- // reduce now to lower mem usage
- Timer t;
- _scope->invoke(_reduceAll, 0, 0, 0, true);
- LOG(3) << " MR - did reduceAll: keys=" << keyCt << " dups=" << dupCt
- << " newKeys=" << _scope->getNumberInt("_keyCt") << " time="
- << t.millis() << "ms" << endl;
- return;
- }
- }
+ virtual bool slaveOk() const {
+ return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
+ repl::ReplicationCoordinator::modeReplSet;
+ }
- if (_jsMode)
- return;
-
- if (_size > _config.maxInMemSize || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) {
- // attempt to reduce in memory map, if memory is too high or we have many duplicates
- long oldSize = _size;
- Timer t;
- reduceInMemory();
- LOG(3) << " MR - did reduceInMemory: size=" << oldSize << " dups=" << _dupCount
- << " newSize=" << _size << " time=" << t.millis() << "ms" << endl;
-
- // if size is still high, or values are not reducing well, dump
- if ( _onDisk && (_size > _config.maxInMemSize || _size > oldSize / 2) ) {
- dumpToInc();
- LOG(3) << " MR - dumping to db" << endl;
- }
- }
- }
+ virtual bool slaveOverrideOk() const {
+ return true;
+ }
- /**
- * emit that will be called by js function
- */
- BSONObj fast_emit( const BSONObj& args, void* data ) {
- uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 );
- uassert( 13069 , "an emit can't be more than half max bson size" , args.objsize() < ( BSONObjMaxUserSize / 2 ) );
-
- State* state = (State*) data;
- if ( args.firstElement().type() == Undefined ) {
- BSONObjBuilder b( args.objsize() );
- b.appendNull( "" );
- BSONObjIterator i( args );
- i.next();
- b.append( i.next() );
- state->emit( b.obj() );
- }
- else {
- state->emit( args );
- }
- return BSONObj();
- }
+ virtual void help(stringstream& help) const {
+ help << "Run a map/reduce operation on the server.\n";
+ help << "Note this is used for aggregation, not querying, in MongoDB.\n";
+ help << "http://dochub.mongodb.org/core/mapreduce";
+ }
- /**
- * function is called when we realize we cant use js mode for m/r on the 1st key
- */
- BSONObj _bailFromJS( const BSONObj& args, void* data ) {
- State* state = (State*) data;
- state->bailFromJS();
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
- // emit this particular key if there is one
- if (!args.isEmpty()) {
- fast_emit(args, data);
- }
- return BSONObj();
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
+ }
+
+ bool run(OperationContext* txn,
+ const string& dbname,
+ BSONObj& cmd,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ Timer t;
+
+ boost::optional<DisableDocumentValidation> maybeDisableValidation;
+ if (shouldBypassDocumentValidationForCommand(cmd))
+ maybeDisableValidation.emplace(txn);
+
+ auto client = txn->getClient();
+
+ if (client->isInDirectClient()) {
+ return appendCommandStatus(
+ result,
+ Status(ErrorCodes::IllegalOperation, "Cannot run mapReduce command from eval()"));
}
- /**
- * This class represents a map/reduce command executed on a single server
- */
- class MapReduceCommand : public Command {
- public:
- MapReduceCommand() : Command("mapReduce", false, "mapreduce") {}
+ CurOp* op = CurOp::get(txn);
- virtual bool slaveOk() const {
- return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
- repl::ReplicationCoordinator::modeReplSet;
- }
+ Config config(dbname, cmd);
- virtual bool slaveOverrideOk() const { return true; }
+ LOG(1) << "mr ns: " << config.ns << endl;
- virtual void help( stringstream &help ) const {
- help << "Run a map/reduce operation on the server.\n";
- help << "Note this is used for aggregation, not querying, in MongoDB.\n";
- help << "http://dochub.mongodb.org/core/mapreduce";
- }
+ uassert(16149, "cannot run map reduce without the js engine", globalScriptEngine);
+
+ CollectionMetadataPtr collMetadata;
- virtual bool isWriteCommandForConfigServer() const { return false; }
+ // Prevent sharding state from changing during the MR.
+ unique_ptr<RangePreserver> rangePreserver;
+ {
+ AutoGetCollectionForRead ctx(txn, config.ns);
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
+ Collection* collection = ctx.getCollection();
+ if (collection) {
+ rangePreserver.reset(new RangePreserver(collection));
}
- bool run(OperationContext* txn,
- const string& dbname,
- BSONObj& cmd,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- Timer t;
+ // Get metadata before we check our version, to make sure it doesn't increment
+ // in the meantime. Need to do this in the same lock scope as the block.
+ if (shardingState.needCollectionMetadata(client, config.ns)) {
+ collMetadata = shardingState.getCollectionMetadata(config.ns);
+ }
+ }
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (shouldBypassDocumentValidationForCommand(cmd))
- maybeDisableValidation.emplace(txn);
+ bool shouldHaveData = false;
- auto client = txn->getClient();
+ BSONObjBuilder countsBuilder;
+ BSONObjBuilder timingBuilder;
+ State state(txn, config);
+ if (!state.sourceExists()) {
+ errmsg = "ns doesn't exist";
+ return false;
+ }
+ if (state.isOnDisk()) {
+ // this means that it will be doing a write operation, make sure we are on Master
+ // ideally this check should be in slaveOk(), but at that point config is not known
+ NamespaceString nss(config.ns);
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
+ errmsg = "not master";
+ return false;
+ }
+ }
- if (client->isInDirectClient()) {
- return appendCommandStatus(result,
- Status(ErrorCodes::IllegalOperation,
- "Cannot run mapReduce command from eval()"));
- }
+ try {
+ state.init();
+ state.prepTempCollection();
+ ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections);
+
+ int progressTotal = 0;
+ bool showTotal = true;
+ if (state.config().filter.isEmpty()) {
+ progressTotal = state.incomingDocuments();
+ } else {
+ showTotal = false;
+ // Set an arbitrary total > 0 so the meter will be activated.
+ progressTotal = 1;
+ }
- CurOp* op = CurOp::get(txn);
+ stdx::unique_lock<Client> lk(*txn->getClient());
+ ProgressMeter& progress(op->setMessage_inlock(
+ "m/r: (1/3) emit phase", "M/R: (1/3) Emit Progress", progressTotal));
+ lk.unlock();
+ progress.showTotal(showTotal);
+ ProgressMeterHolder pm(progress);
- Config config( dbname , cmd );
+ // See cast on next line to 32 bit unsigned
+ wassert(config.limit < 0x4000000);
- LOG(1) << "mr ns: " << config.ns << endl;
+ long long mapTime = 0;
+ long long reduceTime = 0;
+ long long numInputs = 0;
- uassert( 16149 , "cannot run map reduce without the js engine", globalScriptEngine );
+ {
+ // We've got a cursor preventing migrations off, now re-establish our
+ // useful cursor.
- CollectionMetadataPtr collMetadata;
+ const NamespaceString nss(config.ns);
- // Prevent sharding state from changing during the MR.
- unique_ptr<RangePreserver> rangePreserver;
- {
- AutoGetCollectionForRead ctx(txn, config.ns);
+ // Need lock and context to use it
+ unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_IS));
+ unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(txn, nss.db(), MODE_S));
- Collection* collection = ctx.getCollection();
- if (collection) {
- rangePreserver.reset(new RangePreserver(collection));
- }
+ const WhereCallbackReal whereCallback(txn, nss.db());
- // Get metadata before we check our version, to make sure it doesn't increment
- // in the meantime. Need to do this in the same lock scope as the block.
- if (shardingState.needCollectionMetadata(client, config.ns)) {
- collMetadata = shardingState.getCollectionMetadata( config.ns );
- }
+ CanonicalQuery* cqRaw;
+ if (!CanonicalQuery::canonicalize(
+ config.ns, config.filter, config.sort, BSONObj(), &cqRaw, whereCallback)
+ .isOK()) {
+ uasserted(17238, "Can't canonicalize query " + config.filter.toString());
+ return 0;
}
+ std::unique_ptr<CanonicalQuery> cq(cqRaw);
- bool shouldHaveData = false;
+ Database* db = scopedAutoDb->getDb();
+ Collection* coll = state.getCollectionOrUassert(db, config.ns);
+ invariant(coll);
- BSONObjBuilder countsBuilder;
- BSONObjBuilder timingBuilder;
- State state( txn, config );
- if ( ! state.sourceExists() ) {
- errmsg = "ns doesn't exist";
- return false;
- }
- if (state.isOnDisk()) {
- // this means that it will be doing a write operation, make sure we are on Master
- // ideally this check should be in slaveOk(), but at that point config is not known
- NamespaceString nss(config.ns);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- errmsg = "not master";
- return false;
- }
+ PlanExecutor* rawExec;
+ if (!getExecutor(txn, coll, cq.release(), PlanExecutor::YIELD_AUTO, &rawExec)
+ .isOK()) {
+ uasserted(17239, "Can't get executor for query " + config.filter.toString());
+ return 0;
}
- try {
- state.init();
- state.prepTempCollection();
- ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections);
+ unique_ptr<PlanExecutor> exec(rawExec);
- int progressTotal = 0;
- bool showTotal = true;
- if ( state.config().filter.isEmpty() ) {
- progressTotal = state.incomingDocuments();
- }
- else {
- showTotal = false;
- // Set an arbitrary total > 0 so the meter will be activated.
- progressTotal = 1;
- }
+ Timer mt;
- stdx::unique_lock<Client> lk(*txn->getClient());
- ProgressMeter& progress( op->setMessage_inlock("m/r: (1/3) emit phase",
- "M/R: (1/3) Emit Progress",
- progressTotal ));
- lk.unlock();
- progress.showTotal(showTotal);
- ProgressMeterHolder pm(progress);
-
- // See cast on next line to 32 bit unsigned
- wassert(config.limit < 0x4000000);
-
- long long mapTime = 0;
- long long reduceTime = 0;
- long long numInputs = 0;
-
- {
- // We've got a cursor preventing migrations off, now re-establish our
- // useful cursor.
-
- const NamespaceString nss(config.ns);
-
- // Need lock and context to use it
- unique_ptr<ScopedTransaction> scopedXact(
- new ScopedTransaction(txn, MODE_IS));
- unique_ptr<AutoGetDb> scopedAutoDb(new AutoGetDb(txn, nss.db(), MODE_S));
-
- const WhereCallbackReal whereCallback(txn, nss.db());
-
- CanonicalQuery* cqRaw;
- if (!CanonicalQuery::canonicalize(config.ns,
- config.filter,
- config.sort,
- BSONObj(),
- &cqRaw,
- whereCallback).isOK()) {
- uasserted(17238, "Can't canonicalize query " + config.filter.toString());
- return 0;
- }
- std::unique_ptr<CanonicalQuery> cq(cqRaw);
-
- Database* db = scopedAutoDb->getDb();
- Collection* coll = state.getCollectionOrUassert(db, config.ns);
- invariant(coll);
-
- PlanExecutor* rawExec;
- if (!getExecutor(txn,
- coll,
- cq.release(),
- PlanExecutor::YIELD_AUTO,
- &rawExec).isOK()) {
- uasserted(17239, "Can't get executor for query "
- + config.filter.toString());
- return 0;
+ // go through each doc
+ BSONObj o;
+ while (PlanExecutor::ADVANCED == exec->getNext(&o, NULL)) {
+ // check to see if this is a new object we don't own yet
+ // because of a chunk migration
+ if (collMetadata) {
+ ShardKeyPattern kp(collMetadata->getKeyPattern());
+ if (!collMetadata->keyBelongsToMe(kp.extractShardKeyFromDoc(o))) {
+ continue;
}
+ }
- unique_ptr<PlanExecutor> exec(rawExec);
-
- Timer mt;
-
- // go through each doc
- BSONObj o;
- while (PlanExecutor::ADVANCED == exec->getNext(&o, NULL)) {
- // check to see if this is a new object we don't own yet
- // because of a chunk migration
- if ( collMetadata ) {
- ShardKeyPattern kp( collMetadata->getKeyPattern() );
- if (!collMetadata->keyBelongsToMe(kp.extractShardKeyFromDoc(o))) {
- continue;
- }
- }
-
- // do map
- if ( config.verbose ) mt.reset();
- config.mapper->map( o );
- if ( config.verbose ) mapTime += mt.micros();
-
- // Check if the state accumulated so far needs to be written to a
- // collection. This may yield the DB lock temporarily and then
- // acquire it again.
- //
- numInputs++;
- if (numInputs % 100 == 0) {
- Timer t;
-
- // TODO: As an optimization, we might want to do the save/restore
- // state and yield inside the reduceAndSpillInMemoryState method, so
- // it only happens if necessary.
- exec->saveState();
-
- scopedAutoDb.reset();
- scopedXact.reset();
-
- state.reduceAndSpillInMemoryStateIfNeeded();
-
- scopedXact.reset(new ScopedTransaction(txn, MODE_IS));
- scopedAutoDb.reset(new AutoGetDb(txn, nss.db(), MODE_S));
-
- exec->restoreState(txn);
-
- // Need to reload the database, in case it was dropped after we
- // released the lock
- db = scopedAutoDb->getDb();
- if (db == NULL) {
- // Database was deleted after we freed the lock
- StringBuilder sb;
- sb << "Database "
- << nss.db()
- << " was deleted in the middle of the reduce job.";
- uasserted(28523, sb.str());
- }
-
- reduceTime += t.micros();
-
- txn->checkForInterrupt();
- }
-
- pm.hit();
-
- if (config.limit && numInputs >= config.limit)
- break;
+ // do map
+ if (config.verbose)
+ mt.reset();
+ config.mapper->map(o);
+ if (config.verbose)
+ mapTime += mt.micros();
+
+ // Check if the state accumulated so far needs to be written to a
+ // collection. This may yield the DB lock temporarily and then
+ // acquire it again.
+ //
+ numInputs++;
+ if (numInputs % 100 == 0) {
+ Timer t;
+
+ // TODO: As an optimization, we might want to do the save/restore
+ // state and yield inside the reduceAndSpillInMemoryState method, so
+ // it only happens if necessary.
+ exec->saveState();
+
+ scopedAutoDb.reset();
+ scopedXact.reset();
+
+ state.reduceAndSpillInMemoryStateIfNeeded();
+
+ scopedXact.reset(new ScopedTransaction(txn, MODE_IS));
+ scopedAutoDb.reset(new AutoGetDb(txn, nss.db(), MODE_S));
+
+ exec->restoreState(txn);
+
+ // Need to reload the database, in case it was dropped after we
+ // released the lock
+ db = scopedAutoDb->getDb();
+ if (db == NULL) {
+ // Database was deleted after we freed the lock
+ StringBuilder sb;
+ sb << "Database " << nss.db()
+ << " was deleted in the middle of the reduce job.";
+ uasserted(28523, sb.str());
}
- }
- pm.finished();
- txn->checkForInterrupt();
+ reduceTime += t.micros();
- // update counters
- countsBuilder.appendNumber("input", numInputs);
- countsBuilder.appendNumber( "emit" , state.numEmits() );
- if ( state.numEmits() )
- shouldHaveData = true;
+ txn->checkForInterrupt();
+ }
- timingBuilder.appendNumber( "mapTime" , mapTime / 1000 );
- timingBuilder.append( "emitLoop" , t.millis() );
+ pm.hit();
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- op->setMessage_inlock("m/r: (2/3) final reduce in memory",
- "M/R: (2/3) Final In-Memory Reduce Progress");
- }
- Timer rt;
- // do reduce in memory
- // this will be the last reduce needed for inline mode
- state.reduceInMemory();
- // if not inline: dump the in memory map to inc collection, all data is on disk
- state.dumpToInc();
- // final reduce
- state.finalReduce(op , pm );
- reduceTime += rt.micros();
- countsBuilder.appendNumber( "reduce" , state.numReduces() );
- timingBuilder.appendNumber("reduceTime", reduceTime / 1000);
- timingBuilder.append( "mode" , state.jsMode() ? "js" : "mixed" );
-
- long long finalCount = state.postProcessCollection(txn, op, pm);
- state.appendResults( result );
-
- timingBuilder.appendNumber( "total" , t.millis() );
- result.appendNumber( "timeMillis" , t.millis() );
- countsBuilder.appendNumber( "output" , finalCount );
- if ( config.verbose ) result.append( "timing" , timingBuilder.obj() );
- result.append( "counts" , countsBuilder.obj() );
-
- if ( finalCount == 0 && shouldHaveData ) {
- result.append( "cmd" , cmd );
- errmsg = "there were emits but no data!";
- return false;
- }
- }
- catch( SendStaleConfigException& e ){
- log() << "mr detected stale config, should retry" << causedBy(e) << endl;
- throw e;
- }
- // TODO: The error handling code for queries is v. fragile,
- // *requires* rethrow AssertionExceptions - should probably fix.
- catch ( AssertionException& e ){
- log() << "mr failed, removing collection" << causedBy(e) << endl;
- throw e;
- }
- catch ( std::exception& e ){
- log() << "mr failed, removing collection" << causedBy(e) << endl;
- throw e;
- }
- catch ( ... ) {
- log() << "mr failed for unknown reason, removing collection" << endl;
- throw;
+ if (config.limit && numInputs >= config.limit)
+ break;
}
-
- return true;
}
+ pm.finished();
+
+ txn->checkForInterrupt();
+
+ // update counters
+ countsBuilder.appendNumber("input", numInputs);
+ countsBuilder.appendNumber("emit", state.numEmits());
+ if (state.numEmits())
+ shouldHaveData = true;
- } mapReduceCommand;
-
- /**
- * This class represents a map/reduce command executed on the output server of a sharded env
- */
- class MapReduceFinishCommand : public Command {
- public:
- void help(stringstream& h) const { h << "internal"; }
- MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ) {}
- virtual bool slaveOk() const {
- return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
- repl::ReplicationCoordinator::modeReplSet;
+ timingBuilder.appendNumber("mapTime", mapTime / 1000);
+ timingBuilder.append("emitLoop", t.millis());
+
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ op->setMessage_inlock("m/r: (2/3) final reduce in memory",
+ "M/R: (2/3) Final In-Memory Reduce Progress");
}
- virtual bool slaveOverrideOk() const { return true; }
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ Timer rt;
+ // do reduce in memory
+ // this will be the last reduce needed for inline mode
+ state.reduceInMemory();
+ // if not inline: dump the in memory map to inc collection, all data is on disk
+ state.dumpToInc();
+ // final reduce
+ state.finalReduce(op, pm);
+ reduceTime += rt.micros();
+ countsBuilder.appendNumber("reduce", state.numReduces());
+ timingBuilder.appendNumber("reduceTime", reduceTime / 1000);
+ timingBuilder.append("mode", state.jsMode() ? "js" : "mixed");
+
+ long long finalCount = state.postProcessCollection(txn, op, pm);
+ state.appendResults(result);
+
+ timingBuilder.appendNumber("total", t.millis());
+ result.appendNumber("timeMillis", t.millis());
+ countsBuilder.appendNumber("output", finalCount);
+ if (config.verbose)
+ result.append("timing", timingBuilder.obj());
+ result.append("counts", countsBuilder.obj());
+
+ if (finalCount == 0 && shouldHaveData) {
+ result.append("cmd", cmd);
+ errmsg = "there were emits but no data!";
+ return false;
}
- bool run(OperationContext* txn,
- const string& dbname,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (shouldBypassDocumentValidationForCommand(cmdObj))
- maybeDisableValidation.emplace(txn);
-
- ShardedConnectionInfo::addHook();
- // legacy name
- string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
- verify( shardedOutputCollection.size() > 0 );
- string inputNS;
- if ( cmdObj["inputDB"].type() == String ) {
- inputNS = cmdObj["inputDB"].String() + "." + shardedOutputCollection;
- }
- else {
- inputNS = dbname + "." + shardedOutputCollection;
- }
+ } catch (SendStaleConfigException& e) {
+ log() << "mr detected stale config, should retry" << causedBy(e) << endl;
+ throw e;
+ }
+ // TODO: The error handling code for queries is v. fragile,
+ // *requires* rethrow AssertionExceptions - should probably fix.
+ catch (AssertionException& e) {
+ log() << "mr failed, removing collection" << causedBy(e) << endl;
+ throw e;
+ } catch (std::exception& e) {
+ log() << "mr failed, removing collection" << causedBy(e) << endl;
+ throw e;
+ } catch (...) {
+ log() << "mr failed for unknown reason, removing collection" << endl;
+ throw;
+ }
- CurOp * op = CurOp::get(txn);
+ return true;
+ }
- Config config( dbname , cmdObj.firstElement().embeddedObjectUserCheck() );
- State state(txn, config);
- state.init();
+} mapReduceCommand;
- // no need for incremental collection because records are already sorted
- state._useIncremental = false;
- config.incLong = config.tempNamespace;
+/**
+ * This class represents a map/reduce command executed on the output server of a sharded env
+ */
+class MapReduceFinishCommand : public Command {
+public:
+ void help(stringstream& h) const {
+ h << "internal";
+ }
+ MapReduceFinishCommand() : Command("mapreduce.shardedfinish") {}
+ virtual bool slaveOk() const {
+ return repl::getGlobalReplicationCoordinator()->getReplicationMode() !=
+ repl::ReplicationCoordinator::modeReplSet;
+ }
+ virtual bool slaveOverrideOk() const {
+ return true;
+ }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ bool run(OperationContext* txn,
+ const string& dbname,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ boost::optional<DisableDocumentValidation> maybeDisableValidation;
+ if (shouldBypassDocumentValidationForCommand(cmdObj))
+ maybeDisableValidation.emplace(txn);
+
+ ShardedConnectionInfo::addHook();
+ // legacy name
+ string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
+ verify(shardedOutputCollection.size() > 0);
+ string inputNS;
+ if (cmdObj["inputDB"].type() == String) {
+ inputNS = cmdObj["inputDB"].String() + "." + shardedOutputCollection;
+ } else {
+ inputNS = dbname + "." + shardedOutputCollection;
+ }
- BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
- BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
+ CurOp* op = CurOp::get(txn);
- stdx::unique_lock<Client> lk(*txn->getClient());
- ProgressMeterHolder pm(op->setMessage_inlock("m/r: merge sort and reduce",
- "M/R Merge Sort and Reduce Progress"));
- lk.unlock();
- set<string> servers;
+ Config config(dbname, cmdObj.firstElement().embeddedObjectUserCheck());
+ State state(txn, config);
+ state.init();
- {
- // parse per shard results
- BSONObjIterator i(shardCounts);
- while (i.more()) {
- BSONElement e = i.next();
- servers.insert(e.fieldName());
- }
- }
+ // no need for incremental collection because records are already sorted
+ state._useIncremental = false;
+ config.incLong = config.tempNamespace;
- state.prepTempCollection();
- ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections);
-
- BSONList values;
- if (!config.outputOptions.outDB.empty()) {
- BSONObjBuilder loc;
- if ( !config.outputOptions.outDB.empty())
- loc.append( "db" , config.outputOptions.outDB );
- if ( !config.outputOptions.collectionName.empty() )
- loc.append( "collection" , config.outputOptions.collectionName );
- result.append("result", loc.obj());
- }
- else {
- if ( !config.outputOptions.collectionName.empty() )
- result.append( "result" , config.outputOptions.collectionName );
- }
+ BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
+ BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
- auto status = grid.catalogCache()->getDatabase(dbname);
- if (!status.isOK()) {
- return appendCommandStatus(result, status.getStatus());
- }
+ stdx::unique_lock<Client> lk(*txn->getClient());
+ ProgressMeterHolder pm(op->setMessage_inlock("m/r: merge sort and reduce",
+ "M/R Merge Sort and Reduce Progress"));
+ lk.unlock();
+ set<string> servers;
+
+ {
+ // parse per shard results
+ BSONObjIterator i(shardCounts);
+ while (i.more()) {
+ BSONElement e = i.next();
+ servers.insert(e.fieldName());
+ }
+ }
- shared_ptr<DBConfig> confOut = status.getValue();
+ state.prepTempCollection();
+ ON_BLOCK_EXIT_OBJ(state, &State::dropTempCollections);
+
+ BSONList values;
+ if (!config.outputOptions.outDB.empty()) {
+ BSONObjBuilder loc;
+ if (!config.outputOptions.outDB.empty())
+ loc.append("db", config.outputOptions.outDB);
+ if (!config.outputOptions.collectionName.empty())
+ loc.append("collection", config.outputOptions.collectionName);
+ result.append("result", loc.obj());
+ } else {
+ if (!config.outputOptions.collectionName.empty())
+ result.append("result", config.outputOptions.collectionName);
+ }
- vector<ChunkPtr> chunks;
- if ( confOut->isSharded(config.outputOptions.finalNamespace) ) {
- ChunkManagerPtr cm = confOut->getChunkManager(
- config.outputOptions.finalNamespace);
+ auto status = grid.catalogCache()->getDatabase(dbname);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status.getStatus());
+ }
- // Fetch result from other shards 1 chunk at a time. It would be better to do
- // just one big $or query, but then the sorting would not be efficient.
- const string shardName = shardingState.getShardName();
- const ChunkMap& chunkMap = cm->getChunkMap();
+ shared_ptr<DBConfig> confOut = status.getValue();
- for ( ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it ) {
- ChunkPtr chunk = it->second;
- if (chunk->getShardId() == shardName) {
- chunks.push_back(chunk);
- }
- }
- }
+ vector<ChunkPtr> chunks;
+ if (confOut->isSharded(config.outputOptions.finalNamespace)) {
+ ChunkManagerPtr cm = confOut->getChunkManager(config.outputOptions.finalNamespace);
- long long inputCount = 0;
- unsigned int index = 0;
- BSONObj query;
- BSONArrayBuilder chunkSizes;
- while (true) {
- ChunkPtr chunk;
- if (chunks.size() > 0) {
- chunk = chunks[index];
- BSONObjBuilder b;
- b.appendAs(chunk->getMin().firstElement(), "$gte");
- b.appendAs(chunk->getMax().firstElement(), "$lt");
- query = BSON("_id" << b.obj());
-// chunkSizes.append(min);
- }
+ // Fetch result from other shards 1 chunk at a time. It would be better to do
+ // just one big $or query, but then the sorting would not be efficient.
+ const string shardName = shardingState.getShardName();
+ const ChunkMap& chunkMap = cm->getChunkMap();
- // reduce from each shard for a chunk
- BSONObj sortKey = BSON( "_id" << 1 );
- ParallelSortClusteredCursor cursor(servers, inputNS,
- Query(query).sort(sortKey), QueryOption_NoCursorTimeout);
- cursor.init();
- int chunkSize = 0;
-
- while ( cursor.more() || !values.empty() ) {
- BSONObj t;
- if (cursor.more()) {
- t = cursor.next().getOwned();
- ++inputCount;
-
- if ( values.size() == 0 ) {
- values.push_back( t );
- continue;
- }
-
- if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
- values.push_back( t );
- continue;
- }
- }
+ for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) {
+ ChunkPtr chunk = it->second;
+ if (chunk->getShardId() == shardName) {
+ chunks.push_back(chunk);
+ }
+ }
+ }
- BSONObj res = config.reducer->finalReduce( values , config.finalizer.get());
- chunkSize += res.objsize();
- if (state.isOnDisk())
- state.insert( config.tempNamespace , res );
- else
- state.emit(res);
- values.clear();
- if (!t.isEmpty())
- values.push_back( t );
+ long long inputCount = 0;
+ unsigned int index = 0;
+ BSONObj query;
+ BSONArrayBuilder chunkSizes;
+ while (true) {
+ ChunkPtr chunk;
+ if (chunks.size() > 0) {
+ chunk = chunks[index];
+ BSONObjBuilder b;
+ b.appendAs(chunk->getMin().firstElement(), "$gte");
+ b.appendAs(chunk->getMax().firstElement(), "$lt");
+ query = BSON("_id" << b.obj());
+ // chunkSizes.append(min);
+ }
+
+ // reduce from each shard for a chunk
+ BSONObj sortKey = BSON("_id" << 1);
+ ParallelSortClusteredCursor cursor(
+ servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout);
+ cursor.init();
+ int chunkSize = 0;
+
+ while (cursor.more() || !values.empty()) {
+ BSONObj t;
+ if (cursor.more()) {
+ t = cursor.next().getOwned();
+ ++inputCount;
+
+ if (values.size() == 0) {
+ values.push_back(t);
+ continue;
}
- if (chunk) {
- chunkSizes.append(chunk->getMin());
- chunkSizes.append(chunkSize);
+ if (t.woSortOrder(*(values.begin()), sortKey) == 0) {
+ values.push_back(t);
+ continue;
}
- if (++index >= chunks.size())
- break;
}
- // Forget temporary input collection, if output is sharded collection
- ShardConnection::forgetNS( inputNS );
+ BSONObj res = config.reducer->finalReduce(values, config.finalizer.get());
+ chunkSize += res.objsize();
+ if (state.isOnDisk())
+ state.insert(config.tempNamespace, res);
+ else
+ state.emit(res);
+ values.clear();
+ if (!t.isEmpty())
+ values.push_back(t);
+ }
- result.append( "chunkSizes" , chunkSizes.arr() );
+ if (chunk) {
+ chunkSizes.append(chunk->getMin());
+ chunkSizes.append(chunkSize);
+ }
+ if (++index >= chunks.size())
+ break;
+ }
- long long outputCount = state.postProcessCollection(txn, op, pm);
- state.appendResults( result );
+ // Forget temporary input collection, if output is sharded collection
+ ShardConnection::forgetNS(inputNS);
- BSONObjBuilder countsB(32);
- countsB.append("input", inputCount);
- countsB.append("reduce", state.numReduces());
- countsB.append("output", outputCount);
- result.append( "counts" , countsB.obj() );
+ result.append("chunkSizes", chunkSizes.arr());
- return 1;
- }
- } mapReduceFinishCommand;
+ long long outputCount = state.postProcessCollection(txn, op, pm);
+ state.appendResults(result);
- }
+ BSONObjBuilder countsB(32);
+ countsB.append("input", inputCount);
+ countsB.append("reduce", state.numReduces());
+ countsB.append("output", outputCount);
+ result.append("counts", countsB.obj());
+ return 1;
+ }
+} mapReduceFinishCommand;
+}
}
-