diff options
author | Antonio Fuschetto <antonio.fuschetto@mongodb.com> | 2022-10-06 13:43:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-06 14:15:08 +0000 |
commit | 3a3021eedcf2ddcee5dfab712fe226fbc35a70c1 (patch) | |
tree | 64de64e9aea7939a218cc8e4ca8f8556ac22baa4 | |
parent | 8062f490260eac573ad48915b5d8ccc478bb5b9a (diff) | |
download | mongo-3a3021eedcf2ddcee5dfab712fe226fbc35a70c1.tar.gz |
SERVER-69444 Make the joining of concurrent critical section and refresh look the same between DSS and CSS
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.cpp | 186 |
2 files changed, 87 insertions, 101 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 990f07fea47..09d0a97b118 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -1018,7 +1018,7 @@ void recoverMigrationCoordinations(OperationContext* opCtx, hangInRefreshFilteringMetadataUntilSuccessThenSimulateErrorUninterruptible .pauseWhileSet(); uasserted(ErrorCodes::InternalError, - "simulate an error response for forceShardFilteringMetadataRefresh"); + "simulate an error response for forceGetCurrentMetadata"); } auto setFilteringMetadata = [&opCtx, ¤tMetadata, &doc, &cancellationToken]() { diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 86ab7a41866..df1ccd9d49d 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -27,7 +27,6 @@ * it in the license file. */ - #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/catalog/database_holder.h" @@ -51,7 +50,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - namespace mongo { namespace { @@ -60,57 +58,42 @@ MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh); MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread); /** - * If the critical section associate with the database is entered by another thread (e.g., a move - * primary or a drop database is in progress), it releases the acquired locks and waits for the - * latter to exit. In this case the function returns true, otherwise false. + * Blocking method, which will wait for any concurrent operations that could change the database + * version to complete (namely critical section and concurrent onDbVersionMismatch invocations). + * + * Returns 'true' if there were concurrent operations that had to be joined (in which case all locks + * will be dropped). If there were none, returns false and the locks continue to be held. */ -bool checkAndWaitIfCriticalSectionIsEntered( - OperationContext* opCtx, - DatabaseShardingState* dss, - boost::optional<Lock::DBLock>& dbLock, - boost::optional<DatabaseShardingState::DSSLock>& dssLock) { - invariant(dbLock); - invariant(dssLock); +bool joinDbVersionOperation(OperationContext* opCtx, + DatabaseShardingState* dss, + boost::optional<Lock::DBLock>* dbLock, + boost::optional<DatabaseShardingState::DSSLock>* dssLock) { + invariant(dbLock->has_value()); + invariant(dssLock->has_value()); if (auto critSect = - dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, *dssLock)) { + dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, **dssLock)) { LOGV2_DEBUG(6697201, 2, "Waiting for exit from the critical section", "db"_attr = dss->getDbName(), - "reason"_attr = dss->getCriticalSectionReason(*dssLock)); + "reason"_attr = dss->getCriticalSectionReason(**dssLock)); - dbLock = boost::none; - dssLock = boost::none; + dbLock->reset(); + dssLock->reset(); uassertStatusOK(OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSect)); - return true; } - return false; -} - -/** - * If another thread is refreshing the database metadata, it releases the acquired locks and waits - * for that refresh to complete. In this case the function returns true, otherwise false. - */ -bool checkAndWaitIfAnotherRefreshIsRunning( - OperationContext* opCtx, - DatabaseShardingState* dss, - boost::optional<Lock::DBLock>& dbLock, - boost::optional<DatabaseShardingState::DSSLock>& dssLock) { - invariant(dbLock); - invariant(dssLock); - - if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(*dssLock)) { + if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(**dssLock)) { LOGV2_DEBUG(6697202, 2, "Waiting for completion of another database metadata refresh", "db"_attr = dss->getDbName()); - dbLock = boost::none; - dssLock = boost::none; + dbLock->reset(); + dssLock->reset(); try { refreshVersionFuture->get(opCtx); @@ -131,7 +114,7 @@ bool checkAndWaitIfAnotherRefreshIsRunning( */ Status refreshDbMetadata(OperationContext* opCtx, const DatabaseName& dbName, - CancellationToken cancellationToken) { + const CancellationToken& cancellationToken) { invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->getClient()->isInDirectClient()); invariant(ShardingState::get(opCtx)->canAcceptShardedCommands()); @@ -139,7 +122,7 @@ Status refreshDbMetadata(OperationContext* opCtx, ScopeGuard resetRefreshFutureOnError([&] { UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - Lock::DBLock dbLock(opCtx, dbName, MODE_IS); + Lock::DBLock dbLock(opCtx, dbName, MODE_IX); auto* dss = DatabaseShardingState::get(opCtx, dbName.db()); const auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); @@ -172,9 +155,9 @@ Status refreshDbMetadata(OperationContext* opCtx, return swDbMetadata.getStatus(); } -SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx, - const DatabaseName& dbName, - const CancellationToken& cancellationToken) { +SharedSemiFuture<void> recoverRefreshDbVersion(OperationContext* opCtx, + const DatabaseName& dbName, + const CancellationToken& cancellationToken) { const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); return ExecutorFuture<void>(executor) .then([=, @@ -217,7 +200,7 @@ SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx, void onDbVersionMismatch(OperationContext* opCtx, const StringData dbName, - boost::optional<DatabaseVersion> receivedVersion) { + const boost::optional<DatabaseVersion> receivedDbVersion) { invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->getClient()->isInDirectClient()); invariant(ShardingState::get(opCtx)->canAcceptShardedCommands()); @@ -235,23 +218,20 @@ void onDbVersionMismatch(OperationContext* opCtx, 2, "Handle database version mismatch", "db"_attr = dbName, - "receivedVersion"_attr = receivedVersion); + "receivedDbVersion"_attr = receivedDbVersion); while (true) { boost::optional<SharedSemiFuture<void>> dbMetadataRefreshFuture; - // Set the database metadata refresh future after waiting for another thread to exit from - // the critical section or to complete an ongoing refresh. { auto dbLock = boost::make_optional(Lock::DBLock(opCtx, dbName, MODE_IS)); auto* dss = DatabaseShardingState::get(opCtx, dbName); - if (receivedVersion) { + if (receivedDbVersion) { auto dssLock = boost::make_optional(DatabaseShardingState::DSSLock::lockShared(opCtx, dss)); - if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) || - checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) { + if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) { // Waited for another thread to exit from the critical section or to complete an // ongoing refresh, so reacquire the locks. continue; @@ -262,8 +242,9 @@ void onDbVersionMismatch(OperationContext* opCtx, // is in progress or can start (would require to exclusive lock the DSS). // Therefore, the database version can be accessed safely. - const auto wantedVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName); - if (receivedVersion <= wantedVersion) { + const auto wantedDbVersion = + DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName); + if (receivedDbVersion <= wantedDbVersion) { // No need to refresh the database metadata as the wanted version is newer // than the one received. return; @@ -277,8 +258,7 @@ void onDbVersionMismatch(OperationContext* opCtx, auto dssLock = boost::make_optional(DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss)); - if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) || - checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) { + if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) { // Waited for another thread to exit from the critical section or to complete an // ongoing refresh, so reacquire the locks. continue; @@ -292,7 +272,7 @@ void onDbVersionMismatch(OperationContext* opCtx, CancellationSource cancellationSource; CancellationToken cancellationToken = cancellationSource.token(); dss->setDbMetadataRefreshFuture( - asyncDbMetadataRefresh(opCtx, dbName, cancellationToken), + recoverRefreshDbVersion(opCtx, dbName, cancellationToken), std::move(cancellationSource), *dssLock); dbMetadataRefreshFuture = dss->getDbMetadataRefreshFuture(*dssLock); @@ -313,37 +293,43 @@ void onDbVersionMismatch(OperationContext* opCtx, } } -// Return true if joins a shard version update/recover/refresh (in that case, all locks are dropped) +/** + * Blocking method, which will wait for any concurrent operations that could change the shard + * version to complete (namely critical section and concurrent onShardVersionMismatch invocations). + * + * Returns 'true' if there were concurrent operations that had to be joined (in which case all locks + * will be dropped). If there were none, returns false and the locks continue to be held. + */ bool joinShardVersionOperation(OperationContext* opCtx, CollectionShardingRuntime* csr, boost::optional<Lock::DBLock>* dbLock, boost::optional<Lock::CollectionLock>* collLock, boost::optional<CollectionShardingRuntime::CSRLock>* csrLock) { + invariant(dbLock->has_value()); invariant(collLock->has_value()); invariant(csrLock->has_value()); - // If another thread is currently holding the critical section or the shard version future, it - // will be necessary to wait on one of the two variables to finish the update/recover/refresh. - auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx); - auto critSecSignal = - csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite); + if (auto critSecSignal = + csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)) { + csrLock->reset(); + collLock->reset(); + dbLock->reset(); + + uassertStatusOK( + OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal)); + + return true; + } - if (inRecoverOrRefresh || critSecSignal) { - // Drop the locks and wait for an ongoing shard version's recovery/refresh/update + if (auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx)) { csrLock->reset(); collLock->reset(); dbLock->reset(); - if (critSecSignal) { - uassertStatusOK( - OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal)); - } else { - try { - inRecoverOrRefresh->get(opCtx); - } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) { - // The ongoing refresh has finished, although it was canceled by a - // 'clearFilteringMetadata'. - } + try { + inRecoverOrRefresh->get(opCtx); + } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) { + // The ongoing refresh has finished, although it was interrupted. } return true; @@ -353,7 +339,7 @@ bool joinShardVersionOperation(OperationContext* opCtx, } SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext, - const NamespaceString nss, + const NamespaceString& nss, bool runRecover, CancellationToken cancellationToken) { auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); @@ -412,8 +398,6 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext if (!currentMetadata.allowMigrations()) { boost::optional<SharedSemiFuture<void>> waitForMigrationAbort; { - // DBLock and CollectionLock must be used in order to avoid shard version - // checks Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); @@ -449,7 +433,8 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext if (cancellationToken.isCanceled() && (status.isOK() || status == ErrorCodes::Interrupted)) { uasserted(ErrorCodes::ShardVersionRefreshCanceled, - "Shard version refresh canceled by a 'clearFilteringMetadata'"); + "Shard version refresh canceled by an interruption, probably due to a " + "'clearFilteringMetadata'"); } return status; }) @@ -484,6 +469,7 @@ void onShardVersionMismatch(OperationContext* opCtx, while (true) { boost::optional<SharedSemiFuture<void>> inRecoverOrRefresh; + { boost::optional<Lock::DBLock> dbLock; boost::optional<Lock::CollectionLock> collLock; @@ -491,17 +477,16 @@ void onShardVersionMismatch(OperationContext* opCtx, collLock.emplace(opCtx, nss, MODE_IS); auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - boost::optional<CollectionShardingRuntime::CSRLock> csrLock = - CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { - continue; - } + if (shardVersionReceived) { + boost::optional<CollectionShardingRuntime::CSRLock> csrLock = + CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto metadata = csr->getCurrentMetadataIfKnown(); - if (metadata) { - // Check if the current shard version is fresh enough - if (shardVersionReceived) { + if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { + continue; + } + + if (auto metadata = csr->getCurrentMetadataIfKnown()) { const auto currentShardVersion = metadata->getShardVersion(); // Don't need to remotely reload if the requested version is smaller than the // known one. This means that the remote side is behind. @@ -511,25 +496,26 @@ void onShardVersionMismatch(OperationContext* opCtx, } } - csrLock.reset(); - csrLock.emplace(CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr)); - - // If there is no ongoing shard version operation, initialize the RecoverRefreshThread - // thread and associate it to the CSR. - if (!joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { - // If the shard doesn't yet know its filtering metadata, recovery needs to be run - const bool runRecover = metadata ? false : true; - CancellationSource cancellationSource; - CancellationToken cancellationToken = cancellationSource.token(); - csr->setShardVersionRecoverRefreshFuture( - recoverRefreshShardVersion( - opCtx->getServiceContext(), nss, runRecover, std::move(cancellationToken)), - std::move(cancellationSource), - *csrLock); - inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx); - } else { + boost::optional<CollectionShardingRuntime::CSRLock> csrLock = + CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + + if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { continue; } + + // If we reached here, there were no ongoing critical sections or recoverRefresh running + // and we are holding the exclusive CSR lock. + + // If the shard doesn't yet know its filtering metadata, recovery needs to be run + const bool runRecover = csr->getCurrentMetadataIfKnown() ? false : true; + CancellationSource cancellationSource; + CancellationToken cancellationToken = cancellationSource.token(); + csr->setShardVersionRecoverRefreshFuture( + recoverRefreshShardVersion( + opCtx->getServiceContext(), nss, runRecover, std::move(cancellationToken)), + std::move(cancellationSource), + *csrLock); + inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx); } try { |