summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2022-03-10 09:14:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-10 10:00:45 +0000
commitd03d37af8b5ef60d828579e537ae0aab1f290719 (patch)
tree7d1ca1cf2e4db324be8a5ee2655c12f76e253023 /src
parentdb137621456c0db1e8e2362c525e347433a0f43f (diff)
downloadmongo-d03d37af8b5ef60d828579e537ae0aab1f290719.tar.gz
SERVER-63327 Make the Stale* exception handling uniform in all code paths
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp7
-rw-r--r--src/mongo/db/s/database_sharding_state.cpp25
-rw-r--r--src/mongo/db/s/database_sharding_state_test.cpp6
-rw-r--r--src/mongo/db/s/drop_database_coordinator.cpp8
-rw-r--r--src/mongo/db/s/flush_database_cache_updates_command.cpp20
-rw-r--r--src/mongo/db/s/flush_routing_table_cache_updates_command.cpp10
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp78
-rw-r--r--src/mongo/db/s/operation_sharding_state.h45
-rw-r--r--src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp41
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp75
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.h16
-rw-r--r--src/mongo/db/s/sharding_api_d_params.idl41
-rw-r--r--src/mongo/db/s/sharding_runtime_d_params.idl11
-rw-r--r--src/mongo/db/service_entry_point_common.cpp47
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp4
-rw-r--r--src/mongo/s/request_types/flush_database_cache_updates.idl2
-rw-r--r--src/mongo/s/stale_exception.h25
18 files changed, 229 insertions, 235 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 906836c0b7c..952f4de9b9a 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -17,8 +17,10 @@ env.Library(
'database_sharding_state.cpp',
'global_user_write_block_state.cpp',
'operation_sharding_state.cpp',
+ 'sharding_api_d_params.idl',
'sharding_migration_critical_section.cpp',
'sharding_state.cpp',
+ 'sharding_statistics.cpp',
'sharding_write_router.cpp',
'transaction_coordinator_curop.cpp',
'transaction_coordinator_factory.cpp',
@@ -122,7 +124,6 @@ env.Library(
'sharding_initialization_mongod.cpp',
'sharding_runtime_d_params.idl',
'sharding_state_recovery.cpp',
- 'sharding_statistics.cpp',
'split_chunk.cpp',
'split_vector.cpp',
'start_chunk_clone_request.cpp',
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index 8d4fba61de8..1f40b068e86 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -356,22 +356,21 @@ CollectionShardingRuntime::_getMetadataWithVersionCheckAt(
const auto& currentMetadata = optCurrentMetadata->get();
- auto wantedShardVersion = currentMetadata.getShardVersion();
-
{
auto criticalSectionSignal = _critSec.getSignal(
opCtx->lockState()->isWriteLocked() ? ShardingMigrationCriticalSection::kWrite
: ShardingMigrationCriticalSection::kRead);
-
uassert(StaleConfigInfo(_nss,
receivedShardVersion,
- wantedShardVersion,
+ boost::none,
ShardingState::get(opCtx)->shardId(),
std::move(criticalSectionSignal)),
str::stream() << "migration commit in progress for " << _nss.ns(),
!criticalSectionSignal);
}
+ auto wantedShardVersion = currentMetadata.getShardVersion();
+
if (wantedShardVersion.isWriteCompatibleWith(receivedShardVersion) ||
ChunkVersion::isIgnoredVersion(receivedShardVersion))
return optCurrentMetadata;
diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp
index d1d1a3f8746..40b1fcebf25 100644
--- a/src/mongo/db/s/database_sharding_state.cpp
+++ b/src/mongo/db/s/database_sharding_state.cpp
@@ -179,20 +179,21 @@ void DatabaseShardingState::checkDbVersion(OperationContext* opCtx, DSSLock&) co
if (!clientDbVersion)
return;
- auto criticalSectionSignal = _critSec.getSignal(opCtx->lockState()->isWriteLocked()
- ? ShardingMigrationCriticalSection::kWrite
- : ShardingMigrationCriticalSection::kRead);
- if (criticalSectionSignal) {
- OperationShardingState::get(opCtx).setMovePrimaryCriticalSectionSignal(
- criticalSectionSignal);
-
- uasserted(StaleDbRoutingVersion(_dbName, *clientDbVersion, boost::none),
- "database critical section active");
- }
-
uassert(StaleDbRoutingVersion(_dbName, *clientDbVersion, boost::none),
- str::stream() << "don't know dbVersion for database " << _dbName,
+ str::stream() << "sharding status of database " << _dbName
+ << " is not currently known and needs to be recovered",
_optDatabaseInfo);
+
+ {
+ auto criticalSectionSignal = _critSec.getSignal(
+ opCtx->lockState()->isWriteLocked() ? ShardingMigrationCriticalSection::kWrite
+ : ShardingMigrationCriticalSection::kRead);
+ uassert(
+ StaleDbRoutingVersion(_dbName, *clientDbVersion, boost::none, criticalSectionSignal),
+ str::stream() << "movePrimary commit in progress for " << _dbName,
+ !criticalSectionSignal);
+ }
+
const auto& dbVersion = _optDatabaseInfo->getVersion();
uassert(StaleDbRoutingVersion(_dbName, *clientDbVersion, dbVersion),
str::stream() << "dbVersion mismatch for database " << _dbName,
diff --git a/src/mongo/db/s/database_sharding_state_test.cpp b/src/mongo/db/s/database_sharding_state_test.cpp
index f4516bb8d13..81d5218dbd4 100644
--- a/src/mongo/db/s/database_sharding_state_test.cpp
+++ b/src/mongo/db/s/database_sharding_state_test.cpp
@@ -142,12 +142,10 @@ TEST_F(DatabaseShardingStateTestWithMockedLoader, OnDbVersionMismatch) {
return dss->getDbVersion(opCtx, dssLock);
};
- boost::optional<DatabaseVersion> activeDbVersion = getActiveDbVersion();
-
_mockCatalogCacheLoader->setDatabaseRefreshReturnValue(newDb);
- ASSERT_OK(onDbVersionMismatchNoExcept(opCtx, kDbName, newDbVersion, activeDbVersion));
+ ASSERT_OK(onDbVersionMismatchNoExcept(opCtx, kDbName, newDbVersion));
- activeDbVersion = getActiveDbVersion();
+ auto activeDbVersion = getActiveDbVersion();
ASSERT_TRUE(activeDbVersion);
if (expectRefresh) {
ASSERT_EQUALS(newDbVersion.getTimestamp(), activeDbVersion->getTimestamp());
diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp
index 9b896ad317e..35d66821203 100644
--- a/src/mongo/db/s/drop_database_coordinator.cpp
+++ b/src/mongo/db/s/drop_database_coordinator.cpp
@@ -247,10 +247,10 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
const auto db = catalogClient->getDatabase(
opCtx, _dbName, repl::ReadConcernLevel::kMajorityReadConcern);
if (_doc.getDatabaseVersion()->getUuid() != db.getVersion().getUuid()) {
- return; // skip to _flushDatabaseCacheUpdates
+ return; // skip to FlushDatabaseCacheUpdates
}
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
- return; // skip to _flushDatabaseCacheUpdates
+ return; // skip to FlushDatabaseCacheUpdates
}
}
@@ -349,8 +349,8 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
std::remove(participants.begin(), participants.end(), primaryShardId),
participants.end());
// Send _flushDatabaseCacheUpdates to all shards
- auto flushDbCacheUpdatesCmd =
- _flushDatabaseCacheUpdatesWithWriteConcern(_dbName.toString());
+ FlushDatabaseCacheUpdatesWithWriteConcern flushDbCacheUpdatesCmd(
+ _dbName.toString());
flushDbCacheUpdatesCmd.setSyncFromConfig(true);
flushDbCacheUpdatesCmd.setDbName(_dbName);
diff --git a/src/mongo/db/s/flush_database_cache_updates_command.cpp b/src/mongo/db/s/flush_database_cache_updates_command.cpp
index a80192e7dd3..de95a293333 100644
--- a/src/mongo/db/s/flush_database_cache_updates_command.cpp
+++ b/src/mongo/db/s/flush_database_cache_updates_command.cpp
@@ -117,7 +117,7 @@ public:
"Can't call _flushDatabaseCacheUpdates if in read-only mode",
!storageGlobalParams.readOnly);
- auto& oss = OperationShardingState::get(opCtx);
+ boost::optional<SharedSemiFuture<void>> criticalSectionSignal;
{
AutoGetDb autoDb(opCtx, _dbName(), MODE_IS);
@@ -129,14 +129,12 @@ public:
// consistency guarantee.
const auto dss = DatabaseShardingState::get(opCtx, _dbName());
auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss);
-
- if (auto criticalSectionSignal = dss->getCriticalSectionSignal(
- ShardingMigrationCriticalSection::kRead, dssLock)) {
- oss.setMigrationCriticalSectionSignal(criticalSectionSignal);
- }
+ criticalSectionSignal =
+ dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, dssLock);
}
- oss.waitForMigrationCriticalSectionSignal(opCtx);
+ if (criticalSectionSignal)
+ criticalSectionSignal->get(opCtx);
if (Base::request().getSyncFromConfig()) {
LOGV2_DEBUG(21981,
@@ -144,7 +142,7 @@ public:
"Forcing remote routing table refresh for {db}",
"Forcing remote routing table refresh",
"db"_attr = _dbName());
- forceDatabaseRefresh(opCtx, _dbName());
+ uassertStatusOK(onDbVersionMismatchNoExcept(opCtx, _dbName(), boost::none));
}
CatalogCacheLoader::get(opCtx).waitForDatabaseFlush(opCtx, _dbName());
@@ -162,21 +160,23 @@ public:
class FlushDatabaseCacheUpdatesCmd final
: public FlushDatabaseCacheUpdatesCmdBase<FlushDatabaseCacheUpdatesCmd> {
public:
- using Request = _flushDatabaseCacheUpdates;
+ using Request = FlushDatabaseCacheUpdates;
static bool supportsWriteConcern() {
return false;
}
+
} _flushDatabaseCacheUpdates;
class FlushDatabaseCacheUpdatesWithWriteConcernCmd final
: public FlushDatabaseCacheUpdatesCmdBase<FlushDatabaseCacheUpdatesWithWriteConcernCmd> {
public:
- using Request = _flushDatabaseCacheUpdatesWithWriteConcern;
+ using Request = FlushDatabaseCacheUpdatesWithWriteConcern;
static bool supportsWriteConcern() {
return true;
}
+
} _flushDatabaseCacheUpdatesWithWriteConcern;
} // namespace
diff --git a/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp b/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp
index a387f7fae82..943b78ece6d 100644
--- a/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp
+++ b/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp
@@ -112,7 +112,7 @@ public:
<< " if in read-only mode",
!storageGlobalParams.readOnly);
- auto& oss = OperationShardingState::get(opCtx);
+ boost::optional<SharedSemiFuture<void>> criticalSectionSignal;
{
AutoGetCollection autoColl(opCtx, ns(), MODE_IS);
@@ -123,14 +123,12 @@ public:
// propagated back to this shard. This ensures the read your own writes causal
// consistency guarantee.
auto const csr = CollectionShardingRuntime::get(opCtx, ns());
- auto criticalSectionSignal =
+ criticalSectionSignal =
csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite);
- if (criticalSectionSignal) {
- oss.setMigrationCriticalSectionSignal(criticalSectionSignal);
- }
}
- oss.waitForMigrationCriticalSectionSignal(opCtx);
+ if (criticalSectionSignal)
+ criticalSectionSignal->get(opCtx);
if (Base::request().getSyncFromConfig()) {
LOGV2_DEBUG(21982,
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 74d71ef6594..256924b4665 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -29,7 +29,7 @@
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/logv2/log_debug.h"
+#include "mongo/db/s/sharding_api_d_params_gen.h"
namespace mongo {
namespace {
@@ -37,12 +37,6 @@ namespace {
const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration =
OperationContext::declareDecoration<OperationShardingState>();
-// Max time to wait for the migration critical section to complete
-const Milliseconds kMaxWaitForMigrationCriticalSection = Minutes(5);
-
-// Max time to wait for the movePrimary critical section to complete
-const Milliseconds kMaxWaitForMovePrimaryCriticalSection = Minutes(5);
-
} // namespace
OperationShardingState::OperationShardingState() = default;
@@ -121,54 +115,36 @@ boost::optional<DatabaseVersion> OperationShardingState::getDbVersion(StringData
return boost::none;
}
-bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* opCtx) {
+Status OperationShardingState::waitForCriticalSectionToComplete(
+ OperationContext* opCtx, SharedSemiFuture<void> critSecSignal) noexcept {
// Must not block while holding a lock
invariant(!opCtx->lockState()->isLocked());
- if (_migrationCriticalSectionSignal) {
- auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() +
- std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMigrationCriticalSection);
-
- opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
- _migrationCriticalSectionSignal->wait(opCtx);
- });
-
- _migrationCriticalSectionSignal = boost::none;
- return true;
- }
-
- return false;
-}
-
-void OperationShardingState::setMigrationCriticalSectionSignal(
- boost::optional<SharedSemiFuture<void>> critSecSignal) {
- invariant(critSecSignal);
- _migrationCriticalSectionSignal = std::move(critSecSignal);
-}
-
-bool OperationShardingState::waitForMovePrimaryCriticalSectionSignal(OperationContext* opCtx) {
- // Must not block while holding a lock
- invariant(!opCtx->lockState()->isLocked());
-
- if (_movePrimaryCriticalSectionSignal) {
- auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() +
- std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMovePrimaryCriticalSection);
-
- opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
- _movePrimaryCriticalSectionSignal->wait(opCtx);
- });
-
- _movePrimaryCriticalSectionSignal = boost::none;
- return true;
+ // If we are in a transaction, limit the time we can wait behind the critical section. This is
+ // needed in order to prevent distributed deadlocks in situations where a DDL operation needs to
+ // acquire the critical section on several shards.
+ //
+ // In such cases, shard running a transaction could be waiting for the critical section to be
+ // exited, while on another shard the transaction has already executed some statement and
+ // stashed locks which prevent the critical section from being acquired in that node. Limiting
+ // the wait behind the critical section will ensure that the transaction will eventually get
+ // aborted.
+ if (opCtx->inMultiDocumentTransaction()) {
+ try {
+ opCtx->runWithDeadline(
+ opCtx->getServiceContext()->getFastClockSource()->now() +
+ Milliseconds(metadataRefreshInTransactionMaxWaitBehindCritSecMS.load()),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { critSecSignal.wait(opCtx); });
+ return Status::OK();
+ } catch (const DBException& ex) {
+ // This is a best-effort attempt to wait for the critical section to complete, so no
+ // need to handle any exceptions
+ return ex.toStatus();
+ }
+ } else {
+ return critSecSignal.waitNoThrow(opCtx);
}
-
- return false;
-}
-
-void OperationShardingState::setMovePrimaryCriticalSectionSignal(
- boost::optional<SharedSemiFuture<void>> critSecSignal) {
- invariant(critSecSignal);
- _movePrimaryCriticalSectionSignal = std::move(critSecSignal);
}
void OperationShardingState::setShardingOperationFailedStatus(const Status& status) {
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 63c726fcc0b..7a9a20012bb 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -146,37 +146,15 @@ public:
boost::optional<DatabaseVersion> getDbVersion(StringData dbName) const;
/**
- * This call is a no op if there isn't a currently active migration critical section. Otherwise
- * it will wait for the critical section to complete up to the remaining operation time.
+ * This method implements a best-effort attempt to wait for the critical section to complete
+ * before returning to the router at the previous step in order to prevent it from busy spinning
+ * while the critical section is in progress.
*
- * Returns true if the call actually waited because of migration critical section (regardless if
- * whether it timed out or not), false if there was no active migration critical section.
+ * All waits for migration critical section should go through this code path, because it also
+ * accounts for transactions and locking.
*/
- bool waitForMigrationCriticalSectionSignal(OperationContext* opCtx);
-
- /**
- * Setting this value indicates that when the version check failed, there was an active
- * migration for the namespace and that it would be prudent to wait for the critical section to
- * complete before retrying so the router doesn't make wasteful requests.
- */
- void setMigrationCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal);
-
- /**
- * This call is a no op if there isn't a currently active movePrimary critical section.
- * Otherwise it will wait for the critical section to complete up to the remaining operation
- * time.
- *
- * Returns true if the call actually waited because of movePrimary critical section (regardless
- * whether it timed out or not), false if there was no active movePrimary critical section.
- */
- bool waitForMovePrimaryCriticalSectionSignal(OperationContext* opCtx);
-
- /**
- * Setting this value indicates that when the version check failed, there was an active
- * movePrimary for the namespace and that it would be prudent to wait for the critical section
- * to complete before retrying so the router doesn't make wasteful requests.
- */
- void setMovePrimaryCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal);
+ static Status waitForCriticalSectionToComplete(OperationContext* opCtx,
+ SharedSemiFuture<void> critSecSignal) noexcept;
/**
* Stores the failed status in _shardingOperationFailedStatus.
@@ -221,15 +199,6 @@ private:
};
StringMap<DatabaseVersionTracker> _databaseVersions;
- // This value will only be non-null if version check during the operation execution failed due
- // to stale version and there was a migration for that namespace, which was in critical section.
- boost::optional<SharedSemiFuture<void>> _migrationCriticalSectionSignal;
-
- // This value will only be non-null if version check during the operation execution failed due
- // to stale version and there was a movePrimary for that namespace, which was in critical
- // section.
- boost::optional<SharedSemiFuture<void>> _movePrimaryCriticalSectionSignal;
-
// This value can only be set when a rerouting exception occurs during a write operation, and
// must be handled before this object gets destructed.
boost::optional<Status> _shardingOperationFailedStatus;
diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
index 0c88cc6efb0..d7253272b62 100644
--- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
+++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
@@ -29,19 +29,17 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-#include "mongo/platform/basic.h"
-
#include "mongo/db/s/scoped_operation_completion_sharding_actions.h"
#include "mongo/db/curop.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_statistics.h"
#include "mongo/logv2/log.h"
#include "mongo/s/stale_exception.h"
namespace mongo {
-
namespace {
const auto shardingOperationCompletionActionsRegistered =
@@ -71,11 +69,20 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi
}
if (auto staleInfo = status->extraInfo<StaleConfigInfo>()) {
+ ShardingStatistics::get(_opCtx).countStaleConfigErrors.addAndFetch(1);
+
if (staleInfo->getCriticalSectionSignal()) {
- // Set migration critical section on operation sharding state: operation will wait for
- // the migration to finish before returning.
- auto& oss = OperationShardingState::get(_opCtx);
- oss.setMigrationCriticalSectionSignal(staleInfo->getCriticalSectionSignal());
+ // The shard is in a critical section
+ OperationShardingState::waitForCriticalSectionToComplete(
+ _opCtx, *staleInfo->getCriticalSectionSignal())
+ .ignore();
+ return;
+ }
+
+ if (staleInfo->getVersionWanted() &&
+ staleInfo->getVersionReceived().isOlderThan(*staleInfo->getVersionWanted())) {
+ // Shard is recovered and the router is staler than the shard
+ return;
}
auto handleMismatchStatus = onShardVersionMismatchNoExcept(
@@ -87,10 +94,22 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi
"Failed to handle stale version exception as part of the current operation",
"error"_attr = redact(handleMismatchStatus));
} else if (auto staleInfo = status->extraInfo<StaleDbRoutingVersion>()) {
- auto handleMismatchStatus = onDbVersionMismatchNoExcept(_opCtx,
- staleInfo->getDb(),
- staleInfo->getVersionReceived(),
- staleInfo->getVersionWanted());
+ if (staleInfo->getCriticalSectionSignal()) {
+ // The shard is in a critical section
+ OperationShardingState::waitForCriticalSectionToComplete(
+ _opCtx, *staleInfo->getCriticalSectionSignal())
+ .ignore();
+ return;
+ }
+
+ if (staleInfo->getVersionWanted() &&
+ staleInfo->getVersionReceived() < staleInfo->getVersionWanted()) {
+ // Shard is recovered and the router is staler than the shard
+ return;
+ }
+
+ auto handleMismatchStatus = onDbVersionMismatchNoExcept(
+ _opCtx, staleInfo->getDb(), staleInfo->getVersionReceived());
if (!handleMismatchStatus.isOK())
LOGV2(22054,
"Failed to handle database version exception as part of the current operation: "
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 0cfd766ebdb..0a7cecfe632 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -29,8 +29,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-#include "mongo/platform/basic.h"
-
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/catalog/database_holder.h"
@@ -43,9 +41,7 @@
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
-#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/s/sharding_statistics.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/database_version.h"
@@ -61,23 +57,29 @@ MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread);
void onDbVersionMismatch(OperationContext* opCtx,
const StringData dbName,
- const DatabaseVersion& clientDbVersion,
- const boost::optional<DatabaseVersion>& serverDbVersion) {
+ boost::optional<DatabaseVersion> clientDbVersion) {
invariant(!opCtx->lockState()->isLocked());
invariant(!opCtx->getClient()->isInDirectClient());
-
invariant(ShardingState::get(opCtx)->canAcceptShardedCommands());
- if (clientDbVersion <= serverDbVersion) {
- // The client was stale; do not trigger server-side refresh.
- return;
+ {
+ // Take the DBLock directly rather than using AutoGetDb, to prevent a recursive call into
+ // checkDbVersion().
+ //
+ // TODO: It is not safe here to read the DB version without checking for critical section
+ //
+ if (clientDbVersion) {
+ Lock::DBLock dbLock(opCtx, dbName, MODE_IS);
+ auto dss = DatabaseShardingState::get(opCtx, dbName);
+ auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss);
+ const auto serverDbVersion = dss->getDbVersion(opCtx, dssLock);
+ if (clientDbVersion <= serverDbVersion) {
+ // The client was stale
+ return;
+ }
+ }
}
- // Ensure any ongoing movePrimary's have completed before trying to do the refresh. This wait is
- // just an optimization so that mongos does not exhaust its maximum number of
- // StaleDatabaseVersion retry attempts while the movePrimary is being committed.
- OperationShardingState::get(opCtx).waitForMovePrimaryCriticalSectionSignal(opCtx);
-
if (MONGO_unlikely(skipDatabaseVersionMetadataRefresh.shouldFail())) {
return;
}
@@ -90,8 +92,7 @@ bool joinShardVersionOperation(OperationContext* opCtx,
CollectionShardingRuntime* csr,
boost::optional<Lock::DBLock>* dbLock,
boost::optional<Lock::CollectionLock>* collLock,
- boost::optional<CollectionShardingRuntime::CSRLock>* csrLock,
- Milliseconds criticalSectionMaxWait) {
+ boost::optional<CollectionShardingRuntime::CSRLock>* csrLock) {
invariant(collLock->has_value());
invariant(csrLock->has_value());
@@ -108,11 +109,8 @@ bool joinShardVersionOperation(OperationContext* opCtx,
dbLock->reset();
if (critSecSignal) {
- const auto deadline = criticalSectionMaxWait == Milliseconds::max()
- ? Date_t::max()
- : opCtx->getServiceContext()->getFastClockSource()->now() + criticalSectionMaxWait;
- opCtx->runWithDeadline(
- deadline, ErrorCodes::ExceededTimeLimit, [&] { critSecSignal->get(opCtx); });
+ uassertStatusOK(
+ OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal));
} else {
try {
inRecoverOrRefresh->get(opCtx);
@@ -128,8 +126,6 @@ bool joinShardVersionOperation(OperationContext* opCtx,
return false;
}
-} // namespace
-
SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext,
const NamespaceString nss,
bool runRecover,
@@ -235,6 +231,8 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
.share();
}
+} // namespace
+
void onShardVersionMismatch(OperationContext* opCtx,
const NamespaceString& nss,
boost::optional<ChunkVersion> shardVersionReceived) {
@@ -246,8 +244,6 @@ void onShardVersionMismatch(OperationContext* opCtx,
return;
}
- ShardingStatistics::get(opCtx).countStaleConfigErrors.addAndFetch(1);
-
LOGV2_DEBUG(22061,
2,
"Metadata refresh requested for {namespace} at shard version "
@@ -256,17 +252,6 @@ void onShardVersionMismatch(OperationContext* opCtx,
"namespace"_attr = nss,
"shardVersionReceived"_attr = shardVersionReceived);
- // If we are in a transaction, limit the time we can wait behind the critical section. This
- // is needed in order to prevent distributed deadlocks in situations where a DDL operation
- // needs to acquire the critical section on several shards. In that case, a shard running a
- // transaction could be waiting for the critical section to be exited, while on another
- // shard the transaction has already executed some statement and stashed locks which prevent
- // the critical section from being acquired in that node. Limiting the wait behind the
- // critical section will ensure that the transaction will eventually get aborted.
- const auto criticalSectionMaxWait = opCtx->inMultiDocumentTransaction()
- ? Milliseconds(metadataRefreshInTransactionMaxWaitBehindCritSecMS.load())
- : Milliseconds::max();
-
while (true) {
boost::optional<SharedSemiFuture<void>> inRecoverOrRefresh;
{
@@ -279,8 +264,7 @@ void onShardVersionMismatch(OperationContext* opCtx,
boost::optional<CollectionShardingRuntime::CSRLock> csrLock =
CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- if (joinShardVersionOperation(
- opCtx, csr, &dbLock, &collLock, &csrLock, criticalSectionMaxWait)) {
+ if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
continue;
}
@@ -302,8 +286,7 @@ void onShardVersionMismatch(OperationContext* opCtx,
// If there is no ongoing shard version operation, initialize the RecoverRefreshThread
// thread and associate it to the CSR.
- if (!joinShardVersionOperation(
- opCtx, csr, &dbLock, &collLock, &csrLock, criticalSectionMaxWait)) {
+ if (!joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
// If the shard doesn't yet know its filtering metadata, recovery needs to be run
const bool runRecover = metadata ? false : true;
CancellationSource cancellationSource;
@@ -472,13 +455,11 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx,
return newShardVersion;
}
-Status onDbVersionMismatchNoExcept(
- OperationContext* opCtx,
- const StringData dbName,
- const DatabaseVersion& clientDbVersion,
- const boost::optional<DatabaseVersion>& serverDbVersion) noexcept {
+Status onDbVersionMismatchNoExcept(OperationContext* opCtx,
+ const StringData dbName,
+ boost::optional<DatabaseVersion> clientDbVersion) noexcept {
try {
- onDbVersionMismatch(opCtx, dbName, clientDbVersion, serverDbVersion);
+ onDbVersionMismatch(opCtx, dbName, clientDbVersion);
return Status::OK();
} catch (const DBException& ex) {
LOGV2(22065,
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h
index 84fd1dc7c0e..724409b4621 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.h
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h
@@ -64,14 +64,6 @@ void onShardVersionMismatch(OperationContext* opCtx,
boost::optional<ChunkVersion> shardVersionReceived);
/**
- * Starts the RecoverRefreshThread to set the current metadata on the CSR. This function will also
- * recover any ongoing migrations if runRecover is true.
- */
-SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext,
- NamespaceString nss,
- bool runRecover);
-
-/**
* Unconditionally get the shard's filtering metadata from the config server on the calling thread.
* Returns the metadata if the nss is sharded, otherwise default unsharded metadata.
*
@@ -95,11 +87,9 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx,
* Invalidates the cached database version, schedules a refresh of the database info, waits for the
* refresh to complete, and updates the cached database version.
*/
-Status onDbVersionMismatchNoExcept(
- OperationContext* opCtx,
- StringData dbName,
- const DatabaseVersion& clientDbVersion,
- const boost::optional<DatabaseVersion>& serverDbVersion) noexcept;
+Status onDbVersionMismatchNoExcept(OperationContext* opCtx,
+ StringData dbName,
+ boost::optional<DatabaseVersion> clientDbVersion) noexcept;
void forceDatabaseRefresh(OperationContext* opCtx, StringData dbName);
diff --git a/src/mongo/db/s/sharding_api_d_params.idl b/src/mongo/db/s/sharding_api_d_params.idl
new file mode 100644
index 00000000000..507687becb1
--- /dev/null
+++ b/src/mongo/db/s/sharding_api_d_params.idl
@@ -0,0 +1,41 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+global:
+ cpp_namespace: mongo
+
+server_parameters:
+ metadataRefreshInTransactionMaxWaitBehindCritSecMS:
+ description: >-
+ Maximum time in milliseconds to wait behind the critical section when refreshing the
+ filtering metadata within a transaction.
+ set_at: [startup, runtime]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: metadataRefreshInTransactionMaxWaitBehindCritSecMS
+ validator:
+ gte: 0
+ default: 500
diff --git a/src/mongo/db/s/sharding_runtime_d_params.idl b/src/mongo/db/s/sharding_runtime_d_params.idl
index 70f9466365c..23d298f0d6b 100644
--- a/src/mongo/db/s/sharding_runtime_d_params.idl
+++ b/src/mongo/db/s/sharding_runtime_d_params.idl
@@ -142,14 +142,3 @@ server_parameters:
cpp_vartype: int
cpp_varname: shardedIndexConsistencyCheckIntervalMS
default: 600000
-
- metadataRefreshInTransactionMaxWaitBehindCritSecMS:
- description: >-
- Maximum time in milliseconds to wait behind the critical section when refreshing the
- filtering metadata within a transaction.
- set_at: [startup, runtime]
- cpp_vartype: AtomicWord<int>
- cpp_varname: metadataRefreshInTransactionMaxWaitBehindCritSecMS
- validator:
- gte: 0
- default: 500
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 2cfb3f2286e..7dd97367749 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -29,8 +29,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
-#include "mongo/platform/basic.h"
-
#include "mongo/db/service_entry_point_common.h"
#include <fmt/format.h>
@@ -80,6 +78,7 @@
#include "mongo/db/request_execution_context.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/s/transaction_coordinator_factory.h"
#include "mongo/db/service_entry_point_common.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -1654,14 +1653,28 @@ Future<void> ExecCommandDatabase::_commandExec() {
!_refreshedDatabase) {
auto sce = s.extraInfo<StaleDbRoutingVersion>();
invariant(sce);
- if (sce->getVersionWanted() < sce->getVersionReceived()) {
- const auto refreshed = _execContext->behaviors->refreshDatabase(opCtx, *sce);
- if (refreshed) {
- _refreshedDatabase = true;
- if (!opCtx->isContinuingMultiDocumentTransaction()) {
- _resetLockerStateAfterShardingUpdate(opCtx);
- return _commandExec();
- }
+
+ if (sce->getCriticalSectionSignal()) {
+ // The shard is in a critical section, so we cannot retry locally
+ OperationShardingState::waitForCriticalSectionToComplete(
+ opCtx, *sce->getCriticalSectionSignal())
+ .ignore();
+ return s;
+ }
+
+ if (sce->getVersionWanted() &&
+ sce->getVersionReceived() < sce->getVersionWanted()) {
+ // The shard is recovered and the router is staler than the shard, so we cannot
+ // retry locally
+ return s;
+ }
+
+ const auto refreshed = _execContext->behaviors->refreshDatabase(opCtx, *sce);
+ if (refreshed) {
+ _refreshedDatabase = true;
+ if (!opCtx->isContinuingMultiDocumentTransaction()) {
+ _resetLockerStateAfterShardingUpdate(opCtx);
+ return _commandExec();
}
}
}
@@ -1670,15 +1683,23 @@ Future<void> ExecCommandDatabase::_commandExec() {
})
.onErrorCategory<ErrorCategory::StaleShardVersionError>([this](Status s) -> Future<void> {
auto opCtx = _execContext->getOpCtx();
+ ShardingStatistics::get(opCtx).countStaleConfigErrors.addAndFetch(1);
if (!opCtx->getClient()->isInDirectClient() &&
serverGlobalParams.clusterRole != ClusterRole::ConfigServer &&
!_refreshedCollection) {
if (auto sce = s.extraInfo<StaleConfigInfo>()) {
+ if (sce->getCriticalSectionSignal()) {
+ // The shard is in a critical section, so we cannot retry locally
+ OperationShardingState::waitForCriticalSectionToComplete(
+ opCtx, *sce->getCriticalSectionSignal())
+ .ignore();
+ return s;
+ }
+
if (sce->getVersionWanted() &&
- sce->getVersionReceived().isOlderThan(sce->getVersionWanted().get())) {
- // If the local shard version is newer than the received one return the
- // error to the router without retrying locally.
+ sce->getVersionReceived().isOlderThan(*sce->getVersionWanted())) {
+ // Shard is recovered and the router is staler than the shard
return s;
}
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 7d96332a67e..991a099b18b 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -237,9 +237,7 @@ public:
bool refreshDatabase(OperationContext* opCtx, const StaleDbRoutingVersion& se) const
noexcept override {
- return onDbVersionMismatchNoExcept(
- opCtx, se.getDb(), se.getVersionReceived(), se.getVersionWanted())
- .isOK();
+ return onDbVersionMismatchNoExcept(opCtx, se.getDb(), se.getVersionReceived()).isOK();
}
bool refreshCollection(OperationContext* opCtx, const StaleConfigInfo& se) const
diff --git a/src/mongo/s/request_types/flush_database_cache_updates.idl b/src/mongo/s/request_types/flush_database_cache_updates.idl
index a4d331ff788..fb77d853623 100644
--- a/src/mongo/s/request_types/flush_database_cache_updates.idl
+++ b/src/mongo/s/request_types/flush_database_cache_updates.idl
@@ -38,6 +38,7 @@ commands:
_flushDatabaseCacheUpdates:
description: "An internal command to wait for the last routing table cache refresh for a particular database to be persisted to disk"
command_name: _flushDatabaseCacheUpdates
+ cpp_name: FlushDatabaseCacheUpdates
strict: true
namespace: type
api_version: ""
@@ -51,6 +52,7 @@ commands:
_flushDatabaseCacheUpdatesWithWriteConcern:
description: "The same behavior as _flushDatabaseCacheUpdates but accepts writeConcern"
command_name: _flushDatabaseCacheUpdatesWithWriteConcern
+ cpp_name: FlushDatabaseCacheUpdatesWithWriteConcern
strict: true
namespace: type
api_version: ""
diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h
index 4aa22837a3a..70d7a0dc42e 100644
--- a/src/mongo/s/stale_exception.h
+++ b/src/mongo/s/stale_exception.h
@@ -50,7 +50,7 @@ public:
_received(received),
_wanted(wanted),
_shardId(shardId),
- _criticalSectionSignal(criticalSectionSignal) {}
+ _criticalSectionSignal(std::move(criticalSectionSignal)) {}
const auto& getNss() const {
return _nss;
@@ -117,8 +117,7 @@ protected:
boost::optional<ChunkVersion> _wanted;
ShardId _shardId;
- // This signal does not get serialized and therefore does not get propagated
- // to the router.
+ // This signal does not get serialized and therefore does not get propagated to the router
boost::optional<SharedSemiFuture<void>> _criticalSectionSignal;
};
@@ -154,10 +153,15 @@ class StaleDbRoutingVersion final : public ErrorExtraInfo {
public:
static constexpr auto code = ErrorCodes::StaleDbVersion;
- StaleDbRoutingVersion(std::string db,
- DatabaseVersion received,
- boost::optional<DatabaseVersion> wanted)
- : _db(std::move(db)), _received(received), _wanted(wanted) {}
+ StaleDbRoutingVersion(
+ std::string db,
+ DatabaseVersion received,
+ boost::optional<DatabaseVersion> wanted,
+ boost::optional<SharedSemiFuture<void>> criticalSectionSignal = boost::none)
+ : _db(std::move(db)),
+ _received(received),
+ _wanted(wanted),
+ _criticalSectionSignal(std::move(criticalSectionSignal)) {}
const auto& getDb() const {
return _db;
@@ -171,6 +175,10 @@ public:
return _wanted;
}
+ auto getCriticalSectionSignal() const {
+ return _criticalSectionSignal;
+ }
+
void serialize(BSONObjBuilder* bob) const override;
static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&);
static StaleDbRoutingVersion parseFromCommandError(const BSONObj& commandError);
@@ -179,6 +187,9 @@ private:
std::string _db;
DatabaseVersion _received;
boost::optional<DatabaseVersion> _wanted;
+
+ // This signal does not get serialized and therefore does not get propagated to the router
+ boost::optional<SharedSemiFuture<void>> _criticalSectionSignal;
};
} // namespace mongo