diff options
Diffstat (limited to 'src/mongo')
18 files changed, 229 insertions, 235 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 906836c0b7c..952f4de9b9a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -17,8 +17,10 @@ env.Library( 'database_sharding_state.cpp', 'global_user_write_block_state.cpp', 'operation_sharding_state.cpp', + 'sharding_api_d_params.idl', 'sharding_migration_critical_section.cpp', 'sharding_state.cpp', + 'sharding_statistics.cpp', 'sharding_write_router.cpp', 'transaction_coordinator_curop.cpp', 'transaction_coordinator_factory.cpp', @@ -122,7 +124,6 @@ env.Library( 'sharding_initialization_mongod.cpp', 'sharding_runtime_d_params.idl', 'sharding_state_recovery.cpp', - 'sharding_statistics.cpp', 'split_chunk.cpp', 'split_vector.cpp', 'start_chunk_clone_request.cpp', diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index 8d4fba61de8..1f40b068e86 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -356,22 +356,21 @@ CollectionShardingRuntime::_getMetadataWithVersionCheckAt( const auto& currentMetadata = optCurrentMetadata->get(); - auto wantedShardVersion = currentMetadata.getShardVersion(); - { auto criticalSectionSignal = _critSec.getSignal( opCtx->lockState()->isWriteLocked() ? ShardingMigrationCriticalSection::kWrite : ShardingMigrationCriticalSection::kRead); - uassert(StaleConfigInfo(_nss, receivedShardVersion, - wantedShardVersion, + boost::none, ShardingState::get(opCtx)->shardId(), std::move(criticalSectionSignal)), str::stream() << "migration commit in progress for " << _nss.ns(), !criticalSectionSignal); } + auto wantedShardVersion = currentMetadata.getShardVersion(); + if (wantedShardVersion.isWriteCompatibleWith(receivedShardVersion) || ChunkVersion::isIgnoredVersion(receivedShardVersion)) return optCurrentMetadata; diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp index d1d1a3f8746..40b1fcebf25 100644 --- a/src/mongo/db/s/database_sharding_state.cpp +++ b/src/mongo/db/s/database_sharding_state.cpp @@ -179,20 +179,21 @@ void DatabaseShardingState::checkDbVersion(OperationContext* opCtx, DSSLock&) co if (!clientDbVersion) return; - auto criticalSectionSignal = _critSec.getSignal(opCtx->lockState()->isWriteLocked() - ? ShardingMigrationCriticalSection::kWrite - : ShardingMigrationCriticalSection::kRead); - if (criticalSectionSignal) { - OperationShardingState::get(opCtx).setMovePrimaryCriticalSectionSignal( - criticalSectionSignal); - - uasserted(StaleDbRoutingVersion(_dbName, *clientDbVersion, boost::none), - "database critical section active"); - } - uassert(StaleDbRoutingVersion(_dbName, *clientDbVersion, boost::none), - str::stream() << "don't know dbVersion for database " << _dbName, + str::stream() << "sharding status of database " << _dbName + << " is not currently known and needs to be recovered", _optDatabaseInfo); + + { + auto criticalSectionSignal = _critSec.getSignal( + opCtx->lockState()->isWriteLocked() ? ShardingMigrationCriticalSection::kWrite + : ShardingMigrationCriticalSection::kRead); + uassert( + StaleDbRoutingVersion(_dbName, *clientDbVersion, boost::none, criticalSectionSignal), + str::stream() << "movePrimary commit in progress for " << _dbName, + !criticalSectionSignal); + } + const auto& dbVersion = _optDatabaseInfo->getVersion(); uassert(StaleDbRoutingVersion(_dbName, *clientDbVersion, dbVersion), str::stream() << "dbVersion mismatch for database " << _dbName, diff --git a/src/mongo/db/s/database_sharding_state_test.cpp b/src/mongo/db/s/database_sharding_state_test.cpp index f4516bb8d13..81d5218dbd4 100644 --- a/src/mongo/db/s/database_sharding_state_test.cpp +++ b/src/mongo/db/s/database_sharding_state_test.cpp @@ -142,12 +142,10 @@ TEST_F(DatabaseShardingStateTestWithMockedLoader, OnDbVersionMismatch) { return dss->getDbVersion(opCtx, dssLock); }; - boost::optional<DatabaseVersion> activeDbVersion = getActiveDbVersion(); - _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(newDb); - ASSERT_OK(onDbVersionMismatchNoExcept(opCtx, kDbName, newDbVersion, activeDbVersion)); + ASSERT_OK(onDbVersionMismatchNoExcept(opCtx, kDbName, newDbVersion)); - activeDbVersion = getActiveDbVersion(); + auto activeDbVersion = getActiveDbVersion(); ASSERT_TRUE(activeDbVersion); if (expectRefresh) { ASSERT_EQUALS(newDbVersion.getTimestamp(), activeDbVersion->getTimestamp()); diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp index 9b896ad317e..35d66821203 100644 --- a/src/mongo/db/s/drop_database_coordinator.cpp +++ b/src/mongo/db/s/drop_database_coordinator.cpp @@ -247,10 +247,10 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( const auto db = catalogClient->getDatabase( opCtx, _dbName, repl::ReadConcernLevel::kMajorityReadConcern); if (_doc.getDatabaseVersion()->getUuid() != db.getVersion().getUuid()) { - return; // skip to _flushDatabaseCacheUpdates + return; // skip to FlushDatabaseCacheUpdates } } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { - return; // skip to _flushDatabaseCacheUpdates + return; // skip to FlushDatabaseCacheUpdates } } @@ -349,8 +349,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( std::remove(participants.begin(), participants.end(), primaryShardId), participants.end()); // Send _flushDatabaseCacheUpdates to all shards - auto flushDbCacheUpdatesCmd = - _flushDatabaseCacheUpdatesWithWriteConcern(_dbName.toString()); + FlushDatabaseCacheUpdatesWithWriteConcern flushDbCacheUpdatesCmd( + _dbName.toString()); flushDbCacheUpdatesCmd.setSyncFromConfig(true); flushDbCacheUpdatesCmd.setDbName(_dbName); diff --git a/src/mongo/db/s/flush_database_cache_updates_command.cpp b/src/mongo/db/s/flush_database_cache_updates_command.cpp index a80192e7dd3..de95a293333 100644 --- a/src/mongo/db/s/flush_database_cache_updates_command.cpp +++ b/src/mongo/db/s/flush_database_cache_updates_command.cpp @@ -117,7 +117,7 @@ public: "Can't call _flushDatabaseCacheUpdates if in read-only mode", !storageGlobalParams.readOnly); - auto& oss = OperationShardingState::get(opCtx); + boost::optional<SharedSemiFuture<void>> criticalSectionSignal; { AutoGetDb autoDb(opCtx, _dbName(), MODE_IS); @@ -129,14 +129,12 @@ public: // consistency guarantee. const auto dss = DatabaseShardingState::get(opCtx, _dbName()); auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); - - if (auto criticalSectionSignal = dss->getCriticalSectionSignal( - ShardingMigrationCriticalSection::kRead, dssLock)) { - oss.setMigrationCriticalSectionSignal(criticalSectionSignal); - } + criticalSectionSignal = + dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, dssLock); } - oss.waitForMigrationCriticalSectionSignal(opCtx); + if (criticalSectionSignal) + criticalSectionSignal->get(opCtx); if (Base::request().getSyncFromConfig()) { LOGV2_DEBUG(21981, @@ -144,7 +142,7 @@ public: "Forcing remote routing table refresh for {db}", "Forcing remote routing table refresh", "db"_attr = _dbName()); - forceDatabaseRefresh(opCtx, _dbName()); + uassertStatusOK(onDbVersionMismatchNoExcept(opCtx, _dbName(), boost::none)); } CatalogCacheLoader::get(opCtx).waitForDatabaseFlush(opCtx, _dbName()); @@ -162,21 +160,23 @@ public: class FlushDatabaseCacheUpdatesCmd final : public FlushDatabaseCacheUpdatesCmdBase<FlushDatabaseCacheUpdatesCmd> { public: - using Request = _flushDatabaseCacheUpdates; + using Request = FlushDatabaseCacheUpdates; static bool supportsWriteConcern() { return false; } + } _flushDatabaseCacheUpdates; class FlushDatabaseCacheUpdatesWithWriteConcernCmd final : public FlushDatabaseCacheUpdatesCmdBase<FlushDatabaseCacheUpdatesWithWriteConcernCmd> { public: - using Request = _flushDatabaseCacheUpdatesWithWriteConcern; + using Request = FlushDatabaseCacheUpdatesWithWriteConcern; static bool supportsWriteConcern() { return true; } + } _flushDatabaseCacheUpdatesWithWriteConcern; } // namespace diff --git a/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp b/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp index a387f7fae82..943b78ece6d 100644 --- a/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp +++ b/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp @@ -112,7 +112,7 @@ public: << " if in read-only mode", !storageGlobalParams.readOnly); - auto& oss = OperationShardingState::get(opCtx); + boost::optional<SharedSemiFuture<void>> criticalSectionSignal; { AutoGetCollection autoColl(opCtx, ns(), MODE_IS); @@ -123,14 +123,12 @@ public: // propagated back to this shard. This ensures the read your own writes causal // consistency guarantee. auto const csr = CollectionShardingRuntime::get(opCtx, ns()); - auto criticalSectionSignal = + criticalSectionSignal = csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite); - if (criticalSectionSignal) { - oss.setMigrationCriticalSectionSignal(criticalSectionSignal); - } } - oss.waitForMigrationCriticalSectionSignal(opCtx); + if (criticalSectionSignal) + criticalSectionSignal->get(opCtx); if (Base::request().getSyncFromConfig()) { LOGV2_DEBUG(21982, diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 74d71ef6594..256924b4665 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -29,7 +29,7 @@ #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/logv2/log_debug.h" +#include "mongo/db/s/sharding_api_d_params_gen.h" namespace mongo { namespace { @@ -37,12 +37,6 @@ namespace { const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration = OperationContext::declareDecoration<OperationShardingState>(); -// Max time to wait for the migration critical section to complete -const Milliseconds kMaxWaitForMigrationCriticalSection = Minutes(5); - -// Max time to wait for the movePrimary critical section to complete -const Milliseconds kMaxWaitForMovePrimaryCriticalSection = Minutes(5); - } // namespace OperationShardingState::OperationShardingState() = default; @@ -121,54 +115,36 @@ boost::optional<DatabaseVersion> OperationShardingState::getDbVersion(StringData return boost::none; } -bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* opCtx) { +Status OperationShardingState::waitForCriticalSectionToComplete( + OperationContext* opCtx, SharedSemiFuture<void> critSecSignal) noexcept { // Must not block while holding a lock invariant(!opCtx->lockState()->isLocked()); - if (_migrationCriticalSectionSignal) { - auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + - std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMigrationCriticalSection); - - opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { - _migrationCriticalSectionSignal->wait(opCtx); - }); - - _migrationCriticalSectionSignal = boost::none; - return true; - } - - return false; -} - -void OperationShardingState::setMigrationCriticalSectionSignal( - boost::optional<SharedSemiFuture<void>> critSecSignal) { - invariant(critSecSignal); - _migrationCriticalSectionSignal = std::move(critSecSignal); -} - -bool OperationShardingState::waitForMovePrimaryCriticalSectionSignal(OperationContext* opCtx) { - // Must not block while holding a lock - invariant(!opCtx->lockState()->isLocked()); - - if (_movePrimaryCriticalSectionSignal) { - auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + - std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMovePrimaryCriticalSection); - - opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { - _movePrimaryCriticalSectionSignal->wait(opCtx); - }); - - _movePrimaryCriticalSectionSignal = boost::none; - return true; + // If we are in a transaction, limit the time we can wait behind the critical section. This is + // needed in order to prevent distributed deadlocks in situations where a DDL operation needs to + // acquire the critical section on several shards. + // + // In such cases, shard running a transaction could be waiting for the critical section to be + // exited, while on another shard the transaction has already executed some statement and + // stashed locks which prevent the critical section from being acquired in that node. Limiting + // the wait behind the critical section will ensure that the transaction will eventually get + // aborted. + if (opCtx->inMultiDocumentTransaction()) { + try { + opCtx->runWithDeadline( + opCtx->getServiceContext()->getFastClockSource()->now() + + Milliseconds(metadataRefreshInTransactionMaxWaitBehindCritSecMS.load()), + ErrorCodes::ExceededTimeLimit, + [&] { critSecSignal.wait(opCtx); }); + return Status::OK(); + } catch (const DBException& ex) { + // This is a best-effort attempt to wait for the critical section to complete, so no + // need to handle any exceptions + return ex.toStatus(); + } + } else { + return critSecSignal.waitNoThrow(opCtx); } - - return false; -} - -void OperationShardingState::setMovePrimaryCriticalSectionSignal( - boost::optional<SharedSemiFuture<void>> critSecSignal) { - invariant(critSecSignal); - _movePrimaryCriticalSectionSignal = std::move(critSecSignal); } void OperationShardingState::setShardingOperationFailedStatus(const Status& status) { diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index 63c726fcc0b..7a9a20012bb 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -146,37 +146,15 @@ public: boost::optional<DatabaseVersion> getDbVersion(StringData dbName) const; /** - * This call is a no op if there isn't a currently active migration critical section. Otherwise - * it will wait for the critical section to complete up to the remaining operation time. + * This method implements a best-effort attempt to wait for the critical section to complete + * before returning to the router at the previous step in order to prevent it from busy spinning + * while the critical section is in progress. * - * Returns true if the call actually waited because of migration critical section (regardless if - * whether it timed out or not), false if there was no active migration critical section. + * All waits for migration critical section should go through this code path, because it also + * accounts for transactions and locking. */ - bool waitForMigrationCriticalSectionSignal(OperationContext* opCtx); - - /** - * Setting this value indicates that when the version check failed, there was an active - * migration for the namespace and that it would be prudent to wait for the critical section to - * complete before retrying so the router doesn't make wasteful requests. - */ - void setMigrationCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal); - - /** - * This call is a no op if there isn't a currently active movePrimary critical section. - * Otherwise it will wait for the critical section to complete up to the remaining operation - * time. - * - * Returns true if the call actually waited because of movePrimary critical section (regardless - * whether it timed out or not), false if there was no active movePrimary critical section. - */ - bool waitForMovePrimaryCriticalSectionSignal(OperationContext* opCtx); - - /** - * Setting this value indicates that when the version check failed, there was an active - * movePrimary for the namespace and that it would be prudent to wait for the critical section - * to complete before retrying so the router doesn't make wasteful requests. - */ - void setMovePrimaryCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal); + static Status waitForCriticalSectionToComplete(OperationContext* opCtx, + SharedSemiFuture<void> critSecSignal) noexcept; /** * Stores the failed status in _shardingOperationFailedStatus. @@ -221,15 +199,6 @@ private: }; StringMap<DatabaseVersionTracker> _databaseVersions; - // This value will only be non-null if version check during the operation execution failed due - // to stale version and there was a migration for that namespace, which was in critical section. - boost::optional<SharedSemiFuture<void>> _migrationCriticalSectionSignal; - - // This value will only be non-null if version check during the operation execution failed due - // to stale version and there was a movePrimary for that namespace, which was in critical - // section. - boost::optional<SharedSemiFuture<void>> _movePrimaryCriticalSectionSignal; - // This value can only be set when a rerouting exception occurs during a write operation, and // must be handled before this object gets destructed. boost::optional<Status> _shardingOperationFailedStatus; diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp index 0c88cc6efb0..d7253272b62 100644 --- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp +++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp @@ -29,19 +29,17 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding -#include "mongo/platform/basic.h" - #include "mongo/db/s/scoped_operation_completion_sharding_actions.h" #include "mongo/db/curop.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_statistics.h" #include "mongo/logv2/log.h" #include "mongo/s/stale_exception.h" namespace mongo { - namespace { const auto shardingOperationCompletionActionsRegistered = @@ -71,11 +69,20 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi } if (auto staleInfo = status->extraInfo<StaleConfigInfo>()) { + ShardingStatistics::get(_opCtx).countStaleConfigErrors.addAndFetch(1); + if (staleInfo->getCriticalSectionSignal()) { - // Set migration critical section on operation sharding state: operation will wait for - // the migration to finish before returning. - auto& oss = OperationShardingState::get(_opCtx); - oss.setMigrationCriticalSectionSignal(staleInfo->getCriticalSectionSignal()); + // The shard is in a critical section + OperationShardingState::waitForCriticalSectionToComplete( + _opCtx, *staleInfo->getCriticalSectionSignal()) + .ignore(); + return; + } + + if (staleInfo->getVersionWanted() && + staleInfo->getVersionReceived().isOlderThan(*staleInfo->getVersionWanted())) { + // Shard is recovered and the router is staler than the shard + return; } auto handleMismatchStatus = onShardVersionMismatchNoExcept( @@ -87,10 +94,22 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi "Failed to handle stale version exception as part of the current operation", "error"_attr = redact(handleMismatchStatus)); } else if (auto staleInfo = status->extraInfo<StaleDbRoutingVersion>()) { - auto handleMismatchStatus = onDbVersionMismatchNoExcept(_opCtx, - staleInfo->getDb(), - staleInfo->getVersionReceived(), - staleInfo->getVersionWanted()); + if (staleInfo->getCriticalSectionSignal()) { + // The shard is in a critical section + OperationShardingState::waitForCriticalSectionToComplete( + _opCtx, *staleInfo->getCriticalSectionSignal()) + .ignore(); + return; + } + + if (staleInfo->getVersionWanted() && + staleInfo->getVersionReceived() < staleInfo->getVersionWanted()) { + // Shard is recovered and the router is staler than the shard + return; + } + + auto handleMismatchStatus = onDbVersionMismatchNoExcept( + _opCtx, staleInfo->getDb(), staleInfo->getVersionReceived()); if (!handleMismatchStatus.isOK()) LOGV2(22054, "Failed to handle database version exception as part of the current operation: " diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 0cfd766ebdb..0a7cecfe632 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding -#include "mongo/platform/basic.h" - #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/catalog/database_holder.h" @@ -43,9 +41,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" -#include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/sharding_statistics.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/database_version.h" @@ -61,23 +57,29 @@ MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread); void onDbVersionMismatch(OperationContext* opCtx, const StringData dbName, - const DatabaseVersion& clientDbVersion, - const boost::optional<DatabaseVersion>& serverDbVersion) { + boost::optional<DatabaseVersion> clientDbVersion) { invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->getClient()->isInDirectClient()); - invariant(ShardingState::get(opCtx)->canAcceptShardedCommands()); - if (clientDbVersion <= serverDbVersion) { - // The client was stale; do not trigger server-side refresh. - return; + { + // Take the DBLock directly rather than using AutoGetDb, to prevent a recursive call into + // checkDbVersion(). + // + // TODO: It is not safe here to read the DB version without checking for critical section + // + if (clientDbVersion) { + Lock::DBLock dbLock(opCtx, dbName, MODE_IS); + auto dss = DatabaseShardingState::get(opCtx, dbName); + auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); + const auto serverDbVersion = dss->getDbVersion(opCtx, dssLock); + if (clientDbVersion <= serverDbVersion) { + // The client was stale + return; + } + } } - // Ensure any ongoing movePrimary's have completed before trying to do the refresh. This wait is - // just an optimization so that mongos does not exhaust its maximum number of - // StaleDatabaseVersion retry attempts while the movePrimary is being committed. - OperationShardingState::get(opCtx).waitForMovePrimaryCriticalSectionSignal(opCtx); - if (MONGO_unlikely(skipDatabaseVersionMetadataRefresh.shouldFail())) { return; } @@ -90,8 +92,7 @@ bool joinShardVersionOperation(OperationContext* opCtx, CollectionShardingRuntime* csr, boost::optional<Lock::DBLock>* dbLock, boost::optional<Lock::CollectionLock>* collLock, - boost::optional<CollectionShardingRuntime::CSRLock>* csrLock, - Milliseconds criticalSectionMaxWait) { + boost::optional<CollectionShardingRuntime::CSRLock>* csrLock) { invariant(collLock->has_value()); invariant(csrLock->has_value()); @@ -108,11 +109,8 @@ bool joinShardVersionOperation(OperationContext* opCtx, dbLock->reset(); if (critSecSignal) { - const auto deadline = criticalSectionMaxWait == Milliseconds::max() - ? Date_t::max() - : opCtx->getServiceContext()->getFastClockSource()->now() + criticalSectionMaxWait; - opCtx->runWithDeadline( - deadline, ErrorCodes::ExceededTimeLimit, [&] { critSecSignal->get(opCtx); }); + uassertStatusOK( + OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal)); } else { try { inRecoverOrRefresh->get(opCtx); @@ -128,8 +126,6 @@ bool joinShardVersionOperation(OperationContext* opCtx, return false; } -} // namespace - SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext, const NamespaceString nss, bool runRecover, @@ -235,6 +231,8 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext .share(); } +} // namespace + void onShardVersionMismatch(OperationContext* opCtx, const NamespaceString& nss, boost::optional<ChunkVersion> shardVersionReceived) { @@ -246,8 +244,6 @@ void onShardVersionMismatch(OperationContext* opCtx, return; } - ShardingStatistics::get(opCtx).countStaleConfigErrors.addAndFetch(1); - LOGV2_DEBUG(22061, 2, "Metadata refresh requested for {namespace} at shard version " @@ -256,17 +252,6 @@ void onShardVersionMismatch(OperationContext* opCtx, "namespace"_attr = nss, "shardVersionReceived"_attr = shardVersionReceived); - // If we are in a transaction, limit the time we can wait behind the critical section. This - // is needed in order to prevent distributed deadlocks in situations where a DDL operation - // needs to acquire the critical section on several shards. In that case, a shard running a - // transaction could be waiting for the critical section to be exited, while on another - // shard the transaction has already executed some statement and stashed locks which prevent - // the critical section from being acquired in that node. Limiting the wait behind the - // critical section will ensure that the transaction will eventually get aborted. - const auto criticalSectionMaxWait = opCtx->inMultiDocumentTransaction() - ? Milliseconds(metadataRefreshInTransactionMaxWaitBehindCritSecMS.load()) - : Milliseconds::max(); - while (true) { boost::optional<SharedSemiFuture<void>> inRecoverOrRefresh; { @@ -279,8 +264,7 @@ void onShardVersionMismatch(OperationContext* opCtx, boost::optional<CollectionShardingRuntime::CSRLock> csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - if (joinShardVersionOperation( - opCtx, csr, &dbLock, &collLock, &csrLock, criticalSectionMaxWait)) { + if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { continue; } @@ -302,8 +286,7 @@ void onShardVersionMismatch(OperationContext* opCtx, // If there is no ongoing shard version operation, initialize the RecoverRefreshThread // thread and associate it to the CSR. - if (!joinShardVersionOperation( - opCtx, csr, &dbLock, &collLock, &csrLock, criticalSectionMaxWait)) { + if (!joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { // If the shard doesn't yet know its filtering metadata, recovery needs to be run const bool runRecover = metadata ? false : true; CancellationSource cancellationSource; @@ -472,13 +455,11 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, return newShardVersion; } -Status onDbVersionMismatchNoExcept( - OperationContext* opCtx, - const StringData dbName, - const DatabaseVersion& clientDbVersion, - const boost::optional<DatabaseVersion>& serverDbVersion) noexcept { +Status onDbVersionMismatchNoExcept(OperationContext* opCtx, + const StringData dbName, + boost::optional<DatabaseVersion> clientDbVersion) noexcept { try { - onDbVersionMismatch(opCtx, dbName, clientDbVersion, serverDbVersion); + onDbVersionMismatch(opCtx, dbName, clientDbVersion); return Status::OK(); } catch (const DBException& ex) { LOGV2(22065, diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h index 84fd1dc7c0e..724409b4621 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.h +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h @@ -64,14 +64,6 @@ void onShardVersionMismatch(OperationContext* opCtx, boost::optional<ChunkVersion> shardVersionReceived); /** - * Starts the RecoverRefreshThread to set the current metadata on the CSR. This function will also - * recover any ongoing migrations if runRecover is true. - */ -SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext, - NamespaceString nss, - bool runRecover); - -/** * Unconditionally get the shard's filtering metadata from the config server on the calling thread. * Returns the metadata if the nss is sharded, otherwise default unsharded metadata. * @@ -95,11 +87,9 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, * Invalidates the cached database version, schedules a refresh of the database info, waits for the * refresh to complete, and updates the cached database version. */ -Status onDbVersionMismatchNoExcept( - OperationContext* opCtx, - StringData dbName, - const DatabaseVersion& clientDbVersion, - const boost::optional<DatabaseVersion>& serverDbVersion) noexcept; +Status onDbVersionMismatchNoExcept(OperationContext* opCtx, + StringData dbName, + boost::optional<DatabaseVersion> clientDbVersion) noexcept; void forceDatabaseRefresh(OperationContext* opCtx, StringData dbName); diff --git a/src/mongo/db/s/sharding_api_d_params.idl b/src/mongo/db/s/sharding_api_d_params.idl new file mode 100644 index 00000000000..507687becb1 --- /dev/null +++ b/src/mongo/db/s/sharding_api_d_params.idl @@ -0,0 +1,41 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: mongo + +server_parameters: + metadataRefreshInTransactionMaxWaitBehindCritSecMS: + description: >- + Maximum time in milliseconds to wait behind the critical section when refreshing the + filtering metadata within a transaction. + set_at: [startup, runtime] + cpp_vartype: AtomicWord<int> + cpp_varname: metadataRefreshInTransactionMaxWaitBehindCritSecMS + validator: + gte: 0 + default: 500 diff --git a/src/mongo/db/s/sharding_runtime_d_params.idl b/src/mongo/db/s/sharding_runtime_d_params.idl index 70f9466365c..23d298f0d6b 100644 --- a/src/mongo/db/s/sharding_runtime_d_params.idl +++ b/src/mongo/db/s/sharding_runtime_d_params.idl @@ -142,14 +142,3 @@ server_parameters: cpp_vartype: int cpp_varname: shardedIndexConsistencyCheckIntervalMS default: 600000 - - metadataRefreshInTransactionMaxWaitBehindCritSecMS: - description: >- - Maximum time in milliseconds to wait behind the critical section when refreshing the - filtering metadata within a transaction. - set_at: [startup, runtime] - cpp_vartype: AtomicWord<int> - cpp_varname: metadataRefreshInTransactionMaxWaitBehindCritSecMS - validator: - gte: 0 - default: 500 diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 2cfb3f2286e..7dd97367749 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand -#include "mongo/platform/basic.h" - #include "mongo/db/service_entry_point_common.h" #include <fmt/format.h> @@ -80,6 +78,7 @@ #include "mongo/db/request_execution_context.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_statistics.h" #include "mongo/db/s/transaction_coordinator_factory.h" #include "mongo/db/service_entry_point_common.h" #include "mongo/db/session_catalog_mongod.h" @@ -1654,14 +1653,28 @@ Future<void> ExecCommandDatabase::_commandExec() { !_refreshedDatabase) { auto sce = s.extraInfo<StaleDbRoutingVersion>(); invariant(sce); - if (sce->getVersionWanted() < sce->getVersionReceived()) { - const auto refreshed = _execContext->behaviors->refreshDatabase(opCtx, *sce); - if (refreshed) { - _refreshedDatabase = true; - if (!opCtx->isContinuingMultiDocumentTransaction()) { - _resetLockerStateAfterShardingUpdate(opCtx); - return _commandExec(); - } + + if (sce->getCriticalSectionSignal()) { + // The shard is in a critical section, so we cannot retry locally + OperationShardingState::waitForCriticalSectionToComplete( + opCtx, *sce->getCriticalSectionSignal()) + .ignore(); + return s; + } + + if (sce->getVersionWanted() && + sce->getVersionReceived() < sce->getVersionWanted()) { + // The shard is recovered and the router is staler than the shard, so we cannot + // retry locally + return s; + } + + const auto refreshed = _execContext->behaviors->refreshDatabase(opCtx, *sce); + if (refreshed) { + _refreshedDatabase = true; + if (!opCtx->isContinuingMultiDocumentTransaction()) { + _resetLockerStateAfterShardingUpdate(opCtx); + return _commandExec(); } } } @@ -1670,15 +1683,23 @@ Future<void> ExecCommandDatabase::_commandExec() { }) .onErrorCategory<ErrorCategory::StaleShardVersionError>([this](Status s) -> Future<void> { auto opCtx = _execContext->getOpCtx(); + ShardingStatistics::get(opCtx).countStaleConfigErrors.addAndFetch(1); if (!opCtx->getClient()->isInDirectClient() && serverGlobalParams.clusterRole != ClusterRole::ConfigServer && !_refreshedCollection) { if (auto sce = s.extraInfo<StaleConfigInfo>()) { + if (sce->getCriticalSectionSignal()) { + // The shard is in a critical section, so we cannot retry locally + OperationShardingState::waitForCriticalSectionToComplete( + opCtx, *sce->getCriticalSectionSignal()) + .ignore(); + return s; + } + if (sce->getVersionWanted() && - sce->getVersionReceived().isOlderThan(sce->getVersionWanted().get())) { - // If the local shard version is newer than the received one return the - // error to the router without retrying locally. + sce->getVersionReceived().isOlderThan(*sce->getVersionWanted())) { + // Shard is recovered and the router is staler than the shard return s; } diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 7d96332a67e..991a099b18b 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -237,9 +237,7 @@ public: bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const noexcept override { - return onDbVersionMismatchNoExcept( - opCtx, se.getDb(), se.getVersionReceived(), se.getVersionWanted()) - .isOK(); + return onDbVersionMismatchNoExcept(opCtx, se.getDb(), se.getVersionReceived()).isOK(); } bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const diff --git a/src/mongo/s/request_types/flush_database_cache_updates.idl b/src/mongo/s/request_types/flush_database_cache_updates.idl index a4d331ff788..fb77d853623 100644 --- a/src/mongo/s/request_types/flush_database_cache_updates.idl +++ b/src/mongo/s/request_types/flush_database_cache_updates.idl @@ -38,6 +38,7 @@ commands: _flushDatabaseCacheUpdates: description: "An internal command to wait for the last routing table cache refresh for a particular database to be persisted to disk" command_name: _flushDatabaseCacheUpdates + cpp_name: FlushDatabaseCacheUpdates strict: true namespace: type api_version: "" @@ -51,6 +52,7 @@ commands: _flushDatabaseCacheUpdatesWithWriteConcern: description: "The same behavior as _flushDatabaseCacheUpdates but accepts writeConcern" command_name: _flushDatabaseCacheUpdatesWithWriteConcern + cpp_name: FlushDatabaseCacheUpdatesWithWriteConcern strict: true namespace: type api_version: "" diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index 4aa22837a3a..70d7a0dc42e 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -50,7 +50,7 @@ public: _received(received), _wanted(wanted), _shardId(shardId), - _criticalSectionSignal(criticalSectionSignal) {} + _criticalSectionSignal(std::move(criticalSectionSignal)) {} const auto& getNss() const { return _nss; @@ -117,8 +117,7 @@ protected: boost::optional<ChunkVersion> _wanted; ShardId _shardId; - // This signal does not get serialized and therefore does not get propagated - // to the router. + // This signal does not get serialized and therefore does not get propagated to the router boost::optional<SharedSemiFuture<void>> _criticalSectionSignal; }; @@ -154,10 +153,15 @@ class StaleDbRoutingVersion final : public ErrorExtraInfo { public: static constexpr auto code = ErrorCodes::StaleDbVersion; - StaleDbRoutingVersion(std::string db, - DatabaseVersion received, - boost::optional<DatabaseVersion> wanted) - : _db(std::move(db)), _received(received), _wanted(wanted) {} + StaleDbRoutingVersion( + std::string db, + DatabaseVersion received, + boost::optional<DatabaseVersion> wanted, + boost::optional<SharedSemiFuture<void>> criticalSectionSignal = boost::none) + : _db(std::move(db)), + _received(received), + _wanted(wanted), + _criticalSectionSignal(std::move(criticalSectionSignal)) {} const auto& getDb() const { return _db; @@ -171,6 +175,10 @@ public: return _wanted; } + auto getCriticalSectionSignal() const { + return _criticalSectionSignal; + } + void serialize(BSONObjBuilder* bob) const override; static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&); static StaleDbRoutingVersion parseFromCommandError(const BSONObj& commandError); @@ -179,6 +187,9 @@ private: std::string _db; DatabaseVersion _received; boost::optional<DatabaseVersion> _wanted; + + // This signal does not get serialized and therefore does not get propagated to the router + boost::optional<SharedSemiFuture<void>> _criticalSectionSignal; }; } // namespace mongo |