// @file oplog.cpp
/**
* Copyright (C) 2008-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/oplog.h"
#include
#include
#include
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/background.h"
#include "mongo/db/catalog/apply_ops.h"
#include "mongo/db/catalog/capped_utils.h"
#include "mongo/db/catalog/coll_mod.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog/drop_database.h"
#include "mongo/db/catalog/drop_indexes.h"
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/dbhash.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/global_timestamp.h"
#include "mongo/db/index_builder.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/keypattern.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/platform/random.h"
#include "mongo/s/d_state.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/file.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/stacktrace.h"
#include "mongo/util/startup_test.h"
namespace mongo {
using std::endl;
using std::string;
using std::stringstream;
using std::unique_ptr;
using std::vector;
namespace repl {
std::string rsOplogName = "local.oplog.rs";
std::string masterSlaveOplogName = "local.oplog.$main";
int OPLOG_VERSION = 2;
MONGO_FP_DECLARE(disableSnapshotting);
namespace {
// cached copies of these...so don't rename them, drop them, etc.!!!
Database* _localDB = nullptr;
Collection* _localOplogCollection = nullptr;
PseudoRandom hashGenerator(std::unique_ptr(SecureRandom::create())->nextInt64());
// Synchronizes the section where a new Timestamp is generated and when it actually
// appears in the oplog.
stdx::mutex newOpMutex;
stdx::condition_variable newTimestampNotifier;
static std::string _oplogCollectionName;
// so we can fail the same way
void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
struct OplogSlot {
OpTime opTime;
int64_t hash;
};
/**
* Allocates an optime for a new entry in the oplog, and updates the replication coordinator to
* reflect that new optime. Returns the new optime and the correct value of the "h" field for
* the new oplog entry.
*
* NOTE: From the time this function returns to the time that the new oplog entry is written
* to the storage system, all errors must be considered fatal. This is because the this
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
OplogSlot getNextOpTime(OperationContext* txn,
Collection* oplog,
ReplicationCoordinator* replCoord,
ReplicationCoordinator::Mode replicationMode) {
synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns());
OplogSlot slot;
slot.hash = 0;
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
if (replicationMode == ReplicationCoordinator::modeReplSet &&
replCoord->isV1ElectionProtocol()) {
// Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm.
term = replCoord->getTerm();
}
stdx::lock_guard lk(newOpMutex);
Timestamp ts = getNextGlobalTimestamp();
newTimestampNotifier.notify_all();
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
if (replicationMode == ReplicationCoordinator::modeReplSet) {
slot.hash = hashGenerator.nextInt64();
}
slot.opTime = OpTime(ts, term);
return slot;
}
/**
* This allows us to stream the oplog entry directly into data region
* main goal is to avoid copying the o portion
* which can be very large
* TODO: can have this build the entire doc
*/
class OplogDocWriter : public DocWriter {
public:
OplogDocWriter(const BSONObj& frame, const BSONObj& oField) : _frame(frame), _oField(oField) {}
void writeDocument(char* start) const {
char* buf = start;
memcpy(buf, _frame.objdata(), _frame.objsize() - 1); // don't copy final EOO
reinterpret_cast(buf)[0] = documentSize();
buf += (_frame.objsize() - 1);
buf[0] = (char)Object;
buf[1] = 'o';
buf[2] = 0;
memcpy(buf + 3, _oField.objdata(), _oField.objsize());
buf += 3 + _oField.objsize();
buf[0] = EOO;
verify(static_cast((buf + 1) - start) == documentSize()); // DEV?
}
size_t documentSize() const {
return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */;
}
private:
BSONObj _frame;
BSONObj _oField;
};
class UpdateReplOpTimeChange : public RecoveryUnit::Change {
public:
UpdateReplOpTimeChange(OpTime newOpTime, ReplicationCoordinator* replCoord)
: _newOpTime(newOpTime), _replCoord(replCoord) {}
virtual void commit() {
_replCoord->setMyLastAppliedOpTimeForward(_newOpTime);
}
virtual void rollback() {}
private:
const OpTime _newOpTime;
ReplicationCoordinator* _replCoord;
};
} // namespace
void setOplogCollectionName() {
if (getGlobalReplicationCoordinator()->getReplicationMode() ==
ReplicationCoordinator::modeReplSet) {
_oplogCollectionName = rsOplogName;
} else {
_oplogCollectionName = masterSlaveOplogName;
}
}
namespace {
Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) {
if (_localOplogCollection)
return _localOplogCollection;
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX);
OldClientContext ctx(txn, oplogCollectionName);
_localDB = ctx.db();
invariant(_localDB);
_localOplogCollection = _localDB->getCollection(oplogCollectionName);
massert(13347,
"the oplog collection " + oplogCollectionName +
" missing. did you drop it? if so, restart the server",
_localOplogCollection);
return _localOplogCollection;
}
bool oplogDisabled(OperationContext* txn,
ReplicationCoordinator::Mode replicationMode,
const NamespaceString& nss) {
if (replicationMode == ReplicationCoordinator::modeNone)
return true;
if (nss.db() == "local")
return true;
if (nss.isSystemDotProfile())
return true;
if (!txn->writesAreReplicated())
return true;
fassert(28626, txn->recoveryUnit());
return false;
}
unique_ptr _logOpWriter(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
const BSONObj& obj,
BSONObj* o2,
bool fromMigrate,
OpTime optime,
long long hashNew) {
BSONObjBuilder b(256);
b.append("ts", optime.getTimestamp());
if (optime.getTerm() != -1)
b.append("t", optime.getTerm());
b.append("h", hashNew);
b.append("v", OPLOG_VERSION);
b.append("op", opstr);
b.append("ns", nss.ns());
if (fromMigrate)
b.appendBool("fromMigrate", true);
if (o2)
b.append("o2", *o2);
return stdx::make_unique(OplogDocWriter(b.obj(), obj));
}
} // end anon namespace
// Truncates the oplog to and including the "truncateTimestamp" entry.
void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
const NamespaceString oplogNss(rsOplogName);
ScopedTransaction transaction(txn, MODE_IX);
AutoGetDb autoDb(txn, oplogNss.db(), MODE_IX);
Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X);
Collection* oplogCollection = autoDb.getDb()->getCollection(oplogNss);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(
28820,
Status(ErrorCodes::NamespaceNotFound, str::stream() << "Can't find " << rsOplogName));
}
// Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
bool foundSomethingToTruncate = false;
RecordId lastRecordId;
BSONObj lastOplogEntry;
auto oplogRs = oplogCollection->getRecordStore();
auto oplogReverseCursor = oplogRs->getCursor(txn, false);
bool first = true;
while (auto next = oplogReverseCursor->next()) {
lastOplogEntry = next->data.releaseToBson();
lastRecordId = next->id;
const auto tsElem = lastOplogEntry["ts"];
if (first) {
if (tsElem.eoo())
LOG(2) << "Oplog tail entry: " << lastOplogEntry;
else
LOG(2) << "Oplog tail entry ts field: " << tsElem;
first = false;
}
if (tsElem.timestamp() < truncateTimestamp) {
break;
}
foundSomethingToTruncate = true;
}
if (foundSomethingToTruncate) {
oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false);
}
}
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
h: hash
v: version
op:
"i" insert
"u" update
"d" delete
"c" db cmd
"db" declares presence of a database (ns is set to the db name + '.')
"n" no op
bb param:
if not null, specifies a boolean to pass along to the other side as b: param.
used for "justOne" or "upsert" flags on 'd', 'u'
*/
void _logOpsInner(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
const vector>& writers,
bool fromMigrate,
Collection* oplogCollection,
ReplicationCoordinator::Mode replicationMode,
bool updateReplOpTime,
OpTime finalOpTime) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
!replCoord->canAcceptWritesFor(nss)) {
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
}
// we jump through a bunch of hoops here to avoid copying the obj buffer twice --
// instead we do a single copy to the destination in the record store.
for (auto it = writers.begin(); it != writers.end(); it++)
checkOplogInsert(oplogCollection->insertDocument(txn, it->get(), false));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
if (updateReplOpTime)
txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(finalOpTime, replCoord));
ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime);
}
void _logOp(OperationContext* txn,
const char* opstr,
const char* ns,
const BSONObj& obj,
BSONObj* o2,
bool fromMigrate,
const std::string& oplogName,
ReplicationCoordinator::Mode replMode,
bool updateOpTime) {
NamespaceString nss(ns);
if (oplogDisabled(txn, replMode, nss))
return;
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
vector> writers;
Collection* oplog = getLocalOplogCollection(txn, oplogName);
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lock(txn->lockState(), oplogName, MODE_IX);
auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash);
writers.emplace_back(std::move(writer));
_logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, updateOpTime, slot.opTime);
}
void logOps(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
std::vector::const_iterator begin,
std::vector::const_iterator end,
bool fromMigrate) {
ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
invariant(begin != end);
if (oplogDisabled(txn, replMode, nss))
return;
vector> writers;
writers.reserve(end - begin);
OpTime finalOpTime;
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
for (auto it = begin; it != end; it++) {
auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
finalOpTime = slot.opTime;
auto writer = _logOpWriter(txn, opstr, nss, *it, NULL, fromMigrate, slot.opTime, slot.hash);
writers.emplace_back(std::move(writer));
}
_logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, true, finalOpTime);
}
void logOp(OperationContext* txn,
const char* opstr,
const char* ns,
const BSONObj& obj,
BSONObj* o2,
bool fromMigrate) {
ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
_logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true);
}
OpTime writeOpsToOplog(OperationContext* txn, const std::vector& ops) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
OpTime lastOptime;
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
lastOptime = replCoord->getMyLastAppliedOpTime();
invariant(!ops.empty());
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock lk(txn->lockState(), "local", MODE_X);
if (_localOplogCollection == 0) {
OldClientContext ctx(txn, rsOplogName);
_localDB = ctx.db();
verify(_localDB);
_localOplogCollection = _localDB->getCollection(rsOplogName);
massert(13389,
"local.oplog.rs missing. did you drop it? if so restart server",
_localOplogCollection);
}
OldClientContext ctx(txn, rsOplogName, _localDB);
WriteUnitOfWork wunit(txn);
checkOplogInsert(
_localOplogCollection->insertDocuments(txn, ops.begin(), ops.end(), false));
lastOptime =
fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back()));
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns());
return lastOptime;
}
void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool replEnabled) {
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite lk(txn->lockState());
const ReplSettings& replSettings = ReplicationCoordinator::get(txn)->getSettings();
OldClientContext ctx(txn, oplogCollectionName);
Collection* collection = ctx.db()->getCollection(oplogCollectionName);
if (collection) {
if (replSettings.getOplogSizeBytes() != 0) {
const CollectionOptions oplogOpts =
collection->getCatalogEntry()->getCollectionOptions(txn);
int o = (int)(oplogOpts.cappedSize / (1024 * 1024));
int n = (int)(replSettings.getOplogSizeBytes() / (1024 * 1024));
if (n != o) {
stringstream ss;
ss << "cmdline oplogsize (" << n << ") different than existing (" << o
<< ") see: http://dochub.mongodb.org/core/increase-oplog";
log() << ss.str() << endl;
throw UserException(13257, ss.str());
}
}
if (!replEnabled)
initTimestampFromOplog(txn, oplogCollectionName);
return;
}
/* create an oplog collection, if it doesn't yet exist. */
long long sz = 0;
if (replSettings.getOplogSizeBytes() != 0) {
sz = replSettings.getOplogSizeBytes();
} else {
/* not specified. pick a default size */
sz = 50LL * 1024LL * 1024LL;
if (sizeof(int*) >= 8) {
#if defined(__APPLE__)
// typically these are desktops (dev machines), so keep it smallish
sz = (256 - 64) * 1024 * 1024;
#else
sz = 990LL * 1024 * 1024;
double free = File::freeSpace(storageGlobalParams.dbpath); //-1 if call not supported.
long long fivePct = static_cast(free * 0.05);
if (fivePct > sz)
sz = fivePct;
// we use 5% of free space up to 50GB (1TB free)
static long long upperBound = 50LL * 1024 * 1024 * 1024;
if (fivePct > upperBound)
sz = upperBound;
#endif
}
}
log() << "******" << endl;
log() << "creating replication oplog of size: " << (int)(sz / (1024 * 1024)) << "MB..." << endl;
CollectionOptions options;
options.capped = true;
options.cappedSize = sz;
options.autoIndexId = CollectionOptions::NO;
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork uow(txn);
invariant(ctx.db()->createCollection(txn, oplogCollectionName, options));
if (!replEnabled)
getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj());
uow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", oplogCollectionName);
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
storageEngine->flushAllFiles(true);
log() << "******" << endl;
}
void createOplog(OperationContext* txn) {
const auto replEnabled = ReplicationCoordinator::get(txn)->getReplicationMode() ==
ReplicationCoordinator::modeReplSet;
createOplog(txn, _oplogCollectionName, replEnabled);
}
// -------------------------------------
namespace {
NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) {
BSONElement first = cmdObj.firstElement();
uassert(28635,
"no collection name specified",
first.canonicalType() == canonicalizeBSONType(mongo::String) &&
first.valuestrsize() > 0);
std::string coll = first.valuestr();
return NamespaceString(NamespaceString(ns).db().toString(), coll);
}
using OpApplyFn = stdx::function;
struct ApplyOpMetadata {
OpApplyFn applyFunc;
std::set acceptableErrors;
ApplyOpMetadata(OpApplyFn fun) {
applyFunc = fun;
}
ApplyOpMetadata(OpApplyFn fun, std::set theAcceptableErrors) {
applyFunc = fun;
acceptableErrors = theAcceptableErrors;
}
};
std::map opsMap = {
{"create",
{[](OperationContext* txn, const char* ns, BSONObj& cmd)
-> Status { return createCollection(txn, NamespaceString(ns).db().toString(), cmd); },
{ErrorCodes::NamespaceExists}}},
{"collMod",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return collMod(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
}}},
{"dropDatabase",
{[](OperationContext* txn, const char* ns, BSONObj& cmd)
-> Status { return dropDatabase(txn, NamespaceString(ns).db().toString()); },
{ErrorCodes::NamespaceNotFound}}},
{"drop",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropCollection(txn, parseNs(ns, cmd), resultWeDontCareAbout);
},
// IllegalOperation is necessary because in 3.0 we replicate drops of system.profile
// TODO(dannenberg) remove IllegalOperation once we no longer need 3.0 compatibility
{ErrorCodes::NamespaceNotFound, ErrorCodes::IllegalOperation}}},
// deleteIndex(es) is deprecated but still works as of April 10, 2015
{"deleteIndex",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"deleteIndexes",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndex",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndexes",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"renameCollection",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
return renameCollection(txn,
NamespaceString(cmd.firstElement().valuestrsafe()),
NamespaceString(cmd["to"].valuestrsafe()),
cmd["stayTemp"].trueValue(),
cmd["dropTarget"].trueValue());
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}},
{"applyOps",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return applyOps(txn, nsToDatabase(ns), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::UnknownError}}},
{"convertToCapped",
{[](OperationContext* txn, const char* ns, BSONObj& cmd)
-> Status { return convertToCapped(txn, parseNs(ns, cmd), cmd["size"].number()); }}},
{"emptycapped",
{[](OperationContext* txn, const char* ns, BSONObj& cmd)
-> Status { return emptyCapped(txn, parseNs(ns, cmd)); }}},
};
} // namespace
// @return failure status if an update should have happened and the document DNE.
// See replset initial sync code.
Status applyOperation_inlock(OperationContext* txn,
Database* db,
const BSONObj& op,
bool convertUpdateToUpsert) {
LOG(3) << "applying op: " << op << endl;
OpCounters* opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
const char* names[] = {"o", "ns", "op", "b", "o2"};
BSONElement fields[5];
op.getFields(5, names, fields);
BSONElement& fieldO = fields[0];
BSONElement& fieldNs = fields[1];
BSONElement& fieldOp = fields[2];
BSONElement& fieldB = fields[3];
BSONElement& fieldO2 = fields[4];
BSONObj o;
if (fieldO.isABSONObj())
o = fieldO.embeddedObject();
const StringData ns = fieldNs.valueStringData();
BSONObj o2;
if (fieldO2.isABSONObj())
o2 = fieldO2.Obj();
bool valueB = fieldB.booleanSafe();
if (nsIsFull(ns)) {
if (supportsDocLocking()) {
// WiredTiger, and others requires MODE_IX since the applier threads driving
// this allow writes to the same collection on any thread.
dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_IX));
} else {
// mmapV1 ensures that all operations to the same collection are executed from
// the same worker thread, so it takes an exclusive lock (MODE_X)
dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
}
}
Collection* collection = db->getCollection(ns);
IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog();
// operation type -- see logOp() comments for types
const char* opType = fieldOp.valuestrsafe();
invariant(*opType != 'c'); // commands are processed in applyCommand_inlock()
if (*opType == 'i') {
opCounters->gotInsert();
if (nsToCollectionSubstring(ns) == "system.indexes") {
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Missing expected index spec in field 'o': " << op,
!fieldO.eoo());
uassert(ErrorCodes::TypeMismatch,
str::stream() << "Expected object for index spec in field 'o': " << op,
fieldO.isABSONObj());
std::string indexNs;
uassertStatusOK(bsonExtractStringField(o, "ns", &indexNs));
const NamespaceString indexNss(indexNs);
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Invalid namespace in index spec: " << op,
indexNss.isValid());
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Database name mismatch for database ("
<< nsToDatabaseSubstring(ns) << ") while creating index: " << op,
nsToDatabaseSubstring(ns) == indexNss.db());
if (o["background"].trueValue()) {
Lock::TempRelease release(txn->lockState());
if (txn->lockState()->isLocked()) {
// If TempRelease fails, background index build will deadlock.
LOG(3) << "apply op: building background index " << o
<< " in the foreground because temp release failed";
IndexBuilder builder(o);
Status status = builder.buildInForeground(txn, db);
uassertStatusOK(status);
} else {
IndexBuilder* builder = new IndexBuilder(o);
// This spawns a new thread and returns immediately.
builder->go();
// Wait for thread to start and register itself
IndexBuilder::waitForBgIndexStarting();
}
} else {
IndexBuilder builder(o);
Status status = builder.buildInForeground(txn, db);
uassertStatusOK(status);
}
// Since this is an index operation we can return without falling through.
return Status::OK();
}
uassert(
ErrorCodes::NamespaceNotFound,
str::stream() << "Failed to apply insert due to missing collection: " << op.toString(),
collection);
// No _id.
// This indicates an issue with the upstream server:
// The oplog entry is corrupted; or
// The version of the upstream server is obsolete.
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply insert due to missing _id: " << op.toString(),
o.hasField("_id"));
// 1. Try insert first
// 2. If okay, commit
// 3. If not, do update (and commit)
// 4. If both !Ok, return status
Status status{ErrorCodes::NotYetInitialized, ""};
{
WriteUnitOfWork wuow(txn);
try {
status = collection->insertDocument(txn, o, true);
} catch (DBException dbe) {
status = dbe.toStatus();
}
if (status.isOK()) {
wuow.commit();
}
}
// Now see if we need to do an update, based on duplicate _id index key
if (!status.isOK()) {
if (status.code() != ErrorCodes::DuplicateKey) {
return status;
}
// Do update on DuplicateKey errors.
// This will only be on the _id field in replication,
// since we disable non-_id unique constraint violations.
OpDebug debug;
BSONObjBuilder b;
b.append(o.getField("_id"));
const NamespaceString requestNs(ns);
UpdateRequest request(requestNs);
request.setQuery(b.done());
request.setUpdates(o);
request.setUpsert();
UpdateLifecycleImpl updateLifecycle(true, requestNs);
request.setLifecycle(&updateLifecycle);
UpdateResult res = update(txn, db, request, &debug);
if (res.numMatched == 0) {
error() << "No document was updated even though we got a DuplicateKey error when"
" inserting";
fassertFailedNoTrace(28750);
}
}
} else if (*opType == 'u') {
opCounters->gotUpdate();
OpDebug debug;
BSONObj updateCriteria = o2;
const bool upsert = valueB || convertUpdateToUpsert;
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply update due to missing _id: " << op.toString(),
updateCriteria.hasField("_id"));
const NamespaceString requestNs(ns);
UpdateRequest request(requestNs);
request.setQuery(updateCriteria);
request.setUpdates(o);
request.setUpsert(upsert);
UpdateLifecycleImpl updateLifecycle(true, requestNs);
request.setLifecycle(&updateLifecycle);
UpdateResult ur = update(txn, db, request, &debug);
if (ur.numMatched == 0) {
if (ur.modifiers) {
if (updateCriteria.nFields() == 1) {
// was a simple { _id : ... } update criteria
string msg = str::stream() << "failed to apply update: " << op.toString();
error() << msg;
return Status(ErrorCodes::OperationFailed, msg);
}
// Need to check to see if it isn't present so we can exit early with a
// failure. Note that adds some overhead for this extra check in some cases,
// such as an updateCriteria
// of the form
// { _id:..., { x : {$size:...} }
// thus this is not ideal.
if (collection == NULL ||
(indexCatalog->haveIdIndex(txn) &&
Helpers::findById(txn, collection, updateCriteria).isNull()) ||
// capped collections won't have an _id index
(!indexCatalog->haveIdIndex(txn) &&
Helpers::findOne(txn, collection, updateCriteria, false).isNull())) {
string msg = str::stream() << "couldn't find doc: " << op.toString();
error() << msg;
return Status(ErrorCodes::OperationFailed, msg);
}
// Otherwise, it's present; zero objects were updated because of additional
// specifiers in the query for idempotence
} else {
// this could happen benignly on an oplog duplicate replay of an upsert
// (because we are idempotent),
// if an regular non-mod update fails the item is (presumably) missing.
if (!upsert) {
string msg = str::stream() << "update of non-mod failed: " << op.toString();
error() << msg;
return Status(ErrorCodes::OperationFailed, msg);
}
}
}
} else if (*opType == 'd') {
opCounters->gotDelete();
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply delete due to missing _id: " << op.toString(),
o.hasField("_id"));
if (opType[1] == 0) {
deleteObjects(txn, collection, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB);
} else
verify(opType[1] == 'b'); // "db" advertisement
} else if (*opType == 'n') {
// no op
} else {
throw MsgAssertionException(
14825, str::stream() << "error in applyOperation : unknown opType " << *opType);
}
// AuthorizationManager's logOp method registers a RecoveryUnit::Change
// and to do so we need to have begun a UnitOfWork
WriteUnitOfWork wuow(txn);
getGlobalAuthorizationManager()->logOp(
txn, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL);
wuow.commit();
return Status::OK();
}
Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) {
const char* names[] = {"o", "ns", "op"};
BSONElement fields[3];
op.getFields(3, names, fields);
BSONElement& fieldO = fields[0];
BSONElement& fieldNs = fields[1];
BSONElement& fieldOp = fields[2];
const char* opType = fieldOp.valuestrsafe();
invariant(*opType == 'c'); // only commands are processed here
if (fieldO.eoo()) {
return Status(ErrorCodes::NoSuchKey, "Missing expected field 'o'");
}
if (!fieldO.isABSONObj()) {
return Status(ErrorCodes::BadValue, "Expected object for field 'o'");
}
BSONObj o = fieldO.embeddedObject();
const char* ns = fieldNs.valuestrsafe();
// Applying commands in repl is done under Global W-lock, so it is safe to not
// perform the current DB checks after reacquiring the lock.
invariant(txn->lockState()->isW());
bool done = false;
while (!done) {
auto op = opsMap.find(o.firstElementFieldName());
if (op == opsMap.end()) {
return Status(ErrorCodes::BadValue,
mongoutils::str::stream() << "Invalid key '" << o.firstElementFieldName()
<< "' found in field 'o'");
}
ApplyOpMetadata curOpToApply = op->second;
Status status = Status::OK();
try {
status = curOpToApply.applyFunc(txn, ns, o);
} catch (...) {
status = exceptionToStatus();
}
switch (status.code()) {
case ErrorCodes::WriteConflict: {
// Need to throw this up to a higher level where it will be caught and the
// operation retried.
throw WriteConflictException();
}
case ErrorCodes::BackgroundOperationInProgressForDatabase: {
Lock::TempRelease release(txn->lockState());
BackgroundOperation::awaitNoBgOpInProgForDb(nsToDatabaseSubstring(ns));
txn->recoveryUnit()->abandonSnapshot();
break;
}
case ErrorCodes::BackgroundOperationInProgressForNamespace: {
Lock::TempRelease release(txn->lockState());
Command* cmd = Command::findCommand(o.firstElement().fieldName());
invariant(cmd);
BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nsToDatabase(ns), o));
txn->recoveryUnit()->abandonSnapshot();
break;
}
default:
if (_oplogCollectionName == masterSlaveOplogName) {
error() << "Failed command " << o << " on " << nsToDatabaseSubstring(ns)
<< " with status " << status << " during oplog application";
} else if (curOpToApply.acceptableErrors.find(status.code()) ==
curOpToApply.acceptableErrors.end()) {
error() << "Failed command " << o << " on " << nsToDatabaseSubstring(ns)
<< " with status " << status << " during oplog application";
return status;
}
// fallthrough
case ErrorCodes::OK:
done = true;
break;
}
}
// AuthorizationManager's logOp method registers a RecoveryUnit::Change
// and to do so we need to have begun a UnitOfWork
WriteUnitOfWork wuow(txn);
getGlobalAuthorizationManager()->logOp(txn, opType, ns, o, nullptr);
wuow.commit();
return Status::OK();
}
void setNewTimestamp(const Timestamp& newTime) {
stdx::lock_guard lk(newOpMutex);
setGlobalTimestamp(newTime);
newTimestampNotifier.notify_all();
}
void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) {
DBDirectClient c(txn);
BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
LOG(1) << "replSet setting last Timestamp";
const OpTime opTime = fassertStatusOK(28696, OpTime::parseFromOplogEntry(lastOp));
setNewTimestamp(opTime.getTimestamp());
}
}
void oplogCheckCloseDatabase(OperationContext* txn, Database* db) {
invariant(txn->lockState()->isW());
_localDB = nullptr;
_localOplogCollection = nullptr;
}
void signalOplogWaiters() {
if (_localOplogCollection) {
_localOplogCollection->notifyCappedWaitersIfNeeded();
}
}
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000);
SnapshotThread::SnapshotThread(SnapshotManager* manager)
: _manager(manager), _thread([this] { run(); }) {}
bool SnapshotThread::shouldSleepMore(int numSleepsDone, size_t numUncommittedSnapshots) {
const double kThrottleRatio = 1 / 20.0;
const size_t kUncommittedSnapshotLimit = 1000;
const size_t kUncommittedSnapshotRestartPoint = kUncommittedSnapshotLimit / 2;
if (_inShutdown.load())
return false; // Exit the thread quickly without sleeping.
if (numSleepsDone == 0)
return true; // Always sleep at least once.
{
// Enforce a limit on the number of snapshots.
if (numUncommittedSnapshots >= kUncommittedSnapshotLimit)
_hitSnapshotLimit = true; // Don't create new snapshots.
if (numUncommittedSnapshots < kUncommittedSnapshotRestartPoint)
_hitSnapshotLimit = false; // Begin creating new snapshots again.
if (_hitSnapshotLimit)
return true;
}
// Spread out snapshots in time by sleeping as we collect more uncommitted snapshots.
const double numSleepsNeeded = numUncommittedSnapshots * kThrottleRatio;
return numSleepsNeeded > numSleepsDone;
}
void SnapshotThread::run() {
Client::initThread("SnapshotThread");
auto& client = cc();
auto serviceContext = client.getServiceContext();
auto replCoord = ReplicationCoordinator::get(serviceContext);
Timestamp lastTimestamp = {};
while (true) {
// This block logically belongs at the end of the loop, but having it at the top
// simplifies handling of the "continue" cases. It is harmless to do these before the
// first run of the loop.
for (int numSleepsDone = 0;
shouldSleepMore(numSleepsDone, replCoord->getNumUncommittedSnapshots());
numSleepsDone++) {
sleepmicros(replSnapshotThreadThrottleMicros);
_manager->cleanupUnneededSnapshots();
}
{
stdx::unique_lock lock(newOpMutex);
while (true) {
if (_inShutdown.load())
return;
if (_forcedSnapshotPending.load() || lastTimestamp != getLastSetTimestamp()) {
_forcedSnapshotPending.store(false);
lastTimestamp = getLastSetTimestamp();
break;
}
newTimestampNotifier.wait(lock);
}
}
while (MONGO_FAIL_POINT(disableSnapshotting)) {
sleepsecs(1);
if (_inShutdown.load()) {
return;
}
}
try {
auto txn = client.makeOperationContext();
Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX);
if (!replCoord->getMemberState().readable()) {
// If our MemberState isn't readable, we may not be in a consistent state so don't
// take snapshots. When we transition into a readable state from a non-readable
// state, a snapshot is forced to ensure we don't miss the latest write. This must
// be checked each time we acquire the global IS lock since that prevents the node
// from transitioning to a !readable() state from a readable() one in the cases
// where we shouldn't be creating a snapshot.
continue;
}
SnapshotName name(0); // assigned real value in block.
{
// Make sure there are no in-flight capped inserts while we create our snapshot.
Lock::ResourceLock cappedInsertLockForOtherDb(
txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
Lock::ResourceLock cappedInsertLockForLocalDb(
txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
// Reserve the name immediately before we take our snapshot. This ensures that all
// names that compare lower must be from points in time visible to this named
// snapshot.
name = replCoord->reserveSnapshotName(nullptr);
// This establishes the view that we will name.
_manager->prepareForCreateSnapshot(txn.get());
}
auto opTimeOfSnapshot = OpTime();
{
AutoGetCollectionForRead oplog(txn.get(), rsOplogName);
invariant(oplog.getCollection());
// Read the latest op from the oplog.
auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false);
auto record = cursor->next();
if (!record)
continue; // oplog is completely empty.
const auto op = record->data.releaseToBson();
opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op));
invariant(!opTimeOfSnapshot.isNull());
}
_manager->createSnapshot(txn.get(), name);
replCoord->onSnapshotCreate(opTimeOfSnapshot, name);
} catch (const WriteConflictException& wce) {
log() << "skipping storage snapshot pass due to write conflict";
continue;
}
}
}
void SnapshotThread::shutdown() {
invariant(_thread.joinable());
{
stdx::lock_guard lock(newOpMutex);
invariant(!_inShutdown.load());
_inShutdown.store(true);
newTimestampNotifier.notify_all();
}
_thread.join();
}
void SnapshotThread::forceSnapshot() {
stdx::lock_guard lock(newOpMutex);
_forcedSnapshotPending.store(true);
newTimestampNotifier.notify_all();
}
std::unique_ptr SnapshotThread::start(ServiceContext* service) {
if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) {
return std::unique_ptr(new SnapshotThread(manager));
}
return {};
}
} // namespace repl
} // namespace mongo