From 546af4aa74cd24d59272b41878f8af14519ad433 Mon Sep 17 00:00:00 2001 From: Antonio Fuschetto Date: Mon, 3 Oct 2022 07:05:49 +0000 Subject: SERVER-66972 Database critical section does not serialize with ongoing refreshes --- src/mongo/base/error_codes.yml | 2 + src/mongo/db/s/SConscript | 1 + src/mongo/db/s/database_sharding_state.cpp | 27 +- src/mongo/db/s/database_sharding_state.h | 49 ++++ src/mongo/db/s/database_sharding_state_test.cpp | 2 +- .../resharding_destined_recipient_test.cpp | 2 +- .../db/s/shard_filtering_metadata_refresh.cpp | 304 ++++++++++++++++----- src/mongo/db/s/shard_filtering_metadata_refresh.h | 2 - src/mongo/db/s/shard_server_op_observer.cpp | 8 + 9 files changed, 321 insertions(+), 76 deletions(-) (limited to 'src/mongo') diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 0b3150042bd..01bd75aca3e 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -492,6 +492,8 @@ error_codes: - {code: 375, name: TransactionAPIMustRetryCommit, categories: [InternalOnly]} - {code: 377, name: FLEMaxTagLimitExceeded } + - {code: 379, name: DatabaseMetadataRefreshCanceled, categories: [InternalOnly]} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index fa4b7a7f1d6..3f07944ad03 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -156,6 +156,7 @@ env.Library( '$BUILD_DIR/mongo/s/query/cluster_aggregate', '$BUILD_DIR/mongo/s/sharding_api', '$BUILD_DIR/mongo/s/sharding_initialization', + 'forwardable_operation_metadata', 'sharding_api_d', 'sharding_catalog_manager', 'sharding_logging', diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp index a4c0dff2667..aed788ee1fb 100644 --- a/src/mongo/db/s/database_sharding_state.cpp +++ b/src/mongo/db/s/database_sharding_state.cpp @@ -121,10 +121,12 @@ std::shared_ptr DatabaseShardingState::getSharedForLockFr } void DatabaseShardingState::enterCriticalSectionCatchUpPhase(OperationContext* opCtx, - DSSLock&, + DSSLock& dssLock, const BSONObj& reason) { invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X)); _critSec.enterCriticalSectionCatchUpPhase(reason); + + cancelDbMetadataRefresh(dssLock); } void DatabaseShardingState::enterCriticalSectionCommitPhase(OperationContext* opCtx, @@ -220,4 +222,27 @@ void DatabaseShardingState::clearMovePrimarySourceManager(OperationContext* opCt _sourceMgr = nullptr; } +void DatabaseShardingState::setDbMetadataRefreshFuture(SharedSemiFuture future, + CancellationSource cancellationSource, + const DSSLock&) { + invariant(!_dbMetadataRefresh); + _dbMetadataRefresh.emplace(std::move(future), std::move(cancellationSource)); +} + +boost::optional> DatabaseShardingState::getDbMetadataRefreshFuture( + const DSSLock&) const { + return _dbMetadataRefresh ? boost::optional>(_dbMetadataRefresh->future) + : boost::none; +} + +void DatabaseShardingState::resetDbMetadataRefreshFuture(const DSSLock&) { + _dbMetadataRefresh = boost::none; +} + +void DatabaseShardingState::cancelDbMetadataRefresh(const DSSLock&) { + if (_dbMetadataRefresh) { + _dbMetadataRefresh->cancellationSource.cancel(); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h index 4a263405c2b..0431e801df8 100644 --- a/src/mongo/db/s/database_sharding_state.h +++ b/src/mongo/db/s/database_sharding_state.h @@ -73,6 +73,13 @@ public: static std::shared_ptr getSharedForLockFreeReads(OperationContext* opCtx, StringData dbName); + /** + * Returns the name of the database related to the current sharding state. + */ + std::string getDbName() const { + return _dbName; + } + /** * Checks if this shard is the primary shard for the given DB. * @@ -95,6 +102,10 @@ public: return _critSec.getSignal(op); } + auto getCriticalSectionReason(DSSLock&) const { + return _critSec.getReason() ? _critSec.getReason()->toString() : "Unknown"; + } + /** * Returns this shard server's cached dbVersion, if one is cached. * @@ -149,9 +160,43 @@ public: */ void clearMovePrimarySourceManager(OperationContext* opCtx); + /** + * Sets the database metadata refresh future for other threads to wait on it. + */ + void setDbMetadataRefreshFuture(SharedSemiFuture future, + CancellationSource cancellationSource, + const DSSLock&); + + /** + * If there is an ongoing database metadata refresh, returns the future to wait on it, otherwise + * `boost::none`. + */ + boost::optional> getDbMetadataRefreshFuture(const DSSLock&) const; + + /** + * Resets the database metadata refresh future to `boost::none`. + */ + void resetDbMetadataRefreshFuture(const DSSLock&); + + /** + * Cancel any ongoing database metadata refresh. + */ + void cancelDbMetadataRefresh(const DSSLock&); + private: friend DSSLock; + struct DbMetadataRefresh { + DbMetadataRefresh(SharedSemiFuture future, CancellationSource cancellationSource) + : future(std::move(future)), cancellationSource(std::move(cancellationSource)){}; + + // Tracks the ongoing database metadata refresh. + SharedSemiFuture future; + + // Cancellation source to cancel the ongoing database metadata refresh. + CancellationSource cancellationSource; + }; + // Object-wide ResourceMutex to protect changes to the DatabaseShardingState or objects held // within. Lock::ResourceMutex _stateChangeMutex{"DatabaseShardingState"}; @@ -173,6 +218,10 @@ private: // // NOTE: The source manager is not owned by this class. MovePrimarySourceManager* _sourceMgr{nullptr}; + + // Tracks the ongoing database metadata refresh. Possibly keeps a future for other threads to + // wait on it, and a cancellation source to cancel the ongoing database metadata refresh. + boost::optional _dbMetadataRefresh; }; } // namespace mongo diff --git a/src/mongo/db/s/database_sharding_state_test.cpp b/src/mongo/db/s/database_sharding_state_test.cpp index 81d5218dbd4..ca077519ccc 100644 --- a/src/mongo/db/s/database_sharding_state_test.cpp +++ b/src/mongo/db/s/database_sharding_state_test.cpp @@ -168,7 +168,7 @@ TEST_F(DatabaseShardingStateTestWithMockedLoader, ForceDatabaseRefresh) { auto opCtx = operationContext(); _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(newDb); - forceDatabaseRefresh(opCtx, kDbName); + ASSERT_OK(onDbVersionMismatchNoExcept(opCtx, kDbName, boost::none)); boost::optional activeDbVersion = [&] { AutoGetDb autoDb(opCtx, kDbName, MODE_IS); diff --git a/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp index e942dcd139f..96569f5dd9d 100644 --- a/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp @@ -237,7 +237,7 @@ protected: createChunks(env.version.epoch(), env.sourceUuid, env.version.getTimestamp(), "y"), boost::none); - forceDatabaseRefresh(opCtx, kNss.db()); + ASSERT_OK(onDbVersionMismatchNoExcept(opCtx, kNss.db(), boost::none)); forceShardFilteringMetadataRefresh(opCtx, kNss); if (refreshTempNss) diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 84ee9b1e84a..98890b4cb00 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -37,6 +37,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/database_sharding_state.h" +#include "mongo/db/s/forwardable_operation_metadata.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" @@ -55,9 +56,165 @@ MONGO_FAIL_POINT_DEFINE(skipDatabaseVersionMetadataRefresh); MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh); MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread); +/** + * If the critical section associate with the database is entered by another thread (e.g., a move + * primary or a drop database is in progress), it releases the acquired locks and waits for the + * latter to exit. In this case the function returns true, otherwise false. + */ +bool checkAndWaitIfCriticalSectionIsEntered( + OperationContext* opCtx, + DatabaseShardingState* dss, + boost::optional& dbLock, + boost::optional& dssLock) { + invariant(dbLock); + invariant(dssLock); + + if (auto critSect = + dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, *dssLock)) { + LOGV2_DEBUG(6697201, + 2, + "Waiting for exit from the critical section", + "db"_attr = dss->getDbName(), + "reason"_attr = dss->getCriticalSectionReason(*dssLock)); + + dbLock = boost::none; + dssLock = boost::none; + + uassertStatusOK(OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSect)); + + return true; + } + + return false; +} + +/** + * If another thread is refreshing the database metadata, it releases the acquired locks and waits + * for that refresh to complete. In this case the function returns true, otherwise false. + */ +bool checkAndWaitIfAnotherRefreshIsRunning( + OperationContext* opCtx, + DatabaseShardingState* dss, + boost::optional& dbLock, + boost::optional& dssLock) { + invariant(dbLock); + invariant(dssLock); + + if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(*dssLock)) { + LOGV2_DEBUG(6697202, + 2, + "Waiting for completion of another database metadata refresh", + "db"_attr = dss->getDbName()); + + dbLock = boost::none; + dssLock = boost::none; + + try { + refreshVersionFuture->get(opCtx); + } catch (const ExceptionFor&) { + // The refresh was canceled by another thread that entered the critical section. + } + + return true; + } + + return false; +} + +/** + * Unconditionally refreshes the database metadata from the config server. + * + * NOTE: Does network I/O and acquires the database lock in X mode. + */ +Status refreshDbMetadata(OperationContext* opCtx, + const StringData& dbName, + const CancellationToken& cancellationToken) { + invariant(!opCtx->lockState()->isLocked()); + invariant(!opCtx->getClient()->isInDirectClient()); + invariant(ShardingState::get(opCtx)->canAcceptShardedCommands()); + + ScopeGuard resetRefreshFutureOnError([&] { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + + Lock::DBLock dbLock(opCtx, dbName, MODE_IS); + auto* dss = DatabaseShardingState::get(opCtx, dbName); + const auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); + + dss->resetDbMetadataRefreshFuture(dssLock); + }); + + // Force a refresh of the cached database metadata from the config server. + const auto swDbMetadata = + Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, dbName); + + Lock::DBLock dbLock(opCtx, dbName, MODE_X); + auto* dss = DatabaseShardingState::get(opCtx, dbName); + auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); + + if (!cancellationToken.isCanceled()) { + if (swDbMetadata.isOK()) { + // Set the refreshed database metadata. + dss->setDatabaseInfo(opCtx, DatabaseType(*swDbMetadata.getValue()), dssLock); + } else if (swDbMetadata == ErrorCodes::NamespaceNotFound) { + // The database has been dropped, so clear its metadata. + dss->clearDatabaseInfo(opCtx); + } + } + + // Reset the future reference to allow any other thread to refresh the database metadata. + dss->resetDbMetadataRefreshFuture(dssLock); + resetRefreshFutureOnError.dismiss(); + + return swDbMetadata.getStatus(); +} + +SharedSemiFuture asyncDbMetadataRefresh(OperationContext* opCtx, + const StringData& dbName, + const CancellationToken& cancellationToken) { + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + return ExecutorFuture(executor) + .then([=, + serviceCtx = opCtx->getServiceContext(), + forwardableOpMetadata = ForwardableOperationMetadata(opCtx), + dbNameStr = dbName.toString()] { + ThreadClient tc("DbMetadataRefreshThread", serviceCtx); + { + stdx::lock_guard lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + + const auto opCtxHolder = + CancelableOperationContext(tc->makeOperationContext(), cancellationToken, executor); + auto opCtx = opCtxHolder.get(); + + // Forward `users` and `roles` attributes from the original request. + forwardableOpMetadata.setOn(opCtx); + + LOGV2_DEBUG(6697203, 2, "Started database metadata refresh", "db"_attr = dbNameStr); + + return refreshDbMetadata(opCtx, dbNameStr, cancellationToken); + }) + .onCompletion([=, dbNameStr = dbName.toString()](Status status) { + uassert(ErrorCodes::DatabaseMetadataRefreshCanceled, + str::stream() << "Canceled metadata refresh for database " << dbNameStr, + !cancellationToken.isCanceled()); + + if (status.isOK()) { + LOGV2(6697204, "Refreshed database metadata", "db"_attr = dbNameStr); + } else { + LOGV2_ERROR(6697205, + "Failed database metadata refresh", + "db"_attr = dbNameStr, + "error"_attr = redact(status)); + } + }) + .semi() + .share(); +} + void onDbVersionMismatch(OperationContext* opCtx, const StringData dbName, - boost::optional clientDbVersion) { + const boost::optional receivedVersion) { invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->getClient()->isInDirectClient()); invariant(ShardingState::get(opCtx)->canAcceptShardedCommands()); @@ -66,29 +223,86 @@ void onDbVersionMismatch(OperationContext* opCtx, "Can't check version of {} database"_format(dbName), dbName != NamespaceString::kAdminDb && dbName != NamespaceString::kConfigDb); - { - // Take the DBLock directly rather than using AutoGetDb, to prevent a recursive call into - // checkDbVersion(). - // - // TODO: It is not safe here to read the DB version without checking for critical section - // - if (clientDbVersion) { - Lock::DBLock dbLock(opCtx, dbName, MODE_IS); - auto dss = DatabaseShardingState::get(opCtx, dbName); - auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); - const auto serverDbVersion = dss->getDbVersion(opCtx, dssLock); - if (clientDbVersion <= serverDbVersion) { - // The client was stale + LOGV2_DEBUG(6697200, + 2, + "Handle database version mismatch", + "db"_attr = dbName, + "receivedVersion"_attr = receivedVersion); + + while (true) { + boost::optional> dbMetadataRefreshFuture; + + // Set the database metadata refresh future after waiting for another thread to exit from + // the critical section or to complete an ongoing refresh. + { + auto dbLock = boost::make_optional(Lock::DBLock(opCtx, dbName, MODE_IS)); + auto* dss = DatabaseShardingState::get(opCtx, dbName); + + if (receivedVersion) { + auto dssLock = + boost::make_optional(DatabaseShardingState::DSSLock::lockShared(opCtx, dss)); + + if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) || + checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) { + // Waited for another thread to exit from the critical section or to complete an + // ongoing refresh, so reacquire the locks. + continue; + } + + // From now until the end of this block [1] no thread is in the critical section or + // can enter it (would require to X-lock the database) and [2] no metadata refresh + // is in progress or can start (would require to exclusive lock the DSS). + // Therefore, the database version can be accessed safely. + + const auto wantedVersion = dss->getDbVersion(opCtx, *dssLock); + if (receivedVersion <= wantedVersion) { + // No need to refresh the database metadata as the wanted version is newer + // than the one received. + return; + } + } + + if (MONGO_unlikely(skipDatabaseVersionMetadataRefresh.shouldFail())) { return; } + + auto dssLock = + boost::make_optional(DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss)); + + if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) || + checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) { + // Waited for another thread to exit from the critical section or to complete an + // ongoing refresh, so reacquire the locks. + continue; + } + + // From now until the end of this block [1] no thread is in the critical section or can + // enter it (would require to X-lock the database) and [2] this is the only metadata + // refresh in progress (holding the exclusive lock on the DSS). + // Therefore, the future to refresh the database metadata can be set. + + CancellationSource cancellationSource; + CancellationToken cancellationToken = cancellationSource.token(); + dss->setDbMetadataRefreshFuture( + asyncDbMetadataRefresh(opCtx, dbName, cancellationToken), + std::move(cancellationSource), + *dssLock); + dbMetadataRefreshFuture = dss->getDbMetadataRefreshFuture(*dssLock); } - } - if (MONGO_unlikely(skipDatabaseVersionMetadataRefresh.shouldFail())) { - return; - } + // No other metadata refresh for this database can run in parallel. If another thread enters + // the critical section, the ongoing refresh would be interrupted and subsequently + // re-queued. - forceDatabaseRefresh(opCtx, dbName); + try { + dbMetadataRefreshFuture->get(opCtx); + } catch (const ExceptionFor&) { + // The refresh was canceled by another thread that entered the critical section. + continue; + } + + break; + } } // Return true if joins a shard version update/recover/refresh (in that case, all locks are dropped) @@ -475,56 +689,4 @@ Status onDbVersionMismatchNoExcept(OperationContext* opCtx, } } -void forceDatabaseRefresh(OperationContext* opCtx, const StringData dbName) { - invariant(!opCtx->lockState()->isLocked()); - invariant(!opCtx->getClient()->isInDirectClient()); - - auto const shardingState = ShardingState::get(opCtx); - invariant(shardingState->canAcceptShardedCommands()); - - const auto swRefreshedDbInfo = - Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, dbName); - - if (swRefreshedDbInfo == ErrorCodes::NamespaceNotFound) { - // db has been dropped, set the db version to boost::none - Lock::DBLock dbLock(opCtx, dbName, MODE_X); - auto dss = DatabaseShardingState::get(opCtx, dbName); - dss->clearDatabaseInfo(opCtx); - return; - } - - const auto refreshedDbInfo = uassertStatusOK(std::move(swRefreshedDbInfo)); - const auto& refreshedDBVersion = refreshedDbInfo->getVersion(); - - // First, check under a shared lock if another thread already updated the cached version. - // This is a best-effort optimization to make as few threads as possible to convoy on the - // exclusive lock below. - { - // Take the DBLock directly rather than using AutoGetDb, to prevent a recursive call - // into checkDbVersion(). - Lock::DBLock dbLock(opCtx, dbName, MODE_IS); - auto dss = DatabaseShardingState::get(opCtx, dbName); - auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); - - const auto cachedDbVersion = dss->getDbVersion(opCtx, dssLock); - if (cachedDbVersion && *cachedDbVersion >= refreshedDBVersion) { - LOGV2_DEBUG(5369130, - 2, - "Skipping updating cached database info from refreshed version " - "because the one currently cached is more recent", - "db"_attr = dbName, - "refreshedDbVersion"_attr = refreshedDBVersion, - "cachedDbVersion"_attr = *cachedDbVersion); - return; - } - } - - // The cached version is older than the refreshed version; update the cached version. - Lock::DBLock dbLock(opCtx, dbName, MODE_X); - auto dss = DatabaseShardingState::get(opCtx, dbName); - auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); - - dss->setDatabaseInfo(opCtx, DatabaseType(*refreshedDbInfo), dssLock); -} - } // namespace mongo diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h index 724409b4621..0a714ee2b61 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.h +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h @@ -91,6 +91,4 @@ Status onDbVersionMismatchNoExcept(OperationContext* opCtx, StringData dbName, boost::optional clientDbVersion) noexcept; -void forceDatabaseRefresh(OperationContext* opCtx, StringData dbName); - } // namespace mongo diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 228ef5ffbb9..90bf2e74db9 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -390,9 +390,13 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE // TODO SERVER-58223: evaluate whether this is safe or whether acquiring the lock can // block. AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + AutoGetDb autoDb(opCtx, db, MODE_X); auto dss = DatabaseShardingState::get(opCtx, db); dss->clearDatabaseInfo(opCtx); + + const auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); + dss->cancelDbMetadataRefresh(dssLock); } } @@ -487,9 +491,13 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, // TODO SERVER-58223: evaluate whether this is safe or whether acquiring the lock can block. AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + AutoGetDb autoDb(opCtx, deletedDatabase, MODE_X); auto dss = DatabaseShardingState::get(opCtx, deletedDatabase); dss->clearDatabaseInfo(opCtx); + + const auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); + dss->cancelDbMetadataRefresh(dssLock); } if (nss == NamespaceString::kServerConfigurationNamespace) { -- cgit v1.2.1