summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/migration_util.cpp2
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp136
2 files changed, 59 insertions, 79 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 61749bf7f57..bac330eb09b 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -996,7 +996,7 @@ void recoverMigrationCoordinations(OperationContext* opCtx,
hangInRefreshFilteringMetadataUntilSuccessThenSimulateErrorUninterruptible
.pauseWhileSet();
uasserted(ErrorCodes::InternalError,
- "simulate an error response for forceShardFilteringMetadataRefresh");
+ "simulate an error response for forceGetCurrentMetadata");
}
auto setFilteringMetadata = [&opCtx, &currentMetadata, &doc, &cancellationToken]() {
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 81a748c5bef..aa83694fcf2 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -58,57 +58,42 @@ MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh);
MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread);
/**
- * If the critical section associate with the database is entered by another thread (e.g., a move
- * primary or a drop database is in progress), it releases the acquired locks and waits for the
- * latter to exit. In this case the function returns true, otherwise false.
+ * Blocking method, which will wait for any concurrent operations that could change the database
+ * version to complete (namely critical section and concurrent onDbVersionMismatch invocations).
+ *
+ * Returns 'true' if there were concurrent operations that had to be joined (in which case all locks
+ * will be dropped). If there were none, returns false and the locks continue to be held.
*/
-bool checkAndWaitIfCriticalSectionIsEntered(
- OperationContext* opCtx,
- DatabaseShardingState* dss,
- boost::optional<Lock::DBLock>& dbLock,
- boost::optional<DatabaseShardingState::DSSLock>& dssLock) {
- invariant(dbLock);
- invariant(dssLock);
+bool joinDbVersionOperation(OperationContext* opCtx,
+ DatabaseShardingState* dss,
+ boost::optional<Lock::DBLock>* dbLock,
+ boost::optional<DatabaseShardingState::DSSLock>* dssLock) {
+ invariant(dbLock->has_value());
+ invariant(dssLock->has_value());
if (auto critSect =
- dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, *dssLock)) {
+ dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, **dssLock)) {
LOGV2_DEBUG(6697201,
2,
"Waiting for exit from the critical section",
"db"_attr = dss->getDbName(),
- "reason"_attr = dss->getCriticalSectionReason(*dssLock));
+ "reason"_attr = dss->getCriticalSectionReason(**dssLock));
- dbLock = boost::none;
- dssLock = boost::none;
+ dbLock->reset();
+ dssLock->reset();
uassertStatusOK(OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSect));
-
return true;
}
- return false;
-}
-
-/**
- * If another thread is refreshing the database metadata, it releases the acquired locks and waits
- * for that refresh to complete. In this case the function returns true, otherwise false.
- */
-bool checkAndWaitIfAnotherRefreshIsRunning(
- OperationContext* opCtx,
- DatabaseShardingState* dss,
- boost::optional<Lock::DBLock>& dbLock,
- boost::optional<DatabaseShardingState::DSSLock>& dssLock) {
- invariant(dbLock);
- invariant(dssLock);
-
- if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(*dssLock)) {
+ if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(**dssLock)) {
LOGV2_DEBUG(6697202,
2,
"Waiting for completion of another database metadata refresh",
"db"_attr = dss->getDbName());
- dbLock = boost::none;
- dssLock = boost::none;
+ dbLock->reset();
+ dssLock->reset();
try {
refreshVersionFuture->get(opCtx);
@@ -170,9 +155,9 @@ Status refreshDbMetadata(OperationContext* opCtx,
return swDbMetadata.getStatus();
}
-SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx,
- const DatabaseName& dbName,
- const CancellationToken& cancellationToken) {
+SharedSemiFuture<void> recoverRefreshDbVersion(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const CancellationToken& cancellationToken) {
const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
return ExecutorFuture<void>(executor)
.then([=,
@@ -215,7 +200,7 @@ SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx,
void onDbVersionMismatch(OperationContext* opCtx,
const StringData dbName,
- boost::optional<DatabaseVersion> receivedVersion) {
+ boost::optional<DatabaseVersion> receivedDbVersion) {
invariant(!opCtx->lockState()->isLocked());
invariant(!opCtx->getClient()->isInDirectClient());
invariant(ShardingState::get(opCtx)->canAcceptShardedCommands());
@@ -233,23 +218,20 @@ void onDbVersionMismatch(OperationContext* opCtx,
2,
"Handle database version mismatch",
"db"_attr = dbName,
- "receivedVersion"_attr = receivedVersion);
+ "receivedDbVersion"_attr = receivedDbVersion);
while (true) {
boost::optional<SharedSemiFuture<void>> dbMetadataRefreshFuture;
- // Set the database metadata refresh future after waiting for another thread to exit from
- // the critical section or to complete an ongoing refresh.
{
auto dbLock = boost::make_optional(Lock::DBLock(opCtx, dbName, MODE_IS));
auto* dss = DatabaseShardingState::get(opCtx, dbName);
- if (receivedVersion) {
+ if (receivedDbVersion) {
auto dssLock =
boost::make_optional(DatabaseShardingState::DSSLock::lockShared(opCtx, dss));
- if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) ||
- checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) {
+ if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) {
// Waited for another thread to exit from the critical section or to complete an
// ongoing refresh, so reacquire the locks.
continue;
@@ -260,8 +242,9 @@ void onDbVersionMismatch(OperationContext* opCtx,
// is in progress or can start (would require to exclusive lock the DSS).
// Therefore, the database version can be accessed safely.
- const auto wantedVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName);
- if (receivedVersion <= wantedVersion) {
+ const auto wantedDbVersion =
+ DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName);
+ if (receivedDbVersion <= wantedDbVersion) {
// No need to refresh the database metadata as the wanted version is newer
// than the one received.
return;
@@ -275,8 +258,7 @@ void onDbVersionMismatch(OperationContext* opCtx,
auto dssLock =
boost::make_optional(DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss));
- if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) ||
- checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) {
+ if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) {
// Waited for another thread to exit from the critical section or to complete an
// ongoing refresh, so reacquire the locks.
continue;
@@ -290,7 +272,7 @@ void onDbVersionMismatch(OperationContext* opCtx,
CancellationSource cancellationSource;
CancellationToken cancellationToken = cancellationSource.token();
dss->setDbMetadataRefreshFuture(
- asyncDbMetadataRefresh(opCtx, dbName, cancellationToken),
+ recoverRefreshDbVersion(opCtx, dbName, cancellationToken),
std::move(cancellationSource),
*dssLock);
dbMetadataRefreshFuture = dss->getDbMetadataRefreshFuture(*dssLock);
@@ -327,28 +309,28 @@ bool joinShardVersionOperation(OperationContext* opCtx,
invariant(collLock->has_value());
invariant(csrLock->has_value());
- // If another thread is currently holding the critical section or the shard version future, it
- // will be necessary to wait on one of the two variables to finish the update/recover/refresh.
- auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx);
- auto critSecSignal =
- csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite);
+ if (auto critSecSignal =
+ csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)) {
+ csrLock->reset();
+ collLock->reset();
+ dbLock->reset();
+
+ uassertStatusOK(
+ OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal));
- if (inRecoverOrRefresh || critSecSignal) {
- // Drop the locks and wait for an ongoing shard version's recovery/refresh/update
+ return true;
+ }
+
+ if (auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx)) {
csrLock->reset();
collLock->reset();
dbLock->reset();
- if (critSecSignal) {
- uassertStatusOK(
- OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal));
- } else {
- try {
- inRecoverOrRefresh->get(opCtx);
- } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) {
- // The ongoing refresh has finished, although it was canceled by a
- // 'clearFilteringMetadata'.
- }
+ try {
+ inRecoverOrRefresh->get(opCtx);
+ } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) {
+ // The ongoing refresh has finished, although it was canceled by a
+ // 'clearFilteringMetadata'.
}
return true;
@@ -358,7 +340,7 @@ bool joinShardVersionOperation(OperationContext* opCtx,
}
SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext,
- const NamespaceString nss,
+ const NamespaceString& nss,
bool runRecover,
CancellationToken cancellationToken) {
auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor();
@@ -417,8 +399,6 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
if (!currentMetadata.allowMigrations()) {
boost::optional<SharedSemiFuture<void>> waitForMigrationAbort;
{
- // DBLock and CollectionLock must be used in order to avoid shard version
- // checks
Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX);
Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
@@ -497,16 +477,16 @@ void onShardVersionMismatch(OperationContext* opCtx,
collLock.emplace(opCtx, nss, MODE_IS);
auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
- boost::optional<CollectionShardingRuntime::CSRLock> csrLock =
- CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
- continue;
- }
+ if (shardVersionReceived) {
+ boost::optional<CollectionShardingRuntime::CSRLock> csrLock =
+ CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- if (auto metadata = csr->getCurrentMetadataIfKnown()) {
- // Check if the current shard version is fresh enough
- if (shardVersionReceived) {
+ if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
+ continue;
+ }
+
+ if (auto metadata = csr->getCurrentMetadataIfKnown()) {
const auto currentShardVersion = metadata->getShardVersion();
// Don't need to remotely reload if the requested version is smaller than the
// known one. This means that the remote side is behind.
@@ -516,8 +496,8 @@ void onShardVersionMismatch(OperationContext* opCtx,
}
}
- csrLock.reset();
- csrLock.emplace(CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr));
+ boost::optional<CollectionShardingRuntime::CSRLock> csrLock =
+ CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
continue;