diff options
Diffstat (limited to 'src/mongo/db/repl/rs_rollback.cpp')
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 120 |
1 files changed, 60 insertions, 60 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index dc5894cafc8..dbfb3a51284 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -335,7 +335,7 @@ namespace { * information from the upstream node. If any information is fetched from the upstream node after we * have written locally, the function must be called again. */ -void checkRbidAndUpdateMinValid(OperationContext* txn, +void checkRbidAndUpdateMinValid(OperationContext* opCtx, const int rbid, const RollbackSource& rollbackSource) { // It is important that the steps are performed in order to avoid racing with upstream rollbacks @@ -357,8 +357,8 @@ void checkRbidAndUpdateMinValid(OperationContext* txn, // online until we get to that point in freshness. OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValidDoc)); log() << "Setting minvalid to " << minValid; - StorageInterface::get(txn)->setAppliedThrough(txn, {}); // Use top of oplog. - StorageInterface::get(txn)->setMinValid(txn, minValid); + StorageInterface::get(opCtx)->setAppliedThrough(opCtx, {}); // Use top of oplog. + StorageInterface::get(opCtx)->setMinValid(opCtx, minValid); if (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) { // This log output is used in js tests so please leave it. @@ -373,7 +373,7 @@ void checkRbidAndUpdateMinValid(OperationContext* txn, } } -void syncFixUp(OperationContext* txn, +void syncFixUp(OperationContext* opCtx, const FixUpInfo& fixUpInfo, const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord) { @@ -415,7 +415,7 @@ void syncFixUp(OperationContext* txn, } log() << "rollback 3.5"; - checkRbidAndUpdateMinValid(txn, fixUpInfo.rbid, rollbackSource); + checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource); // update them log() << "rollback 4 n:" << goodVersions.size(); @@ -435,25 +435,25 @@ void syncFixUp(OperationContext* txn, { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); - Database* db = dbHolder().openDb(txn, nss.db().toString()); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_X); + Database* db = dbHolder().openDb(opCtx, nss.db().toString()); invariant(db); - WriteUnitOfWork wunit(txn); - fassertStatusOK(40359, db->dropCollectionEvenIfSystem(txn, nss)); + WriteUnitOfWork wunit(opCtx); + fassertStatusOK(40359, db->dropCollectionEvenIfSystem(opCtx, nss)); wunit.commit(); } - rollbackSource.copyCollectionFromRemote(txn, nss); + rollbackSource.copyCollectionFromRemote(opCtx, nss); } for (const string& ns : fixUpInfo.collectionsToResyncMetadata) { log() << "rollback 4.1.2 coll metadata resync " << ns; const NamespaceString nss(ns); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); - auto db = dbHolder().openDb(txn, nss.db().toString()); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_X); + auto db = dbHolder().openDb(opCtx, nss.db().toString()); invariant(db); auto collection = db->getCollection(ns); invariant(collection); @@ -490,23 +490,23 @@ void syncFixUp(OperationContext* txn, // Use default options. } - WriteUnitOfWork wuow(txn); - if (options.flagsSet || cce->getCollectionOptions(txn).flagsSet) { - cce->updateFlags(txn, options.flags); + WriteUnitOfWork wuow(opCtx); + if (options.flagsSet || cce->getCollectionOptions(opCtx).flagsSet) { + cce->updateFlags(opCtx, options.flags); } - auto status = collection->setValidator(txn, options.validator); + auto status = collection->setValidator(opCtx, options.validator); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to set validator: " << status.toString()); } - status = collection->setValidationAction(txn, options.validationAction); + status = collection->setValidationAction(opCtx, options.validationAction); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to set validationAction: " << status.toString()); } - status = collection->setValidationLevel(txn, options.validationLevel); + status = collection->setValidationLevel(opCtx, options.validationLevel); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to set validationLevel: " << status.toString()); @@ -518,7 +518,7 @@ void syncFixUp(OperationContext* txn, // we did more reading from primary, so check it again for a rollback (which would mess // us up), and make minValid newer. log() << "rollback 4.2"; - checkRbidAndUpdateMinValid(txn, fixUpInfo.rbid, rollbackSource); + checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource); } log() << "rollback 4.6"; @@ -530,16 +530,16 @@ void syncFixUp(OperationContext* txn, invariant(!fixUpInfo.indexesToDrop.count(*it)); - ScopedTransaction transaction(txn, MODE_IX); + ScopedTransaction transaction(opCtx, MODE_IX); const NamespaceString nss(*it); - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); - Database* db = dbHolder().get(txn, nsToDatabaseSubstring(*it)); + Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_X); + Database* db = dbHolder().get(opCtx, nsToDatabaseSubstring(*it)); if (db) { Helpers::RemoveSaver removeSaver("rollback", "", *it); // perform a collection scan and write all documents in the collection to disk std::unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( - txn, *it, db->getCollection(*it), PlanExecutor::YIELD_AUTO)); + opCtx, *it, db->getCollection(*it), PlanExecutor::YIELD_AUTO)); BSONObj curObj; PlanExecutor::ExecState execState; while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) { @@ -564,8 +564,8 @@ void syncFixUp(OperationContext* txn, throw RSFatalException(); } - WriteUnitOfWork wunit(txn); - fassertStatusOK(40360, db->dropCollectionEvenIfSystem(txn, nss)); + WriteUnitOfWork wunit(opCtx); + fassertStatusOK(40360, db->dropCollectionEvenIfSystem(opCtx, nss)); wunit.commit(); } } @@ -576,9 +576,9 @@ void syncFixUp(OperationContext* txn, const string& indexName = it->second; log() << "rollback drop index: collection: " << nss.toString() << ". index: " << indexName; - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); - auto db = dbHolder().get(txn, nss.db()); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_X); + auto db = dbHolder().get(opCtx, nss.db()); if (!db) { continue; } @@ -592,14 +592,14 @@ void syncFixUp(OperationContext* txn, } bool includeUnfinishedIndexes = false; auto indexDescriptor = - indexCatalog->findIndexByName(txn, indexName, includeUnfinishedIndexes); + indexCatalog->findIndexByName(opCtx, indexName, includeUnfinishedIndexes); if (!indexDescriptor) { warning() << "rollback failed to drop index " << indexName << " in " << nss.toString() << ": index not found"; continue; } - WriteUnitOfWork wunit(txn); - auto status = indexCatalog->dropIndex(txn, indexDescriptor); + WriteUnitOfWork wunit(opCtx); + auto status = indexCatalog->dropIndex(opCtx, indexDescriptor); if (!status.isOK()) { severe() << "rollback failed to drop index " << indexName << " in " << nss.toString() << ": " << status; @@ -637,9 +637,9 @@ void syncFixUp(OperationContext* txn, // TODO: Lots of overhead in context. This can be faster. const NamespaceString docNss(doc.ns); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock docDbLock(txn->lockState(), docNss.db(), MODE_X); - OldClientContext ctx(txn, doc.ns); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock docDbLock(opCtx->lockState(), docNss.db(), MODE_X); + OldClientContext ctx(opCtx, doc.ns); Collection* collection = ctx.db()->getCollection(doc.ns); @@ -651,7 +651,7 @@ void syncFixUp(OperationContext* txn, // createCollection command and regardless, the document no longer exists. if (collection && removeSaver) { BSONObj obj; - bool found = Helpers::findOne(txn, collection, pattern, obj, false); + bool found = Helpers::findOne(opCtx, collection, pattern, obj, false); if (found) { auto status = removeSaver->goingToDelete(obj); if (!status.isOK()) { @@ -680,9 +680,9 @@ void syncFixUp(OperationContext* txn, // TODO: IIRC cappedTruncateAfter does not handle completely // empty. // this will crazy slow if no _id index. - const auto clock = txn->getServiceContext()->getFastClockSource(); + const auto clock = opCtx->getServiceContext()->getFastClockSource(); const auto findOneStart = clock->now(); - RecordId loc = Helpers::findOne(txn, collection, pattern, false); + RecordId loc = Helpers::findOne(opCtx, collection, pattern, false); if (clock->now() - findOneStart > Milliseconds(200)) warning() << "roll back slow no _id index for " << doc.ns << " perhaps?"; @@ -690,17 +690,17 @@ void syncFixUp(OperationContext* txn, // RecordId loc = Helpers::findById(nsd, pattern); if (!loc.isNull()) { try { - collection->cappedTruncateAfter(txn, loc, true); + collection->cappedTruncateAfter(opCtx, loc, true); } catch (const DBException& e) { if (e.getCode() == 13415) { // hack: need to just make cappedTruncate do this... MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(txn); - uassertStatusOK(collection->truncate(txn)); + WriteUnitOfWork wunit(opCtx); + uassertStatusOK(collection->truncate(opCtx)); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "truncate", collection->ns().ns()); + opCtx, "truncate", collection->ns().ns()); } else { throw e; } @@ -717,7 +717,7 @@ void syncFixUp(OperationContext* txn, << ": " << redact(e); } } else { - deleteObjects(txn, + deleteObjects(opCtx, collection, doc.ns, pattern, @@ -740,7 +740,7 @@ void syncFixUp(OperationContext* txn, UpdateLifecycleImpl updateLifecycle(requestNs); request.setLifecycle(&updateLifecycle); - update(txn, ctx.db(), request); + update(opCtx, ctx.db(), request); } } catch (const DBException& e) { log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' ' @@ -757,10 +757,10 @@ void syncFixUp(OperationContext* txn, LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toString(); { const NamespaceString oplogNss(rsOplogName); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock oplogDbLock(txn->lockState(), oplogNss.db(), MODE_IX); - Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X); - OldClientContext ctx(txn, rsOplogName); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock oplogDbLock(opCtx->lockState(), oplogNss.db(), MODE_IX); + Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X); + OldClientContext ctx(opCtx, rsOplogName); Collection* oplogCollection = ctx.db()->getCollection(rsOplogName); if (!oplogCollection) { fassertFailedWithStatusNoTrace(13423, @@ -768,10 +768,10 @@ void syncFixUp(OperationContext* txn, str::stream() << "Can't find " << rsOplogName)); } // TODO: fatal error if this throws? - oplogCollection->cappedTruncateAfter(txn, fixUpInfo.commonPointOurDiskloc, false); + oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false); } - Status status = getGlobalAuthorizationManager()->initialize(txn); + Status status = getGlobalAuthorizationManager()->initialize(opCtx); if (!status.isOK()) { severe() << "Failed to reinitialize auth data after rollback: " << status; fassertFailedNoTrace(40366); @@ -779,16 +779,16 @@ void syncFixUp(OperationContext* txn, // Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the // lastAppliedHash value in bgsync to reflect our new last op. - replCoord->resetLastOpTimesFromOplog(txn); + replCoord->resetLastOpTimesFromOplog(opCtx); log() << "rollback done"; } -Status _syncRollback(OperationContext* txn, +Status _syncRollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, boost::optional<int> requiredRBID, ReplicationCoordinator* replCoord) { - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); FixUpInfo how; log() << "rollback 1"; @@ -833,7 +833,7 @@ Status _syncRollback(OperationContext* txn, log() << "rollback 3 fixup"; try { ON_BLOCK_EXIT([&] { replCoord->incrementRollbackID(); }); - syncFixUp(txn, how, rollbackSource, replCoord); + syncFixUp(opCtx, how, rollbackSource, replCoord); } catch (const RSFatalException& e) { return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753); } @@ -853,19 +853,19 @@ Status _syncRollback(OperationContext* txn, } // namespace -Status syncRollback(OperationContext* txn, +Status syncRollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, boost::optional<int> requiredRBID, ReplicationCoordinator* replCoord) { - invariant(txn); + invariant(opCtx); invariant(replCoord); log() << "beginning rollback" << rsLog; - DisableDocumentValidation validationDisabler(txn); - UnreplicatedWritesBlock replicationDisabler(txn); - Status status = _syncRollback(txn, localOplog, rollbackSource, requiredRBID, replCoord); + DisableDocumentValidation validationDisabler(opCtx); + UnreplicatedWritesBlock replicationDisabler(opCtx); + Status status = _syncRollback(opCtx, localOplog, rollbackSource, requiredRBID, replCoord); log() << "rollback finished" << rsLog; return status; |