summaryrefslogtreecommitdiff
path: root/src/mongo/db/oplog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/oplog.cpp')
-rw-r--r--src/mongo/db/oplog.cpp872
1 files changed, 872 insertions, 0 deletions
diff --git a/src/mongo/db/oplog.cpp b/src/mongo/db/oplog.cpp
new file mode 100644
index 00000000000..342f362a28f
--- /dev/null
+++ b/src/mongo/db/oplog.cpp
@@ -0,0 +1,872 @@
+// @file oplog.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "oplog.h"
+#include "repl_block.h"
+#include "repl.h"
+#include "commands.h"
+#include "repl/rs.h"
+#include "stats/counters.h"
+#include "../util/file.h"
+#include "../util/unittest.h"
+#include "queryoptimizer.h"
+#include "ops/update.h"
+#include "ops/delete.h"
+#include "ops/query.h"
+
+namespace mongo {
+
+ void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt );
+
+ int __findingStartInitialTimeout = 5; // configurable for testing
+
+ // cached copies of these...so don't rename them, drop them, etc.!!!
+ static NamespaceDetails *localOplogMainDetails = 0;
+ static Database *localDB = 0;
+ static NamespaceDetails *rsOplogDetails = 0;
+ void oplogCheckCloseDatabase( Database * db ) {
+ localDB = 0;
+ localOplogMainDetails = 0;
+ rsOplogDetails = 0;
+ resetSlaveCache();
+ }
+
+ static void _logOpUninitialized(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) {
+ uassert(13288, "replSet error write op to db before replSet initialized", str::startsWith(ns, "local.") || *opstr == 'n');
+ }
+
+ /** write an op to the oplog that is already built.
+ todo : make _logOpRS() call this so we don't repeat ourself?
+ */
+ void _logOpObjRS(const BSONObj& op) {
+ DEV assertInWriteLock();
+
+ const OpTime ts = op["ts"]._opTime();
+ long long h = op["h"].numberLong();
+
+ {
+ const char *logns = rsoplog;
+ if ( rsOplogDetails == 0 ) {
+ Client::Context ctx( logns , dbpath, false);
+ localDB = ctx.db();
+ assert( localDB );
+ rsOplogDetails = nsdetails(logns);
+ massert(13389, "local.oplog.rs missing. did you drop it? if so restart server", rsOplogDetails);
+ }
+ Client::Context ctx( logns , localDB, false );
+ {
+ int len = op.objsize();
+ Record *r = theDataFileMgr.fast_oplog_insert(rsOplogDetails, logns, len);
+ memcpy(getDur().writingPtr(r->data, len), op.objdata(), len);
+ }
+ /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy.
+ this code (or code in now() maybe) should be improved.
+ */
+ if( theReplSet ) {
+ if( !(theReplSet->lastOpTimeWritten<ts) ) {
+ log() << "replSet error possible failover clock skew issue? " << theReplSet->lastOpTimeWritten.toString() << ' ' << endl;
+ }
+ theReplSet->lastOpTimeWritten = ts;
+ theReplSet->lastH = h;
+ ctx.getClient()->setLastOp( ts );
+ }
+ }
+ }
+
+ /** given a BSON object, create a new one at dst which is the existing (partial) object
+ with a new object element appended at the end with fieldname "o".
+
+ @param partial already build object with everything except the o member. e.g. something like:
+ { ts:..., ns:..., os2:... }
+ @param o a bson object to be added with fieldname "o"
+ @dst where to put the newly built combined object. e.g. ends up as something like:
+ { ts:..., ns:..., os2:..., o:... }
+ */
+ void append_O_Obj(char *dst, const BSONObj& partial, const BSONObj& o) {
+ const int size1 = partial.objsize() - 1; // less the EOO char
+ const int oOfs = size1+3; // 3 = byte BSONOBJTYPE + byte 'o' + byte \0
+
+ void *p = getDur().writingPtr(dst, oOfs+o.objsize()+1);
+
+ memcpy(p, partial.objdata(), size1);
+
+ // adjust overall bson object size for the o: field
+ *(static_cast<unsigned*>(p)) += o.objsize() + 1/*fieldtype byte*/ + 2/*"o" fieldname*/;
+
+ char *b = static_cast<char *>(p);
+ b += size1;
+ *b++ = (char) Object;
+ *b++ = 'o'; // { o : ... }
+ *b++ = 0; // null terminate "o" fieldname
+ memcpy(b, o.objdata(), o.objsize());
+ b += o.objsize();
+ *b = EOO;
+ }
+
+ // global is safe as we are in write lock. we put the static outside the function to avoid the implicit mutex
+ // the compiler would use if inside the function. the reason this is static is to avoid a malloc/free for this
+ // on every logop call.
+ static BufBuilder logopbufbuilder(8*1024);
+ static void _logOpRS(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) {
+ DEV assertInWriteLock();
+
+ if ( strncmp(ns, "local.", 6) == 0 ) {
+ if ( strncmp(ns, "local.slaves", 12) == 0 )
+ resetSlaveCache();
+ return;
+ }
+
+ const OpTime ts = OpTime::now();
+ long long hashNew;
+ if( theReplSet ) {
+ massert(13312, "replSet error : logOp() but not primary?", theReplSet->box.getState().primary());
+ hashNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId();
+ }
+ else {
+ // must be initiation
+ assert( *ns == 0 );
+ hashNew = 0;
+ }
+
+ /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
+ instead we do a single copy to the destination position in the memory mapped file.
+ */
+
+ logopbufbuilder.reset();
+ BSONObjBuilder b(logopbufbuilder);
+ b.appendTimestamp("ts", ts.asDate());
+ b.append("h", hashNew);
+ b.append("op", opstr);
+ b.append("ns", ns);
+ if ( bb )
+ b.appendBool("b", *bb);
+ if ( o2 )
+ b.append("o2", *o2);
+ BSONObj partial = b.done();
+ int posz = partial.objsize();
+ int len = posz + obj.objsize() + 1 + 2 /*o:*/;
+
+ Record *r;
+ DEV assert( logNS == 0 );
+ {
+ const char *logns = rsoplog;
+ if ( rsOplogDetails == 0 ) {
+ Client::Context ctx( logns , dbpath, false);
+ localDB = ctx.db();
+ assert( localDB );
+ rsOplogDetails = nsdetails(logns);
+ massert(13347, "local.oplog.rs missing. did you drop it? if so restart server", rsOplogDetails);
+ }
+ Client::Context ctx( logns , localDB, false );
+ r = theDataFileMgr.fast_oplog_insert(rsOplogDetails, logns, len);
+ /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy.
+ this code (or code in now() maybe) should be improved.
+ */
+ if( theReplSet ) {
+ if( !(theReplSet->lastOpTimeWritten<ts) ) {
+ log() << "replSet ERROR possible failover clock skew issue? " << theReplSet->lastOpTimeWritten << ' ' << ts << rsLog;
+ log() << "replSet " << theReplSet->isPrimary() << rsLog;
+ }
+ theReplSet->lastOpTimeWritten = ts;
+ theReplSet->lastH = hashNew;
+ ctx.getClient()->setLastOp( ts );
+ }
+ }
+
+ append_O_Obj(r->data, partial, obj);
+
+ if ( logLevel >= 6 ) {
+ BSONObj temp(r);
+ log( 6 ) << "logOp:" << temp << endl;
+ }
+ }
+
+ /* we write to local.opload.$main:
+ { ts : ..., op: ..., ns: ..., o: ... }
+ ts: an OpTime timestamp
+ op:
+ "i" insert
+ "u" update
+ "d" delete
+ "c" db cmd
+ "db" declares presence of a database (ns is set to the db name + '.')
+ "n" no op
+ logNS - where to log it. 0/null means "local.oplog.$main".
+ bb:
+ if not null, specifies a boolean to pass along to the other side as b: param.
+ used for "justOne" or "upsert" flags on 'd', 'u'
+ first: true
+ when set, indicates this is the first thing we have logged for this database.
+ thus, the slave does not need to copy down all the data when it sees this.
+
+ note this is used for single collection logging even when --replSet is enabled.
+ */
+ static void _logOpOld(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) {
+ DEV assertInWriteLock();
+ static BufBuilder bufbuilder(8*1024);
+
+ if ( strncmp(ns, "local.", 6) == 0 ) {
+ if ( strncmp(ns, "local.slaves", 12) == 0 ) {
+ resetSlaveCache();
+ }
+ return;
+ }
+
+ const OpTime ts = OpTime::now();
+ Client::Context context("",0,false);
+
+ /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
+ instead we do a single copy to the destination position in the memory mapped file.
+ */
+
+ bufbuilder.reset();
+ BSONObjBuilder b(bufbuilder);
+ b.appendTimestamp("ts", ts.asDate());
+ b.append("op", opstr);
+ b.append("ns", ns);
+ if ( bb )
+ b.appendBool("b", *bb);
+ if ( o2 )
+ b.append("o2", *o2);
+ BSONObj partial = b.done(); // partial is everything except the o:... part.
+
+ int po_sz = partial.objsize();
+ int len = po_sz + obj.objsize() + 1 + 2 /*o:*/;
+
+ Record *r;
+ if( logNS == 0 ) {
+ logNS = "local.oplog.$main";
+ if ( localOplogMainDetails == 0 ) {
+ Client::Context ctx( logNS , dbpath, false);
+ localDB = ctx.db();
+ assert( localDB );
+ localOplogMainDetails = nsdetails(logNS);
+ assert( localOplogMainDetails );
+ }
+ Client::Context ctx( logNS , localDB, false );
+ r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, logNS, len);
+ }
+ else {
+ Client::Context ctx( logNS, dbpath, false );
+ assert( nsdetails( logNS ) );
+ // first we allocate the space, then we fill it below.
+ r = theDataFileMgr.fast_oplog_insert( nsdetails( logNS ), logNS, len);
+ }
+
+ append_O_Obj(r->data, partial, obj);
+
+ context.getClient()->setLastOp( ts );
+
+ if ( logLevel >= 6 ) {
+ BSONObj temp(r);
+ log( 6 ) << "logging op:" << temp << endl;
+ }
+
+ }
+
+ static void (*_logOp)(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) = _logOpOld;
+ void newReplUp() {
+ replSettings.master = true;
+ _logOp = _logOpRS;
+ }
+ void newRepl() {
+ replSettings.master = true;
+ _logOp = _logOpUninitialized;
+ }
+ void oldRepl() { _logOp = _logOpOld; }
+
+ void logKeepalive() {
+ _logOp("n", "", 0, BSONObj(), 0, 0);
+ }
+ void logOpComment(const BSONObj& obj) {
+ _logOp("n", "", 0, obj, 0, 0);
+ }
+ void logOpInitiate(const BSONObj& obj) {
+ _logOpRS("n", "", 0, obj, 0, 0);
+ }
+
+ /*@ @param opstr:
+ c userCreateNS
+ i insert
+ n no-op / keepalive
+ d delete / remove
+ u update
+ */
+ void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt, bool *b) {
+ if ( replSettings.master ) {
+ _logOp(opstr, ns, 0, obj, patt, b);
+ }
+
+ logOpForSharding( opstr , ns , obj , patt );
+ }
+
+ void createOplog() {
+ dblock lk;
+
+ const char * ns = "local.oplog.$main";
+
+ bool rs = !cmdLine._replSet.empty();
+ if( rs )
+ ns = rsoplog;
+
+ Client::Context ctx(ns);
+
+ NamespaceDetails * nsd = nsdetails( ns );
+
+ if ( nsd ) {
+
+ if ( cmdLine.oplogSize != 0 ) {
+ int o = (int)(nsd->storageSize() / ( 1024 * 1024 ) );
+ int n = (int)(cmdLine.oplogSize / ( 1024 * 1024 ) );
+ if ( n != o ) {
+ stringstream ss;
+ ss << "cmdline oplogsize (" << n << ") different than existing (" << o << ") see: http://dochub.mongodb.org/core/increase-oplog";
+ log() << ss.str() << endl;
+ throw UserException( 13257 , ss.str() );
+ }
+ }
+
+ if( rs ) return;
+
+ DBDirectClient c;
+ BSONObj lastOp = c.findOne( ns, Query().sort(reverseNaturalObj) );
+ if ( !lastOp.isEmpty() ) {
+ OpTime::setLast( lastOp[ "ts" ].date() );
+ }
+ return;
+ }
+
+ /* create an oplog collection, if it doesn't yet exist. */
+ BSONObjBuilder b;
+ double sz;
+ if ( cmdLine.oplogSize != 0 )
+ sz = (double)cmdLine.oplogSize;
+ else {
+ /* not specified. pick a default size */
+ sz = 50.0 * 1000 * 1000;
+ if ( sizeof(int *) >= 8 ) {
+#if defined(__APPLE__)
+ // typically these are desktops (dev machines), so keep it smallish
+ sz = (256-64) * 1000 * 1000;
+#else
+ sz = 990.0 * 1000 * 1000;
+ boost::intmax_t free = File::freeSpace(dbpath); //-1 if call not supported.
+ double fivePct = free * 0.05;
+ if ( fivePct > sz )
+ sz = fivePct;
+#endif
+ }
+ }
+
+ log() << "******" << endl;
+ log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB..." << endl;
+
+ b.append("size", sz);
+ b.appendBool("capped", 1);
+ b.appendBool("autoIndexId", false);
+
+ string err;
+ BSONObj o = b.done();
+ userCreateNS(ns, o, err, false);
+ if( !rs )
+ logOp( "n", "", BSONObj() );
+
+ /* sync here so we don't get any surprising lag later when we try to sync */
+ MemoryMappedFile::flushAll(true);
+ log() << "******" << endl;
+ }
+
+ // -------------------------------------
+
+ FindingStartCursor::FindingStartCursor( const QueryPlan & qp ) :
+ _qp( qp ),
+ _findingStart( true ),
+ _findingStartMode()
+ { init(); }
+
+ void FindingStartCursor::next() {
+ if ( !_findingStartCursor || !_findingStartCursor->ok() ) {
+ _findingStart = false;
+ _c = _qp.newCursor(); // on error, start from beginning
+ destroyClientCursor();
+ return;
+ }
+ switch( _findingStartMode ) {
+ // Initial mode: scan backwards from end of collection
+ case Initial: {
+ if ( !_matcher->matchesCurrent( _findingStartCursor->c() ) ) {
+ _findingStart = false; // found first record out of query range, so scan normally
+ _c = _qp.newCursor( _findingStartCursor->currLoc() );
+ destroyClientCursor();
+ return;
+ }
+ _findingStartCursor->advance();
+ RARELY {
+ if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) {
+ // If we've scanned enough, switch to find extent mode.
+ createClientCursor( extentFirstLoc( _findingStartCursor->currLoc() ) );
+ _findingStartMode = FindExtent;
+ return;
+ }
+ }
+ return;
+ }
+ // FindExtent mode: moving backwards through extents, check first
+ // document of each extent.
+ case FindExtent: {
+ if ( !_matcher->matchesCurrent( _findingStartCursor->c() ) ) {
+ _findingStartMode = InExtent;
+ return;
+ }
+ DiskLoc prev = prevExtentFirstLoc( _findingStartCursor->currLoc() );
+ if ( prev.isNull() ) { // hit beginning, so start scanning from here
+ createClientCursor();
+ _findingStartMode = InExtent;
+ return;
+ }
+ // There might be a more efficient implementation than creating new cursor & client cursor each time,
+ // not worrying about that for now
+ createClientCursor( prev );
+ return;
+ }
+ // InExtent mode: once an extent is chosen, find starting doc in the extent.
+ case InExtent: {
+ if ( _matcher->matchesCurrent( _findingStartCursor->c() ) ) {
+ _findingStart = false; // found first record in query range, so scan normally
+ _c = _qp.newCursor( _findingStartCursor->currLoc() );
+ destroyClientCursor();
+ return;
+ }
+ _findingStartCursor->advance();
+ return;
+ }
+ default: {
+ massert( 14038, "invalid _findingStartMode", false );
+ }
+ }
+ }
+
+ DiskLoc FindingStartCursor::extentFirstLoc( const DiskLoc &rec ) {
+ Extent *e = rec.rec()->myExtent( rec );
+ if ( !_qp.nsd()->capLooped() || ( e->myLoc != _qp.nsd()->capExtent ) )
+ return e->firstRecord;
+ // Likely we are on the fresh side of capExtent, so return first fresh record.
+ // If we are on the stale side of capExtent, then the collection is small and it
+ // doesn't matter if we start the extent scan with capFirstNewRecord.
+ return _qp.nsd()->capFirstNewRecord;
+ }
+
+ void wassertExtentNonempty( const Extent *e ) {
+ // TODO ensure this requirement is clearly enforced, or fix.
+ wassert( !e->firstRecord.isNull() );
+ }
+
+ DiskLoc FindingStartCursor::prevExtentFirstLoc( const DiskLoc &rec ) {
+ Extent *e = rec.rec()->myExtent( rec );
+ if ( _qp.nsd()->capLooped() ) {
+ if ( e->xprev.isNull() ) {
+ e = _qp.nsd()->lastExtent.ext();
+ }
+ else {
+ e = e->xprev.ext();
+ }
+ if ( e->myLoc != _qp.nsd()->capExtent ) {
+ wassertExtentNonempty( e );
+ return e->firstRecord;
+ }
+ }
+ else {
+ if ( !e->xprev.isNull() ) {
+ e = e->xprev.ext();
+ wassertExtentNonempty( e );
+ return e->firstRecord;
+ }
+ }
+ return DiskLoc(); // reached beginning of collection
+ }
+
+ void FindingStartCursor::createClientCursor( const DiskLoc &startLoc ) {
+ shared_ptr<Cursor> c = _qp.newCursor( startLoc );
+ _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns()) );
+ }
+
+ bool FindingStartCursor::firstDocMatchesOrEmpty() const {
+ shared_ptr<Cursor> c = _qp.newCursor();
+ return !c->ok() || _matcher->matchesCurrent( c.get() );
+ }
+
+ void FindingStartCursor::init() {
+ BSONElement tsElt = _qp.originalQuery()[ "ts" ];
+ massert( 13044, "no ts field in query", !tsElt.eoo() );
+ BSONObjBuilder b;
+ b.append( tsElt );
+ BSONObj tsQuery = b.obj();
+ _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()));
+ if ( firstDocMatchesOrEmpty() ) {
+ _c = _qp.newCursor();
+ _findingStart = false;
+ return;
+ }
+ // Use a ClientCursor here so we can release db mutex while scanning
+ // oplog (can take quite a while with large oplogs).
+ shared_ptr<Cursor> c = _qp.newReverseCursor();
+ _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns(), BSONObj()) );
+ _findingStartTimer.reset();
+ _findingStartMode = Initial;
+ }
+
+ // -------------------------------------
+
+ struct TestOpTime : public UnitTest {
+ void run() {
+ OpTime t;
+ for ( int i = 0; i < 10; i++ ) {
+ OpTime s = OpTime::now_inlock();
+ assert( s != t );
+ t = s;
+ }
+ OpTime q = t;
+ assert( q == t );
+ assert( !(q != t) );
+ }
+ } testoptime;
+
+ int _dummy_z;
+
+ void pretouchN(vector<BSONObj>& v, unsigned a, unsigned b) {
+ DEV assert( !d.dbMutex.isWriteLocked() );
+
+ Client *c = currentClient.get();
+ if( c == 0 ) {
+ Client::initThread("pretouchN");
+ c = &cc();
+ }
+
+ readlock lk("");
+ for( unsigned i = a; i <= b; i++ ) {
+ const BSONObj& op = v[i];
+ const char *which = "o";
+ const char *opType = op.getStringField("op");
+ if ( *opType == 'i' )
+ ;
+ else if( *opType == 'u' )
+ which = "o2";
+ else
+ continue;
+ /* todo : other operations */
+
+ try {
+ BSONObj o = op.getObjectField(which);
+ BSONElement _id;
+ if( o.getObjectID(_id) ) {
+ const char *ns = op.getStringField("ns");
+ BSONObjBuilder b;
+ b.append(_id);
+ BSONObj result;
+ Client::Context ctx( ns );
+ if( Helpers::findById(cc(), ns, b.done(), result) )
+ _dummy_z += result.objsize(); // touch
+ }
+ }
+ catch( DBException& e ) {
+ log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' ' << e.toString() << endl;
+ }
+ }
+ }
+
+ void pretouchOperation(const BSONObj& op) {
+
+ if( d.dbMutex.isWriteLocked() )
+ return; // no point pretouching if write locked. not sure if this will ever fire, but just in case.
+
+ const char *which = "o";
+ const char *opType = op.getStringField("op");
+ if ( *opType == 'i' )
+ ;
+ else if( *opType == 'u' )
+ which = "o2";
+ else
+ return;
+ /* todo : other operations */
+
+ try {
+ BSONObj o = op.getObjectField(which);
+ BSONElement _id;
+ if( o.getObjectID(_id) ) {
+ const char *ns = op.getStringField("ns");
+ BSONObjBuilder b;
+ b.append(_id);
+ BSONObj result;
+ readlock lk(ns);
+ Client::Context ctx( ns );
+ if( Helpers::findById(cc(), ns, b.done(), result) )
+ _dummy_z += result.objsize(); // touch
+ }
+ }
+ catch( DBException& ) {
+ log() << "ignoring assertion in pretouchOperation()" << endl;
+ }
+ }
+
+ BSONObj Sync::getMissingDoc(const BSONObj& o) {
+ OplogReader missingObjReader;
+
+ uassert(15916, str::stream() << "Can no longer connect to initial sync source: " << hn, missingObjReader.connect(hn));
+
+ const char *ns = o.getStringField("ns");
+ // might be more than just _id in the update criteria
+ BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")["_id"]).obj();
+ BSONObj missingObj;
+ try {
+ missingObj = missingObjReader.findOne(ns, query);
+ } catch(DBException& e) {
+ log() << "replication assertion fetching missing object: " << e.what() << endl;
+ throw;
+ }
+
+ return missingObj;
+ }
+
+ bool Sync::shouldRetry(const BSONObj& o) {
+ // we don't have the object yet, which is possible on initial sync. get it.
+ log() << "replication info adding missing object" << endl; // rare enough we can log
+
+ BSONObj missingObj = getMissingDoc(o);
+
+ if( missingObj.isEmpty() ) {
+ log() << "replication missing object not found on source. presumably deleted later in oplog" << endl;
+ log() << "replication o2: " << o.getObjectField("o2").toString() << endl;
+ log() << "replication o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl;
+
+ return false;
+ }
+ else {
+ const char *ns = o.getStringField("ns");
+ Client::Context ctx(ns);
+ DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize());
+ uassert(15917, "Got bad disk location when attempting to insert", !d.isNull());
+
+ return true;
+ }
+ }
+
+ /** @param fromRepl false if from ApplyOpsCmd
+ @return true if was and update should have happened and the document DNE. see replset initial sync code.
+ */
+ bool applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
+ assertInWriteLock();
+ LOG(6) << "applying op: " << op << endl;
+ bool failedUpdate = false;
+
+ OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters;
+
+ const char *names[] = { "o", "ns", "op", "b" };
+ BSONElement fields[4];
+ op.getFields(4, names, fields);
+
+ BSONObj o;
+ if( fields[0].isABSONObj() )
+ o = fields[0].embeddedObject();
+
+ const char *ns = fields[1].valuestrsafe();
+
+ // operation type -- see logOp() comments for types
+ const char *opType = fields[2].valuestrsafe();
+
+ if ( *opType == 'i' ) {
+ opCounters->gotInsert();
+
+ const char *p = strchr(ns, '.');
+ if ( p && strcmp(p, ".system.indexes") == 0 ) {
+ // updates aren't allowed for indexes -- so we will do a regular insert. if index already
+ // exists, that is ok.
+ theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
+ }
+ else {
+ // do upserts for inserts as we might get replayed more than once
+ OpDebug debug;
+ BSONElement _id;
+ if( !o.getObjectID(_id) ) {
+ /* No _id. This will be very slow. */
+ Timer t;
+ updateObjects(ns, o, o, true, false, false, debug );
+ if( t.millis() >= 2 ) {
+ RARELY OCCASIONALLY log() << "warning, repl doing slow updates (no _id field) for " << ns << endl;
+ }
+ }
+ else {
+ /* erh 10/16/2009 - this is probably not relevant any more since its auto-created, but not worth removing */
+ RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow
+
+ /* todo : it may be better to do an insert here, and then catch the dup key exception and do update
+ then. very few upserts will not be inserts...
+ */
+ BSONObjBuilder b;
+ b.append(_id);
+ updateObjects(ns, o, b.done(), true, false, false , debug );
+ }
+ }
+ }
+ else if ( *opType == 'u' ) {
+ opCounters->gotUpdate();
+ // dm do we create this for a capped collection?
+ // - if not, updates would be slow
+ // - but if were by id would be slow on primary too so maybe ok
+ // - if on primary was by another key and there are other indexes, this could be very bad w/out an index
+ // - if do create, odd to have on secondary but not primary. also can cause secondary to block for
+ // quite a while on creation.
+ RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
+ OpDebug debug;
+ BSONObj updateCriteria = op.getObjectField("o2");
+ bool upsert = fields[3].booleanSafe();
+ UpdateResult ur = updateObjects(ns, o, updateCriteria, upsert, /*multi*/ false, /*logop*/ false , debug );
+ if( ur.num == 0 ) {
+ if( ur.mod ) {
+ if( updateCriteria.nFields() == 1 ) {
+ // was a simple { _id : ... } update criteria
+ failedUpdate = true;
+ // todo: probably should assert in these failedUpdate cases if not in initialSync
+ }
+ // need to check to see if it isn't present so we can set failedUpdate correctly.
+ // note that adds some overhead for this extra check in some cases, such as an updateCriteria
+ // of the form
+ // { _id:..., { x : {$size:...} }
+ // thus this is not ideal.
+ else {
+ NamespaceDetails *nsd = nsdetails(ns);
+
+ if (nsd == NULL ||
+ (nsd->findIdIndex() >= 0 && Helpers::findById(nsd, updateCriteria).isNull()) ||
+ // capped collections won't have an _id index
+ (nsd->findIdIndex() < 0 && Helpers::findOne(ns, updateCriteria, false).isNull())) {
+ failedUpdate = true;
+ }
+
+ // Otherwise, it's present; zero objects were updated because of additional specifiers
+ // in the query for idempotence
+ }
+ }
+ else {
+ // this could happen benignly on an oplog duplicate replay of an upsert
+ // (because we are idempotent),
+ // if an regular non-mod update fails the item is (presumably) missing.
+ if( !upsert ) {
+ failedUpdate = true;
+ }
+ }
+ }
+ }
+ else if ( *opType == 'd' ) {
+ opCounters->gotDelete();
+ if ( opType[1] == 0 )
+ deleteObjects(ns, o, /*justOne*/ fields[3].booleanSafe());
+ else
+ assert( opType[1] == 'b' ); // "db" advertisement
+ }
+ else if ( *opType == 'c' ) {
+ opCounters->gotCommand();
+ BufBuilder bb;
+ BSONObjBuilder ob;
+ _runCommands(ns, o, bb, ob, true, 0);
+ }
+ else if ( *opType == 'n' ) {
+ // no op
+ }
+ else {
+ throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) );
+ }
+ return failedUpdate;
+ }
+
+ class ApplyOpsCmd : public Command {
+ public:
+ virtual bool slaveOk() const { return false; }
+ virtual LockType locktype() const { return WRITE; }
+ ApplyOpsCmd() : Command( "applyOps" ) {}
+ virtual void help( stringstream &help ) const {
+ help << "internal (sharding)\n{ applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }";
+ }
+ virtual bool run(const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+
+ if ( cmdObj.firstElement().type() != Array ) {
+ errmsg = "ops has to be an array";
+ return false;
+ }
+
+ BSONObj ops = cmdObj.firstElement().Obj();
+
+ {
+ // check input
+ BSONObjIterator i( ops );
+ while ( i.more() ) {
+ BSONElement e = i.next();
+ if ( e.type() == Object )
+ continue;
+ errmsg = "op not an object: ";
+ errmsg += e.fieldName();
+ return false;
+ }
+ }
+
+ if ( cmdObj["preCondition"].type() == Array ) {
+ BSONObjIterator i( cmdObj["preCondition"].Obj() );
+ while ( i.more() ) {
+ BSONObj f = i.next().Obj();
+
+ BSONObj realres = db.findOne( f["ns"].String() , f["q"].Obj() );
+
+ Matcher m( f["res"].Obj() );
+ if ( ! m.matches( realres ) ) {
+ result.append( "got" , realres );
+ result.append( "whatFailed" , f );
+ errmsg = "pre-condition failed";
+ return false;
+ }
+ }
+ }
+
+ // apply
+ int num = 0;
+ BSONObjIterator i( ops );
+ while ( i.more() ) {
+ BSONElement e = i.next();
+ // todo SERVER-4259 ?
+ applyOperation_inlock( e.Obj() , false );
+ num++;
+ }
+
+ result.append( "applied" , num );
+
+ if ( ! fromRepl ) {
+ // We want this applied atomically on slaves
+ // so we re-wrap without the pre-condition for speed
+
+ string tempNS = str::stream() << dbname << ".$cmd";
+
+ logOp( "c" , tempNS.c_str() , cmdObj.firstElement().wrap() );
+ }
+
+ return true;
+ }
+
+ DBDirectClient db;
+
+ } applyOpsCmd;
+
+}