summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntonio Fuschetto <antonio.fuschetto@mongodb.com>2022-10-06 13:43:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-06 14:15:08 +0000
commit3a3021eedcf2ddcee5dfab712fe226fbc35a70c1 (patch)
tree64de64e9aea7939a218cc8e4ca8f8556ac22baa4
parent8062f490260eac573ad48915b5d8ccc478bb5b9a (diff)
downloadmongo-3a3021eedcf2ddcee5dfab712fe226fbc35a70c1.tar.gz
SERVER-69444 Make the joining of concurrent critical section and refresh look the same between DSS and CSS
-rw-r--r--src/mongo/db/s/migration_util.cpp2
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp186
2 files changed, 87 insertions, 101 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 990f07fea47..09d0a97b118 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -1018,7 +1018,7 @@ void recoverMigrationCoordinations(OperationContext* opCtx,
hangInRefreshFilteringMetadataUntilSuccessThenSimulateErrorUninterruptible
.pauseWhileSet();
uasserted(ErrorCodes::InternalError,
- "simulate an error response for forceShardFilteringMetadataRefresh");
+ "simulate an error response for forceGetCurrentMetadata");
}
auto setFilteringMetadata = [&opCtx, &currentMetadata, &doc, &cancellationToken]() {
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 86ab7a41866..df1ccd9d49d 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -27,7 +27,6 @@
* it in the license file.
*/
-
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/catalog/database_holder.h"
@@ -51,7 +50,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
namespace mongo {
namespace {
@@ -60,57 +58,42 @@ MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh);
MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread);
/**
- * If the critical section associate with the database is entered by another thread (e.g., a move
- * primary or a drop database is in progress), it releases the acquired locks and waits for the
- * latter to exit. In this case the function returns true, otherwise false.
+ * Blocking method, which will wait for any concurrent operations that could change the database
+ * version to complete (namely critical section and concurrent onDbVersionMismatch invocations).
+ *
+ * Returns 'true' if there were concurrent operations that had to be joined (in which case all locks
+ * will be dropped). If there were none, returns false and the locks continue to be held.
*/
-bool checkAndWaitIfCriticalSectionIsEntered(
- OperationContext* opCtx,
- DatabaseShardingState* dss,
- boost::optional<Lock::DBLock>& dbLock,
- boost::optional<DatabaseShardingState::DSSLock>& dssLock) {
- invariant(dbLock);
- invariant(dssLock);
+bool joinDbVersionOperation(OperationContext* opCtx,
+ DatabaseShardingState* dss,
+ boost::optional<Lock::DBLock>* dbLock,
+ boost::optional<DatabaseShardingState::DSSLock>* dssLock) {
+ invariant(dbLock->has_value());
+ invariant(dssLock->has_value());
if (auto critSect =
- dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, *dssLock)) {
+ dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead, **dssLock)) {
LOGV2_DEBUG(6697201,
2,
"Waiting for exit from the critical section",
"db"_attr = dss->getDbName(),
- "reason"_attr = dss->getCriticalSectionReason(*dssLock));
+ "reason"_attr = dss->getCriticalSectionReason(**dssLock));
- dbLock = boost::none;
- dssLock = boost::none;
+ dbLock->reset();
+ dssLock->reset();
uassertStatusOK(OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSect));
-
return true;
}
- return false;
-}
-
-/**
- * If another thread is refreshing the database metadata, it releases the acquired locks and waits
- * for that refresh to complete. In this case the function returns true, otherwise false.
- */
-bool checkAndWaitIfAnotherRefreshIsRunning(
- OperationContext* opCtx,
- DatabaseShardingState* dss,
- boost::optional<Lock::DBLock>& dbLock,
- boost::optional<DatabaseShardingState::DSSLock>& dssLock) {
- invariant(dbLock);
- invariant(dssLock);
-
- if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(*dssLock)) {
+ if (auto refreshVersionFuture = dss->getDbMetadataRefreshFuture(**dssLock)) {
LOGV2_DEBUG(6697202,
2,
"Waiting for completion of another database metadata refresh",
"db"_attr = dss->getDbName());
- dbLock = boost::none;
- dssLock = boost::none;
+ dbLock->reset();
+ dssLock->reset();
try {
refreshVersionFuture->get(opCtx);
@@ -131,7 +114,7 @@ bool checkAndWaitIfAnotherRefreshIsRunning(
*/
Status refreshDbMetadata(OperationContext* opCtx,
const DatabaseName& dbName,
- CancellationToken cancellationToken) {
+ const CancellationToken& cancellationToken) {
invariant(!opCtx->lockState()->isLocked());
invariant(!opCtx->getClient()->isInDirectClient());
invariant(ShardingState::get(opCtx)->canAcceptShardedCommands());
@@ -139,7 +122,7 @@ Status refreshDbMetadata(OperationContext* opCtx,
ScopeGuard resetRefreshFutureOnError([&] {
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- Lock::DBLock dbLock(opCtx, dbName, MODE_IS);
+ Lock::DBLock dbLock(opCtx, dbName, MODE_IX);
auto* dss = DatabaseShardingState::get(opCtx, dbName.db());
const auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss);
@@ -172,9 +155,9 @@ Status refreshDbMetadata(OperationContext* opCtx,
return swDbMetadata.getStatus();
}
-SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx,
- const DatabaseName& dbName,
- const CancellationToken& cancellationToken) {
+SharedSemiFuture<void> recoverRefreshDbVersion(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const CancellationToken& cancellationToken) {
const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
return ExecutorFuture<void>(executor)
.then([=,
@@ -217,7 +200,7 @@ SharedSemiFuture<void> asyncDbMetadataRefresh(OperationContext* opCtx,
void onDbVersionMismatch(OperationContext* opCtx,
const StringData dbName,
- boost::optional<DatabaseVersion> receivedVersion) {
+ const boost::optional<DatabaseVersion> receivedDbVersion) {
invariant(!opCtx->lockState()->isLocked());
invariant(!opCtx->getClient()->isInDirectClient());
invariant(ShardingState::get(opCtx)->canAcceptShardedCommands());
@@ -235,23 +218,20 @@ void onDbVersionMismatch(OperationContext* opCtx,
2,
"Handle database version mismatch",
"db"_attr = dbName,
- "receivedVersion"_attr = receivedVersion);
+ "receivedDbVersion"_attr = receivedDbVersion);
while (true) {
boost::optional<SharedSemiFuture<void>> dbMetadataRefreshFuture;
- // Set the database metadata refresh future after waiting for another thread to exit from
- // the critical section or to complete an ongoing refresh.
{
auto dbLock = boost::make_optional(Lock::DBLock(opCtx, dbName, MODE_IS));
auto* dss = DatabaseShardingState::get(opCtx, dbName);
- if (receivedVersion) {
+ if (receivedDbVersion) {
auto dssLock =
boost::make_optional(DatabaseShardingState::DSSLock::lockShared(opCtx, dss));
- if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) ||
- checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) {
+ if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) {
// Waited for another thread to exit from the critical section or to complete an
// ongoing refresh, so reacquire the locks.
continue;
@@ -262,8 +242,9 @@ void onDbVersionMismatch(OperationContext* opCtx,
// is in progress or can start (would require to exclusive lock the DSS).
// Therefore, the database version can be accessed safely.
- const auto wantedVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName);
- if (receivedVersion <= wantedVersion) {
+ const auto wantedDbVersion =
+ DatabaseHolder::get(opCtx)->getDbVersion(opCtx, dbName);
+ if (receivedDbVersion <= wantedDbVersion) {
// No need to refresh the database metadata as the wanted version is newer
// than the one received.
return;
@@ -277,8 +258,7 @@ void onDbVersionMismatch(OperationContext* opCtx,
auto dssLock =
boost::make_optional(DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss));
- if (checkAndWaitIfCriticalSectionIsEntered(opCtx, dss, dbLock, dssLock) ||
- checkAndWaitIfAnotherRefreshIsRunning(opCtx, dss, dbLock, dssLock)) {
+ if (joinDbVersionOperation(opCtx, dss, &dbLock, &dssLock)) {
// Waited for another thread to exit from the critical section or to complete an
// ongoing refresh, so reacquire the locks.
continue;
@@ -292,7 +272,7 @@ void onDbVersionMismatch(OperationContext* opCtx,
CancellationSource cancellationSource;
CancellationToken cancellationToken = cancellationSource.token();
dss->setDbMetadataRefreshFuture(
- asyncDbMetadataRefresh(opCtx, dbName, cancellationToken),
+ recoverRefreshDbVersion(opCtx, dbName, cancellationToken),
std::move(cancellationSource),
*dssLock);
dbMetadataRefreshFuture = dss->getDbMetadataRefreshFuture(*dssLock);
@@ -313,37 +293,43 @@ void onDbVersionMismatch(OperationContext* opCtx,
}
}
-// Return true if joins a shard version update/recover/refresh (in that case, all locks are dropped)
+/**
+ * Blocking method, which will wait for any concurrent operations that could change the shard
+ * version to complete (namely critical section and concurrent onShardVersionMismatch invocations).
+ *
+ * Returns 'true' if there were concurrent operations that had to be joined (in which case all locks
+ * will be dropped). If there were none, returns false and the locks continue to be held.
+ */
bool joinShardVersionOperation(OperationContext* opCtx,
CollectionShardingRuntime* csr,
boost::optional<Lock::DBLock>* dbLock,
boost::optional<Lock::CollectionLock>* collLock,
boost::optional<CollectionShardingRuntime::CSRLock>* csrLock) {
+ invariant(dbLock->has_value());
invariant(collLock->has_value());
invariant(csrLock->has_value());
- // If another thread is currently holding the critical section or the shard version future, it
- // will be necessary to wait on one of the two variables to finish the update/recover/refresh.
- auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx);
- auto critSecSignal =
- csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite);
+ if (auto critSecSignal =
+ csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)) {
+ csrLock->reset();
+ collLock->reset();
+ dbLock->reset();
+
+ uassertStatusOK(
+ OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal));
+
+ return true;
+ }
- if (inRecoverOrRefresh || critSecSignal) {
- // Drop the locks and wait for an ongoing shard version's recovery/refresh/update
+ if (auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx)) {
csrLock->reset();
collLock->reset();
dbLock->reset();
- if (critSecSignal) {
- uassertStatusOK(
- OperationShardingState::waitForCriticalSectionToComplete(opCtx, *critSecSignal));
- } else {
- try {
- inRecoverOrRefresh->get(opCtx);
- } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) {
- // The ongoing refresh has finished, although it was canceled by a
- // 'clearFilteringMetadata'.
- }
+ try {
+ inRecoverOrRefresh->get(opCtx);
+ } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) {
+ // The ongoing refresh has finished, although it was interrupted.
}
return true;
@@ -353,7 +339,7 @@ bool joinShardVersionOperation(OperationContext* opCtx,
}
SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext,
- const NamespaceString nss,
+ const NamespaceString& nss,
bool runRecover,
CancellationToken cancellationToken) {
auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor();
@@ -412,8 +398,6 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
if (!currentMetadata.allowMigrations()) {
boost::optional<SharedSemiFuture<void>> waitForMigrationAbort;
{
- // DBLock and CollectionLock must be used in order to avoid shard version
- // checks
Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX);
Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
@@ -449,7 +433,8 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
if (cancellationToken.isCanceled() &&
(status.isOK() || status == ErrorCodes::Interrupted)) {
uasserted(ErrorCodes::ShardVersionRefreshCanceled,
- "Shard version refresh canceled by a 'clearFilteringMetadata'");
+ "Shard version refresh canceled by an interruption, probably due to a "
+ "'clearFilteringMetadata'");
}
return status;
})
@@ -484,6 +469,7 @@ void onShardVersionMismatch(OperationContext* opCtx,
while (true) {
boost::optional<SharedSemiFuture<void>> inRecoverOrRefresh;
+
{
boost::optional<Lock::DBLock> dbLock;
boost::optional<Lock::CollectionLock> collLock;
@@ -491,17 +477,16 @@ void onShardVersionMismatch(OperationContext* opCtx,
collLock.emplace(opCtx, nss, MODE_IS);
auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
- boost::optional<CollectionShardingRuntime::CSRLock> csrLock =
- CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
- continue;
- }
+ if (shardVersionReceived) {
+ boost::optional<CollectionShardingRuntime::CSRLock> csrLock =
+ CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- auto metadata = csr->getCurrentMetadataIfKnown();
- if (metadata) {
- // Check if the current shard version is fresh enough
- if (shardVersionReceived) {
+ if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
+ continue;
+ }
+
+ if (auto metadata = csr->getCurrentMetadataIfKnown()) {
const auto currentShardVersion = metadata->getShardVersion();
// Don't need to remotely reload if the requested version is smaller than the
// known one. This means that the remote side is behind.
@@ -511,25 +496,26 @@ void onShardVersionMismatch(OperationContext* opCtx,
}
}
- csrLock.reset();
- csrLock.emplace(CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr));
-
- // If there is no ongoing shard version operation, initialize the RecoverRefreshThread
- // thread and associate it to the CSR.
- if (!joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
- // If the shard doesn't yet know its filtering metadata, recovery needs to be run
- const bool runRecover = metadata ? false : true;
- CancellationSource cancellationSource;
- CancellationToken cancellationToken = cancellationSource.token();
- csr->setShardVersionRecoverRefreshFuture(
- recoverRefreshShardVersion(
- opCtx->getServiceContext(), nss, runRecover, std::move(cancellationToken)),
- std::move(cancellationSource),
- *csrLock);
- inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx);
- } else {
+ boost::optional<CollectionShardingRuntime::CSRLock> csrLock =
+ CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
+
+ if (joinShardVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) {
continue;
}
+
+ // If we reached here, there were no ongoing critical sections or recoverRefresh running
+ // and we are holding the exclusive CSR lock.
+
+ // If the shard doesn't yet know its filtering metadata, recovery needs to be run
+ const bool runRecover = csr->getCurrentMetadataIfKnown() ? false : true;
+ CancellationSource cancellationSource;
+ CancellationToken cancellationToken = cancellationSource.token();
+ csr->setShardVersionRecoverRefreshFuture(
+ recoverRefreshShardVersion(
+ opCtx->getServiceContext(), nss, runRecover, std::move(cancellationToken)),
+ std::move(cancellationSource),
+ *csrLock);
+ inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx);
}
try {