summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/mr.cpp
diff options
context:
space:
mode:
authorAmalia Hawkins <amalia.hawkins@mongodb.com>2013-12-03 17:39:23 -0500
committerMatt Kangas <matt.kangas@mongodb.com>2013-12-13 17:40:44 -0500
commit8416afb7c5724076b1231626f27f5198a5a2cce7 (patch)
tree8e87bd2489c2240b361a3a7bf0be70add9d94ef7 /src/mongo/db/commands/mr.cpp
parentc3e4cd6c722e78b1da5caa3f72bc654d2719a764 (diff)
downloadmongo-8416afb7c5724076b1231626f27f5198a5a2cce7.tar.gz
SERVER-11611: convert mapreduce to new collection encapsulation
Signed-off-by: Matt Kangas <matt.kangas@mongodb.com>
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r--src/mongo/db/commands/mr.cpp150
1 files changed, 107 insertions, 43 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index bd0328ffd51..e28af212b0d 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -35,6 +35,7 @@
#include "mongo/client/connpool.h"
#include "mongo/client/parallel.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
#include "mongo/db/db.h"
@@ -44,6 +45,7 @@
#include "mongo/db/query/get_runner.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/repl/is_master.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/range_preserver.h"
#include "mongo/db/storage_options.h"
#include "mongo/scripting/engine.h"
@@ -335,52 +337,86 @@ namespace mongo {
dropTempCollections();
if (_useIncremental) {
- // create the inc collection and make sure we have index on "0" key
- {
- Client::WriteContext ctx( _config.incLong );
- string err;
- if ( ! userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 << "temp" << true ) , err , false ) ) {
- uasserted( 13631 , str::stream() << "userCreateNS failed for mr incLong ns: " << _config.incLong << " err: " << err );
- }
+ // Create the inc collection and make sure we have index on "0" key.
+ Client::WriteContext incCtx( _config.incLong );
+ Collection* incColl = incCtx.ctx().db()->getCollection( _config.incLong );
+ if ( !incColl ) {
+ const BSONObj options = BSON( "autoIndexId" << false << "temp" << true );
+ incColl = incCtx.ctx().db()->createCollection( _config.incLong, false,
+ &options, true );
+ // Log the createCollection operation.
+ BSONObjBuilder b;
+ b.append( "create", nsToCollectionSubstring( _config.incLong ));
+ b.appendElements( options );
+ string logNs = nsToDatabase( _config.incLong ) + ".$cmd";
+ logOp( "c", logNs.c_str(), b.obj() );
}
- BSONObj sortKey = BSON( "0" << 1 );
- _db.ensureIndex( _config.incLong , sortKey );
+ BSONObj indexSpec = BSON( "key" << BSON( "0" << 1 ) << "ns" << _config.incLong
+ << "name" << "_temp_0" );
+ Status status = incColl->getIndexCatalog()->createIndex( indexSpec, false );
+ // Log the createIndex operation.
+ string logNs = nsToDatabase( _config.incLong ) + ".system.indexes";
+ logOp( "i", logNs.c_str(), indexSpec );
+ if ( !status.isOK() ) {
+ uasserted( 17305 , str::stream() << "createIndex failed for mr incLong ns: " <<
+ _config.incLong << " err: " << status.code() );
+ }
}
- // create temp collection
+ vector<BSONObj> indexesToInsert;
+
{
- Client::WriteContext ctx( _config.tempNamespace.c_str() );
- string errmsg;
- if ( ! userCreateNS( _config.tempNamespace.c_str() , BSON("temp" << true) , errmsg , true ) ) {
- uasserted(13630, str::stream() << "userCreateNS failed for mr tempLong ns: "
- << _config.tempNamespace << " err: " << errmsg );
+ // copy indexes into temporary storage
+ Client::WriteContext finalCtx( _config.outputOptions.finalNamespace );
+ Collection* finalColl =
+ finalCtx.ctx().db()->getCollection( _config.outputOptions.finalNamespace );
+ if ( finalColl ) {
+ IndexCatalog::IndexIterator ii =
+ finalColl->getIndexCatalog()->getIndexIterator( true );
+ // Iterate over finalColl's indexes.
+ while ( ii.more() ) {
+ IndexDescriptor* currIndex = ii.next();
+ BSONObjBuilder b;
+ b.append( "ns" , _config.tempNamespace );
+
+ // Copy over contents of the index descriptor's infoObj.
+ BSONObjIterator j( currIndex->infoObj() );
+ while ( j.more() ) {
+ BSONElement e = j.next();
+ if ( str::equals( e.fieldName() , "_id" ) ||
+ str::equals( e.fieldName() , "ns" ) )
+ continue;
+ b.append( e );
+ }
+ indexesToInsert.push_back( b.obj() );
+ }
}
}
{
- // copy indexes
- auto_ptr<DBClientCursor> idx = _db.getIndexes(_config.outputOptions.finalNamespace);
- while ( idx->more() ) {
- BSONObj i = idx->nextSafe();
-
- BSONObjBuilder b( i.objsize() + 16 );
- b.append( "ns" , _config.tempNamespace );
- BSONObjIterator j( i );
- while ( j.more() ) {
- BSONElement e = j.next();
- if ( str::equals( e.fieldName() , "_id" ) ||
- str::equals( e.fieldName() , "ns" ) )
- continue;
-
- b.append( e );
- }
-
- BSONObj indexToInsert = b.obj();
- NamespaceString tempNamespace(_config.tempNamespace);
- insert(tempNamespace.getSystemIndexesCollection(), indexToInsert);
+ // create temp collection and insert the indexes from temporary storage
+ Client::WriteContext tempCtx( _config.tempNamespace );
+ Collection* tempColl = tempCtx.ctx().db()->getCollection( _config.tempNamespace );
+ if ( !tempColl ) {
+ const BSONObj options = BSON( "temp" << true );
+ tempColl = tempCtx.ctx().db()->createCollection( _config.tempNamespace, false,
+ &options, true );
+ // Log the createCollection operation.
+ BSONObjBuilder b;
+ b.append( "create", nsToCollectionSubstring( _config.tempNamespace ));
+ b.appendElements( options );
+ string logNs = nsToDatabase( _config.tempNamespace ) + ".$cmd";
+ logOp( "c", logNs.c_str(), b.obj() );
}
+ for ( vector<BSONObj>::iterator it = indexesToInsert.begin();
+ it != indexesToInsert.end(); ++it ) {
+ tempColl->getIndexCatalog()->createIndex( *it, false );
+ // Log the createIndex operation.
+ string logNs = nsToDatabase( _config.tempNamespace ) + ".system.indexes";
+ logOp( "i", logNs.c_str(), *it );
+ }
}
}
@@ -585,8 +621,23 @@ namespace mongo {
verify( _onDisk );
Client::WriteContext ctx( ns );
+ Collection* coll = ctx.ctx().db()->getCollection( ns );
+ if ( !coll )
+ uasserted(13630, str::stream() << "attempted to insert into nonexistent" <<
+ " collection during a mr operation." <<
+ " collection expected: " << ns );
+
+ class BSONObjBuilder b;
+ if ( !o.hasField( "_id" ) ) {
+ OID id;
+ id.init();
+ b.appendOID( "_id", NULL, true );
+ }
+ b.appendElements(o);
+ BSONObj bo = b.obj();
- theDataFileMgr.insertAndLog( ns.c_str() , o , false );
+ coll->insertDocument( bo, true );
+ logOp( "i", ns.c_str(), bo );
}
/**
@@ -594,7 +645,16 @@ namespace mongo {
*/
void State::_insertToInc( BSONObj& o ) {
verify( _onDisk );
- theDataFileMgr.insertWithObjMod( _config.incLong.c_str(), o, false, true );
+
+ Client::WriteContext ctx( _config.incLong );
+ Collection* coll = ctx.ctx().db()->getCollection( _config.incLong );
+ if ( !coll )
+ uasserted(13631, str::stream() << "attempted to insert into nonexistent"
+ " collection during a mr operation." <<
+ " collection expected: " << _config.incLong );
+
+ coll->insertDocument( o, true );
+ logOp( "i", _config.incLong.c_str(), o );
getDur().commitIfNeeded();
}
@@ -860,12 +920,18 @@ namespace mongo {
// use index on "0" to pull sorted data
verify( _temp->size() == 0 );
BSONObj sortKey = BSON( "0" << 1 );
+
{
- bool foundIndex = false;
+ Client::WriteContext incCtx( _config.incLong );
+ Collection* incColl = incCtx.ctx().db()->getCollection( _config.incLong );
- auto_ptr<DBClientCursor> idx = _db.getIndexes( _config.incLong );
- while ( idx.get() && idx->more() ) {
- BSONObj x = idx->nextSafe();
+ bool foundIndex = false;
+ IndexCatalog::IndexIterator ii =
+ incColl->getIndexCatalog()->getIndexIterator( true );
+ // Iterate over incColl's indexes.
+ while ( ii.more() ) {
+ IndexDescriptor* currIndex = ii.next();
+ BSONObj x = currIndex->infoObj();
if ( sortKey.woCompare( x["key"].embeddedObject() ) == 0 ) {
foundIndex = true;
break;
@@ -971,7 +1037,6 @@ namespace mongo {
// only 1 value for this key
if ( _onDisk ) {
// this key has low cardinality, so just write to collection
- Client::WriteContext ctx(_config.incLong.c_str());
_insertToInc( *(all.begin()) );
}
else {
@@ -999,7 +1064,6 @@ namespace mongo {
return;
Lock::DBWrite kl(_config.incLong);
- Client::Context ctx(_config.incLong);
for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ) {
BSONList& all = i->second;