// @file oplog.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/pch.h" #include "mongo/db/repl/oplog.h" #include #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/background.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/commands.h" #include "mongo/db/commands/dbhash.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/global_optime.h" #include "mongo/db/index_builder.h" #include "mongo/db/instance.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/delete.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/rs.h" #include "mongo/db/repl/write_concern.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/mmap_v1/dur_transaction.h" #include "mongo/db/storage_options.h" #include "mongo/db/catalog/collection.h" #include "mongo/s/d_logic.h" #include "mongo/scripting/engine.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/file.h" #include "mongo/util/startup_test.h" namespace mongo { // cached copies of these...so don't rename them, drop them, etc.!!! static Database* localDB = NULL; static Collection* localOplogMainCollection = 0; static Collection* localOplogRSCollection = 0; // Synchronizes the section where a new OpTime is generated and when it actually // appears in the oplog. static mongo::mutex newOpMutex("oplogNewOp"); static boost::condition newOptimeNotifier; static void setNewOptime(const OpTime& newTime) { mutex::scoped_lock lk(newOpMutex); setGlobalOptime(newTime); newOptimeNotifier.notify_all(); } void oplogCheckCloseDatabase( Database* db ) { verify( Lock::isW() ); localDB = NULL; localOplogMainCollection = NULL; localOplogRSCollection = NULL; resetSlaveCache(); } // so we can fail the same way void checkOplogInsert( StatusWith result ) { massert( 17322, str::stream() << "write to oplog failed: " << result.getStatus().toString(), result.isOK() ); } static void _logOpUninitialized(TransactionExperiment* txn, const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) { uassert(13288, "replSet error write op to db before replSet initialized", str::startsWith(ns, "local.") || *opstr == 'n'); } /** write an op to the oplog that is already built. todo : make _logOpRS() call this so we don't repeat ourself? */ void _logOpObjRS(const BSONObj& op) { Lock::DBWrite lk("local"); DurTransaction txn; const OpTime ts = op["ts"]._opTime(); long long h = op["h"].numberLong(); { if ( localOplogRSCollection == 0 ) { Client::Context ctx(rsoplog, storageGlobalParams.dbpath); localDB = ctx.db(); verify( localDB ); localOplogRSCollection = localDB->getCollection( &txn, rsoplog ); massert(13389, "local.oplog.rs missing. did you drop it? if so restart server", localOplogRSCollection); } Client::Context ctx(rsoplog, localDB); checkOplogInsert( localOplogRSCollection->insertDocument( &txn, op, false ) ); /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy. this code (or code in now() maybe) should be improved. */ if( theReplSet ) { if( !(theReplSet->lastOpTimeWrittenlastOpTimeWritten << " newest timestamp: " << ts << ". attempting to sync directly from primary." << endl; std::string errmsg; BSONObjBuilder result; if (!theReplSet->forceSyncFrom(theReplSet->box.getPrimary()->fullName(), errmsg, result)) { log() << "Can't sync from primary: " << errmsg << endl; } } theReplSet->lastOpTimeWritten = ts; theReplSet->lastH = h; ctx.getClient()->setLastOp( ts ); replset::BackgroundSync::notify(); } } setNewOptime(ts); } /** * This allows us to stream the oplog entry directly into data region * main goal is to avoid copying the o portion * which can be very large * TODO: can have this build the entire doc */ class OplogDocWriter : public DocWriter { public: OplogDocWriter( const BSONObj& frame, const BSONObj& oField ) : _frame( frame ), _oField( oField ) { } ~OplogDocWriter(){} void writeDocument( char* start ) const { char* buf = start; memcpy( buf, _frame.objdata(), _frame.objsize() - 1 ); // don't copy final EOO reinterpret_cast( buf )[0] = documentSize(); buf += ( _frame.objsize() - 1 ); buf[0] = (char)Object; buf[1] = 'o'; buf[2] = 0; memcpy( buf+3, _oField.objdata(), _oField.objsize() ); buf += 3 + _oField.objsize(); buf[0] = EOO; verify( static_cast( ( buf + 1 ) - start ) == documentSize() ); // DEV? } size_t documentSize() const { return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */; } private: BSONObj _frame; BSONObj _oField; }; /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp h: hash v: version op: "i" insert "u" update "d" delete "c" db cmd "db" declares presence of a database (ns is set to the db name + '.') "n" no op bb param: if not null, specifies a boolean to pass along to the other side as b: param. used for "justOne" or "upsert" flags on 'd', 'u' */ // global is safe as we are in write lock. we put the static outside the function to avoid the implicit mutex // the compiler would use if inside the function. the reason this is static is to avoid a malloc/free for this // on every logop call. static BufBuilder logopbufbuilder(8*1024); static const int OPLOG_VERSION = 2; static void _logOpRS(TransactionExperiment* txn, const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) { Lock::DBWrite lk1("local"); if ( strncmp(ns, "local.", 6) == 0 ) { if ( strncmp(ns, "local.slaves", 12) == 0 ) resetSlaveCache(); return; } mutex::scoped_lock lk2(newOpMutex); OpTime ts(getNextGlobalOptime()); newOptimeNotifier.notify_all(); long long hashNew; if( theReplSet ) { if (!theReplSet->box.getState().primary()) { log() << "replSet error : logOp() but not primary"; fassertFailed(17405); } hashNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId(); } else { // must be initiation verify( *ns == 0 ); hashNew = 0; } /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- instead we do a single copy to the destination position in the memory mapped file. */ logopbufbuilder.reset(); BSONObjBuilder b(logopbufbuilder); b.appendTimestamp("ts", ts.asDate()); b.append("h", hashNew); b.append("v", OPLOG_VERSION); b.append("op", opstr); b.append("ns", ns); if (fromMigrate) b.appendBool("fromMigrate", true); if ( bb ) b.appendBool("b", *bb); if ( o2 ) b.append("o2", *o2); BSONObj partial = b.done(); DEV verify( logNS == 0 ); // check this was never a master/slave master if ( localOplogRSCollection == 0 ) { Client::Context ctx(rsoplog, storageGlobalParams.dbpath); localDB = ctx.db(); verify( localDB ); localOplogRSCollection = localDB->getCollection( txn, rsoplog ); massert(13347, "local.oplog.rs missing. did you drop it? if so restart server", localOplogRSCollection); } Client::Context ctx(rsoplog, localDB); OplogDocWriter writer( partial, obj ); checkOplogInsert( localOplogRSCollection->insertDocument( txn, &writer, false ) ); /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy. this code (or code in now() maybe) should be improved. */ if( theReplSet ) { if( !(theReplSet->lastOpTimeWrittenlastOpTimeWritten << " newest timestamp: " << ts << ". attempting to sync directly from primary." << endl; std::string errmsg; BSONObjBuilder result; if (!theReplSet->forceSyncFrom(theReplSet->box.getPrimary()->fullName(), errmsg, result)) { log() << "Can't sync from primary: " << errmsg << endl; } } theReplSet->lastOpTimeWritten = ts; theReplSet->lastH = hashNew; ctx.getClient()->setLastOp( ts ); } } static void _logOpOld(TransactionExperiment* txn, const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) { Lock::DBWrite lk("local"); static BufBuilder bufbuilder(8*1024); // todo there is likely a mutex on this constructor if ( strncmp(ns, "local.", 6) == 0 ) { if ( strncmp(ns, "local.slaves", 12) == 0 ) { resetSlaveCache(); } return; } mutex::scoped_lock lk2(newOpMutex); OpTime ts(getNextGlobalOptime()); newOptimeNotifier.notify_all(); Client::Context context("", 0); /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- instead we do a single copy to the destination position in the memory mapped file. */ bufbuilder.reset(); BSONObjBuilder b(bufbuilder); b.appendTimestamp("ts", ts.asDate()); b.append("op", opstr); b.append("ns", ns); if (fromMigrate) b.appendBool("fromMigrate", true); if ( bb ) b.appendBool("b", *bb); if ( o2 ) b.append("o2", *o2); BSONObj partial = b.done(); // partial is everything except the o:... part. if( logNS == 0 ) { logNS = "local.oplog.$main"; } if ( localOplogMainCollection == 0 ) { Client::Context ctx(logNS, storageGlobalParams.dbpath); localDB = ctx.db(); verify( localDB ); localOplogMainCollection = localDB->getCollection(txn, logNS); verify( localOplogMainCollection ); } Client::Context ctx(logNS , localDB); OplogDocWriter writer( partial, obj ); checkOplogInsert( localOplogMainCollection->insertDocument( txn, &writer, false ) ); context.getClient()->setLastOp( ts ); } static void (*_logOp)(TransactionExperiment* txn, const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) = _logOpOld; void newReplUp() { replSettings.master = true; _logOp = _logOpRS; } void newRepl() { replSettings.master = true; _logOp = _logOpUninitialized; } void oldRepl() { _logOp = _logOpOld; } void logKeepalive() { DurTransaction txn; _logOp(&txn, "n", "", 0, BSONObj(), 0, 0, false); } void logOpComment(const BSONObj& obj) { DurTransaction txn; _logOp(&txn, "n", "", 0, obj, 0, 0, false); } void logOpInitiate(TransactionExperiment* txn, const BSONObj& obj) { _logOpRS(txn, "n", "", 0, obj, 0, 0, false); } /*@ @param opstr: c userCreateNS i insert n no-op / keepalive d delete / remove u update */ void logOp(TransactionExperiment* txn, const char* opstr, const char* ns, const BSONObj& obj, BSONObj* patt, bool* b, bool fromMigrate, const BSONObj* fullObj) { if ( replSettings.master ) { _logOp(txn, opstr, ns, 0, obj, patt, b, fromMigrate); } logOpForSharding(opstr, ns, obj, patt, fullObj, fromMigrate); logOpForDbHash(opstr, ns, obj, patt, fullObj, fromMigrate); getGlobalAuthorizationManager()->logOp(opstr, ns, obj, patt, b); if ( strstr( ns, ".system.js" ) ) { Scope::storedFuncMod(); // this is terrible } } void createOplog() { Lock::GlobalWrite lk; const char * ns = "local.oplog.$main"; bool rs = !replSettings.replSet.empty(); if( rs ) ns = rsoplog; Client::Context ctx(ns); DurTransaction txn; Collection* collection = ctx.db()->getCollection( &txn, ns ); if ( collection ) { if (replSettings.oplogSize != 0) { int o = (int)(collection->storageSize() / ( 1024 * 1024 ) ); int n = (int)(replSettings.oplogSize / (1024 * 1024)); if ( n != o ) { stringstream ss; ss << "cmdline oplogsize (" << n << ") different than existing (" << o << ") see: http://dochub.mongodb.org/core/increase-oplog"; log() << ss.str() << endl; throw UserException( 13257 , ss.str() ); } } if( rs ) return; initOpTimeFromOplog(ns); return; } /* create an oplog collection, if it doesn't yet exist. */ long long sz = 0; if ( replSettings.oplogSize != 0 ) { sz = replSettings.oplogSize; } else { /* not specified. pick a default size */ sz = 50LL * 1024LL * 1024LL; if ( sizeof(int *) >= 8 ) { #if defined(__APPLE__) // typically these are desktops (dev machines), so keep it smallish sz = (256-64) * 1024 * 1024; #else sz = 990LL * 1024 * 1024; double free = File::freeSpace(storageGlobalParams.dbpath); //-1 if call not supported. long long fivePct = static_cast( free * 0.05 ); if ( fivePct > sz ) sz = fivePct; // we use 5% of free space up to 50GB (1TB free) static long long upperBound = 50LL * 1024 * 1024 * 1024; if (fivePct > upperBound) sz = upperBound; #endif } } log() << "******" << endl; log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB..." << endl; CollectionOptions options; options.capped = true; options.cappedSize = sz; options.autoIndexId = CollectionOptions::NO; invariant( ctx.db()->createCollection( &txn, ns, options ) ); if( !rs ) logOp( &txn, "n", "", BSONObj() ); /* sync here so we don't get any surprising lag later when we try to sync */ MemoryMappedFile::flushAll(true); log() << "******" << endl; } // ------------------------------------- /** @param fromRepl false if from ApplyOpsCmd @return true if was and update should have happened and the document DNE. see replset initial sync code. */ bool applyOperation_inlock(TransactionExperiment* txn, Database* db, const BSONObj& op, bool fromRepl, bool convertUpdateToUpsert) { LOG(3) << "applying op: " << op << endl; bool failedUpdate = false; OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters; const char *names[] = { "o", "ns", "op", "b", "o2" }; BSONElement fields[5]; op.getFields(5, names, fields); BSONElement& fieldO = fields[0]; BSONElement& fieldNs = fields[1]; BSONElement& fieldOp = fields[2]; BSONElement& fieldB = fields[3]; BSONElement& fieldO2 = fields[4]; BSONObj o; if( fieldO.isABSONObj() ) o = fieldO.embeddedObject(); const char *ns = fieldNs.valuestrsafe(); BSONObj o2; if (fieldO2.isABSONObj()) o2 = fieldO2.Obj(); bool valueB = fieldB.booleanSafe(); Lock::assertWriteLocked(ns); Collection* collection = db->getCollection( txn, ns ); IndexCatalog* indexCatalog = collection == NULL ? NULL : collection->getIndexCatalog(); // operation type -- see logOp() comments for types const char *opType = fieldOp.valuestrsafe(); if ( *opType == 'i' ) { opCounters->gotInsert(); const char *p = strchr(ns, '.'); if ( p && nsToCollectionSubstring( p ) == "system.indexes" ) { if (o["background"].trueValue()) { IndexBuilder* builder = new IndexBuilder(o); // This spawns a new thread and returns immediately. builder->go(); } else { IndexBuilder builder(o); Status status = builder.build(txn, db); if ( status.isOK() ) { // yay } else if ( status.code() == ErrorCodes::IndexOptionsConflict || status.code() == ErrorCodes::IndexKeySpecsConflict ) { // SERVER-13206, SERVER-13496 // 2.4 (and earlier) will add an ensureIndex to an oplog if its ok or not // so in 2.6+ where we do stricter validation, it will fail // but we shouldn't care as the primary is responsible warning() << "index creation attempted on secondary that conflicts, " << "skipping: " << status; } else { uassertStatusOK( status ); } } } else { // do upserts for inserts as we might get replayed more than once OpDebug debug; BSONElement _id; if( !o.getObjectID(_id) ) { /* No _id. This will be very slow. */ Timer t; const NamespaceString requestNs(ns); UpdateRequest request(requestNs); request.setQuery(o); request.setUpdates(o); request.setUpsert(); request.setFromReplication(); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); update(txn, db, request, &debug); if( t.millis() >= 2 ) { RARELY OCCASIONALLY log() << "warning, repl doing slow updates (no _id field) for " << ns << endl; } } else { // probably don't need this since all replicated colls have _id indexes now // but keep it just in case RARELY if ( indexCatalog && !collection->isCapped() ) { indexCatalog->ensureHaveIdIndex(txn); } /* todo : it may be better to do an insert here, and then catch the dup key exception and do update then. very few upserts will not be inserts... */ BSONObjBuilder b; b.append(_id); const NamespaceString requestNs(ns); UpdateRequest request(requestNs); request.setQuery(b.done()); request.setUpdates(o); request.setUpsert(); request.setFromReplication(); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); update(txn, db, request, &debug); } } } else if ( *opType == 'u' ) { opCounters->gotUpdate(); // probably don't need this since all replicated colls have _id indexes now // but keep it just in case RARELY if ( indexCatalog && !collection->isCapped() ) { indexCatalog->ensureHaveIdIndex(txn); } OpDebug debug; BSONObj updateCriteria = o2; const bool upsert = valueB || convertUpdateToUpsert; const NamespaceString requestNs(ns); UpdateRequest request(requestNs); request.setQuery(updateCriteria); request.setUpdates(o); request.setUpsert(upsert); request.setFromReplication(); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); UpdateResult ur = update(txn, db, request, &debug); if( ur.numMatched == 0 ) { if( ur.modifiers ) { if( updateCriteria.nFields() == 1 ) { // was a simple { _id : ... } update criteria failedUpdate = true; log() << "replication failed to apply update: " << op.toString() << endl; } // need to check to see if it isn't present so we can set failedUpdate correctly. // note that adds some overhead for this extra check in some cases, such as an updateCriteria // of the form // { _id:..., { x : {$size:...} } // thus this is not ideal. else { if (collection == NULL || (indexCatalog->haveIdIndex() && Helpers::findById(collection, updateCriteria).isNull()) || // capped collections won't have an _id index (!indexCatalog->haveIdIndex() && Helpers::findOne(collection, updateCriteria, false).isNull())) { failedUpdate = true; log() << "replication couldn't find doc: " << op.toString() << endl; } // Otherwise, it's present; zero objects were updated because of additional specifiers // in the query for idempotence } } else { // this could happen benignly on an oplog duplicate replay of an upsert // (because we are idempotent), // if an regular non-mod update fails the item is (presumably) missing. if( !upsert ) { failedUpdate = true; log() << "replication update of non-mod failed: " << op.toString() << endl; } } } } else if ( *opType == 'd' ) { opCounters->gotDelete(); if ( opType[1] == 0 ) deleteObjects(txn, db, ns, o, /*justOne*/ valueB); else verify( opType[1] == 'b' ); // "db" advertisement } else if ( *opType == 'c' ) { bool done = false; while (!done) { BufBuilder bb; BSONObjBuilder ob; _runCommands(ns, o, bb, ob, true, 0); // _runCommands takes care of adjusting opcounters for command counting. Status status = Command::getStatusFromCommandResult(ob.done()); switch (status.code()) { case ErrorCodes::BackgroundOperationInProgressForDatabase: { dbtemprelease release; BackgroundOperation::awaitNoBgOpInProgForDb(nsToDatabaseSubstring(ns)); break; } case ErrorCodes::BackgroundOperationInProgressForNamespace: { dbtemprelease release; BackgroundOperation::awaitNoBgOpInProgForNs( Command::findCommand(o.firstElement().fieldName())->parseNs( nsToDatabase(ns), o)); break; } default: warning() << "repl Failed command " << o << " on " << nsToDatabaseSubstring(ns) << " with status " << status << " during oplog application"; // fallthrough case ErrorCodes::OK: done = true; break; } } } else if ( *opType == 'n' ) { // no op } else { throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) ); } getGlobalAuthorizationManager()->logOp( opType, ns, o, fieldO2.isABSONObj() ? &o2 : NULL, !fieldB.eoo() ? &valueB : NULL ); return failedUpdate; } bool waitForOptimeChange(const OpTime& referenceTime, unsigned timeoutMillis) { mutex::scoped_lock lk(newOpMutex); while (referenceTime == getLastSetOptime()) { if (!newOptimeNotifier.timed_wait(lk.boost(), boost::posix_time::milliseconds(timeoutMillis))) return false; } return true; } void initOpTimeFromOplog(const std::string& oplogNS) { DBDirectClient c; BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); if (!lastOp.isEmpty()) { LOG(1) << "replSet setting last OpTime"; setNewOptime(lastOp[ "ts" ].date()); } } }