diff options
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 268 |
1 files changed, 135 insertions, 133 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index e394df05efd..9db957bb00c 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -145,13 +145,13 @@ struct OplogSlot { * function registers the new optime with the storage system and the replication coordinator, * and provides no facility to revert those registrations on rollback. */ -void getNextOpTime(OperationContext* txn, +void getNextOpTime(OperationContext* opCtx, Collection* oplog, ReplicationCoordinator* replCoord, ReplicationCoordinator::Mode replicationMode, unsigned count, OplogSlot* slotsOut) { - synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns()); + synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns()); long long term = OpTime::kUninitializedTerm; // Fetch term out of the newOpMutex. @@ -163,10 +163,10 @@ void getNextOpTime(OperationContext* txn, stdx::lock_guard<stdx::mutex> lk(newOpMutex); - auto ts = LogicalClock::get(txn)->reserveTicks(count).asTimestamp(); + auto ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); newTimestampNotifier.notify_all(); - fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); + fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts)); // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet); @@ -229,11 +229,12 @@ void setOplogCollectionName() { namespace { -Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) { +Collection* getLocalOplogCollection(OperationContext* opCtx, + const std::string& oplogCollectionName) { if (_localOplogCollection) return _localOplogCollection; - AutoGetCollection autoColl(txn, NamespaceString(oplogCollectionName), MODE_IX); + AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX); _localOplogCollection = autoColl.getCollection(); massert(13347, "the oplog collection " + oplogCollectionName + @@ -243,7 +244,7 @@ Collection* getLocalOplogCollection(OperationContext* txn, const std::string& op return _localOplogCollection; } -bool oplogDisabled(OperationContext* txn, +bool oplogDisabled(OperationContext* opCtx, ReplicationCoordinator::Mode replicationMode, const NamespaceString& nss) { if (replicationMode == ReplicationCoordinator::modeNone) @@ -255,15 +256,15 @@ bool oplogDisabled(OperationContext* txn, if (nss.isSystemDotProfile()) return true; - if (!txn->writesAreReplicated()) + if (!opCtx->writesAreReplicated()) return true; - fassert(28626, txn->recoveryUnit()); + fassert(28626, opCtx->recoveryUnit()); return false; } -OplogDocWriter _logOpWriter(OperationContext* txn, +OplogDocWriter _logOpWriter(OperationContext* opCtx, const char* opstr, const NamespaceString& nss, const BSONObj& obj, @@ -290,11 +291,11 @@ OplogDocWriter _logOpWriter(OperationContext* txn, } // end anon namespace // Truncates the oplog after and including the "truncateTimestamp" entry. -void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { +void truncateOplogTo(OperationContext* opCtx, Timestamp truncateTimestamp) { const NamespaceString oplogNss(rsOplogName); - ScopedTransaction transaction(txn, MODE_IX); - AutoGetDb autoDb(txn, oplogNss.db(), MODE_IX); - Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX); + Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X); Collection* oplogCollection = autoDb.getDb()->getCollection(oplogNss); if (!oplogCollection) { fassertFailedWithStatusNoTrace( @@ -305,7 +306,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. RecordId oldestIDToDelete; // Non-null if there is something to delete. auto oplogRs = oplogCollection->getRecordStore(); - auto oplogReverseCursor = oplogRs->getCursor(txn, /*forward=*/false); + auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false); size_t count = 0; while (auto next = oplogReverseCursor->next()) { const BSONObj entry = next->data.releaseToBson(); @@ -325,7 +326,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { // oplog is < truncateTimestamp. if (count != 1) { invariant(!oldestIDToDelete.isNull()); - oplogCollection->cappedTruncateAfter(txn, oldestIDToDelete, /*inclusive=*/true); + oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true); } return; } @@ -356,7 +357,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { if not null, specifies a boolean to pass along to the other side as b: param. used for "justOne" or "upsert" flags on 'd', 'u' */ -void _logOpsInner(OperationContext* txn, +void _logOpsInner(OperationContext* opCtx, const NamespaceString& nss, const DocWriter* const* writers, size_t nWriters, @@ -366,79 +367,80 @@ void _logOpsInner(OperationContext* txn, ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet && - !replCoord->canAcceptWritesFor(txn, nss)) { + !replCoord->canAcceptWritesFor(opCtx, nss)) { severe() << "logOp() but can't accept write to collection " << nss.ns(); fassertFailed(17405); } // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- // instead we do a single copy to the destination in the record store. - checkOplogInsert(oplogCollection->insertDocumentsForOplog(txn, writers, nWriters)); + checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, nWriters)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. - txn->recoveryUnit()->onCommit([txn, replCoord, finalOpTime] { + opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] { replCoord->setMyLastAppliedOpTimeForward(finalOpTime); - ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime); + ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime); }); } -void logOp(OperationContext* txn, +void logOp(OperationContext* opCtx, const char* opstr, const char* ns, const BSONObj& obj, const BSONObj* o2, bool fromMigrate) { - ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); + ReplicationCoordinator::Mode replMode = + ReplicationCoordinator::get(opCtx)->getReplicationMode(); NamespaceString nss(ns); - if (oplogDisabled(txn, replMode, nss)) + if (oplogDisabled(opCtx, replMode, nss)) return; ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName); - Lock::DBLock lk(txn->lockState(), "local", MODE_IX); - Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX); + Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); + Lock::DBLock lk(opCtx->lockState(), "local", MODE_IX); + Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); OplogSlot slot; - getNextOpTime(txn, oplog, replCoord, replMode, 1, &slot); - auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash); + getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot); + auto writer = _logOpWriter(opCtx, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash); const DocWriter* basePtr = &writer; - _logOpsInner(txn, nss, &basePtr, 1, oplog, replMode, slot.opTime); + _logOpsInner(opCtx, nss, &basePtr, 1, oplog, replMode, slot.opTime); } -void logOps(OperationContext* txn, +void logOps(OperationContext* opCtx, const char* opstr, const NamespaceString& nss, std::vector<BSONObj>::const_iterator begin, std::vector<BSONObj>::const_iterator end, bool fromMigrate) { - ReplicationCoordinator* replCoord = ReplicationCoordinator::get(txn); + ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx); ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode(); invariant(begin != end); - if (oplogDisabled(txn, replMode, nss)) + if (oplogDisabled(opCtx, replMode, nss)) return; const size_t count = end - begin; std::vector<OplogDocWriter> writers; writers.reserve(count); - Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName); - Lock::DBLock lk(txn->lockState(), "local", MODE_IX); - Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX); + Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); + Lock::DBLock lk(opCtx->lockState(), "local", MODE_IX); + Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]); - getNextOpTime(txn, oplog, replCoord, replMode, count, slots.get()); + getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get()); for (size_t i = 0; i < count; i++) { writers.emplace_back(_logOpWriter( - txn, opstr, nss, begin[i], NULL, fromMigrate, slots[i].opTime, slots[i].hash)); + opCtx, opstr, nss, begin[i], NULL, fromMigrate, slots[i].opTime, slots[i].hash)); } std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]); for (size_t i = 0; i < count; i++) { basePtrs[i] = &writers[i]; } - _logOpsInner(txn, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime); + _logOpsInner(opCtx, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime); } namespace { -long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSettings) { +long long getNewOplogSizeBytes(OperationContext* opCtx, const ReplSettings& replSettings) { if (replSettings.getOplogSizeBytes() != 0) { return replSettings.getOplogSizeBytes(); } @@ -459,7 +461,7 @@ long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSe #else long long lowerBound = 0; double bytes = 0; - if (txn->getClient()->getServiceContext()->getGlobalStorageEngine()->isEphemeral()) { + if (opCtx->getClient()->getServiceContext()->getGlobalStorageEngine()->isEphemeral()) { // in memory: 50MB minimum size lowerBound = 50LL * 1024 * 1024; bytes = pi.getMemSizeMB() * 1024 * 1024; @@ -482,19 +484,19 @@ long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSe } } // namespace -void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool isReplSet) { - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); +void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName, bool isReplSet) { + ScopedTransaction transaction(opCtx, MODE_X); + Lock::GlobalWrite lk(opCtx->lockState()); - const ReplSettings& replSettings = ReplicationCoordinator::get(txn)->getSettings(); + const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings(); - OldClientContext ctx(txn, oplogCollectionName); + OldClientContext ctx(opCtx, oplogCollectionName); Collection* collection = ctx.db()->getCollection(oplogCollectionName); if (collection) { if (replSettings.getOplogSizeBytes() != 0) { const CollectionOptions oplogOpts = - collection->getCatalogEntry()->getCollectionOptions(txn); + collection->getCatalogEntry()->getCollectionOptions(opCtx); int o = (int)(oplogOpts.cappedSize / (1024 * 1024)); int n = (int)(replSettings.getOplogSizeBytes() / (1024 * 1024)); @@ -508,12 +510,12 @@ void createOplog(OperationContext* txn, const std::string& oplogCollectionName, } if (!isReplSet) - initTimestampFromOplog(txn, oplogCollectionName); + initTimestampFromOplog(opCtx, oplogCollectionName); return; } /* create an oplog collection, if it doesn't yet exist. */ - const auto sz = getNewOplogSizeBytes(txn, replSettings); + const auto sz = getNewOplogSizeBytes(opCtx, replSettings); log() << "******" << endl; log() << "creating replication oplog of size: " << (int)(sz / (1024 * 1024)) << "MB..." << endl; @@ -524,24 +526,24 @@ void createOplog(OperationContext* txn, const std::string& oplogCollectionName, options.autoIndexId = CollectionOptions::NO; MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork uow(txn); - invariant(ctx.db()->createCollection(txn, oplogCollectionName, options)); + WriteUnitOfWork uow(opCtx); + invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options)); if (!isReplSet) - getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj()); + getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj()); uow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", oplogCollectionName); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "createCollection", oplogCollectionName); /* sync here so we don't get any surprising lag later when we try to sync */ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - storageEngine->flushAllFiles(txn, true); + storageEngine->flushAllFiles(opCtx, true); log() << "******" << endl; } -void createOplog(OperationContext* txn) { - const auto isReplSet = ReplicationCoordinator::get(txn)->getReplicationMode() == +void createOplog(OperationContext* opCtx) { + const auto isReplSet = ReplicationCoordinator::get(opCtx)->getReplicationMode() == ReplicationCoordinator::modeReplSet; - createOplog(txn, _oplogCollectionName, isReplSet); + createOplog(opCtx, _oplogCollectionName, isReplSet); } // ------------------------------------- @@ -575,13 +577,13 @@ struct ApplyOpMetadata { std::map<std::string, ApplyOpMetadata> opsMap = { {"create", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { const NamespaceString nss(parseNs(ns, cmd)); if (auto idIndexElem = cmd["idIndex"]) { // Remove "idIndex" field from command. auto cmdWithoutIdIndex = cmd.removeField("idIndex"); return createCollection( - txn, nss.db().toString(), cmdWithoutIdIndex, idIndexElem.Obj()); + opCtx, nss.db().toString(), cmdWithoutIdIndex, idIndexElem.Obj()); } // No _id index spec was provided, so we should build a v:1 _id index. @@ -591,55 +593,55 @@ std::map<std::string, ApplyOpMetadata> opsMap = { idIndexSpecBuilder.append(IndexDescriptor::kIndexNameFieldName, "_id_"); idIndexSpecBuilder.append(IndexDescriptor::kNamespaceFieldName, nss.ns()); idIndexSpecBuilder.append(IndexDescriptor::kKeyPatternFieldName, BSON("_id" << 1)); - return createCollection(txn, nss.db().toString(), cmd, idIndexSpecBuilder.done()); + return createCollection(opCtx, nss.db().toString(), cmd, idIndexSpecBuilder.done()); }, {ErrorCodes::NamespaceExists}}}, {"collMod", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; - return collMod(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return collMod(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}}, {"dropDatabase", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return dropDatabase(txn, NamespaceString(ns).db().toString()); + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { + return dropDatabase(opCtx, NamespaceString(ns).db().toString()); }, {ErrorCodes::NamespaceNotFound}}}, {"drop", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropCollection(txn, parseNs(ns, cmd), resultWeDontCareAbout); + return dropCollection(opCtx, parseNs(ns, cmd), resultWeDontCareAbout); }, // IllegalOperation is necessary because in 3.0 we replicate drops of system.profile // TODO(dannenberg) remove IllegalOperation once we no longer need 3.0 compatibility {ErrorCodes::NamespaceNotFound, ErrorCodes::IllegalOperation}}}, // deleteIndex(es) is deprecated but still works as of April 10, 2015 {"deleteIndex", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"deleteIndexes", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"dropIndex", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"dropIndexes", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + return dropIndexes(opCtx, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"renameCollection", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { const auto sourceNsElt = cmd.firstElement(); const auto targetNsElt = cmd["to"]; uassert(ErrorCodes::TypeMismatch, @@ -648,7 +650,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = { uassert(ErrorCodes::TypeMismatch, "'to' must be of type String", targetNsElt.type() == BSONType::String); - return renameCollection(txn, + return renameCollection(opCtx, NamespaceString(sourceNsElt.valueStringData()), NamespaceString(targetNsElt.valueStringData()), cmd["dropTarget"].trueValue(), @@ -656,16 +658,16 @@ std::map<std::string, ApplyOpMetadata> opsMap = { }, {ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}}, {"applyOps", - {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { BSONObjBuilder resultWeDontCareAbout; - return applyOps(txn, nsToDatabase(ns), cmd, &resultWeDontCareAbout); + return applyOps(opCtx, nsToDatabase(ns), cmd, &resultWeDontCareAbout); }, {ErrorCodes::UnknownError}}}, - {"convertToCapped", {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return convertToCapped(txn, parseNs(ns, cmd), cmd["size"].number()); + {"convertToCapped", {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { + return convertToCapped(opCtx, parseNs(ns, cmd), cmd["size"].number()); }}}, - {"emptycapped", {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return emptyCapped(txn, parseNs(ns, cmd)); + {"emptycapped", {[](OperationContext* opCtx, const char* ns, BSONObj& cmd) -> Status { + return emptyCapped(opCtx, parseNs(ns, cmd)); }}}, }; @@ -673,14 +675,14 @@ std::map<std::string, ApplyOpMetadata> opsMap = { // @return failure status if an update should have happened and the document DNE. // See replset initial sync code. -Status applyOperation_inlock(OperationContext* txn, +Status applyOperation_inlock(OperationContext* opCtx, Database* db, const BSONObj& op, bool inSteadyStateReplication, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { LOG(3) << "applying op: " << redact(op); - OpCounters* opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters; + OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters; const char* names[] = {"o", "ns", "op", "b", "o2"}; BSONElement fields[5]; @@ -710,19 +712,19 @@ Status applyOperation_inlock(OperationContext* txn, if (supportsDocLocking()) { // WiredTiger, and others requires MODE_IX since the applier threads driving // this allow writes to the same collection on any thread. - dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IX)); } else { // mmapV1 ensures that all operations to the same collection are executed from // the same worker thread, so it takes an exclusive lock (MODE_X) - dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_X)); } } Collection* collection = db->getCollection(ns); IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); - const bool haveWrappingWriteUnitOfWork = txn->lockState()->inAWriteUnitOfWork(); + const bool haveWrappingWriteUnitOfWork = opCtx->lockState()->inAWriteUnitOfWork(); uassert(ErrorCodes::CommandNotSupportedOnView, str::stream() << "applyOps not supported on view: " << ns, - collection || !db->getViewCatalog()->lookup(txn, ns)); + collection || !db->getViewCatalog()->lookup(opCtx, ns)); // operation type -- see logOp() comments for types const char* opType = fieldOp.valuestrsafe(); @@ -768,15 +770,15 @@ Status applyOperation_inlock(OperationContext* txn, } bool relaxIndexConstraints = - ReplicationCoordinator::get(txn)->shouldRelaxIndexConstraints(txn, indexNss); + ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss); if (indexSpec["background"].trueValue()) { - Lock::TempRelease release(txn->lockState()); - if (txn->lockState()->isLocked()) { + Lock::TempRelease release(opCtx->lockState()); + if (opCtx->lockState()->isLocked()) { // If TempRelease fails, background index build will deadlock. LOG(3) << "apply op: building background index " << indexSpec << " in the foreground because temp release failed"; IndexBuilder builder(indexSpec, relaxIndexConstraints); - Status status = builder.buildInForeground(txn, db); + Status status = builder.buildInForeground(opCtx, db); uassertStatusOK(status); } else { IndexBuilder* builder = new IndexBuilder(indexSpec, relaxIndexConstraints); @@ -785,10 +787,10 @@ Status applyOperation_inlock(OperationContext* txn, // Wait for thread to start and register itself IndexBuilder::waitForBgIndexStarting(); } - txn->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->abandonSnapshot(); } else { IndexBuilder builder(indexSpec, relaxIndexConstraints); - Status status = builder.buildInForeground(txn, db); + Status status = builder.buildInForeground(opCtx, db); uassertStatusOK(status); } // Since this is an index operation we can return without falling through. @@ -812,10 +814,10 @@ Status applyOperation_inlock(OperationContext* txn, str::stream() << "Failed to apply insert due to empty array element: " << op.toString(), !insertObjs.empty()); - WriteUnitOfWork wuow(txn); + WriteUnitOfWork wuow(opCtx); OpDebug* const nullOpDebug = nullptr; Status status = collection->insertDocuments( - txn, insertObjs.begin(), insertObjs.end(), nullOpDebug, true); + opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true); if (!status.isOK()) { return status; } @@ -849,10 +851,10 @@ Status applyOperation_inlock(OperationContext* txn, bool needToDoUpsert = haveWrappingWriteUnitOfWork; if (!needToDoUpsert) { - WriteUnitOfWork wuow(txn); + WriteUnitOfWork wuow(opCtx); try { OpDebug* const nullOpDebug = nullptr; - status = collection->insertDocument(txn, o, nullOpDebug, true); + status = collection->insertDocument(opCtx, o, nullOpDebug, true); } catch (DBException dbe) { status = dbe.toStatus(); } @@ -881,7 +883,7 @@ Status applyOperation_inlock(OperationContext* txn, UpdateLifecycleImpl updateLifecycle(requestNs); request.setLifecycle(&updateLifecycle); - UpdateResult res = update(txn, db, request); + UpdateResult res = update(opCtx, db, request); if (res.numMatched == 0 && res.upserted.isEmpty()) { error() << "No document was updated even though we got a DuplicateKey " "error when inserting"; @@ -912,7 +914,7 @@ Status applyOperation_inlock(OperationContext* txn, UpdateLifecycleImpl updateLifecycle(requestNs); request.setLifecycle(&updateLifecycle); - UpdateResult ur = update(txn, db, request); + UpdateResult ur = update(opCtx, db, request); if (ur.numMatched == 0 && ur.upserted.isEmpty()) { if (ur.modifiers) { @@ -929,11 +931,11 @@ Status applyOperation_inlock(OperationContext* txn, // { _id:..., { x : {$size:...} } // thus this is not ideal. if (collection == NULL || - (indexCatalog->haveIdIndex(txn) && - Helpers::findById(txn, collection, updateCriteria).isNull()) || + (indexCatalog->haveIdIndex(opCtx) && + Helpers::findById(opCtx, collection, updateCriteria).isNull()) || // capped collections won't have an _id index - (!indexCatalog->haveIdIndex(txn) && - Helpers::findOne(txn, collection, updateCriteria, false).isNull())) { + (!indexCatalog->haveIdIndex(opCtx) && + Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) { string msg = str::stream() << "couldn't find doc: " << redact(op); error() << msg; return Status(ErrorCodes::OperationFailed, msg); @@ -963,7 +965,7 @@ Status applyOperation_inlock(OperationContext* txn, o.hasField("_id")); if (opType[1] == 0) { - deleteObjects(txn, collection, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB); + deleteObjects(opCtx, collection, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB); } else verify(opType[1] == 'b'); // "db" advertisement if (incrementOpsAppliedStats) { @@ -984,15 +986,15 @@ Status applyOperation_inlock(OperationContext* txn, // have a wrapping WUOW, the extra nexting is harmless. The logOp really should have been // done in the WUOW that did the write, but this won't happen because applyOps turns off // observers. - WriteUnitOfWork wuow(txn); + WriteUnitOfWork wuow(opCtx); getGlobalAuthorizationManager()->logOp( - txn, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL); + opCtx, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL); wuow.commit(); return Status::OK(); } -Status applyCommand_inlock(OperationContext* txn, +Status applyCommand_inlock(OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) { const char* names[] = {"o", "ns", "op"}; @@ -1023,8 +1025,8 @@ Status applyCommand_inlock(OperationContext* txn, return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(nss.ns())}; } { - Database* db = dbHolder().get(txn, nss.ns()); - if (db && !db->getCollection(nss.ns()) && db->getViewCatalog()->lookup(txn, nss.ns())) { + Database* db = dbHolder().get(opCtx, nss.ns()); + if (db && !db->getCollection(nss.ns()) && db->getViewCatalog()->lookup(opCtx, nss.ns())) { return {ErrorCodes::CommandNotSupportedOnView, str::stream() << "applyOps not supported on view:" << nss.ns()}; } @@ -1040,7 +1042,7 @@ Status applyCommand_inlock(OperationContext* txn, // Applying commands in repl is done under Global W-lock, so it is safe to not // perform the current DB checks after reacquiring the lock. - invariant(txn->lockState()->isW()); + invariant(opCtx->lockState()->isW()); bool done = false; @@ -1054,7 +1056,7 @@ Status applyCommand_inlock(OperationContext* txn, ApplyOpMetadata curOpToApply = op->second; Status status = Status::OK(); try { - status = curOpToApply.applyFunc(txn, nss.ns().c_str(), o); + status = curOpToApply.applyFunc(opCtx, nss.ns().c_str(), o); } catch (...) { status = exceptionToStatus(); } @@ -1065,21 +1067,21 @@ Status applyCommand_inlock(OperationContext* txn, throw WriteConflictException(); } case ErrorCodes::BackgroundOperationInProgressForDatabase: { - Lock::TempRelease release(txn->lockState()); + Lock::TempRelease release(opCtx->lockState()); BackgroundOperation::awaitNoBgOpInProgForDb(nss.db()); - txn->recoveryUnit()->abandonSnapshot(); - txn->checkForInterrupt(); + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->checkForInterrupt(); break; } case ErrorCodes::BackgroundOperationInProgressForNamespace: { - Lock::TempRelease release(txn->lockState()); + Lock::TempRelease release(opCtx->lockState()); Command* cmd = Command::findCommand(o.firstElement().fieldName()); invariant(cmd); BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nss.db().toString(), o)); - txn->recoveryUnit()->abandonSnapshot(); - txn->checkForInterrupt(); + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->checkForInterrupt(); break; } default: @@ -1101,8 +1103,8 @@ Status applyCommand_inlock(OperationContext* txn, // AuthorizationManager's logOp method registers a RecoveryUnit::Change // and to do so we need to have begun a UnitOfWork - WriteUnitOfWork wuow(txn); - getGlobalAuthorizationManager()->logOp(txn, opType, nss.ns().c_str(), o, nullptr); + WriteUnitOfWork wuow(opCtx); + getGlobalAuthorizationManager()->logOp(opCtx, opType, nss.ns().c_str(), o, nullptr); wuow.commit(); return Status::OK(); @@ -1114,19 +1116,19 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { newTimestampNotifier.notify_all(); } -void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) { - DBDirectClient c(txn); +void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) { + DBDirectClient c(opCtx); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); if (!lastOp.isEmpty()) { LOG(1) << "replSet setting last Timestamp"; const OpTime opTime = fassertStatusOK(28696, OpTime::parseFromOplogEntry(lastOp)); - setNewTimestamp(txn->getServiceContext(), opTime.getTimestamp()); + setNewTimestamp(opCtx->getServiceContext(), opTime.getTimestamp()); } } -void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { - invariant(txn->lockState()->isW()); +void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) { + invariant(opCtx->lockState()->isW()); _localOplogCollection = nullptr; } @@ -1213,8 +1215,8 @@ void SnapshotThread::run() { } try { - auto txn = client.makeOperationContext(); - Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX); + auto opCtx = client.makeOperationContext(); + Lock::GlobalLock globalLock(opCtx->lockState(), MODE_IS, UINT_MAX); if (!replCoord->getMemberState().readable()) { // If our MemberState isn't readable, we may not be in a consistent state so don't @@ -1231,9 +1233,9 @@ void SnapshotThread::run() { // Make sure there are no in-flight capped inserts while we create our snapshot. // This lock cannot be aquired until all writes holding the resource commit/abort. Lock::ResourceLock cappedInsertLockForOtherDb( - txn->lockState(), resourceCappedInFlightForOtherDb, MODE_X); + opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X); Lock::ResourceLock cappedInsertLockForLocalDb( - txn->lockState(), resourceCappedInFlightForLocalDb, MODE_X); + opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X); // Reserve the name immediately before we take our snapshot. This ensures that all // names that compare lower must be from points in time visible to this named @@ -1241,15 +1243,15 @@ void SnapshotThread::run() { name = replCoord->reserveSnapshotName(nullptr); // This establishes the view that we will name. - _manager->prepareForCreateSnapshot(txn.get()); + _manager->prepareForCreateSnapshot(opCtx.get()); } auto opTimeOfSnapshot = OpTime(); { - AutoGetCollectionForRead oplog(txn.get(), NamespaceString(rsOplogName)); + AutoGetCollectionForRead oplog(opCtx.get(), NamespaceString(rsOplogName)); invariant(oplog.getCollection()); // Read the latest op from the oplog. - auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false); + auto cursor = oplog.getCollection()->getCursor(opCtx.get(), /*forward*/ false); auto record = cursor->next(); if (!record) continue; // oplog is completely empty. @@ -1259,7 +1261,7 @@ void SnapshotThread::run() { invariant(!opTimeOfSnapshot.isNull()); } - replCoord->createSnapshot(txn.get(), opTimeOfSnapshot, name); + replCoord->createSnapshot(opCtx.get(), opTimeOfSnapshot, name); } catch (const WriteConflictException& wce) { log() << "skipping storage snapshot pass due to write conflict"; continue; |