summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2020-06-09 11:00:35 +0200
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-09 10:20:43 +0000
commitdd7e51f1a8512802829ac92921fed0b0c74f6548 (patch)
tree1df3be65cfcfc3a38db5a36ac36c205c65fe23f0 /src/mongo/db/s/shard_filtering_metadata_refresh.cpp
parent5147459e36582cf65a3c5fc315fb41d81f03b170 (diff)
downloadmongo-dd7e51f1a8512802829ac92921fed0b0c74f6548.tar.gz
SERVER-45983 Perform the shardVersion recovery and refresh on a separate thread from that of the user request
Diffstat (limited to 'src/mongo/db/s/shard_filtering_metadata_refresh.cpp')
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp134
1 files changed, 98 insertions, 36 deletions
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 9c936a3f859..60095ee0db0 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -85,6 +85,44 @@ void onDbVersionMismatch(OperationContext* opCtx,
} // namespace
+SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext,
+ const NamespaceString nss,
+ bool runRecover) {
+ return ExecutorFuture<void>(migrationutil::getMigrationUtilExecutor())
+ .then([=] {
+ ThreadClient tc("RecoverRefreshThread", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+ auto opCtx = tc->makeOperationContext();
+
+ ON_BLOCK_EXIT([&] {
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ // TODO (SERVER-48394): Views must not cause stale shard version
+ Lock::DBLock autoDb(opCtx.get(), nss.db(), MODE_IX);
+ Lock::CollectionLock collLock(opCtx.get(), nss, MODE_IX);
+ auto* const csr = CollectionShardingRuntime::get(opCtx.get(), nss);
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx.get(), csr);
+ csr->resetShardVersionRecoverRefreshFuture(csrLock);
+ if (runRecover) {
+ csr->exitCriticalSection(opCtx.get());
+ }
+ });
+
+ if (runRecover) {
+ auto* const replCoord = repl::ReplicationCoordinator::get(opCtx.get());
+ if (!replCoord->isReplEnabled() || replCoord->getMemberState().primary()) {
+ migrationutil::recoverMigrationCoordinations(opCtx.get(), nss);
+ }
+ }
+
+ forceShardFilteringMetadataRefresh(opCtx.get(), nss, true);
+ })
+ .semi()
+ .share();
+}
+
void onShardVersionMismatch(OperationContext* opCtx,
const NamespaceString& nss,
boost::optional<ChunkVersion> shardVersionReceived) {
@@ -102,9 +140,18 @@ void onShardVersionMismatch(OperationContext* opCtx,
"namespace"_attr = nss,
"shardVersionReceived"_attr = shardVersionReceived);
- bool runRecover;
while (true) {
+ // If another thread is currently holding the critical section or the shard version future,
+ // it will be necessary to wait on one of the following variables to finish the
+ // update/recover/refresh.
std::shared_ptr<Notification<void>> critSecSignal;
+ boost::optional<SharedSemiFuture<void>> inRecoverOrRefresh;
+
+ // Flag set to true if a recovery needs to be eventually performed
+ bool runRecover;
+
+ // Flag indicating wether the current thread has triggered a recover/refresh
+ bool triggeredRecoverRefresh = false;
{
// TODO (SERVER-48394): Views must not cause stale shard version
@@ -112,10 +159,15 @@ void onShardVersionMismatch(OperationContext* opCtx,
opCtx, nss, MODE_IS, AutoGetCollection::ViewMode::kViewsPermitted);
auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
+
+ inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx);
critSecSignal =
csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite);
- if (!critSecSignal) {
+
+ if (!inRecoverOrRefresh && !critSecSignal) {
const auto collDesc = csr->getCurrentMetadataIfKnown();
+
+ // Check if the current shard version is fresh enough
if (collDesc) {
if (shardVersionReceived) {
const auto currentShardVersion = collDesc->getShardVersion();
@@ -127,45 +179,43 @@ void onShardVersionMismatch(OperationContext* opCtx,
shardVersionReceived->majorVersion())
return;
}
+ }
- runRecover = false;
- break;
- } else {
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
- critSecSignal = csr->getCriticalSectionSignal(
- opCtx, ShardingMigrationCriticalSection::kWrite);
- if (!critSecSignal) {
+ runRecover = collDesc ? false : true;
+
+ // If the critical section is not busy and no recover/refresh is ongoing,
+ // initialize the RecoverRefreshThread thread and associate it to the CSR.
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
+
+ inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx);
+ critSecSignal =
+ csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite);
+
+ if (!inRecoverOrRefresh && !critSecSignal) {
+ if (runRecover) {
CollectionShardingRuntime::get(opCtx, nss)
->enterCriticalSectionCatchUpPhase(opCtx, csrLock);
- runRecover = true;
- break;
}
+
+ csr->setShardVersionRecoverRefreshFuture(
+ recoverRefreshShardVersion(opCtx->getServiceContext(), nss, runRecover),
+ csrLock);
+ inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx);
+ triggeredRecoverRefresh = true;
}
}
}
- invariant(critSecSignal);
- critSecSignal->get(opCtx);
- }
-
- ON_BLOCK_EXIT([&] {
- if (runRecover) {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- // Avoid using AutoGetDB since it can throw exceptions in ON_BLOCK_EXIT's destructor
- Lock::DBLock autoDb(opCtx, nss.db(), MODE_IX);
- Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
- CollectionShardingRuntime::get(opCtx, nss)->exitCriticalSection(opCtx);
- }
- });
-
- if (runRecover) {
- auto* const replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->isReplEnabled() || replCoord->getMemberState().primary()) {
- migrationutil::recoverMigrationCoordinations(opCtx, nss);
+ // Wait for an ongoing shard version's recovery/refresh/update
+ if (critSecSignal) {
+ critSecSignal->get(opCtx);
+ } else {
+ inRecoverOrRefresh->get(opCtx);
+ if (triggeredRecoverRefresh) {
+ return;
+ }
}
}
-
- forceShardFilteringMetadataRefresh(opCtx, nss, !shardVersionReceived);
}
ScopedShardVersionCriticalSection::ScopedShardVersionCriticalSection(OperationContext* opCtx,
@@ -173,7 +223,11 @@ ScopedShardVersionCriticalSection::ScopedShardVersionCriticalSection(OperationCo
: _opCtx(opCtx), _nss(std::move(nss)) {
while (true) {
+ // If another thread is currently holding the critical section or the shard version future,
+ // it will be necessary to wait on one of the following variables to finish the
+ // update/recover/refresh.
std::shared_ptr<Notification<void>> critSecSignal;
+ boost::optional<SharedSemiFuture<void>> inRecoverOrRefresh;
{
// This acquisition is performed with collection lock MODE_S in order to ensure that any
@@ -186,13 +240,15 @@ ScopedShardVersionCriticalSection::ScopedShardVersionCriticalSection(OperationCo
Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss);
+ inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(_opCtx);
critSecSignal =
csr->getCriticalSectionSignal(_opCtx, ShardingMigrationCriticalSection::kWrite);
- if (!critSecSignal) {
+ if (!inRecoverOrRefresh && !critSecSignal) {
auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
+ inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(_opCtx);
critSecSignal =
csr->getCriticalSectionSignal(_opCtx, ShardingMigrationCriticalSection::kWrite);
- if (!critSecSignal) {
+ if (!inRecoverOrRefresh && !critSecSignal) {
CollectionShardingRuntime::get(_opCtx, _nss)
->enterCriticalSectionCatchUpPhase(_opCtx, csrLock);
break;
@@ -200,11 +256,17 @@ ScopedShardVersionCriticalSection::ScopedShardVersionCriticalSection(OperationCo
}
}
- invariant(critSecSignal);
- critSecSignal->get(_opCtx);
+ // Wait for an ongoing shard version's recovery/refresh/update
+ if (critSecSignal) {
+ critSecSignal->get(opCtx);
+ } else {
+ inRecoverOrRefresh->get(opCtx);
+ }
}
- auto* const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ // Holding the critical section ensures that a shard version recover/refresh will be perfomed by
+ // just one thread at a time
+ auto* const replCoord = repl::ReplicationCoordinator::get(_opCtx);
if (!replCoord->isReplEnabled() || replCoord->getMemberState().primary()) {
migrationutil::recoverMigrationCoordinations(_opCtx, _nss);
}