diff options
22 files changed, 171 insertions, 150 deletions
diff --git a/src/mongo/db/catalog/capped_collection_maintenance.cpp b/src/mongo/db/catalog/capped_collection_maintenance.cpp index fbace659eb6..688531a3d93 100644 --- a/src/mongo/db/catalog/capped_collection_maintenance.cpp +++ b/src/mongo/db/catalog/capped_collection_maintenance.cpp @@ -47,7 +47,6 @@ public: } ~CappedDeleteSideTxn() { - _opCtx->releaseRecoveryUnit(); _opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>(_originalRecoveryUnit), _originalRecoveryUnitState); } diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp index dca8d9dfc2b..26c630537a8 100644 --- a/src/mongo/db/catalog/collection_catalog.cpp +++ b/src/mongo/db/catalog/collection_catalog.cpp @@ -127,7 +127,7 @@ public: PublishCatalogUpdates(OperationContext* opCtx, UncommittedCatalogUpdates& uncommittedCatalogUpdates) - : _opCtx(opCtx), _uncommittedCatalogUpdates(uncommittedCatalogUpdates) {} + : _uncommittedCatalogUpdates(uncommittedCatalogUpdates) {} static void ensureRegisteredWithRecoveryUnit( OperationContext* opCtx, UncommittedCatalogUpdates& UncommittedCatalogUpdates) { @@ -138,7 +138,7 @@ public: std::make_unique<PublishCatalogUpdates>(opCtx, UncommittedCatalogUpdates)); } - void commit(boost::optional<Timestamp> commitTime) override { + void commit(OperationContext* opCtx, boost::optional<Timestamp> commitTime) override { boost::container::small_vector<CollectionCatalog::CatalogWriteFn, kNumStaticActions> writeJobs; @@ -154,7 +154,7 @@ public: break; } case UncommittedCatalogUpdates::Entry::Action::kRenamedCollection: { - writeJobs.push_back([opCtx = _opCtx, &from = entry.nss, &to = entry.renameTo]( + writeJobs.push_back([opCtx, &from = entry.nss, &to = entry.renameTo]( CollectionCatalog& catalog) { catalog._collections.erase(from); @@ -166,15 +166,14 @@ public: } case UncommittedCatalogUpdates::Entry::Action::kDroppedCollection: { writeJobs.push_back( - [opCtx = _opCtx, - uuid = *entry.uuid(), - isDropPending = *entry.isDropPending](CollectionCatalog& catalog) { + [opCtx, uuid = *entry.uuid(), isDropPending = *entry.isDropPending]( + CollectionCatalog& catalog) { catalog.deregisterCollection(opCtx, uuid, isDropPending); }); break; } case UncommittedCatalogUpdates::Entry::Action::kRecreatedCollection: { - writeJobs.push_back([opCtx = _opCtx, + writeJobs.push_back([opCtx, collection = entry.collection, uuid = *entry.externalUUID, commitTime](CollectionCatalog& catalog) { @@ -214,25 +213,23 @@ public: break; } case UncommittedCatalogUpdates::Entry::Action::kAddViewResource: { - writeJobs.push_back( - [opCtx = _opCtx, &viewName = entry.nss](CollectionCatalog& catalog) { - ResourceCatalog::get(opCtx->getServiceContext()) - .add({RESOURCE_COLLECTION, viewName}, viewName); - catalog.deregisterUncommittedView(viewName); - }); + writeJobs.push_back([opCtx, &viewName = entry.nss](CollectionCatalog& catalog) { + ResourceCatalog::get(opCtx->getServiceContext()) + .add({RESOURCE_COLLECTION, viewName}, viewName); + catalog.deregisterUncommittedView(viewName); + }); break; } case UncommittedCatalogUpdates::Entry::Action::kRemoveViewResource: { - writeJobs.push_back( - [opCtx = _opCtx, &viewName = entry.nss](CollectionCatalog& catalog) { - ResourceCatalog::get(opCtx->getServiceContext()) - .remove({RESOURCE_COLLECTION, viewName}, viewName); - }); + writeJobs.push_back([opCtx, &viewName = entry.nss](CollectionCatalog& catalog) { + ResourceCatalog::get(opCtx->getServiceContext()) + .remove({RESOURCE_COLLECTION, viewName}, viewName); + }); break; } case UncommittedCatalogUpdates::Entry::Action::kDroppedIndex: { writeJobs.push_back( - [opCtx = _opCtx, + [opCtx, indexEntry = entry.indexEntry, isDropPending = *entry.isDropPending](CollectionCatalog& catalog) { catalog.deregisterIndex(opCtx, std::move(indexEntry), isDropPending); @@ -244,7 +241,7 @@ public: // Write all catalog updates to the catalog in the same write to ensure atomicity. if (!writeJobs.empty()) { - CollectionCatalog::write(_opCtx, [&writeJobs](CollectionCatalog& catalog) { + CollectionCatalog::write(opCtx, [&writeJobs](CollectionCatalog& catalog) { for (auto&& job : writeJobs) { job(catalog); } @@ -252,12 +249,11 @@ public: } } - void rollback() override { + void rollback(OperationContext* opCtx) override { _uncommittedCatalogUpdates.releaseEntries(); } private: - OperationContext* _opCtx; UncommittedCatalogUpdates& _uncommittedCatalogUpdates; }; diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp index 59f7924b27e..990f0dad60b 100644 --- a/src/mongo/db/catalog/index_catalog_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_impl.cpp @@ -1345,27 +1345,25 @@ Status IndexCatalogImpl::dropUnfinishedIndex(OperationContext* opCtx, namespace { class IndexRemoveChange final : public RecoveryUnit::Change { public: - IndexRemoveChange(OperationContext* opCtx, - const NamespaceString& nss, + IndexRemoveChange(const NamespaceString& nss, const UUID& uuid, std::shared_ptr<IndexCatalogEntry> entry, SharedCollectionDecorations* collectionDecorations) - : _opCtx(opCtx), - _nss(nss), + : _nss(nss), _uuid(uuid), _entry(std::move(entry)), _collectionDecorations(collectionDecorations) {} - void commit(boost::optional<Timestamp> commitTime) final { + void commit(OperationContext* opCtx, boost::optional<Timestamp> commitTime) final { if (commitTime) { - HistoricalIdentTracker::get(_opCtx).recordDrop( + HistoricalIdentTracker::get(opCtx).recordDrop( _entry->getIdent(), _nss, _uuid, commitTime.value()); } _entry->setDropped(); } - void rollback() final { + void rollback(OperationContext* opCtx) final { auto indexDescriptor = _entry->descriptor(); // Refresh the CollectionIndexUsageTrackerDecoration's knowledge of what indices are @@ -1377,7 +1375,6 @@ public: } private: - OperationContext* _opCtx; const NamespaceString _nss; const UUID _uuid; std::shared_ptr<IndexCatalogEntry> _entry; @@ -1410,7 +1407,7 @@ Status IndexCatalogImpl::dropIndexEntry(OperationContext* opCtx, invariant(released.get() == entry); opCtx->recoveryUnit()->registerChange(std::make_unique<IndexRemoveChange>( - opCtx, collection->ns(), collection->uuid(), released, collection->getSharedDecorations())); + collection->ns(), collection->uuid(), released, collection->getSharedDecorations())); CollectionQueryInfo::get(collection).rebuildIndexData(opCtx, collection); CollectionIndexUsageTrackerDecoration::get(collection->getSharedDecorations()) @@ -1584,8 +1581,7 @@ const IndexDescriptor* IndexCatalogImpl::refreshEntry(OperationContext* opCtx, auto oldEntry = _readyIndexes.release(oldDesc); invariant(oldEntry); opCtx->recoveryUnit()->registerChange( - std::make_unique<IndexRemoveChange>(opCtx, - collection->ns(), + std::make_unique<IndexRemoveChange>(collection->ns(), collection->uuid(), std::move(oldEntry), collection->getSharedDecorations())); diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 9c463b9688f..d1de60217d6 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -428,6 +428,10 @@ void OperationContext::setTxnRetryCounter(TxnRetryCounter txnRetryCounter) { } std::unique_ptr<RecoveryUnit> OperationContext::releaseRecoveryUnit() { + if (_recoveryUnit) { + _recoveryUnit->setOperationContext(nullptr); + } + return std::move(_recoveryUnit); } @@ -439,9 +443,19 @@ std::unique_ptr<RecoveryUnit> OperationContext::releaseAndReplaceRecoveryUnit() return ru; } +void OperationContext::replaceRecoveryUnit() { + setRecoveryUnit( + std::unique_ptr<RecoveryUnit>(getServiceContext()->getStorageEngine()->newRecoveryUnit()), + WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); +} + WriteUnitOfWork::RecoveryUnitState OperationContext::setRecoveryUnit( std::unique_ptr<RecoveryUnit> unit, WriteUnitOfWork::RecoveryUnitState state) { _recoveryUnit = std::move(unit); + if (_recoveryUnit) { + _recoveryUnit->setOperationContext(this); + } + WriteUnitOfWork::RecoveryUnitState oldState = _ruState; _ruState = state; return oldState; diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index da78cac229f..2c04c0aaeae 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -129,11 +129,18 @@ public: std::unique_ptr<RecoveryUnit> releaseRecoveryUnit(); /* - * Similar to releaseRecoveryUnit(), but sets up a new, inactive RecoveryUnit after releasing - * the existing one. + * Sets up a new, inactive RecoveryUnit in the OperationContext. Destroys any previous recovery + * unit and executes its rollback handlers. + */ + void replaceRecoveryUnit(); + + /* + * Similar to replaceRecoveryUnit(), but returns the previous recovery unit like + * releaseRecoveryUnit(). */ std::unique_ptr<RecoveryUnit> releaseAndReplaceRecoveryUnit(); + /** * Associates the OperatingContext with a different RecoveryUnit for getMore or * subtransactions, see RecoveryUnitSwap. The new state is passed and the old state is diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp index 264099e24cf..fa083def99f 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp @@ -150,14 +150,13 @@ void onTransitionToAborted(OperationContext* opCtx, */ class TenantMigrationDonorCommitOrAbortHandler final : public RecoveryUnit::Change { public: - TenantMigrationDonorCommitOrAbortHandler(OperationContext* opCtx, - const TenantMigrationDonorDocument donorStateDoc) - : _opCtx(opCtx), _donorStateDoc(std::move(donorStateDoc)) {} + TenantMigrationDonorCommitOrAbortHandler(const TenantMigrationDonorDocument donorStateDoc) + : _donorStateDoc(std::move(donorStateDoc)) {} - void commit(boost::optional<Timestamp>) override { + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (_donorStateDoc.getExpireAt()) { auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( - _opCtx->getServiceContext(), _donorStateDoc.getTenantId()); + opCtx->getServiceContext(), _donorStateDoc.getTenantId()); if (!mtab) { // The state doc and TenantMigrationDonorAccessBlocker for this migration were @@ -166,7 +165,7 @@ public: return; } - if (!_opCtx->writesAreReplicated()) { + if (!opCtx->writesAreReplicated()) { // Setting expireAt implies that the TenantMigrationDonorAccessBlocker for this // migration will be removed shortly after this. However, a lagged secondary // might not manage to advance its majority commit point past the migration @@ -188,14 +187,14 @@ public: if (_donorStateDoc.getProtocol().value_or( MigrationProtocolEnum::kMultitenantMigrations) == MigrationProtocolEnum::kMultitenantMigrations) { - TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext()) + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(_donorStateDoc.getTenantId(), TenantMigrationAccessBlocker::BlockerType::kDonor); } else { tassert(6448701, "Bad protocol", _donorStateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge); - TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext()) + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeShardMergeDonorAccessBlocker(_donorStateDoc.getId()); } } @@ -204,20 +203,19 @@ public: switch (_donorStateDoc.getState()) { case TenantMigrationDonorStateEnum::kCommitted: - onTransitionToCommitted(_opCtx, _donorStateDoc); + onTransitionToCommitted(opCtx, _donorStateDoc); break; case TenantMigrationDonorStateEnum::kAborted: - onTransitionToAborted(_opCtx, _donorStateDoc); + onTransitionToAborted(opCtx, _donorStateDoc); break; default: MONGO_UNREACHABLE; } } - void rollback() override {} + void rollback(OperationContext* opCtx) override {} private: - OperationContext* _opCtx; const TenantMigrationDonorDocument _donorStateDoc; }; @@ -270,8 +268,7 @@ void TenantMigrationDonorOpObserver::onUpdate(OperationContext* opCtx, case TenantMigrationDonorStateEnum::kCommitted: case TenantMigrationDonorStateEnum::kAborted: opCtx->recoveryUnit()->registerChange( - std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(opCtx, - donorStateDoc)); + std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(donorStateDoc)); break; default: MONGO_UNREACHABLE; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 8d6b5050f1e..09b16381cb1 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -143,12 +143,12 @@ public: const repl::OpTime& opTime) : _cloner(cloner), _idObj(idObj.getOwned()), _op(op), _opTime(opTime) {} - void commit(boost::optional<Timestamp>) override { + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { _cloner->_addToTransferModsQueue(_idObj, _op, _opTime); _cloner->_decrementOutstandingOperationTrackRequests(); } - void rollback() override { + void rollback(OperationContext* opCtx) override { _cloner->_decrementOutstandingOperationTrackRequests(); } @@ -159,7 +159,8 @@ private: const repl::OpTime _opTime; }; -void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) { +void LogTransactionOperationsForShardingHandler::commit(OperationContext* opCtx, + boost::optional<Timestamp>) { std::set<NamespaceString> namespacesTouchedByTransaction; // Inform the session migration subsystem that a transaction has committed for the given diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index ab76c1f0132..02a9a04c266 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -73,9 +73,9 @@ public: const repl::OpTime& prepareOrCommitOpTime) : _lsid(lsid), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {} - void commit(boost::optional<Timestamp>) override; + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override; - void rollback() override{}; + void rollback(OperationContext* opCtx) override{}; private: const LogicalSessionId _lsid; diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index d2e10bd630e..0278b1487ca 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -84,30 +84,27 @@ bool isStandaloneOrPrimary(OperationContext* opCtx) { */ class CollectionVersionLogOpHandler final : public RecoveryUnit::Change { public: - CollectionVersionLogOpHandler(OperationContext* opCtx, - const NamespaceString& nss, - bool droppingCollection) - : _opCtx(opCtx), _nss(nss), _droppingCollection(droppingCollection) {} + CollectionVersionLogOpHandler(const NamespaceString& nss, bool droppingCollection) + : _nss(nss), _droppingCollection(droppingCollection) {} - void commit(boost::optional<Timestamp>) override { - invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IX)); + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { + invariant(opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IX)); - CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss); + CatalogCacheLoader::get(opCtx).notifyOfCollectionVersionUpdate(_nss); // Force subsequent uses of the namespace to refresh the filtering metadata so they can // synchronize with any work happening on the primary (e.g., migration critical section). - UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); if (_droppingCollection) - CollectionShardingRuntime::get(_opCtx, _nss) - ->clearFilteringMetadataForDroppedCollection(_opCtx); + CollectionShardingRuntime::get(opCtx, _nss) + ->clearFilteringMetadataForDroppedCollection(opCtx); else - CollectionShardingRuntime::get(_opCtx, _nss)->clearFilteringMetadata(_opCtx); + CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); } - void rollback() override {} + void rollback(OperationContext* opCtx) override {} private: - OperationContext* _opCtx; const NamespaceString _nss; const bool _droppingCollection; }; @@ -121,13 +118,13 @@ public: SubmitRangeDeletionHandler(OperationContext* opCtx, RangeDeletionTask task) : _opCtx(opCtx), _task(std::move(task)) {} - void commit(boost::optional<Timestamp>) override { + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { migrationutil::submitRangeDeletionTask(_opCtx, _task).getAsync([](auto) {}); } } - void rollback() override {} + void rollback(OperationContext* opCtx) override {} private: OperationContext* _opCtx; @@ -163,8 +160,8 @@ void onConfigDeleteInvalidateCachedCollectionMetadataAndNotify(OperationContext* AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX); - opCtx->recoveryUnit()->registerChange(std::make_unique<CollectionVersionLogOpHandler>( - opCtx, deletedNss, /* droppingCollection */ true)); + opCtx->recoveryUnit()->registerChange( + std::make_unique<CollectionVersionLogOpHandler>(deletedNss, /* droppingCollection */ true)); } /** @@ -373,7 +370,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX); if (refreshingFieldNewVal.isBoolean() && !refreshingFieldNewVal.boolean()) { opCtx->recoveryUnit()->registerChange(std::make_unique<CollectionVersionLogOpHandler>( - opCtx, updatedNss, /* droppingCollection */ false)); + updatedNss, /* droppingCollection */ false)); } if (enterCriticalSectionFieldNewVal.ok()) { diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index 0e25639559f..2d67495c431 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -245,18 +245,17 @@ void onTransitionToAborted(OperationContext* opCtx, const ShardSplitDonorDocumen */ class TenantMigrationDonorCommitOrAbortHandler final : public RecoveryUnit::Change { public: - TenantMigrationDonorCommitOrAbortHandler(OperationContext* opCtx, - ShardSplitDonorDocument donorStateDoc) - : _opCtx(opCtx), _donorStateDoc(std::move(donorStateDoc)) {} + TenantMigrationDonorCommitOrAbortHandler(ShardSplitDonorDocument donorStateDoc) + : _donorStateDoc(std::move(donorStateDoc)) {} - void commit(boost::optional<Timestamp>) override { + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (_donorStateDoc.getExpireAt()) { if (_donorStateDoc.getTenantIds()) { auto tenantIds = _donorStateDoc.getTenantIds().value(); for (auto&& tenantId : tenantIds) { auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( - _opCtx->getServiceContext(), tenantId); + opCtx->getServiceContext(), tenantId); if (!mtab) { // The state doc and TenantMigrationDonorAccessBlocker for this @@ -266,7 +265,7 @@ public: continue; } - if (isSecondary(_opCtx)) { + if (isSecondary(opCtx)) { // Setting expireAt implies that the TenantMigrationDonorAccessBlocker // for this migration will be removed shortly after this. However, a // lagged secondary might not manage to advance its majority commit @@ -288,7 +287,7 @@ public: // The migration durably aborted and is now marked as garbage // collectable, remove its TenantMigrationDonorAccessBlocker right away // to allow back-to-back migration retries. - TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext()) + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } } @@ -298,20 +297,19 @@ public: switch (_donorStateDoc.getState()) { case ShardSplitDonorStateEnum::kCommitted: - onTransitionToCommitted(_opCtx, _donorStateDoc); + onTransitionToCommitted(opCtx, _donorStateDoc); break; case ShardSplitDonorStateEnum::kAborted: - onTransitionToAborted(_opCtx, _donorStateDoc); + onTransitionToAborted(opCtx, _donorStateDoc); break; default: MONGO_UNREACHABLE; } } - void rollback() override {} + void rollback(OperationContext* opCtx) override {} private: - OperationContext* _opCtx; const ShardSplitDonorDocument _donorStateDoc; }; @@ -359,7 +357,7 @@ void ShardSplitDonorOpObserver::onUpdate(OperationContext* opCtx, case ShardSplitDonorStateEnum::kCommitted: case ShardSplitDonorStateEnum::kAborted: opCtx->recoveryUnit()->registerChange( - std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(opCtx, donorStateDoc)); + std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(donorStateDoc)); break; default: uasserted(ErrorCodes::IllegalOperation, diff --git a/src/mongo/db/session/session_catalog_mongod.cpp b/src/mongo/db/session/session_catalog_mongod.cpp index 15a764b6f16..34a830d7c9c 100644 --- a/src/mongo/db/session/session_catalog_mongod.cpp +++ b/src/mongo/db/session/session_catalog_mongod.cpp @@ -564,23 +564,21 @@ void MongoDSessionCatalog::observeDirectWriteToConfigTransactions(OperationConte class KillSessionTokenOnCommit : public RecoveryUnit::Change { public: - KillSessionTokenOnCommit(OperationContext* opCtx, - MongoDSessionCatalogTransactionInterface* ti, + KillSessionTokenOnCommit(MongoDSessionCatalogTransactionInterface* ti, SessionCatalog::KillToken sessionKillToken) - : _opCtx(opCtx), _ti(ti), _sessionKillToken(std::move(sessionKillToken)) {} + : _ti(ti), _sessionKillToken(std::move(sessionKillToken)) {} - void commit(boost::optional<Timestamp>) override { - rollback(); + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { + rollback(opCtx); } - void rollback() override { + void rollback(OperationContext* opCtx) override { std::vector<SessionCatalog::KillToken> sessionKillTokenVec; sessionKillTokenVec.emplace_back(std::move(_sessionKillToken)); - killSessionTokens(_opCtx, _ti, std::move(sessionKillTokenVec)); + killSessionTokens(opCtx, _ti, std::move(sessionKillTokenVec)); } private: - OperationContext* _opCtx; MongoDSessionCatalogTransactionInterface* _ti; SessionCatalog::KillToken _sessionKillToken; }; @@ -597,7 +595,7 @@ void MongoDSessionCatalog::observeDirectWriteToConfigTransactions(OperationConte !ti->isTransactionPrepared(session)); opCtx->recoveryUnit()->registerChange( - std::make_unique<KillSessionTokenOnCommit>(opCtx, ti, session.kill())); + std::make_unique<KillSessionTokenOnCommit>(ti, session.kill())); }); } diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp index 49488be4550..de4befd7f02 100644 --- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp +++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp @@ -50,14 +50,11 @@ using std::shared_ptr; // Works for both removes and updates class EphemeralForTestRecordStore::RemoveChange : public RecoveryUnit::Change { public: - RemoveChange(OperationContext* opCtx, - Data* data, - RecordId loc, - const EphemeralForTestRecord& rec) - : _opCtx(opCtx), _data(data), _loc(loc), _rec(rec) {} - - virtual void commit(boost::optional<Timestamp>) {} - virtual void rollback() { + RemoveChange(Data* data, RecordId loc, const EphemeralForTestRecord& rec) + : _data(data), _loc(loc), _rec(rec) {} + + virtual void commit(OperationContext* opCtx, boost::optional<Timestamp>) {} + virtual void rollback(OperationContext* opCtx) { stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); Records::iterator it = _data->records.find(_loc); @@ -70,7 +67,6 @@ public: } private: - OperationContext* _opCtx; Data* const _data; const RecordId _loc; const EphemeralForTestRecord _rec; @@ -86,8 +82,8 @@ public: swap(_records, _data->records); } - virtual void commit(boost::optional<Timestamp>) {} - virtual void rollback() { + virtual void commit(OperationContext* opCtx, boost::optional<Timestamp>) {} + virtual void rollback(OperationContext* opCtx) { using std::swap; stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); @@ -330,7 +326,7 @@ void EphemeralForTestRecordStore::deleteRecord(WithLock lk, OperationContext* opCtx, const RecordId& loc) { EphemeralForTestRecord* rec = recordFor(lk, loc); - opCtx->recoveryUnit()->registerChange(std::make_unique<RemoveChange>(opCtx, _data, loc, *rec)); + opCtx->recoveryUnit()->registerChange(std::make_unique<RemoveChange>(_data, loc, *rec)); _data->dataSize -= rec->size; invariant(_data->records.erase(loc) == 1); } @@ -407,8 +403,7 @@ Status EphemeralForTestRecordStore::doUpdateRecord(OperationContext* opCtx, EphemeralForTestRecord newRecord(len); memcpy(newRecord.data.get(), data, len); - opCtx->recoveryUnit()->registerChange( - std::make_unique<RemoveChange>(opCtx, _data, loc, *oldRecord)); + opCtx->recoveryUnit()->registerChange(std::make_unique<RemoveChange>(_data, loc, *oldRecord)); _data->dataSize += len - oldLen; *oldRecord = newRecord; return Status::OK(); @@ -435,8 +430,7 @@ StatusWith<RecordData> EphemeralForTestRecordStore::doUpdateWithDamages( EphemeralForTestRecord newRecord(len); - opCtx->recoveryUnit()->registerChange( - std::make_unique<RemoveChange>(opCtx, _data, loc, *oldRecord)); + opCtx->recoveryUnit()->registerChange(std::make_unique<RemoveChange>(_data, loc, *oldRecord)); char* root = newRecord.data.get(); char* old = oldRecord->data.get(); @@ -502,8 +496,7 @@ void EphemeralForTestRecordStore::doCappedTruncateAfter( aboutToDelete(opCtx, id, record.toRecordData()); } - opCtx->recoveryUnit()->registerChange( - std::make_unique<RemoveChange>(opCtx, _data, id, record)); + opCtx->recoveryUnit()->registerChange(std::make_unique<RemoveChange>(_data, id, record)); _data->dataSize -= record.size; _data->records.erase(it++); } diff --git a/src/mongo/db/storage/durable_catalog_impl.cpp b/src/mongo/db/storage/durable_catalog_impl.cpp index 7d4f23d88b5..d2113359295 100644 --- a/src/mongo/db/storage/durable_catalog_impl.cpp +++ b/src/mongo/db/storage/durable_catalog_impl.cpp @@ -147,8 +147,8 @@ public: AddIdentChange(DurableCatalogImpl* catalog, RecordId catalogId) : _catalog(catalog), _catalogId(std::move(catalogId)) {} - virtual void commit(boost::optional<Timestamp>) {} - virtual void rollback() { + virtual void commit(OperationContext* opCtx, boost::optional<Timestamp>) {} + virtual void rollback(OperationContext* opCtx) { stdx::lock_guard<Latch> lk(_catalog->_catalogIdToEntryMapLock); _catalog->_catalogIdToEntryMap.erase(_catalogId); } diff --git a/src/mongo/db/storage/kv/kv_engine_timestamps_test.cpp b/src/mongo/db/storage/kv/kv_engine_timestamps_test.cpp index 83f9b251ddd..f2e5b0b3641 100644 --- a/src/mongo/db/storage/kv/kv_engine_timestamps_test.cpp +++ b/src/mongo/db/storage/kv/kv_engine_timestamps_test.cpp @@ -57,7 +57,6 @@ public: Operation() = default; Operation(ServiceContext::UniqueClient client, RecoveryUnit* ru) : _client(std::move(client)), _opCtx(_client->makeOperationContext()) { - _opCtx->releaseRecoveryUnit(); _opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>(ru), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); } diff --git a/src/mongo/db/storage/recovery_unit.cpp b/src/mongo/db/storage/recovery_unit.cpp index ad98239448e..cee7ad831af 100644 --- a/src/mongo/db/storage/recovery_unit.cpp +++ b/src/mongo/db/storage/recovery_unit.cpp @@ -117,7 +117,12 @@ void RecoveryUnit::endReadOnlyUnitOfWork() { _readOnly = false; } +void RecoveryUnit::setOperationContext(OperationContext* opCtx) { + _opCtx = opCtx; +} + void RecoveryUnit::_executeCommitHandlers(boost::optional<Timestamp> commitTimestamp) { + invariant(_opCtx); for (auto& change : _changes) { try { // Log at higher level because commits occur far more frequently than rollbacks. @@ -125,7 +130,7 @@ void RecoveryUnit::_executeCommitHandlers(boost::optional<Timestamp> commitTimes 3, "CUSTOM COMMIT {demangleName_typeid_change}", "demangleName_typeid_change"_attr = redact(demangleName(typeid(*change)))); - change->commit(commitTimestamp); + change->commit(_opCtx, commitTimestamp); } catch (...) { std::terminate(); } @@ -138,7 +143,7 @@ void RecoveryUnit::_executeCommitHandlers(boost::optional<Timestamp> commitTimes "CUSTOM COMMIT {demangleName_typeid_change}", "demangleName_typeid_change"_attr = redact(demangleName(typeid(*_changeForCatalogVisibility)))); - _changeForCatalogVisibility->commit(commitTimestamp); + _changeForCatalogVisibility->commit(_opCtx, commitTimestamp); } } catch (...) { std::terminate(); @@ -155,6 +160,9 @@ void RecoveryUnit::abortRegisteredChanges() { _executeRollbackHandlers(); } void RecoveryUnit::_executeRollbackHandlers() { + // Make sure we have an OperationContext when executing rollback handlers. Unless we have no + // handlers to run, which might be the case in unit tests. + invariant(_opCtx || (_changes.empty() && !_changeForCatalogVisibility)); try { if (_changeForCatalogVisibility) { @@ -163,7 +171,7 @@ void RecoveryUnit::_executeRollbackHandlers() { "CUSTOM ROLLBACK {demangleName_typeid_change}", "demangleName_typeid_change"_attr = redact(demangleName(typeid(*_changeForCatalogVisibility)))); - _changeForCatalogVisibility->rollback(); + _changeForCatalogVisibility->rollback(_opCtx); } for (Changes::const_reverse_iterator it = _changes.rbegin(), end = _changes.rend(); it != end; @@ -173,7 +181,7 @@ void RecoveryUnit::_executeRollbackHandlers() { 2, "CUSTOM ROLLBACK {demangleName_typeid_change}", "demangleName_typeid_change"_attr = redact(demangleName(typeid(*change)))); - change->rollback(); + change->rollback(_opCtx); } _changeForCatalogVisibility.reset(); _changes.clear(); diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 8135aea2d22..ef739b0129d 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -245,6 +245,12 @@ public: } /** + * Sets the OperationContext that currently owns this RecoveryUnit. Should only be called by the + * OperationContext. + */ + void setOperationContext(OperationContext* opCtx); + + /** * Informs the RecoveryUnit that a snapshot will be needed soon, if one was not already * established. This specifically allows the storage engine to preallocate any required * transaction resources while minimizing the critical section between generating a new @@ -559,23 +565,29 @@ public: * A Change is an action that is registerChange()'d while a WriteUnitOfWork exists. The * change is either rollback()'d or commit()'d when the WriteUnitOfWork goes out of scope. * - * Neither rollback() nor commit() may fail or throw exceptions. + * Neither rollback() nor commit() may fail or throw exceptions. Acquiring locks or blocking + * operations should not be performed in these handlers, as it may lead to deadlocks. + * LockManager locks are still held due to 2PL. * - * Change implementors are responsible for handling their own locking, and must be aware - * that rollback() and commit() may be called after resources with a shorter lifetime than - * the WriteUnitOfWork have been freed. Each registered change will be committed or rolled - * back once. + * Change implementors are responsible for handling their own synchronization, and must be aware + * that rollback() and commit() may be called out of line and after the WriteUnitOfWork have + * been freed. Pointers or references to stack variables should not be bound to the definitions + * of rollback() or commit(). Each registered change will be committed or rolled back once. * * commit() handlers are passed the timestamp at which the transaction is committed. If the * transaction is not committed at a particular timestamp, or if the storage engine does not * support timestamps, then boost::none will be supplied for this parameter. + * + * The OperationContext provided in commit() and rollback() handlers is the current + * OperationContext and may not be the same as when the Change was registered on the + * RecoveryUnit. See above for usage restrictions. */ class Change { public: virtual ~Change() {} - virtual void rollback() = 0; - virtual void commit(boost::optional<Timestamp> commitTime) = 0; + virtual void rollback(OperationContext* opCtx) = 0; + virtual void commit(OperationContext* opCtx, boost::optional<Timestamp> commitTime) = 0; }; /** @@ -600,10 +612,10 @@ public: public: CallbackChange(CommitCallback&& commit, RollbackCallback&& rollback) : _rollback(std::move(rollback)), _commit(std::move(commit)) {} - void rollback() final { + void rollback(OperationContext* opCtx) final { _rollback(); } - void commit(boost::optional<Timestamp> ts) final { + void commit(OperationContext* opCtx, boost::optional<Timestamp> ts) final { _commit(ts); } @@ -643,10 +655,10 @@ public: class OnRollbackChange final : public Change { public: OnRollbackChange(Callback&& callback) : _callback(std::move(callback)) {} - void rollback() final { + void rollback(OperationContext* opCtx) final { _callback(); } - void commit(boost::optional<Timestamp>) final {} + void commit(OperationContext* opCtx, boost::optional<Timestamp>) final {} private: Callback _callback; @@ -665,8 +677,8 @@ public: class OnCommitChange final : public Change { public: OnCommitChange(Callback&& callback) : _callback(std::move(callback)) {} - void rollback() final {} - void commit(boost::optional<Timestamp> commitTime) final { + void rollback(OperationContext* opCtx) final {} + void commit(OperationContext* opCtx, boost::optional<Timestamp> commitTime) final { _callback(commitTime); } @@ -834,6 +846,7 @@ private: Changes _changes; std::unique_ptr<Change> _changeForCatalogVisibility; State _state = State::kInactive; + OperationContext* _opCtx = nullptr; uint64_t _mySnapshotId; bool _readOnly = false; }; diff --git a/src/mongo/db/storage/recovery_unit_test_harness.cpp b/src/mongo/db/storage/recovery_unit_test_harness.cpp index 4b7b2b38b0e..f68550aa675 100644 --- a/src/mongo/db/storage/recovery_unit_test_harness.cpp +++ b/src/mongo/db/storage/recovery_unit_test_harness.cpp @@ -72,11 +72,11 @@ class TestChange final : public RecoveryUnit::Change { public: TestChange(int* count) : _count(count) {} - void commit(boost::optional<Timestamp>) override { + void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { *_count = *_count + 1; } - void rollback() override { + void rollback(OperationContext* opCtx) override { *_count = *_count - 1; } diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index db326f05e4f..66a4e36183b 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -1108,8 +1108,7 @@ StatusWith<Timestamp> StorageEngineImpl::recoverToStableTimestamp(OperationConte // SERVER-58311: Reset the recovery unit to unposition storage engine cursors. This allows WT to // assert it has sole access when performing rollback_to_stable(). - auto recovUnit = opCtx->releaseAndReplaceRecoveryUnit(); - recovUnit.reset(); + opCtx->replaceRecoveryUnit(); StatusWith<Timestamp> swTimestamp = _engine->recoverToStableTimestamp(opCtx); if (!swTimestamp.isOK()) { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_import.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_import.cpp index b82832ba78d..a6db7338e06 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_import.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_import.cpp @@ -111,11 +111,11 @@ class CountsChange : public RecoveryUnit::Change { public: CountsChange(WiredTigerRecordStore* rs, long long numRecords, long long dataSize) : _rs(rs), _numRecords(numRecords), _dataSize(dataSize) {} - void commit(boost::optional<Timestamp>) { + void commit(OperationContext* opCtx, boost::optional<Timestamp>) { _rs->setNumRecords(_numRecords); _rs->setDataSize(_dataSize); } - void rollback() {} + void rollback(OperationContext* opCtx) {} private: WiredTigerRecordStore* _rs; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 2465ad45922..7dbc44d7845 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -148,13 +148,11 @@ public: InsertChange(OplogStones* oplogStones, int64_t bytesInserted, const Record& highestInsertedRecord, - int64_t countInserted, - OperationContext* opCtx) + int64_t countInserted) : _oplogStones(oplogStones), _bytesInserted(bytesInserted), _recordId(highestInsertedRecord.id), - _countInserted(countInserted), - _opCtx(opCtx) { + _countInserted(countInserted) { // We only want to initialize _wall by parsing BSONObj when we expect to need it in // OplogStone::createNewStoneIfNeeded. int64_t currBytes = _oplogStones->_currentBytes.load() + _bytesInserted; @@ -172,7 +170,7 @@ public: } } - void commit(boost::optional<Timestamp>) final { + void commit(OperationContext* opCtx, boost::optional<Timestamp>) final { invariant(_bytesInserted >= 0); invariant(_recordId.isValid()); @@ -182,18 +180,17 @@ public: // When other InsertChanges commit concurrently, an uninitialized wallTime may delay the // creation of a new stone. This delay is limited to the number of concurrently running // transactions, so the size difference should be inconsequential. - _oplogStones->createNewStoneIfNeeded(_opCtx, _recordId, _wall); + _oplogStones->createNewStoneIfNeeded(opCtx, _recordId, _wall); } } - void rollback() final {} + void rollback(OperationContext* opCtx) final {} private: OplogStones* _oplogStones; int64_t _bytesInserted; RecordId _recordId; int64_t _countInserted; - OperationContext* _opCtx; Date_t _wall; }; @@ -369,8 +366,8 @@ void WiredTigerRecordStore::OplogStones::updateCurrentStoneAfterInsertOnCommit( int64_t bytesInserted, const Record& highestInsertedRecord, int64_t countInserted) { - opCtx->recoveryUnit()->registerChange(std::make_unique<InsertChange>( - this, bytesInserted, highestInsertedRecord, countInserted, opCtx)); + opCtx->recoveryUnit()->registerChange( + std::make_unique<InsertChange>(this, bytesInserted, highestInsertedRecord, countInserted)); } void WiredTigerRecordStore::OplogStones::clearStonesOnCommit(OperationContext* opCtx) { diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index 73dd77aac99..87ab79ae50e 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -1144,6 +1144,11 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o stdx::lock_guard<Client> lk(*opCtx->getClient()); // Save the RecoveryUnit from the new transaction and replace it with an empty one. _recoveryUnit = opCtx->releaseAndReplaceRecoveryUnit(); + // The recovery unit is detached from the OperationContext, but keep the OperationContext in the + // case we need to run rollback handlers. + if (_recoveryUnit) { + _recoveryUnit->setOperationContext(opCtx); + } // End two-phase locking on locker manually since the WUOW has been released. _opCtx->lockState()->endWriteUnitOfWork(); @@ -1202,6 +1207,11 @@ TransactionParticipant::TxnResources::TxnResources(WithLock wl, invariant(!(stashStyle == StashStyle::kSecondary && opCtx->lockState()->hasMaxLockTimeout())); _recoveryUnit = opCtx->releaseAndReplaceRecoveryUnit(); + // The recovery unit is detached from the OperationContext, but keep the OperationContext in the + // case we need to run rollback handlers. + if (_recoveryUnit) { + _recoveryUnit->setOperationContext(opCtx); + } _apiParameters = APIParameters::get(opCtx); _readConcernArgs = repl::ReadConcernArgs::get(opCtx); diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index 6fa18476b43..d552e4e4ad1 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -269,8 +269,7 @@ Status waitForWriteConcern(OperationContext* opCtx, // This fail point pauses with an open snapshot on the oplog. Some tests pause on this fail // point prior to running replication rollback. This prevents the operation from being // killed and the snapshot being released. Hence, we release the snapshot here. - auto recoveryUnit = opCtx->releaseAndReplaceRecoveryUnit(); - recoveryUnit.reset(); + opCtx->replaceRecoveryUnit(); hangBeforeWaitingForWriteConcern.pauseWhileSet(); } |