// @file oplog.cpp
/**
* Copyright (C) 2008-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/oplog.h"
#include
#include
#include
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/background.h"
#include "mongo/db/catalog/apply_ops.h"
#include "mongo/db/catalog/capped_utils.h"
#include "mongo/db/catalog/coll_mod.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog/drop_database.h"
#include "mongo/db/catalog/drop_indexes.h"
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/dbhash.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/global_timestamp.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/index_builder.h"
#include "mongo/db/keypattern.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/platform/random.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/file.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/stacktrace.h"
#include "mongo/util/startup_test.h"
namespace mongo {
using std::endl;
using std::string;
using std::stringstream;
using std::unique_ptr;
using std::vector;
using IndexVersion = IndexDescriptor::IndexVersion;
namespace repl {
std::string rsOplogName = "local.oplog.rs";
std::string masterSlaveOplogName = "local.oplog.$main";
MONGO_FP_DECLARE(disableSnapshotting);
namespace {
// cached copy...so don't rename, drop, etc.!!!
Collection* _localOplogCollection = nullptr;
PseudoRandom hashGenerator(std::unique_ptr(SecureRandom::create())->nextInt64());
// Synchronizes the section where a new Timestamp is generated and when it actually
// appears in the oplog.
stdx::mutex newOpMutex;
stdx::condition_variable newTimestampNotifier;
static std::string _oplogCollectionName;
// so we can fail the same way
void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
struct OplogSlot {
OpTime opTime;
int64_t hash = 0;
};
/**
* Allocates an optime for a new entry in the oplog, and updates the replication coordinator to
* reflect that new optime. Returns the new optime and the correct value of the "h" field for
* the new oplog entry.
*
* NOTE: From the time this function returns to the time that the new oplog entry is written
* to the storage system, all errors must be considered fatal. This is because the this
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
void getNextOpTime(OperationContext* txn,
Collection* oplog,
ReplicationCoordinator* replCoord,
ReplicationCoordinator::Mode replicationMode,
unsigned count,
OplogSlot* slotsOut) {
synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns());
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
if (replicationMode == ReplicationCoordinator::modeReplSet &&
replCoord->isV1ElectionProtocol()) {
// Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm.
term = replCoord->getTerm();
}
stdx::lock_guard lk(newOpMutex);
Timestamp ts = getNextGlobalTimestamp(count);
newTimestampNotifier.notify_all();
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet);
for (unsigned i = 0; i < count; i++) {
slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term};
if (needHash) {
slotsOut[i].hash = hashGenerator.nextInt64();
}
}
}
/**
* This allows us to stream the oplog entry directly into data region
* main goal is to avoid copying the o portion
* which can be very large
* TODO: can have this build the entire doc
*/
class OplogDocWriter final : public DocWriter {
public:
OplogDocWriter(BSONObj frame, BSONObj oField)
: _frame(std::move(frame)), _oField(std::move(oField)) {}
void writeDocument(char* start) const {
char* buf = start;
memcpy(buf, _frame.objdata(), _frame.objsize() - 1); // don't copy final EOO
DataView(buf).write>(documentSize());
buf += (_frame.objsize() - 1);
buf[0] = (char)Object;
buf[1] = 'o';
buf[2] = 0;
memcpy(buf + 3, _oField.objdata(), _oField.objsize());
buf += 3 + _oField.objsize();
buf[0] = EOO;
verify(static_cast((buf + 1) - start) == documentSize()); // DEV?
}
size_t documentSize() const {
return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */;
}
private:
BSONObj _frame;
BSONObj _oField;
};
} // namespace
void setOplogCollectionName() {
if (getGlobalReplicationCoordinator()->getReplicationMode() ==
ReplicationCoordinator::modeReplSet) {
_oplogCollectionName = rsOplogName;
} else {
_oplogCollectionName = masterSlaveOplogName;
}
}
namespace {
Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) {
if (_localOplogCollection)
return _localOplogCollection;
AutoGetCollection autoColl(txn, NamespaceString(oplogCollectionName), MODE_IX);
_localOplogCollection = autoColl.getCollection();
massert(13347,
"the oplog collection " + oplogCollectionName +
" missing. did you drop it? if so, restart the server",
_localOplogCollection);
return _localOplogCollection;
}
bool oplogDisabled(OperationContext* txn,
ReplicationCoordinator::Mode replicationMode,
const NamespaceString& nss) {
if (replicationMode == ReplicationCoordinator::modeNone)
return true;
if (nss.db() == "local")
return true;
if (nss.isSystemDotProfile())
return true;
if (!txn->writesAreReplicated())
return true;
fassert(28626, txn->recoveryUnit());
return false;
}
OplogDocWriter _logOpWriter(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
const BSONObj& obj,
const BSONObj* o2,
bool fromMigrate,
OpTime optime,
long long hashNew) {
BSONObjBuilder b(256);
b.append("ts", optime.getTimestamp());
if (optime.getTerm() != -1)
b.append("t", optime.getTerm());
b.append("h", hashNew);
b.append("v", OplogEntry::kOplogVersion);
b.append("op", opstr);
b.append("ns", nss.ns());
if (fromMigrate)
b.appendBool("fromMigrate", true);
if (o2)
b.append("o2", *o2);
return OplogDocWriter(OplogDocWriter(b.obj(), obj));
}
} // end anon namespace
// Truncates the oplog after and including the "truncateTimestamp" entry.
void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
const NamespaceString oplogNss(rsOplogName);
ScopedTransaction transaction(txn, MODE_IX);
AutoGetDb autoDb(txn, oplogNss.db(), MODE_IX);
Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X);
Collection* oplogCollection = autoDb.getDb()->getCollection(oplogNss);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(
34418,
Status(ErrorCodes::NamespaceNotFound, str::stream() << "Can't find " << rsOplogName));
}
// Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
RecordId oldestIDToDelete; // Non-null if there is something to delete.
auto oplogRs = oplogCollection->getRecordStore();
auto oplogReverseCursor = oplogRs->getCursor(txn, /*forward=*/false);
size_t count = 0;
while (auto next = oplogReverseCursor->next()) {
const BSONObj entry = next->data.releaseToBson();
const RecordId id = next->id;
count++;
const auto tsElem = entry["ts"];
if (count == 1) {
if (tsElem.eoo())
LOG(2) << "Oplog tail entry: " << redact(entry);
else
LOG(2) << "Oplog tail entry ts field: " << tsElem;
}
if (tsElem.timestamp() < truncateTimestamp) {
// If count == 1, that means that we have nothing to delete because everything in the
// oplog is < truncateTimestamp.
if (count != 1) {
invariant(!oldestIDToDelete.isNull());
oplogCollection->temp_cappedTruncateAfter(
txn, oldestIDToDelete, /*inclusive=*/true);
}
return;
}
oldestIDToDelete = id;
}
severe() << "Reached end of oplog looking for oplog entry before "
<< truncateTimestamp.toStringPretty()
<< " but couldn't find any after looking through " << count << " entries.";
fassertFailedNoTrace(40296);
}
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
h: hash
v: version
op:
"i" insert
"u" update
"d" delete
"c" db cmd
"db" declares presence of a database (ns is set to the db name + '.')
"n" no op
bb param:
if not null, specifies a boolean to pass along to the other side as b: param.
used for "justOne" or "upsert" flags on 'd', 'u'
*/
void _logOpsInner(OperationContext* txn,
const NamespaceString& nss,
const DocWriter* const* writers,
size_t nWriters,
Collection* oplogCollection,
ReplicationCoordinator::Mode replicationMode,
OpTime finalOpTime) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
!replCoord->canAcceptWritesFor(nss)) {
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
}
// we jump through a bunch of hoops here to avoid copying the obj buffer twice --
// instead we do a single copy to the destination in the record store.
checkOplogInsert(oplogCollection->insertDocumentsForOplog(txn, writers, nWriters));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
txn->recoveryUnit()->onCommit([txn, replCoord, finalOpTime] {
replCoord->setMyLastAppliedOpTimeForward(finalOpTime);
ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime);
});
}
void logOp(OperationContext* txn,
const char* opstr,
const char* ns,
const BSONObj& obj,
const BSONObj* o2,
bool fromMigrate) {
ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
NamespaceString nss(ns);
if (oplogDisabled(txn, replMode, nss))
return;
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
OplogSlot slot;
getNextOpTime(txn, oplog, replCoord, replMode, 1, &slot);
auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash);
const DocWriter* basePtr = &writer;
_logOpsInner(txn, nss, &basePtr, 1, oplog, replMode, slot.opTime);
}
void logOps(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
std::vector::const_iterator begin,
std::vector::const_iterator end,
bool fromMigrate) {
ReplicationCoordinator* replCoord = ReplicationCoordinator::get(txn);
ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode();
invariant(begin != end);
if (oplogDisabled(txn, replMode, nss))
return;
const size_t count = end - begin;
std::vector writers;
writers.reserve(count);
Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
std::unique_ptr slots(new OplogSlot[count]);
getNextOpTime(txn, oplog, replCoord, replMode, count, slots.get());
for (size_t i = 0; i < count; i++) {
writers.emplace_back(_logOpWriter(
txn, opstr, nss, begin[i], NULL, fromMigrate, slots[i].opTime, slots[i].hash));
}
std::unique_ptr basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
}
_logOpsInner(txn, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime);
}
namespace {
long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSettings) {
if (replSettings.getOplogSizeBytes() != 0) {
return replSettings.getOplogSizeBytes();
}
/* not specified. pick a default size */
ProcessInfo pi;
if (pi.getAddrSize() == 32) {
const auto sz = 50LL * 1024LL * 1024LL;
LOG(3) << "32bit system; choosing " << sz << " bytes oplog";
return sz;
}
// First choose a minimum size.
#if defined(__APPLE__)
// typically these are desktops (dev machines), so keep it smallish
const auto sz = 192 * 1024 * 1024;
LOG(3) << "Apple system; choosing " << sz << " bytes oplog";
return sz;
#else
long long lowerBound = 0;
double bytes = 0;
if (txn->getClient()->getServiceContext()->getGlobalStorageEngine()->isEphemeral()) {
// in memory: 50MB minimum size
lowerBound = 50LL * 1024 * 1024;
bytes = pi.getMemSizeMB() * 1024 * 1024;
LOG(3) << "Ephemeral storage system; lowerBound: " << lowerBound << " bytes, " << bytes
<< " bytes total memory";
} else {
// disk: 990MB minimum size
lowerBound = 990LL * 1024 * 1024;
bytes = File::freeSpace(storageGlobalParams.dbpath); //-1 if call not supported.
LOG(3) << "Disk storage system; lowerBound: " << lowerBound << " bytes, " << bytes
<< " bytes free space on device";
}
long long fivePct = static_cast(bytes * 0.05);
auto sz = std::max(fivePct, lowerBound);
// we use 5% of free [disk] space up to 50GB (1TB free)
const long long upperBound = 50LL * 1024 * 1024 * 1024;
sz = std::min(sz, upperBound);
return sz;
#endif
}
} // namespace
void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool isReplSet) {
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite lk(txn->lockState());
const ReplSettings& replSettings = ReplicationCoordinator::get(txn)->getSettings();
OldClientContext ctx(txn, oplogCollectionName);
Collection* collection = ctx.db()->getCollection(oplogCollectionName);
if (collection) {
if (replSettings.getOplogSizeBytes() != 0) {
const CollectionOptions oplogOpts =
collection->getCatalogEntry()->getCollectionOptions(txn);
int o = (int)(oplogOpts.cappedSize / (1024 * 1024));
int n = (int)(replSettings.getOplogSizeBytes() / (1024 * 1024));
if (n != o) {
stringstream ss;
ss << "cmdline oplogsize (" << n << ") different than existing (" << o
<< ") see: http://dochub.mongodb.org/core/increase-oplog";
log() << ss.str() << endl;
throw UserException(13257, ss.str());
}
}
if (!isReplSet)
initTimestampFromOplog(txn, oplogCollectionName);
return;
}
/* create an oplog collection, if it doesn't yet exist. */
const auto sz = getNewOplogSizeBytes(txn, replSettings);
log() << "******" << endl;
log() << "creating replication oplog of size: " << (int)(sz / (1024 * 1024)) << "MB..." << endl;
CollectionOptions options;
options.capped = true;
options.cappedSize = sz;
options.autoIndexId = CollectionOptions::NO;
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork uow(txn);
invariant(ctx.db()->createCollection(txn, oplogCollectionName, options));
if (!isReplSet)
getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj());
uow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", oplogCollectionName);
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
storageEngine->flushAllFiles(true);
log() << "******" << endl;
}
void createOplog(OperationContext* txn) {
const auto isReplSet = ReplicationCoordinator::get(txn)->getReplicationMode() ==
ReplicationCoordinator::modeReplSet;
createOplog(txn, _oplogCollectionName, isReplSet);
}
// -------------------------------------
namespace {
NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) {
BSONElement first = cmdObj.firstElement();
uassert(40073,
str::stream() << "collection name has invalid type " << typeName(first.type()),
first.canonicalType() == canonicalizeBSONType(mongo::String));
std::string coll = first.valuestr();
uassert(28635, "no collection name specified", !coll.empty());
return NamespaceString(NamespaceString(ns).db().toString(), coll);
}
using OpApplyFn = stdx::function;
struct ApplyOpMetadata {
OpApplyFn applyFunc;
std::set acceptableErrors;
ApplyOpMetadata(OpApplyFn fun) {
applyFunc = fun;
}
ApplyOpMetadata(OpApplyFn fun, std::set theAcceptableErrors) {
applyFunc = fun;
acceptableErrors = theAcceptableErrors;
}
};
std::map opsMap = {
{"create",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
const NamespaceString nss(parseNs(ns, cmd));
if (auto idIndexElem = cmd["idIndex"]) {
// Remove "idIndex" field from command.
auto cmdWithoutIdIndex = cmd.removeField("idIndex");
return createCollection(
txn, nss.db().toString(), cmdWithoutIdIndex, idIndexElem.Obj());
}
// No _id index spec was provided, so we should build a v:1 _id index.
BSONObjBuilder idIndexSpecBuilder;
idIndexSpecBuilder.append(IndexDescriptor::kIndexVersionFieldName,
static_cast(IndexVersion::kV1));
idIndexSpecBuilder.append(IndexDescriptor::kIndexNameFieldName, "_id_");
idIndexSpecBuilder.append(IndexDescriptor::kNamespaceFieldName, nss.ns());
idIndexSpecBuilder.append(IndexDescriptor::kKeyPatternFieldName, BSON("_id" << 1));
return createCollection(txn, nss.db().toString(), cmd, idIndexSpecBuilder.done());
},
{ErrorCodes::NamespaceExists}}},
{"collMod",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return collMod(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}},
{"dropDatabase",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
return dropDatabase(txn, NamespaceString(ns).db().toString());
},
{ErrorCodes::NamespaceNotFound}}},
{"drop",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropCollection(txn, parseNs(ns, cmd), resultWeDontCareAbout);
},
// IllegalOperation is necessary because in 3.0 we replicate drops of system.profile
// TODO(dannenberg) remove IllegalOperation once we no longer need 3.0 compatibility
{ErrorCodes::NamespaceNotFound, ErrorCodes::IllegalOperation}}},
// deleteIndex(es) is deprecated but still works as of April 10, 2015
{"deleteIndex",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"deleteIndexes",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndex",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndexes",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"renameCollection",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
return renameCollection(txn,
NamespaceString(cmd.firstElement().valuestrsafe()),
NamespaceString(cmd["to"].valuestrsafe()),
cmd["dropTarget"].trueValue(),
cmd["stayTemp"].trueValue());
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}},
{"applyOps",
{[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
return applyOps(txn, nsToDatabase(ns), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::UnknownError}}},
{"convertToCapped", {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
return convertToCapped(txn, parseNs(ns, cmd), cmd["size"].number());
}}},
{"emptycapped", {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
return emptyCapped(txn, parseNs(ns, cmd));
}}},
};
} // namespace
// @return failure status if an update should have happened and the document DNE.
// See replset initial sync code.
Status applyOperation_inlock(OperationContext* txn,
Database* db,
const BSONObj& op,
bool inSteadyStateReplication,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
LOG(3) << "applying op: " << redact(op);
OpCounters* opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
const char* names[] = {"o", "ns", "op", "b", "o2"};
BSONElement fields[5];
op.getFields(5, names, fields);
BSONElement& fieldO = fields[0];
BSONElement& fieldNs = fields[1];
BSONElement& fieldOp = fields[2];
BSONElement& fieldB = fields[3];
BSONElement& fieldO2 = fields[4];
BSONObj o;
if (fieldO.isABSONObj())
o = fieldO.embeddedObject();
const StringData ns = fieldNs.valueStringData();
BSONObj o2;
if (fieldO2.isABSONObj())
o2 = fieldO2.Obj();
bool valueB = fieldB.booleanSafe();
if (nsIsFull(ns)) {
if (supportsDocLocking()) {
// WiredTiger, and others requires MODE_IX since the applier threads driving
// this allow writes to the same collection on any thread.
dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_IX));
} else {
// mmapV1 ensures that all operations to the same collection are executed from
// the same worker thread, so it takes an exclusive lock (MODE_X)
dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
}
}
Collection* collection = db->getCollection(ns);
IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog();
const bool haveWrappingWriteUnitOfWork = txn->lockState()->inAWriteUnitOfWork();
uassert(ErrorCodes::CommandNotSupportedOnView,
str::stream() << "applyOps not supported on view: " << ns,
collection || !db->getViewCatalog()->lookup(txn, ns));
// operation type -- see logOp() comments for types
const char* opType = fieldOp.valuestrsafe();
invariant(*opType != 'c'); // commands are processed in applyCommand_inlock()
if (*opType == 'i') {
if (nsToCollectionSubstring(ns) == "system.indexes") {
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Missing expected index spec in field 'o': " << op,
!fieldO.eoo());
uassert(ErrorCodes::TypeMismatch,
str::stream() << "Expected object for index spec in field 'o': " << op,
fieldO.isABSONObj());
BSONObj indexSpec = fieldO.embeddedObject();
std::string indexNs;
uassertStatusOK(bsonExtractStringField(indexSpec, "ns", &indexNs));
const NamespaceString indexNss(indexNs);
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Invalid namespace in index spec: " << op,
indexNss.isValid());
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Database name mismatch for database ("
<< nsToDatabaseSubstring(ns)
<< ") while creating index: "
<< op,
nsToDatabaseSubstring(ns) == indexNss.db());
// Check if collection exists.
auto indexCollection = db->getCollection(indexNss);
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Failed to create index due to missing collection: "
<< op.toString(),
indexCollection);
opCounters->gotInsert();
if (!indexSpec["v"]) {
// If the "v" field isn't present in the index specification, then we assume it is a
// v=1 index from an older version of MongoDB. This is because
// (1) we haven't built v=0 indexes as the default for a long time, and
// (2) the index version has been included in the corresponding oplog entry since
// v=2 indexes were introduced.
BSONObjBuilder bob;
bob.append("v", static_cast(IndexVersion::kV1));
bob.appendElements(indexSpec);
indexSpec = bob.obj();
}
bool relaxIndexConstraints =
ReplicationCoordinator::get(txn)->shouldRelaxIndexConstraints(indexNss);
if (indexSpec["background"].trueValue()) {
Lock::TempRelease release(txn->lockState());
if (txn->lockState()->isLocked()) {
// If TempRelease fails, background index build will deadlock.
LOG(3) << "apply op: building background index " << indexSpec
<< " in the foreground because temp release failed";
IndexBuilder builder(indexSpec, relaxIndexConstraints);
Status status = builder.buildInForeground(txn, db);
uassertStatusOK(status);
} else {
IndexBuilder* builder = new IndexBuilder(indexSpec, relaxIndexConstraints);
// This spawns a new thread and returns immediately.
builder->go();
// Wait for thread to start and register itself
IndexBuilder::waitForBgIndexStarting();
}
txn->recoveryUnit()->abandonSnapshot();
} else {
IndexBuilder builder(indexSpec, relaxIndexConstraints);
Status status = builder.buildInForeground(txn, db);
uassertStatusOK(status);
}
// Since this is an index operation we can return without falling through.
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
return Status::OK();
}
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Failed to apply insert due to missing collection: "
<< op.toString(),
collection);
if (fieldO.type() == Array) {
// Batched inserts.
std::vector insertObjs;
for (auto elem : fieldO.Obj()) {
insertObjs.push_back(elem.Obj());
}
uassert(ErrorCodes::OperationFailed,
str::stream() << "Failed to apply insert due to empty array element: "
<< op.toString(),
!insertObjs.empty());
WriteUnitOfWork wuow(txn);
OpDebug* const nullOpDebug = nullptr;
Status status = collection->insertDocuments(
txn, insertObjs.begin(), insertObjs.end(), nullOpDebug, true);
if (!status.isOK()) {
return status;
}
wuow.commit();
for (auto entry : insertObjs) {
opCounters->gotInsert();
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
}
} else {
// Single insert.
opCounters->gotInsert();
// No _id.
// This indicates an issue with the upstream server:
// The oplog entry is corrupted; or
// The version of the upstream server is obsolete.
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply insert due to missing _id: " << op.toString(),
o.hasField("_id"));
// 1. Try insert first, if we have no wrappingWriteUnitOfWork
// 2. If okay, commit
// 3. If not, do upsert (and commit)
// 4. If both !Ok, return status
Status status{ErrorCodes::NotYetInitialized, ""};
// We cannot rely on a DuplicateKey error if we'repart of a larger transaction, because
// that would require the transaction to abort. So instead, use upsert in that case.
bool needToDoUpsert = haveWrappingWriteUnitOfWork;
if (!needToDoUpsert) {
WriteUnitOfWork wuow(txn);
try {
OpDebug* const nullOpDebug = nullptr;
status = collection->insertDocument(txn, o, nullOpDebug, true);
} catch (DBException dbe) {
status = dbe.toStatus();
}
if (status.isOK()) {
wuow.commit();
} else if (status == ErrorCodes::DuplicateKey) {
needToDoUpsert = true;
} else {
return status;
}
}
// Now see if we need to do an upsert.
if (needToDoUpsert) {
// Do update on DuplicateKey errors.
// This will only be on the _id field in replication,
// since we disable non-_id unique constraint violations.
BSONObjBuilder b;
b.append(o.getField("_id"));
const NamespaceString requestNs(ns);
UpdateRequest request(requestNs);
request.setQuery(b.done());
request.setUpdates(o);
request.setUpsert();
UpdateLifecycleImpl updateLifecycle(requestNs);
request.setLifecycle(&updateLifecycle);
UpdateResult res = update(txn, db, request);
if (res.numMatched == 0 && res.upserted.isEmpty()) {
error() << "No document was updated even though we got a DuplicateKey "
"error when inserting";
fassertFailedNoTrace(28750);
}
}
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
}
} else if (*opType == 'u') {
opCounters->gotUpdate();
BSONObj updateCriteria = o2;
const bool upsert = valueB || inSteadyStateReplication;
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply update due to missing _id: " << op.toString(),
updateCriteria.hasField("_id"));
const NamespaceString requestNs(ns);
UpdateRequest request(requestNs);
request.setQuery(updateCriteria);
request.setUpdates(o);
request.setUpsert(upsert);
UpdateLifecycleImpl updateLifecycle(requestNs);
request.setLifecycle(&updateLifecycle);
UpdateResult ur = update(txn, db, request);
if (ur.numMatched == 0 && ur.upserted.isEmpty()) {
if (ur.modifiers) {
if (updateCriteria.nFields() == 1) {
// was a simple { _id : ... } update criteria
string msg = str::stream() << "failed to apply update: " << redact(op);
error() << msg;
return Status(ErrorCodes::OperationFailed, msg);
}
// Need to check to see if it isn't present so we can exit early with a
// failure. Note that adds some overhead for this extra check in some cases,
// such as an updateCriteria
// of the form
// { _id:..., { x : {$size:...} }
// thus this is not ideal.
if (collection == NULL ||
(indexCatalog->haveIdIndex(txn) &&
Helpers::findById(txn, collection, updateCriteria).isNull()) ||
// capped collections won't have an _id index
(!indexCatalog->haveIdIndex(txn) &&
Helpers::findOne(txn, collection, updateCriteria, false).isNull())) {
string msg = str::stream() << "couldn't find doc: " << redact(op);
error() << msg;
return Status(ErrorCodes::OperationFailed, msg);
}
// Otherwise, it's present; zero objects were updated because of additional
// specifiers in the query for idempotence
} else {
// this could happen benignly on an oplog duplicate replay of an upsert
// (because we are idempotent),
// if an regular non-mod update fails the item is (presumably) missing.
if (!upsert) {
string msg = str::stream() << "update of non-mod failed: " << redact(op);
error() << msg;
return Status(ErrorCodes::OperationFailed, msg);
}
}
}
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
} else if (*opType == 'd') {
opCounters->gotDelete();
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply delete due to missing _id: " << op.toString(),
o.hasField("_id"));
if (opType[1] == 0) {
deleteObjects(txn, collection, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB);
} else
verify(opType[1] == 'b'); // "db" advertisement
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
} else if (*opType == 'n') {
// no op
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
} else {
throw MsgAssertionException(
14825, str::stream() << "error in applyOperation : unknown opType " << *opType);
}
// AuthorizationManager's logOp method registers a RecoveryUnit::Change and to do so we need
// to a new WriteUnitOfWork, if we dont have a wrapping unit of work already. If we already
// have a wrapping WUOW, the extra nexting is harmless. The logOp really should have been
// done in the WUOW that did the write, but this won't happen because applyOps turns off
// observers.
WriteUnitOfWork wuow(txn);
getGlobalAuthorizationManager()->logOp(
txn, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL);
wuow.commit();
return Status::OK();
}
Status applyCommand_inlock(OperationContext* txn,
const BSONObj& op,
bool inSteadyStateReplication) {
const char* names[] = {"o", "ns", "op"};
BSONElement fields[3];
op.getFields(3, names, fields);
BSONElement& fieldO = fields[0];
BSONElement& fieldNs = fields[1];
BSONElement& fieldOp = fields[2];
const char* opType = fieldOp.valuestrsafe();
invariant(*opType == 'c'); // only commands are processed here
if (fieldO.eoo()) {
return Status(ErrorCodes::NoSuchKey, "Missing expected field 'o'");
}
if (!fieldO.isABSONObj()) {
return Status(ErrorCodes::BadValue, "Expected object for field 'o'");
}
BSONObj o = fieldO.embeddedObject();
const NamespaceString nss(fieldNs.valuestrsafe());
if (!nss.isValid()) {
return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(nss.ns())};
}
{
Database* db = dbHolder().get(txn, nss.ns());
if (db && !db->getCollection(nss.ns()) && db->getViewCatalog()->lookup(txn, nss.ns())) {
return {ErrorCodes::CommandNotSupportedOnView,
str::stream() << "applyOps not supported on view:" << nss.ns()};
}
}
// Applying renameCollection during initial sync might lead to data corruption, so we restart
// the initial sync.
if (!inSteadyStateReplication && o.firstElementFieldName() == std::string("renameCollection")) {
return Status(ErrorCodes::OplogOperationUnsupported,
str::stream() << "Applying renameCollection not supported in initial sync: "
<< redact(op));
}
// Applying commands in repl is done under Global W-lock, so it is safe to not
// perform the current DB checks after reacquiring the lock.
invariant(txn->lockState()->isW());
bool done = false;
while (!done) {
auto op = opsMap.find(o.firstElementFieldName());
if (op == opsMap.end()) {
return Status(ErrorCodes::BadValue,
mongoutils::str::stream() << "Invalid key '" << o.firstElementFieldName()
<< "' found in field 'o'");
}
ApplyOpMetadata curOpToApply = op->second;
Status status = Status::OK();
try {
status = curOpToApply.applyFunc(txn, nss.ns().c_str(), o);
} catch (...) {
status = exceptionToStatus();
}
switch (status.code()) {
case ErrorCodes::WriteConflict: {
// Need to throw this up to a higher level where it will be caught and the
// operation retried.
throw WriteConflictException();
}
case ErrorCodes::BackgroundOperationInProgressForDatabase: {
Lock::TempRelease release(txn->lockState());
BackgroundOperation::awaitNoBgOpInProgForDb(nss.db());
txn->recoveryUnit()->abandonSnapshot();
txn->checkForInterrupt();
break;
}
case ErrorCodes::BackgroundOperationInProgressForNamespace: {
Lock::TempRelease release(txn->lockState());
Command* cmd = Command::findCommand(o.firstElement().fieldName());
invariant(cmd);
BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nss.db().toString(), o));
txn->recoveryUnit()->abandonSnapshot();
txn->checkForInterrupt();
break;
}
default:
if (_oplogCollectionName == masterSlaveOplogName) {
error() << "Failed command " << redact(o) << " on " << nss.db()
<< " with status " << status << " during oplog application";
} else if (curOpToApply.acceptableErrors.find(status.code()) ==
curOpToApply.acceptableErrors.end()) {
error() << "Failed command " << redact(o) << " on " << nss.db()
<< " with status " << status << " during oplog application";
return status;
}
// fallthrough
case ErrorCodes::OK:
done = true;
break;
}
}
// AuthorizationManager's logOp method registers a RecoveryUnit::Change
// and to do so we need to have begun a UnitOfWork
WriteUnitOfWork wuow(txn);
getGlobalAuthorizationManager()->logOp(txn, opType, nss.ns().c_str(), o, nullptr);
wuow.commit();
return Status::OK();
}
void setNewTimestamp(const Timestamp& newTime) {
stdx::lock_guard lk(newOpMutex);
setGlobalTimestamp(newTime);
newTimestampNotifier.notify_all();
}
void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) {
DBDirectClient c(txn);
BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
LOG(1) << "replSet setting last Timestamp";
const OpTime opTime = fassertStatusOK(28696, OpTime::parseFromOplogEntry(lastOp));
setNewTimestamp(opTime.getTimestamp());
}
}
void oplogCheckCloseDatabase(OperationContext* txn, Database* db) {
invariant(txn->lockState()->isW());
_localOplogCollection = nullptr;
}
void signalOplogWaiters() {
if (_localOplogCollection) {
_localOplogCollection->notifyCappedWaitersIfNeeded();
}
}
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000);
SnapshotThread::SnapshotThread(SnapshotManager* manager)
: _manager(manager), _thread([this] { run(); }) {}
bool SnapshotThread::shouldSleepMore(int numSleepsDone, size_t numUncommittedSnapshots) {
const double kThrottleRatio = 1 / 20.0;
const size_t kUncommittedSnapshotLimit = 1000;
const size_t kUncommittedSnapshotRestartPoint = kUncommittedSnapshotLimit / 2;
if (_inShutdown.load())
return false; // Exit the thread quickly without sleeping.
if (numSleepsDone == 0)
return true; // Always sleep at least once.
{
// Enforce a limit on the number of snapshots.
if (numUncommittedSnapshots >= kUncommittedSnapshotLimit)
_hitSnapshotLimit = true; // Don't create new snapshots.
if (numUncommittedSnapshots < kUncommittedSnapshotRestartPoint)
_hitSnapshotLimit = false; // Begin creating new snapshots again.
if (_hitSnapshotLimit)
return true;
}
// Spread out snapshots in time by sleeping as we collect more uncommitted snapshots.
const double numSleepsNeeded = numUncommittedSnapshots * kThrottleRatio;
return numSleepsNeeded > numSleepsDone;
}
void SnapshotThread::run() {
Client::initThread("SnapshotThread");
auto& client = cc();
auto serviceContext = client.getServiceContext();
auto replCoord = ReplicationCoordinator::get(serviceContext);
Timestamp lastTimestamp = {};
while (true) {
// This block logically belongs at the end of the loop, but having it at the top
// simplifies handling of the "continue" cases. It is harmless to do these before the
// first run of the loop.
for (int numSleepsDone = 0;
shouldSleepMore(numSleepsDone, replCoord->getNumUncommittedSnapshots());
numSleepsDone++) {
sleepmicros(replSnapshotThreadThrottleMicros);
_manager->cleanupUnneededSnapshots();
}
{
stdx::unique_lock lock(newOpMutex);
while (true) {
if (_inShutdown.load())
return;
if (_forcedSnapshotPending.load() || lastTimestamp != getLastSetTimestamp()) {
_forcedSnapshotPending.store(false);
lastTimestamp = getLastSetTimestamp();
break;
}
MONGO_IDLE_THREAD_BLOCK;
newTimestampNotifier.wait(lock);
}
}
while (MONGO_FAIL_POINT(disableSnapshotting)) {
sleepsecs(1);
if (_inShutdown.load()) {
return;
}
}
try {
auto txn = client.makeOperationContext();
Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX);
if (!replCoord->getMemberState().readable()) {
// If our MemberState isn't readable, we may not be in a consistent state so don't
// take snapshots. When we transition into a readable state from a non-readable
// state, a snapshot is forced to ensure we don't miss the latest write. This must
// be checked each time we acquire the global IS lock since that prevents the node
// from transitioning to a !readable() state from a readable() one in the cases
// where we shouldn't be creating a snapshot.
continue;
}
SnapshotName name(0); // assigned real value in block.
{
// Make sure there are no in-flight capped inserts while we create our snapshot.
// This lock cannot be aquired until all writes holding the resource commit/abort.
Lock::ResourceLock cappedInsertLockForOtherDb(
txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
Lock::ResourceLock cappedInsertLockForLocalDb(
txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
// Reserve the name immediately before we take our snapshot. This ensures that all
// names that compare lower must be from points in time visible to this named
// snapshot.
name = replCoord->reserveSnapshotName(nullptr);
// This establishes the view that we will name.
_manager->prepareForCreateSnapshot(txn.get());
}
auto opTimeOfSnapshot = OpTime();
{
AutoGetCollectionForRead oplog(txn.get(), rsOplogName);
invariant(oplog.getCollection());
// Read the latest op from the oplog.
auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false);
auto record = cursor->next();
if (!record)
continue; // oplog is completely empty.
const auto op = record->data.releaseToBson();
opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op));
invariant(!opTimeOfSnapshot.isNull());
}
replCoord->createSnapshot(txn.get(), opTimeOfSnapshot, name);
} catch (const WriteConflictException& wce) {
log() << "skipping storage snapshot pass due to write conflict";
continue;
}
}
}
void SnapshotThread::shutdown() {
invariant(_thread.joinable());
{
stdx::lock_guard lock(newOpMutex);
invariant(!_inShutdown.load());
_inShutdown.store(true);
newTimestampNotifier.notify_all();
}
_thread.join();
}
void SnapshotThread::forceSnapshot() {
stdx::lock_guard lock(newOpMutex);
_forcedSnapshotPending.store(true);
newTimestampNotifier.notify_all();
}
std::unique_ptr SnapshotThread::start(ServiceContext* service) {
if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) {
return std::unique_ptr(new SnapshotThread(manager));
}
return {};
}
} // namespace repl
} // namespace mongo