summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r--src/mongo/db/repl/oplog.cpp268
1 files changed, 135 insertions, 133 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index e394df05efd..9db957bb00c 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -145,13 +145,13 @@ struct OplogSlot {
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
-void getNextOpTime(OperationContext* txn,
+void getNextOpTime(OperationContext* opCtx,
Collection* oplog,
ReplicationCoordinator* replCoord,
ReplicationCoordinator::Mode replicationMode,
unsigned count,
OplogSlot* slotsOut) {
- synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns());
+ synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns());
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
@@ -163,10 +163,10 @@ void getNextOpTime(OperationContext* txn,
stdx::lock_guard<stdx::mutex> lk(newOpMutex);
- auto ts = LogicalClock::get(txn)->reserveTicks(count).asTimestamp();
+ auto ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp();
newTimestampNotifier.notify_all();
- fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts));
+ fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet);
@@ -229,11 +229,12 @@ void setOplogCollectionName() {
namespace {
-Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) {
+Collection* getLocalOplogCollection(OperationContext* opCtx,
+ const std::string& oplogCollectionName) {
if (_localOplogCollection)
return _localOplogCollection;
- AutoGetCollection autoColl(txn, NamespaceString(oplogCollectionName), MODE_IX);
+ AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX);
_localOplogCollection = autoColl.getCollection();
massert(13347,
"the oplog collection " + oplogCollectionName +
@@ -243,7 +244,7 @@ Collection* getLocalOplogCollection(OperationContext* txn, const std::string& op
return _localOplogCollection;
}
-bool oplogDisabled(OperationContext* txn,
+bool oplogDisabled(OperationContext* opCtx,
ReplicationCoordinator::Mode replicationMode,
const NamespaceString& nss) {
if (replicationMode == ReplicationCoordinator::modeNone)
@@ -255,15 +256,15 @@ bool oplogDisabled(OperationContext* txn,
if (nss.isSystemDotProfile())
return true;
- if (!txn->writesAreReplicated())
+ if (!opCtx->writesAreReplicated())
return true;
- fassert(28626, txn->recoveryUnit());
+ fassert(28626, opCtx->recoveryUnit());
return false;
}
-OplogDocWriter _logOpWriter(OperationContext* txn,
+OplogDocWriter _logOpWriter(OperationContext* opCtx,
const char* opstr,
const NamespaceString& nss,
const BSONObj& obj,
@@ -290,11 +291,11 @@ OplogDocWriter _logOpWriter(OperationContext* txn,
} // end anon namespace
// Truncates the oplog after and including the "truncateTimestamp" entry.
-void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
+void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp) {
const NamespaceString oplogNss(rsOplogName);
- ScopedTransaction transaction(txn, MODE_IX);
- AutoGetDb autoDb(txn, oplogNss.db(), MODE_IX);
- Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX);
+ Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X);
Collection* oplogCollection = autoDb.getDb()->getCollection(oplogNss);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(
@@ -305,7 +306,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
// Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
RecordId oldestIDToDelete; // Non-null if there is something to delete.
auto oplogRs = oplogCollection->getRecordStore();
- auto oplogReverseCursor = oplogRs->getCursor(txn, /*forward=*/false);
+ auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false);
size_t count = 0;
while (auto next = oplogReverseCursor->next()) {
const BSONObj entry = next->data.releaseToBson();
@@ -325,7 +326,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
// oplog is < truncateTimestamp.
if (count != 1) {
invariant(!oldestIDToDelete.isNull());
- oplogCollection->cappedTruncateAfter(txn, oldestIDToDelete, /*inclusive=*/true);
+ oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true);
}
return;
}
@@ -356,7 +357,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
if not null, specifies a boolean to pass along to the other side as b: param.
used for "justOne" or "upsert" flags on 'd', 'u'
*/
-void _logOpsInner(OperationContext* txn,
+void _logOpsInner(OperationContext* opCtx,
const NamespaceString& nss,
const DocWriter* const* writers,
size_t nWriters,
@@ -366,79 +367,80 @@ void _logOpsInner(OperationContext* txn,
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
- !replCoord->canAcceptWritesFor(txn, nss)) {
+ !replCoord->canAcceptWritesFor(opCtx, nss)) {
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
}
// we jump through a bunch of hoops here to avoid copying the obj buffer twice --
// instead we do a single copy to the destination in the record store.
- checkOplogInsert(oplogCollection->insertDocumentsForOplog(txn, writers, nWriters));
+ checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, nWriters));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
- txn->recoveryUnit()->onCommit([txn, replCoord, finalOpTime] {
+ opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] {
replCoord->setMyLastAppliedOpTimeForward(finalOpTime);
- ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime);
+ ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime);
});
}
-void logOp(OperationContext* txn,
+void logOp(OperationContext* opCtx,
const char* opstr,
const char* ns,
const BSONObj& obj,
const BSONObj* o2,
bool fromMigrate) {
- ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
+ ReplicationCoordinator::Mode replMode =
+ ReplicationCoordinator::get(opCtx)->getReplicationMode();
NamespaceString nss(ns);
- if (oplogDisabled(txn, replMode, nss))
+ if (oplogDisabled(opCtx, replMode, nss))
return;
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
- Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
- Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
+ Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
+ Lock::DBLock lk(opCtx->lockState(), "local", MODE_IX);
+ Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
OplogSlot slot;
- getNextOpTime(txn, oplog, replCoord, replMode, 1, &slot);
- auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash);
+ getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot);
+ auto writer = _logOpWriter(opCtx, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash);
const DocWriter* basePtr = &writer;
- _logOpsInner(txn, nss, &basePtr, 1, oplog, replMode, slot.opTime);
+ _logOpsInner(opCtx, nss, &basePtr, 1, oplog, replMode, slot.opTime);
}
-void logOps(OperationContext* txn,
+void logOps(OperationContext* opCtx,
const char* opstr,
const NamespaceString& nss,
std::vector<BSONObj>::const_iterator begin,
std::vector<BSONObj>::const_iterator end,
bool fromMigrate) {
- ReplicationCoordinator* replCoord = ReplicationCoordinator::get(txn);
+ ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx);
ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode();
invariant(begin != end);
- if (oplogDisabled(txn, replMode, nss))
+ if (oplogDisabled(opCtx, replMode, nss))
return;
const size_t count = end - begin;
std::vector<OplogDocWriter> writers;
writers.reserve(count);
- Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
- Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
- Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
+ Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
+ Lock::DBLock lk(opCtx->lockState(), "local", MODE_IX);
+ Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]);
- getNextOpTime(txn, oplog, replCoord, replMode, count, slots.get());
+ getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get());
for (size_t i = 0; i < count; i++) {
writers.emplace_back(_logOpWriter(
- txn, opstr, nss, begin[i], NULL, fromMigrate, slots[i].opTime, slots[i].hash));
+ opCtx, opstr, nss, begin[i], NULL, fromMigrate, slots[i].opTime, slots[i].hash));
}
std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
}
- _logOpsInner(txn, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime);
+ _logOpsInner(opCtx, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime);
}
namespace {
-long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSettings) {
+long long getNewOplogSizeBytes(OperationContext* opCtx, const ReplSettings& replSettings) {
if (replSettings.getOplogSizeBytes() != 0) {
return replSettings.getOplogSizeBytes();
}
@@ -459,7 +461,7 @@ long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSe
#else
long long lowerBound = 0;
double bytes = 0;
- if (txn->getClient()->getServiceContext()->getGlobalStorageEngine()->isEphemeral()) {
+ if (opCtx->getClient()->getServiceContext()->getGlobalStorageEngine()->isEphemeral()) {
// in memory: 50MB minimum size
lowerBound = 50LL * 1024 * 1024;
bytes = pi.getMemSizeMB() * 1024 * 1024;
@@ -482,19 +484,19 @@ long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSe
}
} // namespace
-void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool isReplSet) {
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
+void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName, bool isReplSet) {
+ ScopedTransaction transaction(opCtx, MODE_X);
+ Lock::GlobalWrite lk(opCtx->lockState());
- const ReplSettings& replSettings = ReplicationCoordinator::get(txn)->getSettings();
+ const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings();
- OldClientContext ctx(txn, oplogCollectionName);
+ OldClientContext ctx(opCtx, oplogCollectionName);
Collection* collection = ctx.db()->getCollection(oplogCollectionName);
if (collection) {
if (replSettings.getOplogSizeBytes() != 0) {
const CollectionOptions oplogOpts =
- collection->getCatalogEntry()->getCollectionOptions(txn);
+ collection->getCatalogEntry()->getCollectionOptions(opCtx);
int o = (int)(oplogOpts.cappedSize / (1024 * 1024));
int n = (int)(replSettings.getOplogSizeBytes() / (1024 * 1024));
@@ -508,12 +510,12 @@ void createOplog(OperationContext* txn, const std::string& oplogCollectionName,
}
if (!isReplSet)
- initTimestampFromOplog(txn, oplogCollectionName);
+ initTimestampFromOplog(opCtx, oplogCollectionName);
return;
}
/* create an oplog collection, if it doesn't yet exist. */
- const auto sz = getNewOplogSizeBytes(txn, replSettings);
+ const auto sz = getNewOplogSizeBytes(opCtx, replSettings);
log() << "******" << endl;
log() << "creating replication oplog of size: " << (int)(sz / (1024 * 1024)) << "MB..." << endl;
@@ -524,24 +526,24 @@ void createOplog(OperationContext* txn, const std::string& oplogCollectionName,
options.autoIndexId = CollectionOptions::NO;
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork uow(txn);
- invariant(ctx.db()->createCollection(txn, oplogCollectionName, options));
+ WriteUnitOfWork uow(opCtx);
+ invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options));
if (!isReplSet)
- getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj());
+ getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj());
uow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", oplogCollectionName);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", oplogCollectionName);
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
- storageEngine->flushAllFiles(txn, true);
+ storageEngine->flushAllFiles(opCtx, true);
log() << "******" << endl;
}
-void createOplog(OperationContext* txn) {
- const auto isReplSet = ReplicationCoordinator::get(txn)->getReplicationMode() ==
+void createOplog(OperationContext* opCtx) {
+ const auto isReplSet = ReplicationCoordinator::get(opCtx)->getReplicationMode() ==
ReplicationCoordinator::modeReplSet;
- createOplog(txn, _oplogCollectionName, isReplSet);
+ createOplog(opCtx, _oplogCollectionName, isReplSet);
}
// -------------------------------------
@@ -575,13 +577,13 @@ struct ApplyOpMetadata {
std::map<std::string, ApplyOpMetadata> opsMap = {
{"create",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
const NamespaceString nss(parseNs(ns, cmd));
if (auto idIndexElem = cmd["idIndex"]) {
// Remove "idIndex" field from command.
auto cmdWithoutIdIndex = cmd.removeField("idIndex");
return createCollection(
- txn, nss.db().toString(), cmdWithoutIdIndex, idIndexElem.Obj());
+ opCtx, nss.db().toString(), cmdWithoutIdIndex, idIndexElem.Obj());
}
// No _id index spec was provided, so we should build a v:1 _id index.
@@ -591,55 +593,55 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
idIndexSpecBuilder.append(IndexDescriptor::kIndexNameFieldName, "_id_");
idIndexSpecBuilder.append(IndexDescriptor::kNamespaceFieldName, nss.ns());
idIndexSpecBuilder.append(IndexDescriptor::kKeyPatternFieldName, BSON("_id" << 1));
- return createCollection(txn, nss.db().toString(), cmd, idIndexSpecBuilder.done());
+ return createCollection(opCtx, nss.db().toString(), cmd, idIndexSpecBuilder.done());
},
{ErrorCodes::NamespaceExists}}},
{"collMod",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return collMod(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return collMod(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}},
{"dropDatabase",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
- return dropDatabase(txn, NamespaceString(ns).db().toString());
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
+ return dropDatabase(opCtx, NamespaceString(ns).db().toString());
},
{ErrorCodes::NamespaceNotFound}}},
{"drop",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropCollection(txn, parseNs(ns, cmd), resultWeDontCareAbout);
+ return dropCollection(opCtx, parseNs(ns, cmd), resultWeDontCareAbout);
},
// IllegalOperation is necessary because in 3.0 we replicate drops of system.profile
// TODO(dannenberg) remove IllegalOperation once we no longer need 3.0 compatibility
{ErrorCodes::NamespaceNotFound, ErrorCodes::IllegalOperation}}},
// deleteIndex(es) is deprecated but still works as of April 10, 2015
{"deleteIndex",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"deleteIndexes",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndex",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndexes",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
+ return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"renameCollection",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
const auto sourceNsElt = cmd.firstElement();
const auto targetNsElt = cmd["to"];
uassert(ErrorCodes::TypeMismatch,
@@ -648,7 +650,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
uassert(ErrorCodes::TypeMismatch,
"'to' must be of type String",
targetNsElt.type() == BSONType::String);
- return renameCollection(txn,
+ return renameCollection(opCtx,
NamespaceString(sourceNsElt.valueStringData()),
NamespaceString(targetNsElt.valueStringData()),
cmd["dropTarget"].trueValue(),
@@ -656,16 +658,16 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}},
{"applyOps",
- {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
+ {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return applyOps(txn, nsToDatabase(ns), cmd, &resultWeDontCareAbout);
+ return applyOps(opCtx, nsToDatabase(ns), cmd, &resultWeDontCareAbout);
},
{ErrorCodes::UnknownError}}},
- {"convertToCapped", {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
- return convertToCapped(txn, parseNs(ns, cmd), cmd["size"].number());
+ {"convertToCapped", {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
+ return convertToCapped(opCtx, parseNs(ns, cmd), cmd["size"].number());
}}},
- {"emptycapped", {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status {
- return emptyCapped(txn, parseNs(ns, cmd));
+ {"emptycapped", {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status {
+ return emptyCapped(opCtx, parseNs(ns, cmd));
}}},
};
@@ -673,14 +675,14 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
// @return failure status if an update should have happened and the document DNE.
// See replset initial sync code.
-Status applyOperation_inlock(OperationContext* txn,
+Status applyOperation_inlock(OperationContext* opCtx,
Database* db,
const BSONObj& op,
bool inSteadyStateReplication,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
LOG(3) << "applying op: " << redact(op);
- OpCounters* opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
+ OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
const char* names[] = {"o", "ns", "op", "b", "o2"};
BSONElement fields[5];
@@ -710,19 +712,19 @@ Status applyOperation_inlock(OperationContext* txn,
if (supportsDocLocking()) {
// WiredTiger, and others requires MODE_IX since the applier threads driving
// this allow writes to the same collection on any thread.
- dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_IX));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IX));
} else {
// mmapV1 ensures that all operations to the same collection are executed from
// the same worker thread, so it takes an exclusive lock (MODE_X)
- dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_X));
}
}
Collection* collection = db->getCollection(ns);
IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog();
- const bool haveWrappingWriteUnitOfWork = txn->lockState()->inAWriteUnitOfWork();
+ const bool haveWrappingWriteUnitOfWork = opCtx->lockState()->inAWriteUnitOfWork();
uassert(ErrorCodes::CommandNotSupportedOnView,
str::stream() << "applyOps not supported on view: " << ns,
- collection || !db->getViewCatalog()->lookup(txn, ns));
+ collection || !db->getViewCatalog()->lookup(opCtx, ns));
// operation type -- see logOp() comments for types
const char* opType = fieldOp.valuestrsafe();
@@ -768,15 +770,15 @@ Status applyOperation_inlock(OperationContext* txn,
}
bool relaxIndexConstraints =
- ReplicationCoordinator::get(txn)->shouldRelaxIndexConstraints(txn, indexNss);
+ ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss);
if (indexSpec["background"].trueValue()) {
- Lock::TempRelease release(txn->lockState());
- if (txn->lockState()->isLocked()) {
+ Lock::TempRelease release(opCtx->lockState());
+ if (opCtx->lockState()->isLocked()) {
// If TempRelease fails, background index build will deadlock.
LOG(3) << "apply op: building background index " << indexSpec
<< " in the foreground because temp release failed";
IndexBuilder builder(indexSpec, relaxIndexConstraints);
- Status status = builder.buildInForeground(txn, db);
+ Status status = builder.buildInForeground(opCtx, db);
uassertStatusOK(status);
} else {
IndexBuilder* builder = new IndexBuilder(indexSpec, relaxIndexConstraints);
@@ -785,10 +787,10 @@ Status applyOperation_inlock(OperationContext* txn,
// Wait for thread to start and register itself
IndexBuilder::waitForBgIndexStarting();
}
- txn->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->abandonSnapshot();
} else {
IndexBuilder builder(indexSpec, relaxIndexConstraints);
- Status status = builder.buildInForeground(txn, db);
+ Status status = builder.buildInForeground(opCtx, db);
uassertStatusOK(status);
}
// Since this is an index operation we can return without falling through.
@@ -812,10 +814,10 @@ Status applyOperation_inlock(OperationContext* txn,
str::stream() << "Failed to apply insert due to empty array element: "
<< op.toString(),
!insertObjs.empty());
- WriteUnitOfWork wuow(txn);
+ WriteUnitOfWork wuow(opCtx);
OpDebug* const nullOpDebug = nullptr;
Status status = collection->insertDocuments(
- txn, insertObjs.begin(), insertObjs.end(), nullOpDebug, true);
+ opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true);
if (!status.isOK()) {
return status;
}
@@ -849,10 +851,10 @@ Status applyOperation_inlock(OperationContext* txn,
bool needToDoUpsert = haveWrappingWriteUnitOfWork;
if (!needToDoUpsert) {
- WriteUnitOfWork wuow(txn);
+ WriteUnitOfWork wuow(opCtx);
try {
OpDebug* const nullOpDebug = nullptr;
- status = collection->insertDocument(txn, o, nullOpDebug, true);
+ status = collection->insertDocument(opCtx, o, nullOpDebug, true);
} catch (DBException dbe) {
status = dbe.toStatus();
}
@@ -881,7 +883,7 @@ Status applyOperation_inlock(OperationContext* txn,
UpdateLifecycleImpl updateLifecycle(requestNs);
request.setLifecycle(&updateLifecycle);
- UpdateResult res = update(txn, db, request);
+ UpdateResult res = update(opCtx, db, request);
if (res.numMatched == 0 && res.upserted.isEmpty()) {
error() << "No document was updated even though we got a DuplicateKey "
"error when inserting";
@@ -912,7 +914,7 @@ Status applyOperation_inlock(OperationContext* txn,
UpdateLifecycleImpl updateLifecycle(requestNs);
request.setLifecycle(&updateLifecycle);
- UpdateResult ur = update(txn, db, request);
+ UpdateResult ur = update(opCtx, db, request);
if (ur.numMatched == 0 && ur.upserted.isEmpty()) {
if (ur.modifiers) {
@@ -929,11 +931,11 @@ Status applyOperation_inlock(OperationContext* txn,
// { _id:..., { x : {$size:...} }
// thus this is not ideal.
if (collection == NULL ||
- (indexCatalog->haveIdIndex(txn) &&
- Helpers::findById(txn, collection, updateCriteria).isNull()) ||
+ (indexCatalog->haveIdIndex(opCtx) &&
+ Helpers::findById(opCtx, collection, updateCriteria).isNull()) ||
// capped collections won't have an _id index
- (!indexCatalog->haveIdIndex(txn) &&
- Helpers::findOne(txn, collection, updateCriteria, false).isNull())) {
+ (!indexCatalog->haveIdIndex(opCtx) &&
+ Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) {
string msg = str::stream() << "couldn't find doc: " << redact(op);
error() << msg;
return Status(ErrorCodes::OperationFailed, msg);
@@ -963,7 +965,7 @@ Status applyOperation_inlock(OperationContext* txn,
o.hasField("_id"));
if (opType[1] == 0) {
- deleteObjects(txn, collection, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB);
+ deleteObjects(opCtx, collection, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB);
} else
verify(opType[1] == 'b'); // "db" advertisement
if (incrementOpsAppliedStats) {
@@ -984,15 +986,15 @@ Status applyOperation_inlock(OperationContext* txn,
// have a wrapping WUOW, the extra nexting is harmless. The logOp really should have been
// done in the WUOW that did the write, but this won't happen because applyOps turns off
// observers.
- WriteUnitOfWork wuow(txn);
+ WriteUnitOfWork wuow(opCtx);
getGlobalAuthorizationManager()->logOp(
- txn, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL);
+ opCtx, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL);
wuow.commit();
return Status::OK();
}
-Status applyCommand_inlock(OperationContext* txn,
+Status applyCommand_inlock(OperationContext* opCtx,
const BSONObj& op,
bool inSteadyStateReplication) {
const char* names[] = {"o", "ns", "op"};
@@ -1023,8 +1025,8 @@ Status applyCommand_inlock(OperationContext* txn,
return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(nss.ns())};
}
{
- Database* db = dbHolder().get(txn, nss.ns());
- if (db && !db->getCollection(nss.ns()) && db->getViewCatalog()->lookup(txn, nss.ns())) {
+ Database* db = dbHolder().get(opCtx, nss.ns());
+ if (db && !db->getCollection(nss.ns()) && db->getViewCatalog()->lookup(opCtx, nss.ns())) {
return {ErrorCodes::CommandNotSupportedOnView,
str::stream() << "applyOps not supported on view:" << nss.ns()};
}
@@ -1040,7 +1042,7 @@ Status applyCommand_inlock(OperationContext* txn,
// Applying commands in repl is done under Global W-lock, so it is safe to not
// perform the current DB checks after reacquiring the lock.
- invariant(txn->lockState()->isW());
+ invariant(opCtx->lockState()->isW());
bool done = false;
@@ -1054,7 +1056,7 @@ Status applyCommand_inlock(OperationContext* txn,
ApplyOpMetadata curOpToApply = op->second;
Status status = Status::OK();
try {
- status = curOpToApply.applyFunc(txn, nss.ns().c_str(), o);
+ status = curOpToApply.applyFunc(opCtx, nss.ns().c_str(), o);
} catch (...) {
status = exceptionToStatus();
}
@@ -1065,21 +1067,21 @@ Status applyCommand_inlock(OperationContext* txn,
throw WriteConflictException();
}
case ErrorCodes::BackgroundOperationInProgressForDatabase: {
- Lock::TempRelease release(txn->lockState());
+ Lock::TempRelease release(opCtx->lockState());
BackgroundOperation::awaitNoBgOpInProgForDb(nss.db());
- txn->recoveryUnit()->abandonSnapshot();
- txn->checkForInterrupt();
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->checkForInterrupt();
break;
}
case ErrorCodes::BackgroundOperationInProgressForNamespace: {
- Lock::TempRelease release(txn->lockState());
+ Lock::TempRelease release(opCtx->lockState());
Command* cmd = Command::findCommand(o.firstElement().fieldName());
invariant(cmd);
BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nss.db().toString(), o));
- txn->recoveryUnit()->abandonSnapshot();
- txn->checkForInterrupt();
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->checkForInterrupt();
break;
}
default:
@@ -1101,8 +1103,8 @@ Status applyCommand_inlock(OperationContext* txn,
// AuthorizationManager's logOp method registers a RecoveryUnit::Change
// and to do so we need to have begun a UnitOfWork
- WriteUnitOfWork wuow(txn);
- getGlobalAuthorizationManager()->logOp(txn, opType, nss.ns().c_str(), o, nullptr);
+ WriteUnitOfWork wuow(opCtx);
+ getGlobalAuthorizationManager()->logOp(opCtx, opType, nss.ns().c_str(), o, nullptr);
wuow.commit();
return Status::OK();
@@ -1114,19 +1116,19 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
newTimestampNotifier.notify_all();
}
-void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) {
- DBDirectClient c(txn);
+void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) {
+ DBDirectClient c(opCtx);
BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
LOG(1) << "replSet setting last Timestamp";
const OpTime opTime = fassertStatusOK(28696, OpTime::parseFromOplogEntry(lastOp));
- setNewTimestamp(txn->getServiceContext(), opTime.getTimestamp());
+ setNewTimestamp(opCtx->getServiceContext(), opTime.getTimestamp());
}
}
-void oplogCheckCloseDatabase(OperationContext* txn, Database* db) {
- invariant(txn->lockState()->isW());
+void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) {
+ invariant(opCtx->lockState()->isW());
_localOplogCollection = nullptr;
}
@@ -1213,8 +1215,8 @@ void SnapshotThread::run() {
}
try {
- auto txn = client.makeOperationContext();
- Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX);
+ auto opCtx = client.makeOperationContext();
+ Lock::GlobalLock globalLock(opCtx->lockState(), MODE_IS, UINT_MAX);
if (!replCoord->getMemberState().readable()) {
// If our MemberState isn't readable, we may not be in a consistent state so don't
@@ -1231,9 +1233,9 @@ void SnapshotThread::run() {
// Make sure there are no in-flight capped inserts while we create our snapshot.
// This lock cannot be aquired until all writes holding the resource commit/abort.
Lock::ResourceLock cappedInsertLockForOtherDb(
- txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
+ opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
Lock::ResourceLock cappedInsertLockForLocalDb(
- txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
+ opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
// Reserve the name immediately before we take our snapshot. This ensures that all
// names that compare lower must be from points in time visible to this named
@@ -1241,15 +1243,15 @@ void SnapshotThread::run() {
name = replCoord->reserveSnapshotName(nullptr);
// This establishes the view that we will name.
- _manager->prepareForCreateSnapshot(txn.get());
+ _manager->prepareForCreateSnapshot(opCtx.get());
}
auto opTimeOfSnapshot = OpTime();
{
- AutoGetCollectionForRead oplog(txn.get(), NamespaceString(rsOplogName));
+ AutoGetCollectionForRead oplog(opCtx.get(), NamespaceString(rsOplogName));
invariant(oplog.getCollection());
// Read the latest op from the oplog.
- auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false);
+ auto cursor = oplog.getCollection()->getCursor(opCtx.get(), /*forward*/ false);
auto record = cursor->next();
if (!record)
continue; // oplog is completely empty.
@@ -1259,7 +1261,7 @@ void SnapshotThread::run() {
invariant(!opTimeOfSnapshot.isNull());
}
- replCoord->createSnapshot(txn.get(), opTimeOfSnapshot, name);
+ replCoord->createSnapshot(opCtx.get(), opTimeOfSnapshot, name);
} catch (const WriteConflictException& wce) {
log() << "skipping storage snapshot pass due to write conflict";
continue;