diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2020-09-29 10:30:39 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-29 11:19:17 +0000 |
commit | dd3b30ae9b5d4d4a8fc4f51dd835265d4cfd67a9 (patch) | |
tree | a3aba60743f5ec05d30f090c739ab0a0f46761c8 | |
parent | c27100db0125e1792766497a428b28bef2a6135d (diff) | |
download | mongo-dd3b30ae9b5d4d4a8fc4f51dd835265d4cfd67a9.tar.gz |
SERVER-51101 Futurify ShardingMigrationCriticalSection
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/set_shard_version_command.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_migration_critical_section.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_migration_critical_section.h | 12 | ||||
-rw-r--r-- | src/mongo/s/stale_exception.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/stale_exception.h | 4 |
9 files changed, 56 insertions, 37 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index 67e45e9ea11..c5c264af4c7 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -171,7 +171,7 @@ void CollectionShardingRuntime::exitCriticalSection(OperationContext* opCtx) { _critSec.exitCriticalSection(); } -std::shared_ptr<Notification<void>> CollectionShardingRuntime::getCriticalSectionSignal( +boost::optional<SharedSemiFuture<void>> CollectionShardingRuntime::getCriticalSectionSignal( OperationContext* opCtx, ShardingMigrationCriticalSection::Operation op) { auto csrLock = CSRLock::lockShared(opCtx, this); return _critSec.getSignal(op); diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h index d6e16f0f024..9ba49cb282f 100644 --- a/src/mongo/db/s/collection_sharding_runtime.h +++ b/src/mongo/db/s/collection_sharding_runtime.h @@ -137,7 +137,7 @@ public: * * This method internally acquires the CSRLock in IS to wait for eventual ongoing operations. */ - std::shared_ptr<Notification<void>> getCriticalSectionSignal( + boost::optional<SharedSemiFuture<void>> getCriticalSectionSignal( OperationContext* opCtx, ShardingMigrationCriticalSection::Operation op); /** diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 21e81d2888d..f8dc5077eae 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -145,12 +145,14 @@ bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationCont invariant(!opCtx->lockState()->isLocked()); if (_migrationCriticalSectionSignal) { - _migrationCriticalSectionSignal->waitFor( - opCtx, - opCtx->hasDeadline() - ? std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMigrationCriticalSection) - : kMaxWaitForMigrationCriticalSection); - _migrationCriticalSectionSignal = nullptr; + auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + + std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMigrationCriticalSection); + + opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { + _migrationCriticalSectionSignal->wait(opCtx); + }); + + _migrationCriticalSectionSignal = boost::none; return true; } @@ -158,7 +160,7 @@ bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationCont } void OperationShardingState::setMigrationCriticalSectionSignal( - std::shared_ptr<Notification<void>> critSecSignal) { + boost::optional<SharedSemiFuture<void>> critSecSignal) { invariant(critSecSignal); _migrationCriticalSectionSignal = std::move(critSecSignal); } @@ -168,12 +170,14 @@ bool OperationShardingState::waitForMovePrimaryCriticalSectionSignal(OperationCo invariant(!opCtx->lockState()->isLocked()); if (_movePrimaryCriticalSectionSignal) { - _movePrimaryCriticalSectionSignal->waitFor( - opCtx, - opCtx->hasDeadline() ? std::min(opCtx->getRemainingMaxTimeMillis(), - kMaxWaitForMovePrimaryCriticalSection) - : kMaxWaitForMovePrimaryCriticalSection); - _movePrimaryCriticalSectionSignal = nullptr; + auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + + std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMovePrimaryCriticalSection); + + opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { + _movePrimaryCriticalSectionSignal->wait(opCtx); + }); + + _movePrimaryCriticalSectionSignal = boost::none; return true; } @@ -181,7 +185,7 @@ bool OperationShardingState::waitForMovePrimaryCriticalSectionSignal(OperationCo } void OperationShardingState::setMovePrimaryCriticalSectionSignal( - std::shared_ptr<Notification<void>> critSecSignal) { + boost::optional<SharedSemiFuture<void>> critSecSignal) { invariant(critSecSignal); _movePrimaryCriticalSectionSignal = std::move(critSecSignal); } diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index 3a645419a79..41c158ba8fd 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -34,7 +34,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/chunk_version.h" #include "mongo/s/database_version_gen.h" -#include "mongo/util/concurrency/notification.h" +#include "mongo/util/future.h" #include "mongo/util/string_map.h" namespace mongo { @@ -145,7 +145,7 @@ public: * migration for the namespace and that it would be prudent to wait for the critical section to * complete before retrying so the router doesn't make wasteful requests. */ - void setMigrationCriticalSectionSignal(std::shared_ptr<Notification<void>> critSecSignal); + void setMigrationCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal); /** * This call is a no op if there isn't a currently active movePrimary critical section. @@ -162,7 +162,7 @@ public: * movePrimary for the namespace and that it would be prudent to wait for the critical section * to complete before retrying so the router doesn't make wasteful requests. */ - void setMovePrimaryCriticalSectionSignal(std::shared_ptr<Notification<void>> critSecSignal); + void setMovePrimaryCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal); /** * Stores the failed status in _shardingOperationFailedStatus. @@ -195,12 +195,12 @@ private: // This value will only be non-null if version check during the operation execution failed due // to stale version and there was a migration for that namespace, which was in critical section. - std::shared_ptr<Notification<void>> _migrationCriticalSectionSignal; + boost::optional<SharedSemiFuture<void>> _migrationCriticalSectionSignal; // This value will only be non-null if version check during the operation execution failed due // to stale version and there was a movePrimary for that namespace, which was in critical // section. - std::shared_ptr<Notification<void>> _movePrimaryCriticalSectionSignal; + boost::optional<SharedSemiFuture<void>> _movePrimaryCriticalSectionSignal; // This value can only be set when a rerouting exception occurs during a write operation, and // must be handled before this object gets destructed. diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index aba2cd2f632..68a8ebbb1c8 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -187,6 +187,8 @@ public: // Step 5 + const auto kTenSeconds = Milliseconds(10000); + // TODO: Refactor all of this if (requestedVersion < collectionShardVersion && requestedVersion.epoch() == collectionShardVersion.epoch()) { @@ -196,7 +198,12 @@ public: collLock.reset(); autoDb.reset(); LOGV2(22056, "waiting till out of critical section"); - critSecSignal->waitFor(opCtx, Seconds(10)); + auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + + std::min(opCtx->getRemainingMaxTimeMillis(), kTenSeconds); + + opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { + critSecSignal->wait(opCtx); + }); } errmsg = str::stream() << "shard global version for collection is higher " @@ -217,7 +224,13 @@ public: collLock.reset(); autoDb.reset(); LOGV2(22057, "waiting till out of critical section"); - critSecSignal->waitFor(opCtx, Seconds(10)); + + auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + + std::min(opCtx->getRemainingMaxTimeMillis(), kTenSeconds); + + opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { + critSecSignal->wait(opCtx); + }); } // need authoritative for first look diff --git a/src/mongo/db/s/sharding_migration_critical_section.cpp b/src/mongo/db/s/sharding_migration_critical_section.cpp index bf1c8cc944a..10b60790ffd 100644 --- a/src/mongo/db/s/sharding_migration_critical_section.cpp +++ b/src/mongo/db/s/sharding_migration_critical_section.cpp @@ -41,7 +41,7 @@ ShardingMigrationCriticalSection::~ShardingMigrationCriticalSection() { void ShardingMigrationCriticalSection::enterCriticalSectionCatchUpPhase() { invariant(!_critSecSignal); - _critSecSignal = std::make_shared<Notification<void>>(); + _critSecSignal.emplace(); _readsShouldWaitOnCritSec = false; } @@ -52,20 +52,20 @@ void ShardingMigrationCriticalSection::enterCriticalSectionCommitPhase() { void ShardingMigrationCriticalSection::exitCriticalSection() { if (_critSecSignal) { - _critSecSignal->set(); + _critSecSignal->emplaceValue(); _critSecSignal.reset(); } } -std::shared_ptr<Notification<void>> ShardingMigrationCriticalSection::getSignal( +boost::optional<SharedSemiFuture<void>> ShardingMigrationCriticalSection::getSignal( Operation op) const { if (!_critSecSignal) - return nullptr; + return boost::none; if (op == kWrite || _readsShouldWaitOnCritSec) - return _critSecSignal; + return _critSecSignal->getFuture(); - return nullptr; + return boost::none; } } // namespace mongo diff --git a/src/mongo/db/s/sharding_migration_critical_section.h b/src/mongo/db/s/sharding_migration_critical_section.h index 0dbc8fd4df4..21a5ed91d9b 100644 --- a/src/mongo/db/s/sharding_migration_critical_section.h +++ b/src/mongo/db/s/sharding_migration_critical_section.h @@ -29,7 +29,9 @@ #pragma once -#include "mongo/util/concurrency/notification.h" +#include <boost/optional.hpp> + +#include "mongo/util/future.h" namespace mongo { @@ -68,17 +70,17 @@ public: void exitCriticalSection(); /** - * Retrieves a critical section notification to wait on. Will return nullptr if the migration is + * Retrieves a critical section future to wait on. Will return boost::none if the migration is * not yet in the critical section or if the caller is a reader and the migration is not yet in * the commit phase. */ enum Operation { kRead, kWrite }; - std::shared_ptr<Notification<void>> getSignal(Operation op) const; + boost::optional<SharedSemiFuture<void>> getSignal(Operation op) const; private: - // Whether the migration source is in a critical section. Tracked as a shared pointer so that + // Whether the migration source is in a critical section. Tracked as a shared promise so that // callers don't have to hold metadata locks in order to wait on it. - std::shared_ptr<Notification<void>> _critSecSignal; + boost::optional<SharedPromise<void>> _critSecSignal; // Used to delay blocking reads up until the commit of the metadata on the config server needs // to happen. This allows the shard to serve reads up until the config server metadata update diff --git a/src/mongo/s/stale_exception.cpp b/src/mongo/s/stale_exception.cpp index 47993e5eb68..ca834a9aaf6 100644 --- a/src/mongo/s/stale_exception.cpp +++ b/src/mongo/s/stale_exception.cpp @@ -52,7 +52,7 @@ StaleConfigInfo::StaleConfigInfo(NamespaceString nss, ChunkVersion received, boost::optional<ChunkVersion> wanted, ShardId shardId, - std::shared_ptr<Notification<void>> criticalSectionSignal) + boost::optional<SharedSemiFuture<void>> criticalSectionSignal) : _nss(std::move(nss)), _received(received), _wanted(wanted), diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index 9850c40ad11..94150894938 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -45,7 +45,7 @@ public: ChunkVersion received, boost::optional<ChunkVersion> wanted, ShardId shardId, - std::shared_ptr<Notification<void>> criticalSectionSignal = nullptr); + boost::optional<SharedSemiFuture<void>> = boost::none); const auto& getNss() const { return _nss; @@ -79,7 +79,7 @@ private: // This signal does not get serialized and therefore does not get propagated // to the router. - std::shared_ptr<Notification<void>> _criticalSectionSignal; + boost::optional<SharedSemiFuture<void>> _criticalSectionSignal; }; using StaleConfigException = ExceptionFor<ErrorCodes::StaleConfig>; |