// @file oplog.cpp /** * Copyright (C) 2008-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/oplog.h" #include #include #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/background.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/commands.h" #include "mongo/db/commands/dbhash.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/service_context.h" #include "mongo/db/global_optime.h" #include "mongo/db/index_builder.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage_options.h" #include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/file.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/stacktrace.h" #include "mongo/util/startup_test.h" namespace mongo { using std::endl; using std::string; using std::stringstream; namespace repl { std::string rsOplogName = "local.oplog.rs"; std::string masterSlaveOplogName = "local.oplog.$main"; int OPLOG_VERSION = 2; namespace { // cached copies of these...so don't rename them, drop them, etc.!!! Database* _localDB = nullptr; Collection* _localOplogCollection = nullptr; // Synchronizes the section where a new OpTime is generated and when it actually // appears in the oplog. mongo::mutex newOpMutex; boost::condition newOptimeNotifier; static std::string _oplogCollectionName; // so we can fail the same way void checkOplogInsert( StatusWith result ) { massert( 17322, str::stream() << "write to oplog failed: " << result.getStatus().toString(), result.isOK() ); } /** * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to * reflect that new optime. Returns the new optime and the correct value of the "h" field for * the new oplog entry. * * NOTE: From the time this function returns to the time that the new oplog entry is written * to the storage system, all errors must be considered fatal. This is because the this * function registers the new optime with the storage system and the replication coordinator, * and provides no facility to revert those registrations on rollback. */ std::pair getNextOpTime(OperationContext* txn, Collection* oplog, const char* ns, ReplicationCoordinator* replCoord, const char* opstr) { boost::lock_guard lk(newOpMutex); OpTime ts = getNextGlobalOptime(); newOptimeNotifier.notify_all(); fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); long long hashNew; if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) { hashNew = BackgroundSync::get()->getLastAppliedHash(); // Check to make sure logOp() is legal at this point. if (*opstr == 'n') { // 'n' operations are always logged invariant(*ns == '\0'); // 'n' operations do not advance the hash, since they are not rolled back } else { // Advance the hash hashNew = (hashNew * 131 + ts.asLL()) * 17 + replCoord->getMyId(); BackgroundSync::get()->setLastAppliedHash(hashNew); } } else { hashNew = 0; } replCoord->setMyLastOptime(ts); return std::pair(ts, hashNew); } /** * This allows us to stream the oplog entry directly into data region * main goal is to avoid copying the o portion * which can be very large * TODO: can have this build the entire doc */ class OplogDocWriter : public DocWriter { public: OplogDocWriter( const BSONObj& frame, const BSONObj& oField ) : _frame( frame ), _oField( oField ) { } ~OplogDocWriter(){} void writeDocument( char* start ) const { char* buf = start; memcpy( buf, _frame.objdata(), _frame.objsize() - 1 ); // don't copy final EOO reinterpret_cast( buf )[0] = documentSize(); buf += ( _frame.objsize() - 1 ); buf[0] = (char)Object; buf[1] = 'o'; buf[2] = 0; memcpy( buf+3, _oField.objdata(), _oField.objsize() ); buf += 3 + _oField.objsize(); buf[0] = EOO; verify( static_cast( ( buf + 1 ) - start ) == documentSize() ); // DEV? } size_t documentSize() const { return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */; } private: BSONObj _frame; BSONObj _oField; }; } // namespace void setOplogCollectionName() { if (getGlobalReplicationCoordinator()->getReplicationMode() == ReplicationCoordinator::modeReplSet) { _oplogCollectionName = rsOplogName; } else { _oplogCollectionName = masterSlaveOplogName; } } /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp h: hash v: version op: "i" insert "u" update "d" delete "c" db cmd "db" declares presence of a database (ns is set to the db name + '.') "n" no op bb param: if not null, specifies a boolean to pass along to the other side as b: param. used for "justOne" or "upsert" flags on 'd', 'u' */ void _logOp(OperationContext* txn, const char *opstr, const char *ns, const BSONObj& obj, BSONObj *o2, bool fromMigrate) { if ( strncmp(ns, "local.", 6) == 0 ) { return; } Lock::DBLock lk(txn->lockState(), "local", MODE_IX); ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); if (ns[0] && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && !replCoord->canAcceptWritesForDatabase(nsToDatabaseSubstring(ns))) { severe() << "logOp() but can't accept write to collection " << ns; fassertFailed(17405); } Lock::CollectionLock lk2(txn->lockState(), _oplogCollectionName, MODE_IX); if (_localOplogCollection == nullptr) { OldClientContext ctx(txn, _oplogCollectionName); _localDB = ctx.db(); invariant(_localDB); _localOplogCollection = _localDB->getCollection(_oplogCollectionName); massert(13347, "the oplog collection " + _oplogCollectionName + " missing. did you drop it? if so, restart the server", _localOplogCollection); } std::pair slot = getNextOpTime(txn, _localOplogCollection, ns, replCoord, opstr); /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- instead we do a single copy to the destination position in the memory mapped file. */ BSONObjBuilder b(256); b.appendTimestamp("ts", slot.first.asDate()); b.append("h", slot.second); b.append("v", OPLOG_VERSION); b.append("op", opstr); b.append("ns", ns); if (fromMigrate) { b.appendBool("fromMigrate", true); } if (txn->getWriteConcern().shouldWaitForOtherNodes() && txn->getWriteConcern().syncMode == WriteConcernOptions::JOURNAL) { b.appendBool("j", true); } if ( o2 ) { b.append("o2", *o2); } BSONObj partial = b.done(); OplogDocWriter writer( partial, obj ); checkOplogInsert( _localOplogCollection->insertDocument( txn, &writer, false ) ); ReplClientInfo::forClient(txn->getClient()).setLastOp( slot.first ); } OpTime writeOpsToOplog(OperationContext* txn, const std::deque& ops) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); OpTime lastOptime = replCoord->getMyLastOptime(); invariant(!ops.empty()); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock lk(txn->lockState(), "local", MODE_X); if ( _localOplogCollection == 0 ) { OldClientContext ctx(txn, rsOplogName); _localDB = ctx.db(); verify( _localDB ); _localOplogCollection = _localDB->getCollection(rsOplogName); massert(13389, "local.oplog.rs missing. did you drop it? if so restart server", _localOplogCollection); } OldClientContext ctx(txn, rsOplogName, _localDB); WriteUnitOfWork wunit(txn); for (std::deque::const_iterator it = ops.begin(); it != ops.end(); ++it) { const BSONObj& op = *it; const OpTime ts = op["ts"]._opTime(); checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false)); if (!(lastOptime < ts)) { severe() << "replication oplog stream went back in time. " "previous timestamp: " << lastOptime << " newest timestamp: " << ts << ". Op being applied: " << op; fassertFailedNoTrace(18905); } lastOptime = ts; } wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns()); BackgroundSync* bgsync = BackgroundSync::get(); // Keep this up-to-date, in case we step up to primary. long long hash = ops.back()["h"].numberLong(); bgsync->setLastAppliedHash(hash); return lastOptime; } void createOplog(OperationContext* txn) { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); bool rs = !replSettings.replSet.empty(); OldClientContext ctx(txn, _oplogCollectionName); Collection* collection = ctx.db()->getCollection( _oplogCollectionName ); if ( collection ) { if (replSettings.oplogSize != 0) { const CollectionOptions oplogOpts = collection->getCatalogEntry()->getCollectionOptions(txn); int o = (int)(oplogOpts.cappedSize / ( 1024 * 1024 ) ); int n = (int)(replSettings.oplogSize / (1024 * 1024)); if ( n != o ) { stringstream ss; ss << "cmdline oplogsize (" << n << ") different than existing (" << o << ") see: http://dochub.mongodb.org/core/increase-oplog"; log() << ss.str() << endl; throw UserException( 13257 , ss.str() ); } } if ( !rs ) initOpTimeFromOplog(txn, _oplogCollectionName); return; } /* create an oplog collection, if it doesn't yet exist. */ long long sz = 0; if ( replSettings.oplogSize != 0 ) { sz = replSettings.oplogSize; } else { /* not specified. pick a default size */ sz = 50LL * 1024LL * 1024LL; if ( sizeof(int *) >= 8 ) { #if defined(__APPLE__) // typically these are desktops (dev machines), so keep it smallish sz = (256-64) * 1024 * 1024; #else sz = 990LL * 1024 * 1024; double free = File::freeSpace(storageGlobalParams.dbpath); //-1 if call not supported. long long fivePct = static_cast( free * 0.05 ); if ( fivePct > sz ) sz = fivePct; // we use 5% of free space up to 50GB (1TB free) static long long upperBound = 50LL * 1024 * 1024 * 1024; if (fivePct > upperBound) sz = upperBound; #endif } } log() << "******" << endl; log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB..." << endl; CollectionOptions options; options.capped = true; options.cappedSize = sz; options.autoIndexId = CollectionOptions::NO; WriteUnitOfWork uow( txn ); invariant(ctx.db()->createCollection(txn, _oplogCollectionName, options)); if( !rs ) getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj()); uow.commit(); /* sync here so we don't get any surprising lag later when we try to sync */ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); storageEngine->flushAllFiles(true); log() << "******" << endl; } // ------------------------------------- // @param fromRepl false if from ApplyOpsCmd // @return failure status if an update should have happened and the document DNE. // See replset initial sync code. Status applyOperation_inlock(OperationContext* txn, Database* db, const BSONObj& op, bool fromRepl, bool convertUpdateToUpsert) { LOG(3) << "applying op: " << op << endl; OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters; const char *names[] = { "o", "ns", "op", "b", "o2" }; BSONElement fields[5]; op.getFields(5, names, fields); BSONElement& fieldO = fields[0]; BSONElement& fieldNs = fields[1]; BSONElement& fieldOp = fields[2]; BSONElement& fieldB = fields[3]; BSONElement& fieldO2 = fields[4]; BSONObj o; if( fieldO.isABSONObj() ) o = fieldO.embeddedObject(); const char *ns = fieldNs.valuestrsafe(); BSONObj o2; if (fieldO2.isABSONObj()) o2 = fieldO2.Obj(); bool valueB = fieldB.booleanSafe(); if (nsIsFull(ns)) { if (supportsDocLocking()) { // WiredTiger, and others requires MODE_IX since the applier threads driving // this allow writes to the same collection on any thread. invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_IX)); } else { // mmapV1 ensures that all operations to the same collection are executed from // the same worker thread, so it takes an exclusive lock (MODE_X) invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); } } Collection* collection = db->getCollection( ns ); IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); // operation type -- see logOp() comments for types const char *opType = fieldOp.valuestrsafe(); if ( *opType == 'i' ) { opCounters->gotInsert(); const char *p = strchr(ns, '.'); if ( p && nsToCollectionSubstring( p ) == "system.indexes" ) { if (o["background"].trueValue()) { IndexBuilder* builder = new IndexBuilder(o); // This spawns a new thread and returns immediately. builder->go(); // Wait for thread to start and register itself Lock::TempRelease release(txn->lockState()); IndexBuilder::waitForBgIndexStarting(); } else { IndexBuilder builder(o); Status status = builder.buildInForeground(txn, db); uassertStatusOK(status); } } else { // do upserts for inserts as we might get replayed more than once OpDebug debug; uassert(ErrorCodes::NamespaceNotFound, str::stream() << "Failed to apply insert due to missing collection: " << op.toString(), collection); // No _id. // This indicates an issue with the upstream server: // The oplog entry is corrupted; or // The version of the upstream server is obsolete. uassert(ErrorCodes::NoSuchKey, str::stream() << "Failed to apply insert due to missing _id: " << op.toString(), o.hasField("_id")); // TODO: It may be better to do an insert here, and then catch the duplicate // key exception and do update then. Very few upserts will not be inserts... BSONObjBuilder b; b.append(o.getField("_id")); const NamespaceString requestNs(ns); UpdateRequest request(requestNs); request.setQuery(b.done()); request.setUpdates(o); request.setUpsert(); request.setFromReplication(); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); update(txn, db, request, &debug); } } else if ( *opType == 'u' ) { opCounters->gotUpdate(); OpDebug debug; BSONObj updateCriteria = o2; const bool upsert = valueB || convertUpdateToUpsert; uassert(ErrorCodes::NoSuchKey, str::stream() << "Failed to apply update due to missing _id: " << op.toString(), updateCriteria.hasField("_id")); const NamespaceString requestNs(ns); UpdateRequest request(requestNs); request.setQuery(updateCriteria); request.setUpdates(o); request.setUpsert(upsert); request.setFromReplication(); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); UpdateResult ur = update(txn, db, request, &debug); if( ur.numMatched == 0 ) { if( ur.modifiers ) { if( updateCriteria.nFields() == 1 ) { // was a simple { _id : ... } update criteria string msg = str::stream() << "failed to apply update: " << op.toString(); error() << msg; return Status(ErrorCodes::OperationFailed, msg); } // Need to check to see if it isn't present so we can exit early with a // failure. Note that adds some overhead for this extra check in some cases, // such as an updateCriteria // of the form // { _id:..., { x : {$size:...} } // thus this is not ideal. if (collection == NULL || (indexCatalog->haveIdIndex(txn) && Helpers::findById(txn, collection, updateCriteria).isNull()) || // capped collections won't have an _id index (!indexCatalog->haveIdIndex(txn) && Helpers::findOne(txn, collection, updateCriteria, false).isNull())) { string msg = str::stream() << "couldn't find doc: " << op.toString(); error() << msg; return Status(ErrorCodes::OperationFailed, msg); } // Otherwise, it's present; zero objects were updated because of additional specifiers // in the query for idempotence } else { // this could happen benignly on an oplog duplicate replay of an upsert // (because we are idempotent), // if an regular non-mod update fails the item is (presumably) missing. if( !upsert ) { string msg = str::stream() << "update of non-mod failed: " << op.toString(); error() << msg; return Status(ErrorCodes::OperationFailed, msg); } } } } else if ( *opType == 'd' ) { opCounters->gotDelete(); uassert(ErrorCodes::NoSuchKey, str::stream() << "Failed to apply delete due to missing _id: " << op.toString(), o.hasField("_id")); if ( opType[1] == 0 ) deleteObjects(txn, db, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB); else verify( opType[1] == 'b' ); // "db" advertisement } else if ( *opType == 'c' ) { bool done = false; while (!done) { BufBuilder bb; BSONObjBuilder runCommandResult; // Applying commands in repl is done under Global W-lock, so it is safe to not // perform the current DB checks after reacquiring the lock. invariant(txn->lockState()->isW()); _runCommands(txn, ns, o, bb, runCommandResult, true, 0); // _runCommands takes care of adjusting opcounters for command counting. Status status = Command::getStatusFromCommandResult(runCommandResult.done()); switch (status.code()) { case ErrorCodes::WriteConflict: { // Need to throw this up to a higher level where it will be caught and the // operation retried. throw WriteConflictException(); } case ErrorCodes::BackgroundOperationInProgressForDatabase: { Lock::TempRelease release(txn->lockState()); BackgroundOperation::awaitNoBgOpInProgForDb(nsToDatabaseSubstring(ns)); break; } case ErrorCodes::BackgroundOperationInProgressForNamespace: { Lock::TempRelease release(txn->lockState()); Command* cmd = Command::findCommand(o.firstElement().fieldName()); invariant(cmd); BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nsToDatabase(ns), o)); break; } default: warning() << "Failed command " << o << " on " << nsToDatabaseSubstring(ns) << " with status " << status << " during oplog application"; // fallthrough case ErrorCodes::OK: done = true; break; } } } else if ( *opType == 'n' ) { // no op } else { throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) ); } // AuthorizationManager's logOp method registers a RecoveryUnit::Change // and to do so we need to have begun a UnitOfWork WriteUnitOfWork wuow(txn); getGlobalAuthorizationManager()->logOp( txn, opType, ns, o, fieldO2.isABSONObj() ? &o2 : NULL); wuow.commit(); return Status::OK(); } void waitUpToOneSecondForOptimeChange(const OpTime& referenceTime) { boost::unique_lock lk(newOpMutex); while (referenceTime == getLastSetOptime()) { if (!newOptimeNotifier.timed_wait(lk, boost::posix_time::seconds(1))) return; } } void setNewOptime(const OpTime& newTime) { boost::lock_guard lk(newOpMutex); setGlobalOptime(newTime); newOptimeNotifier.notify_all(); } void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS) { DBDirectClient c(txn); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); if (!lastOp.isEmpty()) { LOG(1) << "replSet setting last OpTime"; setNewOptime(lastOp[ "ts" ].date()); } } void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { invariant(txn->lockState()->isW()); _localDB = nullptr; _localOplogCollection = nullptr; } } // namespace repl } // namespace mongo