diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.cpp | 136 |
2 files changed, 59 insertions, 79 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 61749bf7f57..bac330eb09b 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -996,7 +996,7 @@ void recoverMigrationCoordinations(OperationContext* opCtx, hangInRefreshFilteringMetadataUntilSuccessThenSimulateErrorUninterruptible .pauseWhileSet(); uasserted(ErrorCodes::InternalError, - "simulate an error response for forceShardFilteringMetadataRefresh"); + "simulate an error response for forceGetCurrentMetadata"); } auto setFilteringMetadata = [&opCtx, ¤tMetadata, &doc, &cancellationToken]() { diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 81a748c5bef..aa83694fcf2 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -58,57 +58,42 @@ MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh); MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread); /** - * If the critical section associate with the database is entered by another thread (e.g., a move - * primary or a drop database is in progress), it releases the acquired locks and waits for the - * latter to exit. In this case the function returns true, otherwise false. + * Blocking method, which will wait for any concurrent operations that could change the database + * version to complete (namely critical section and concurrent onDbVersionMismatch invocations). + * + * Returns 'true' if there were concurrent operations that had to be joined (in which case all locks + * will be dropped). If there were none, returns false and the locks continue to be held. */ -bool checkAndWaitIfCriticalSectionIsEntered( - OperationContext* opCtx, - DatabaseShardingState* dss, - boost::optional<Lock::DBLock>& dbLock, - boost::optional<DatabaseShardingState::DSSLock>& dssLock) { - invariant(dbLock); - invariant(dssLock); +bool joinDbVersionOperation(OperationContext* opCtx, + DatabaseShardingState* dss, + boost::optional<Lock::DBLock>* dbLock, + boost::optional<DatabaseShardingState::DSSLock>* dssLock) { + invariant(dbLock->has_value()); + invariant(dssLock->has_value()); if (auto critSect = - dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, *dssLock)) { + dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, **dssLock)) { LOGV2_DEBUG(6697201, 2, "Waiting for exit from the critical section", "db"_attr = dss->getDbName(), - "reason"_attr = dss->getCriticalSectionReason(*dssLock)); + "reason"_attr = dss->getCriticalSectionReason(**dssLock)); - dbLock = boost::none; - dssLock = boost::none; + dbLock->reset(); + dssLock->reset(); uassertStatusOK(OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSect)); - return true; } - return false; -} - -/** - * If another thread is refreshing the database metadata, it releases the acquired locks and waits - * for that refresh to complete. In this case the function returns true, otherwise false. - */ -bool checkAndWaitIfAnotherRefreshIsRunning( - OperationContext* opCtx, - DatabaseShardingState* dss, - boost::optional<Lock::DBLock>& dbLock, - boost::optional<DatabaseShardingState::DSSLock>& dssLock) { - invariant(dbLock); - invariant(dssLock); - - if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(*dssLock)) { + if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(**dssLock)) { LOGV2_DEBUG(6697202, 2, "Waiting for completion of another database metadata refresh", "db"_attr = dss->getDbName()); - dbLock = boost::none; - dssLock = boost::none; + dbLock->reset(); + dssLock->reset(); try { refreshVersionFuture->get(opCtx); @@ -170,9 +155,9 @@ Status refreshDbMetadata(OperationContext* opCtx, return swDbMetadata.getStatus(); } -SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx, - const DatabaseName& dbName, - const CancellationToken& cancellationToken) { +SharedSemiFuture<void> recoverRefreshDbVersion(OperationContext* opCtx, + const DatabaseName& dbName, + const CancellationToken& cancellationToken) { const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); return ExecutorFuture<void>(executor) .then([=, @@ -215,7 +200,7 @@ SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx, void onDbVersionMismatch(OperationContext* opCtx, const StringData dbName, - boost::optional<DatabaseVersion> receivedVersion) { + boost::optional<DatabaseVersion> receivedDbVersion) { invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->getClient()->isInDirectClient()); invariant(ShardingState::get(opCtx)->canAcceptShardedCommands()); @@ -233,23 +218,20 @@ void onDbVersionMismatch(OperationContext* opCtx, 2, "Handle database version mismatch", "db"_attr = dbName, - "receivedVersion"_attr = receivedVersion); + "receivedDbVersion"_attr = receivedDbVersion); while (true) { boost::optional<SharedSemiFuture<void>> dbMetadataRefreshFuture; - // Set the database metadata refresh future after waiting for another thread to exit from - // the critical section or to complete an ongoing refresh. { auto dbLock = boost::make_optional(Lock::DBLock(opCtx, dbName, MODE_IS)); auto* dss = DatabaseShardingState::get(opCtx, dbName); - if (receivedVersion) { + if (receivedDbVersion) { auto dssLock = boost::make_optional(DatabaseShardingState::DSSLock::lockShared(opCtx, dss)); - if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) || - checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) { + if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) { // Waited for another thread to exit from the critical section or to complete an // ongoing refresh, so reacquire the locks. continue; @@ -260,8 +242,9 @@ void onDbVersionMismatch(OperationContext* opCtx, // is in progress or can start (would require to exclusive lock the DSS). // Therefore, the database version can be accessed safely. - const auto wantedVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName); - if (receivedVersion <= wantedVersion) { + const auto wantedDbVersion = + DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName); + if (receivedDbVersion <= wantedDbVersion) { // No need to refresh the database metadata as the wanted version is newer // than the one received. return; @@ -275,8 +258,7 @@ void onDbVersionMismatch(OperationContext* opCtx, auto dssLock = boost::make_optional(DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss)); - if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) || - checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) { + if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) { // Waited for another thread to exit from the critical section or to complete an // ongoing refresh, so reacquire the locks. continue; @@ -290,7 +272,7 @@ void onDbVersionMismatch(OperationContext* opCtx, CancellationSource cancellationSource; CancellationToken cancellationToken = cancellationSource.token(); dss->setDbMetadataRefreshFuture( - asyncDbMetadataRefresh(opCtx, dbName, cancellationToken), + recoverRefreshDbVersion(opCtx, dbName, cancellationToken), std::move(cancellationSource), *dssLock); dbMetadataRefreshFuture = dss->getDbMetadataRefreshFuture(*dssLock); @@ -327,28 +309,28 @@ bool joinShardVersionOperation(OperationContext* opCtx, invariant(collLock->has_value()); invariant(csrLock->has_value()); - // If another thread is currently holding the critical section or the shard version future, it - // will be necessary to wait on one of the two variables to finish the update/recover/refresh. - auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx); - auto critSecSignal = - csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite); + if (auto critSecSignal = + csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)) { + csrLock->reset(); + collLock->reset(); + dbLock->reset(); + + uassertStatusOK( + OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal)); - if (inRecoverOrRefresh || critSecSignal) { - // Drop the locks and wait for an ongoing shard version's recovery/refresh/update + return true; + } + + if (auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx)) { csrLock->reset(); collLock->reset(); dbLock->reset(); - if (critSecSignal) { - uassertStatusOK( - OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal)); - } else { - try { - inRecoverOrRefresh->get(opCtx); - } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) { - // The ongoing refresh has finished, although it was canceled by a - // 'clearFilteringMetadata'. - } + try { + inRecoverOrRefresh->get(opCtx); + } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) { + // The ongoing refresh has finished, although it was canceled by a + // 'clearFilteringMetadata'. } return true; @@ -358,7 +340,7 @@ bool joinShardVersionOperation(OperationContext* opCtx, } SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext, - const NamespaceString nss, + const NamespaceString& nss, bool runRecover, CancellationToken cancellationToken) { auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); @@ -417,8 +399,6 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext if (!currentMetadata.allowMigrations()) { boost::optional<SharedSemiFuture<void>> waitForMigrationAbort; { - // DBLock and CollectionLock must be used in order to avoid shard version - // checks Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); @@ -497,16 +477,16 @@ void onShardVersionMismatch(OperationContext* opCtx, collLock.emplace(opCtx, nss, MODE_IS); auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - boost::optional<CollectionShardingRuntime::CSRLock> csrLock = - CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { - continue; - } + if (shardVersionReceived) { + boost::optional<CollectionShardingRuntime::CSRLock> csrLock = + CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - if (auto metadata = csr->getCurrentMetadataIfKnown()) { - // Check if the current shard version is fresh enough - if (shardVersionReceived) { + if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { + continue; + } + + if (auto metadata = csr->getCurrentMetadataIfKnown()) { const auto currentShardVersion = metadata->getShardVersion(); // Don't need to remotely reload if the requested version is smaller than the // known one. This means that the remote side is behind. @@ -516,8 +496,8 @@ void onShardVersionMismatch(OperationContext* opCtx, } } - csrLock.reset(); - csrLock.emplace(CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr)); + boost::optional<CollectionShardingRuntime::CSRLock> csrLock = + CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { continue; |