diff options
author | Amalia Hawkins <amalia.hawkins@mongodb.com> | 2013-12-03 17:39:23 -0500 |
---|---|---|
committer | Matt Kangas <matt.kangas@mongodb.com> | 2013-12-13 17:40:44 -0500 |
commit | 8416afb7c5724076b1231626f27f5198a5a2cce7 (patch) | |
tree | 8e87bd2489c2240b361a3a7bf0be70add9d94ef7 /src/mongo/db/commands/mr.cpp | |
parent | c3e4cd6c722e78b1da5caa3f72bc654d2719a764 (diff) | |
download | mongo-8416afb7c5724076b1231626f27f5198a5a2cce7.tar.gz |
SERVER-11611: convert mapreduce to new collection encapsulation
Signed-off-by: Matt Kangas <matt.kangas@mongodb.com>
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 150 |
1 files changed, 107 insertions, 43 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index bd0328ffd51..e28af212b0d 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -35,6 +35,7 @@ #include "mongo/client/connpool.h" #include "mongo/client/parallel.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/index_catalog.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/db.h" @@ -44,6 +45,7 @@ #include "mongo/db/query/get_runner.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/is_master.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/range_preserver.h" #include "mongo/db/storage_options.h" #include "mongo/scripting/engine.h" @@ -335,52 +337,86 @@ namespace mongo { dropTempCollections(); if (_useIncremental) { - // create the inc collection and make sure we have index on "0" key - { - Client::WriteContext ctx( _config.incLong ); - string err; - if ( ! userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 << "temp" << true ) , err , false ) ) { - uasserted( 13631 , str::stream() << "userCreateNS failed for mr incLong ns: " << _config.incLong << " err: " << err ); - } + // Create the inc collection and make sure we have index on "0" key. + Client::WriteContext incCtx( _config.incLong ); + Collection* incColl = incCtx.ctx().db()->getCollection( _config.incLong ); + if ( !incColl ) { + const BSONObj options = BSON( "autoIndexId" << false << "temp" << true ); + incColl = incCtx.ctx().db()->createCollection( _config.incLong, false, + &options, true ); + // Log the createCollection operation. + BSONObjBuilder b; + b.append( "create", nsToCollectionSubstring( _config.incLong )); + b.appendElements( options ); + string logNs = nsToDatabase( _config.incLong ) + ".$cmd"; + logOp( "c", logNs.c_str(), b.obj() ); } - BSONObj sortKey = BSON( "0" << 1 ); - _db.ensureIndex( _config.incLong , sortKey ); + BSONObj indexSpec = BSON( "key" << BSON( "0" << 1 ) << "ns" << _config.incLong + << "name" << "_temp_0" ); + Status status = incColl->getIndexCatalog()->createIndex( indexSpec, false ); + // Log the createIndex operation. + string logNs = nsToDatabase( _config.incLong ) + ".system.indexes"; + logOp( "i", logNs.c_str(), indexSpec ); + if ( !status.isOK() ) { + uasserted( 17305 , str::stream() << "createIndex failed for mr incLong ns: " << + _config.incLong << " err: " << status.code() ); + } } - // create temp collection + vector<BSONObj> indexesToInsert; + { - Client::WriteContext ctx( _config.tempNamespace.c_str() ); - string errmsg; - if ( ! userCreateNS( _config.tempNamespace.c_str() , BSON("temp" << true) , errmsg , true ) ) { - uasserted(13630, str::stream() << "userCreateNS failed for mr tempLong ns: " - << _config.tempNamespace << " err: " << errmsg ); + // copy indexes into temporary storage + Client::WriteContext finalCtx( _config.outputOptions.finalNamespace ); + Collection* finalColl = + finalCtx.ctx().db()->getCollection( _config.outputOptions.finalNamespace ); + if ( finalColl ) { + IndexCatalog::IndexIterator ii = + finalColl->getIndexCatalog()->getIndexIterator( true ); + // Iterate over finalColl's indexes. + while ( ii.more() ) { + IndexDescriptor* currIndex = ii.next(); + BSONObjBuilder b; + b.append( "ns" , _config.tempNamespace ); + + // Copy over contents of the index descriptor's infoObj. + BSONObjIterator j( currIndex->infoObj() ); + while ( j.more() ) { + BSONElement e = j.next(); + if ( str::equals( e.fieldName() , "_id" ) || + str::equals( e.fieldName() , "ns" ) ) + continue; + b.append( e ); + } + indexesToInsert.push_back( b.obj() ); + } } } { - // copy indexes - auto_ptr<DBClientCursor> idx = _db.getIndexes(_config.outputOptions.finalNamespace); - while ( idx->more() ) { - BSONObj i = idx->nextSafe(); - - BSONObjBuilder b( i.objsize() + 16 ); - b.append( "ns" , _config.tempNamespace ); - BSONObjIterator j( i ); - while ( j.more() ) { - BSONElement e = j.next(); - if ( str::equals( e.fieldName() , "_id" ) || - str::equals( e.fieldName() , "ns" ) ) - continue; - - b.append( e ); - } - - BSONObj indexToInsert = b.obj(); - NamespaceString tempNamespace(_config.tempNamespace); - insert(tempNamespace.getSystemIndexesCollection(), indexToInsert); + // create temp collection and insert the indexes from temporary storage + Client::WriteContext tempCtx( _config.tempNamespace ); + Collection* tempColl = tempCtx.ctx().db()->getCollection( _config.tempNamespace ); + if ( !tempColl ) { + const BSONObj options = BSON( "temp" << true ); + tempColl = tempCtx.ctx().db()->createCollection( _config.tempNamespace, false, + &options, true ); + // Log the createCollection operation. + BSONObjBuilder b; + b.append( "create", nsToCollectionSubstring( _config.tempNamespace )); + b.appendElements( options ); + string logNs = nsToDatabase( _config.tempNamespace ) + ".$cmd"; + logOp( "c", logNs.c_str(), b.obj() ); } + for ( vector<BSONObj>::iterator it = indexesToInsert.begin(); + it != indexesToInsert.end(); ++it ) { + tempColl->getIndexCatalog()->createIndex( *it, false ); + // Log the createIndex operation. + string logNs = nsToDatabase( _config.tempNamespace ) + ".system.indexes"; + logOp( "i", logNs.c_str(), *it ); + } } } @@ -585,8 +621,23 @@ namespace mongo { verify( _onDisk ); Client::WriteContext ctx( ns ); + Collection* coll = ctx.ctx().db()->getCollection( ns ); + if ( !coll ) + uasserted(13630, str::stream() << "attempted to insert into nonexistent" << + " collection during a mr operation." << + " collection expected: " << ns ); + + class BSONObjBuilder b; + if ( !o.hasField( "_id" ) ) { + OID id; + id.init(); + b.appendOID( "_id", NULL, true ); + } + b.appendElements(o); + BSONObj bo = b.obj(); - theDataFileMgr.insertAndLog( ns.c_str() , o , false ); + coll->insertDocument( bo, true ); + logOp( "i", ns.c_str(), bo ); } /** @@ -594,7 +645,16 @@ namespace mongo { */ void State::_insertToInc( BSONObj& o ) { verify( _onDisk ); - theDataFileMgr.insertWithObjMod( _config.incLong.c_str(), o, false, true ); + + Client::WriteContext ctx( _config.incLong ); + Collection* coll = ctx.ctx().db()->getCollection( _config.incLong ); + if ( !coll ) + uasserted(13631, str::stream() << "attempted to insert into nonexistent" + " collection during a mr operation." << + " collection expected: " << _config.incLong ); + + coll->insertDocument( o, true ); + logOp( "i", _config.incLong.c_str(), o ); getDur().commitIfNeeded(); } @@ -860,12 +920,18 @@ namespace mongo { // use index on "0" to pull sorted data verify( _temp->size() == 0 ); BSONObj sortKey = BSON( "0" << 1 ); + { - bool foundIndex = false; + Client::WriteContext incCtx( _config.incLong ); + Collection* incColl = incCtx.ctx().db()->getCollection( _config.incLong ); - auto_ptr<DBClientCursor> idx = _db.getIndexes( _config.incLong ); - while ( idx.get() && idx->more() ) { - BSONObj x = idx->nextSafe(); + bool foundIndex = false; + IndexCatalog::IndexIterator ii = + incColl->getIndexCatalog()->getIndexIterator( true ); + // Iterate over incColl's indexes. + while ( ii.more() ) { + IndexDescriptor* currIndex = ii.next(); + BSONObj x = currIndex->infoObj(); if ( sortKey.woCompare( x["key"].embeddedObject() ) == 0 ) { foundIndex = true; break; @@ -971,7 +1037,6 @@ namespace mongo { // only 1 value for this key if ( _onDisk ) { // this key has low cardinality, so just write to collection - Client::WriteContext ctx(_config.incLong.c_str()); _insertToInc( *(all.begin()) ); } else { @@ -999,7 +1064,6 @@ namespace mongo { return; Lock::DBWrite kl(_config.incLong); - Client::Context ctx(_config.incLong); for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ) { BSONList& all = i->second; |