diff options
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.h | 22 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_source_manager.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_source_manager.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/recoverable_critical_section_service.cpp | 60 | ||||
-rw-r--r-- | src/mongo/db/s/shard_collection_legacy.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_migration_critical_section.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_migration_critical_section.h | 49 |
15 files changed, 229 insertions, 146 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index 8f5d04410ec..15e73931ec1 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -158,20 +158,27 @@ void CollectionShardingRuntime::checkShardVersionOrThrow(OperationContext* opCtx (void)_getMetadataWithVersionCheckAt(opCtx, boost::none); } -void CollectionShardingRuntime::enterCriticalSectionCatchUpPhase(const CSRLock&) { - _critSec.enterCriticalSectionCatchUpPhase(); +void CollectionShardingRuntime::enterCriticalSectionCatchUpPhase(const CSRLock&, + const BSONObj& reason) { + _critSec.enterCriticalSectionCatchUpPhase(reason); } -void CollectionShardingRuntime::enterCriticalSectionCommitPhase(const CSRLock&) { - _critSec.enterCriticalSectionCommitPhase(); +void CollectionShardingRuntime::enterCriticalSectionCommitPhase(const CSRLock&, + const BSONObj& reason) { + _critSec.enterCriticalSectionCommitPhase(reason); } -void CollectionShardingRuntime::rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&) { - _critSec.rollbackCriticalSectionCommitPhaseToCatchUpPhase(); +void CollectionShardingRuntime::rollbackCriticalSectionCommitPhaseToCatchUpPhase( + const CSRLock&, const BSONObj& reason) { + _critSec.rollbackCriticalSectionCommitPhaseToCatchUpPhase(reason); } -void CollectionShardingRuntime::exitCriticalSection(const CSRLock&) { - _critSec.exitCriticalSection(); +void CollectionShardingRuntime::exitCriticalSection(const CSRLock&, const BSONObj& reason) { + _critSec.exitCriticalSection(reason); +} + +void CollectionShardingRuntime::exitCriticalSectionNoChecks(const CSRLock&) { + _critSec.exitCriticalSectionNoChecks(); } boost::optional<SharedSemiFuture<void>> CollectionShardingRuntime::getCriticalSectionSignal( @@ -409,8 +416,10 @@ void CollectionShardingRuntime::resetShardVersionRecoverRefreshFuture(const CSRL _shardVersionInRecoverOrRefresh = boost::none; } -CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx, NamespaceString nss) - : _opCtx(opCtx), _nss(std::move(nss)) { +CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx, + NamespaceString nss, + BSONObj reason) + : _opCtx(opCtx), _nss(std::move(nss)), _reason(std::move(reason)) { // This acquisition is performed with collection lock MODE_S in order to ensure that any ongoing // writes have completed and become visible AutoGetCollection autoColl(_opCtx, @@ -422,7 +431,7 @@ CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx, Na auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); invariant(csr->getCurrentMetadataIfKnown()); - csr->enterCriticalSectionCatchUpPhase(csrLock); + csr->enterCriticalSectionCatchUpPhase(csrLock, _reason); } CollectionCriticalSection::~CollectionCriticalSection() { @@ -430,7 +439,7 @@ CollectionCriticalSection::~CollectionCriticalSection() { AutoGetCollection autoColl(_opCtx, _nss, MODE_IX); auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - csr->exitCriticalSection(csrLock); + csr->exitCriticalSection(csrLock, _reason); } void CollectionCriticalSection::enterCommitPhase() { @@ -443,7 +452,7 @@ void CollectionCriticalSection::enterCommitPhase() { auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); invariant(csr->getCurrentMetadataIfKnown()); - csr->enterCriticalSectionCommitPhase(csrLock); + csr->enterCriticalSectionCommitPhase(csrLock, _reason); } } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h index 03622031c12..46bf003c194 100644 --- a/src/mongo/db/s/collection_sharding_runtime.h +++ b/src/mongo/db/s/collection_sharding_runtime.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/bson/bsonobj.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/collection_sharding_state.h" @@ -126,13 +127,13 @@ public: * * In these methods, the CSRLock ensures concurrent access to the critical section. */ - void enterCriticalSectionCatchUpPhase(const CSRLock&); - void enterCriticalSectionCommitPhase(const CSRLock&); + void enterCriticalSectionCatchUpPhase(const CSRLock&, const BSONObj& reason); + void enterCriticalSectionCommitPhase(const CSRLock&, const BSONObj& reason); /** * It transitions the critical section back to the catch up phase. */ - void rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&); + void rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&, const BSONObj& reason); /** * Method to control the collection's critical secion. Method listed below must be called with @@ -140,13 +141,19 @@ public: * * In this method, the CSRLock ensures concurrent access to the critical section. */ - void exitCriticalSection(const CSRLock&); + void exitCriticalSection(const CSRLock&, const BSONObj& reason); + + /** + * Same semantics than 'exitCriticalSection' but without doing error-checking. Only meant to be + * used when recovering the critical sections in the RecoverableCriticalSectionService. + */ + void exitCriticalSectionNoChecks(const CSRLock&); /** * If the collection is currently in a critical section, returns the critical section signal to * be waited on. Otherwise, returns nullptr. * - * This method internally acquires the CSRLock in IS to wait for eventual ongoing operations. + * This method internally acquires the CSRLock in MODE_IS. */ boost::optional<SharedSemiFuture<void>> getCriticalSectionSignal( OperationContext* opCtx, ShardingMigrationCriticalSection::Operation op); @@ -192,7 +199,7 @@ public: * If there an ongoing shard version recover/refresh, it returns the shared semifuture to be * waited on. Otherwise, returns boost::none. * - * This method internally acquires the CSRLock in IS to wait for eventual ongoing operations. + * This method internally acquires the CSRLock in MODE_IS. */ boost::optional<SharedSemiFuture<void>> getShardVersionRecoverRefreshFuture( OperationContext* opCtx); @@ -275,7 +282,7 @@ class CollectionCriticalSection { CollectionCriticalSection& operator=(const CollectionCriticalSection&) = delete; public: - CollectionCriticalSection(OperationContext* opCtx, NamespaceString nss); + CollectionCriticalSection(OperationContext* opCtx, NamespaceString nss, BSONObj reason); ~CollectionCriticalSection(); /** @@ -287,6 +294,7 @@ private: OperationContext* const _opCtx; NamespaceString _nss; + const BSONObj _reason; }; } // namespace mongo diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp index 227a561711d..79b9db6a2cb 100644 --- a/src/mongo/db/s/database_sharding_state.cpp +++ b/src/mongo/db/s/database_sharding_state.cpp @@ -120,19 +120,23 @@ std::shared_ptr<DatabaseShardingState> DatabaseShardingState::getSharedForLockFr return databasesMap.getOrCreate(dbName); } -void DatabaseShardingState::enterCriticalSectionCatchUpPhase(OperationContext* opCtx, DSSLock&) { +void DatabaseShardingState::enterCriticalSectionCatchUpPhase(OperationContext* opCtx, + DSSLock&, + const BSONObj& reason) { invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X)); - _critSec.enterCriticalSectionCatchUpPhase(); + _critSec.enterCriticalSectionCatchUpPhase(reason); } -void DatabaseShardingState::enterCriticalSectionCommitPhase(OperationContext* opCtx, DSSLock&) { +void DatabaseShardingState::enterCriticalSectionCommitPhase(OperationContext* opCtx, + DSSLock&, + const BSONObj& reason) { invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X)); - _critSec.enterCriticalSectionCommitPhase(); + _critSec.enterCriticalSectionCommitPhase(reason); } -void DatabaseShardingState::exitCriticalSection(OperationContext* opCtx) { +void DatabaseShardingState::exitCriticalSection(OperationContext* opCtx, const BSONObj& reason) { const auto dssLock = DSSLock::lockExclusive(opCtx, this); - _critSec.exitCriticalSection(); + _critSec.exitCriticalSection(reason); } DatabaseType DatabaseShardingState::getDatabaseInfo(OperationContext* opCtx, diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h index b1624bbd9b8..7fafc0fc495 100644 --- a/src/mongo/db/s/database_sharding_state.h +++ b/src/mongo/db/s/database_sharding_state.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/bson/bsonobj.h" #include "mongo/db/s/sharding_migration_critical_section.h" #include "mongo/db/s/sharding_state_lock.h" #include "mongo/s/catalog/type_database.h" @@ -86,9 +87,9 @@ public: * Methods to control the databases's critical section. Must be called with the database X lock * held. */ - void enterCriticalSectionCatchUpPhase(OperationContext* opCtx, DSSLock&); - void enterCriticalSectionCommitPhase(OperationContext* opCtx, DSSLock&); - void exitCriticalSection(OperationContext* opCtx); + void enterCriticalSectionCatchUpPhase(OperationContext* opCtx, DSSLock&, const BSONObj& reason); + void enterCriticalSectionCommitPhase(OperationContext* opCtx, DSSLock&, const BSONObj& reason); + void exitCriticalSection(OperationContext* opCtx, const BSONObj& reason); auto getCriticalSectionSignal(ShardingMigrationCriticalSection::Operation op, DSSLock&) const { return _critSec.getSignal(op); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 752a428a52b..62aa6843250 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -125,7 +125,11 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, _args(std::move(request)), _donorConnStr(std::move(donorConnStr)), _recipientHost(std::move(recipientHost)), - _stats(ShardingStatistics::get(_opCtx)) { + _stats(ShardingStatistics::get(_opCtx)), + _critSecReason(BSON("command" + << "moveChunk" + << "fromShard" << request.getFromShardId() << "toShard" + << request.getToShardId())) { invariant(!_opCtx->lockState()->isLocked()); LOGV2(22016, @@ -333,7 +337,7 @@ Status MigrationSourceManager::enterCriticalSection() { "Starting critical section", "migrationId"_attr = _coordinator->getMigrationId()); - _critSec.emplace(_opCtx, _args.getNss()); + _critSec.emplace(_opCtx, _args.getNss(), _critSecReason); _state = kCriticalSection; diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index d032047c4fb..17f235bedbe 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -270,6 +270,9 @@ private: // on this node. The future is set when the range deletion completes. Used if the moveChunk was // sent with waitForDelete. boost::optional<SemiFuture<void>> _cleanupCompleteFuture; + + // Information about the moveChunk to be used in the critical section. + const BSONObj _critSecReason; }; } // namespace mongo diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp index 1b7ed360794..99bf70f5ba7 100644 --- a/src/mongo/db/s/move_primary_source_manager.cpp +++ b/src/mongo/db/s/move_primary_source_manager.cpp @@ -64,7 +64,11 @@ MovePrimarySourceManager::MovePrimarySourceManager(OperationContext* opCtx, : _requestArgs(std::move(requestArgs)), _dbname(dbname), _fromShard(std::move(fromShard)), - _toShard(std::move(toShard)) {} + _toShard(std::move(toShard)), + _critSecReason(BSON("command" + << "movePrimary" + << "dbName" << _dbname << "fromShard" << fromShard << "toShard" + << toShard)) {} MovePrimarySourceManager::~MovePrimarySourceManager() {} @@ -175,7 +179,7 @@ Status MovePrimarySourceManager::enterCriticalSection(OperationContext* opCtx) { auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); // IMPORTANT: After this line, the critical section is in place and needs to be signaled - dss->enterCriticalSectionCatchUpPhase(opCtx, dssLock); + dss->enterCriticalSectionCatchUpPhase(opCtx, dssLock, _critSecReason); } _state = kCriticalSection; @@ -225,7 +229,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { // Read operations must begin to wait on the critical section just before we send the // commit operation to the config server - dss->enterCriticalSectionCommitPhase(opCtx, dssLock); + dss->enterCriticalSectionCommitPhase(opCtx, dssLock, _critSecReason); } auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); @@ -446,7 +450,7 @@ void MovePrimarySourceManager::_cleanup(OperationContext* opCtx) { dss->clearMovePrimarySourceManager(opCtx); dss->clearDatabaseInfo(opCtx); // Leave the critical section if we're still registered. - dss->exitCriticalSection(opCtx); + dss->exitCriticalSection(opCtx, _critSecReason); } if (_state == kCriticalSection || _state == kCloneCompleted) { diff --git a/src/mongo/db/s/move_primary_source_manager.h b/src/mongo/db/s/move_primary_source_manager.h index 707c7c82ebe..cd678c95b19 100644 --- a/src/mongo/db/s/move_primary_source_manager.h +++ b/src/mongo/db/s/move_primary_source_manager.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/bson/bsonobj.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/s/request_types/move_primary_gen.h" #include "mongo/s/shard_id.h" @@ -190,6 +191,9 @@ private: // The current state. Used only for diagnostics and validation. State _state{kCreated}; + + // Information about the movePrimary to be used in the critical section. + const BSONObj _critSecReason; }; } // namespace mongo diff --git a/src/mongo/db/s/recoverable_critical_section_service.cpp b/src/mongo/db/s/recoverable_critical_section_service.cpp index 1a61edc3e5c..371de82a6e7 100644 --- a/src/mongo/db/s/recoverable_critical_section_service.cpp +++ b/src/mongo/db/s/recoverable_critical_section_service.cpp @@ -347,54 +347,32 @@ void RecoverableCriticalSectionService::recoverRecoverableCriticalSections( OperationContext* opCtx) { LOGV2_DEBUG(5604000, 2, "Recovering all recoverable critical sections"); - std::set<NamespaceString> nssPresentOnDisk; - PersistentTaskStore<CollectionCriticalSectionDocument> store( - NamespaceString::kCollectionCriticalSectionsNamespace); - store.forEach( - opCtx, Query{}, [&opCtx, &nssPresentOnDisk](const CollectionCriticalSectionDocument& doc) { - const auto& nss = doc.getNss(); - { - AutoGetCollection collLock(opCtx, nss, MODE_X); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - - const bool blockingWritesCS = (bool)csr->getCriticalSectionSignal( - opCtx, ShardingMigrationCriticalSection::Operation::kWrite); - - const bool blockingReadsAndWritesCS = blockingWritesCS && - csr->getCriticalSectionSignal( - opCtx, ShardingMigrationCriticalSection::Operation::kRead); - - if (!doc.getBlockReads()) { - if (!blockingWritesCS) - csr->enterCriticalSectionCatchUpPhase(csrLock); - else if (blockingReadsAndWritesCS) - csr->rollbackCriticalSectionCommitPhaseToCatchUpPhase(csrLock); - } else { - if (!blockingWritesCS) - csr->enterCriticalSectionCatchUpPhase(csrLock); - if (!blockingReadsAndWritesCS) - csr->enterCriticalSectionCommitPhase(csrLock); - } - - nssPresentOnDisk.insert(nss); - - return true; - } - }); - - // Release in-memory CS that are not present on the disk + // Release all in-memory critical sections const auto collectionNames = CollectionShardingState::getCollectionNames(opCtx); for (const auto& collName : collectionNames) { - if (nssPresentOnDisk.find(collName) != nssPresentOnDisk.end()) - continue; - AutoGetCollection collLock(opCtx, collName, MODE_X); auto* const csr = CollectionShardingRuntime::get(opCtx, collName); auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); + csr->exitCriticalSectionNoChecks(csrLock); } + // Map the critical sections that are on disk to memory + PersistentTaskStore<CollectionCriticalSectionDocument> store( + NamespaceString::kCollectionCriticalSectionsNamespace); + store.forEach(opCtx, Query{}, [&opCtx](const CollectionCriticalSectionDocument& doc) { + const auto& nss = doc.getNss(); + { + AutoGetCollection collLock(opCtx, nss, MODE_X); + auto* const csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + csr->enterCriticalSectionCatchUpPhase(csrLock, doc.getReason()); + if (doc.getBlockReads()) + csr->enterCriticalSectionCommitPhase(csrLock, doc.getReason()); + + return true; + } + }); + LOGV2_DEBUG(5604001, 2, "Recovered all recoverable critical sections"); } diff --git a/src/mongo/db/s/shard_collection_legacy.cpp b/src/mongo/db/s/shard_collection_legacy.cpp index 24d514559c8..be939b295ea 100644 --- a/src/mongo/db/s/shard_collection_legacy.cpp +++ b/src/mongo/db/s/shard_collection_legacy.cpp @@ -500,7 +500,11 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx, // From this point onward the collection can only be read, not written to, so it is safe to // construct the prerequisites and generate the target state. - ScopedShardVersionCriticalSection critSec(opCtx, nss); + ScopedShardVersionCriticalSection critSec(opCtx, + nss, + BSON("command" + << "shardCollection" + << "collection" << nss.ns())); pauseShardCollectionReadOnlyCriticalSection.pauseWhileSet(); diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 58f811e968e..7f7541a4b79 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -269,8 +269,9 @@ void onShardVersionMismatch(OperationContext* opCtx, } ScopedShardVersionCriticalSection::ScopedShardVersionCriticalSection(OperationContext* opCtx, - NamespaceString nss) - : _opCtx(opCtx), _nss(std::move(nss)) { + NamespaceString nss, + BSONObj reason) + : _opCtx(opCtx), _nss(std::move(nss)), _reason(std::move(reason)) { while (true) { uassert(ErrorCodes::InvalidNamespace, @@ -312,7 +313,7 @@ ScopedShardVersionCriticalSection::ScopedShardVersionCriticalSection(OperationCo if (!joinShardVersionOperation(_opCtx, csr, &dbLock, &collLock, &csrLock)) { CollectionShardingRuntime::get(_opCtx, _nss) - ->enterCriticalSectionCatchUpPhase(*csrLock); + ->enterCriticalSectionCatchUpPhase(*csrLock, _reason); break; } } @@ -328,7 +329,7 @@ ScopedShardVersionCriticalSection::~ScopedShardVersionCriticalSection() { Lock::CollectionLock collLock(_opCtx, _nss, MODE_IX); auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - csr->exitCriticalSection(csrLock); + csr->exitCriticalSection(csrLock, _reason); } void ScopedShardVersionCriticalSection::enterCommitPhase() { @@ -340,7 +341,7 @@ void ScopedShardVersionCriticalSection::enterCommitPhase() { Lock::CollectionLock collLock(_opCtx, _nss, MODE_IS, deadline); auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - csr->enterCriticalSectionCommitPhase(csrLock); + csr->enterCriticalSectionCommitPhase(csrLock, _reason); } Status onShardVersionMismatchNoExcept(OperationContext* opCtx, diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h index 95700f0a042..3801654aace 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.h +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h @@ -113,7 +113,7 @@ class ScopedShardVersionCriticalSection { ScopedShardVersionCriticalSection& operator=(const ScopedShardVersionCriticalSection&) = delete; public: - ScopedShardVersionCriticalSection(OperationContext* opCtx, NamespaceString nss); + ScopedShardVersionCriticalSection(OperationContext* opCtx, NamespaceString nss, BSONObj reason); ~ScopedShardVersionCriticalSection(); void enterCommitPhase(); @@ -121,6 +121,7 @@ public: private: OperationContext* const _opCtx; const NamespaceString _nss; + const BSONObj _reason; }; } // namespace mongo diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index c1e0e01da88..0c84dabb36a 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -280,9 +280,9 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, !replCoord->getMemberState().rollback())) { const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserErrorContext("ShardServerOpObserver"), insertedDoc); - - - opCtx->recoveryUnit()->onCommit([opCtx, insertedNss = collCSDoc.getNss()]( + opCtx->recoveryUnit()->onCommit([opCtx, + insertedNss = collCSDoc.getNss(), + reason = collCSDoc.getReason().getOwned()]( boost::optional<Timestamp>) { boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; if (!isStandaloneOrPrimary(opCtx)) @@ -291,7 +291,7 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, UninterruptibleLockGuard noInterrupt(opCtx->lockState()); auto* const csr = CollectionShardingRuntime::get(opCtx, insertedNss); auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCatchUpPhase(csrLock); + csr->enterCriticalSectionCatchUpPhase(csrLock, reason); }); } } @@ -415,12 +415,12 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE if (!replCoord->isReplEnabled() || (!replCoord->getMemberState().recovering() && !replCoord->getMemberState().rollback())) { - const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc); opCtx->recoveryUnit()->onCommit( - [opCtx, updatedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + [opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( + boost::optional<Timestamp>) { boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; if (!isStandaloneOrPrimary(opCtx)) lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX); @@ -428,7 +428,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE UninterruptibleLockGuard noInterrupt(opCtx->lockState()); auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss); auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCommitPhase(csrLock); + csr->enterCriticalSectionCommitPhase(csrLock, reason); }); } } @@ -448,9 +448,14 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE void ShardServerOpObserver::aboutToDelete(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& doc) { - // Extract the _id field from the document. If it does not have an _id, use the - // document itself as the _id. - documentIdDecoration(opCtx) = doc["_id"] ? doc["_id"].wrap() : doc; + + if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { + documentIdDecoration(opCtx) = doc; + } else { + // Extract the _id field from the document. If it does not have an _id, use the + // document itself as the _id. + documentIdDecoration(opCtx) = doc["_id"] ? doc["_id"].wrap() : doc; + } } void ShardServerOpObserver::onDelete(OperationContext* opCtx, @@ -503,24 +508,22 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, if (!replCoord->isReplEnabled() || (!replCoord->getMemberState().recovering() && !replCoord->getMemberState().rollback())) { - const auto deletedNss([&] { - std::string coll; - fassert(5514801, - bsonExtractStringField( - documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll)); - return NamespaceString(coll); - }()); - - opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional<Timestamp>) { - boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) - lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX); - - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); - }); + const auto& deletedDoc = documentId; + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ShardServerOpObserver"), deletedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( + boost::optional<Timestamp>) { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) + lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX); + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + csr->exitCriticalSection(csrLock, reason); + }); } } } diff --git a/src/mongo/db/s/sharding_migration_critical_section.cpp b/src/mongo/db/s/sharding_migration_critical_section.cpp index 6f29221e33c..9c415c07452 100644 --- a/src/mongo/db/s/sharding_migration_critical_section.cpp +++ b/src/mongo/db/s/sharding_migration_critical_section.cpp @@ -33,45 +33,85 @@ namespace mongo { +namespace { +std::string getMessageMismatchReason(const std::string& action, + const BSONObj& newReason, + const BSONObj& existingReason) { + return str::stream() << "trying to " << action << " a critical section with reason " + << newReason + << " but it was already taken by another operation with different reason " + << existingReason << "."; +} + +std::string getMessageNotAcquired(const std::string& action, const BSONObj& newReason) { + return str::stream() << "Trying to " << action << " a critical section with reason " + << newReason << " but it was not acquired first."; +} +} // namespace + ShardingMigrationCriticalSection::ShardingMigrationCriticalSection() = default; ShardingMigrationCriticalSection::~ShardingMigrationCriticalSection() { - invariant(!_critSecSignal); + invariant(!_critSecCtx); +} + +void ShardingMigrationCriticalSection::enterCriticalSectionCatchUpPhase(const BSONObj& reason) { + if (_critSecCtx && _critSecCtx->reason.woCompare(reason) == 0) + return; + + invariant(!_critSecCtx, getMessageMismatchReason("acquire", reason, _critSecCtx->reason)); + + _critSecCtx.emplace(reason.getOwned()); } -void ShardingMigrationCriticalSection::enterCriticalSectionCatchUpPhase() { - invariant(!_critSecSignal); - _critSecSignal.emplace(); - _readsShouldWaitOnCritSec = false; +void ShardingMigrationCriticalSection::enterCriticalSectionCommitPhase(const BSONObj& reason) { + invariant(_critSecCtx, getMessageNotAcquired("promote", reason)); + invariant(_critSecCtx->reason.woCompare(reason) == 0, + getMessageMismatchReason("promote", reason, _critSecCtx->reason)); + + _critSecCtx->readsShouldWaitOnCritSec = true; } -void ShardingMigrationCriticalSection::enterCriticalSectionCommitPhase() { - invariant(_critSecSignal); - _readsShouldWaitOnCritSec = true; +void ShardingMigrationCriticalSection::exitCriticalSection(const BSONObj& reason) { + invariant(!_critSecCtx || _critSecCtx->reason.woCompare(reason) == 0, + getMessageMismatchReason("release", reason, _critSecCtx->reason)); + + exitCriticalSectionNoChecks(); } -void ShardingMigrationCriticalSection::exitCriticalSection() { - if (_critSecSignal) { - _critSecSignal->emplaceValue(); - _critSecSignal.reset(); - } +void ShardingMigrationCriticalSection::exitCriticalSectionNoChecks() { + if (!_critSecCtx) + return; + + _critSecCtx->critSecSignal.emplaceValue(); + _critSecCtx.reset(); } -void ShardingMigrationCriticalSection::rollbackCriticalSectionCommitPhaseToCatchUpPhase() { - invariant(_critSecSignal); - invariant(_readsShouldWaitOnCritSec); - _readsShouldWaitOnCritSec = false; +void ShardingMigrationCriticalSection::rollbackCriticalSectionCommitPhaseToCatchUpPhase( + const BSONObj& reason) { + invariant(_critSecCtx, getMessageNotAcquired("rollbackToCatchUp", reason)); + invariant(_critSecCtx->reason.woCompare(reason) == 0, + getMessageMismatchReason("rollbackToCatchUp", reason, _critSecCtx->reason)); + + _critSecCtx->readsShouldWaitOnCritSec = false; } boost::optional<SharedSemiFuture<void>> ShardingMigrationCriticalSection::getSignal( Operation op) const { - if (!_critSecSignal) + if (!_critSecCtx) return boost::none; - if (op == kWrite || _readsShouldWaitOnCritSec) - return _critSecSignal->getFuture(); + if (op == kWrite || _critSecCtx->readsShouldWaitOnCritSec) + return _critSecCtx->critSecSignal.getFuture(); return boost::none; } +boost::optional<BSONObj> ShardingMigrationCriticalSection::getReason() const { + if (!_critSecCtx) + return boost::none; + + return _critSecCtx->reason; +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_migration_critical_section.h b/src/mongo/db/s/sharding_migration_critical_section.h index b0f87477607..28aa51511d9 100644 --- a/src/mongo/db/s/sharding_migration_critical_section.h +++ b/src/mongo/db/s/sharding_migration_critical_section.h @@ -31,6 +31,7 @@ #include <boost/optional.hpp> +#include "mongo/bson/bsonobj.h" #include "mongo/util/future.h" namespace mongo { @@ -57,22 +58,28 @@ public: * * NOTE: Must be called under the appropriate X lock (collection or database). */ - void enterCriticalSectionCatchUpPhase(); + void enterCriticalSectionCatchUpPhase(const BSONObj& reason); /** * Sets the critical section in a mode, which disallows reads. */ - void enterCriticalSectionCommitPhase(); + void enterCriticalSectionCommitPhase(const BSONObj& reason); /** * Leaves the critical section. */ - void exitCriticalSection(); + void exitCriticalSection(const BSONObj& reason); + + /** + * Leaves the critical section without doing error-checking. Only meant to be used when + * recovering the critical sections in the RecoverableCriticalSectionService. + */ + void exitCriticalSectionNoChecks(); /** * Sets the critical section back to the catch up phase, which disallows reads. */ - void rollbackCriticalSectionCommitPhaseToCatchUpPhase(); + void rollbackCriticalSectionCommitPhaseToCatchUpPhase(const BSONObj& reason); /** * Retrieves a critical section future to wait on. Will return boost::none if the migration is @@ -82,18 +89,30 @@ public: enum Operation { kRead, kWrite }; boost::optional<SharedSemiFuture<void>> getSignal(Operation op) const; + boost::optional<BSONObj> getReason() const; + private: - // Whether the migration source is in a critical section. Tracked as a shared promise so that - // callers don't have to hold metadata locks in order to wait on it. - boost::optional<SharedPromise<void>> _critSecSignal; - - // Used to delay blocking reads up until the commit of the metadata on the config server needs - // to happen. This allows the shard to serve reads up until the config server metadata update - // needs to be committed. - // - // The transition from false to true is protected by the database or collection X-lock, which - // happens just before the config server metadata commit is scheduled. - bool _readsShouldWaitOnCritSec{false}; + struct CriticalSectionContext { + CriticalSectionContext(BSONObj reason_) : reason(std::move(reason_)) {} + // Whether the migration source is in a critical section. Tracked as a shared promise so + // that callers don't have to hold metadata locks in order to wait on it. + SharedPromise<void> critSecSignal; + + // Used to delay blocking reads up until the commit of the metadata on the config server + // needs to happen. This allows the shard to serve reads up until the config server metadata + // update needs to be committed. + // + // The transition from false to true is protected by the database or collection X-lock, + // which happens just before the config server metadata commit is scheduled. + bool readsShouldWaitOnCritSec{false}; + + // Information about the operation that originally acquired the critical section. Used to + // make the operations that modify the state of the CS idempotent and to provide more + // detailed error messages. + BSONObj reason; + }; + + boost::optional<CriticalSectionContext> _critSecCtx; }; } // namespace mongo |