summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2020-09-29 10:30:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-29 11:19:17 +0000
commitdd3b30ae9b5d4d4a8fc4f51dd835265d4cfd67a9 (patch)
treea3aba60743f5ec05d30f090c739ab0a0f46761c8
parentc27100db0125e1792766497a428b28bef2a6135d (diff)
downloadmongo-dd3b30ae9b5d4d4a8fc4f51dd835265d4cfd67a9.tar.gz
SERVER-51101 Futurify ShardingMigrationCriticalSection
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp2
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.h2
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp32
-rw-r--r--src/mongo/db/s/operation_sharding_state.h10
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp17
-rw-r--r--src/mongo/db/s/sharding_migration_critical_section.cpp12
-rw-r--r--src/mongo/db/s/sharding_migration_critical_section.h12
-rw-r--r--src/mongo/s/stale_exception.cpp2
-rw-r--r--src/mongo/s/stale_exception.h4
9 files changed, 56 insertions, 37 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index 67e45e9ea11..c5c264af4c7 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -171,7 +171,7 @@ void CollectionShardingRuntime::exitCriticalSection(OperationContext* opCtx) {
_critSec.exitCriticalSection();
}
-std::shared_ptr<Notification<void>> CollectionShardingRuntime::getCriticalSectionSignal(
+boost::optional<SharedSemiFuture<void>> CollectionShardingRuntime::getCriticalSectionSignal(
OperationContext* opCtx, ShardingMigrationCriticalSection::Operation op) {
auto csrLock = CSRLock::lockShared(opCtx, this);
return _critSec.getSignal(op);
diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h
index d6e16f0f024..9ba49cb282f 100644
--- a/src/mongo/db/s/collection_sharding_runtime.h
+++ b/src/mongo/db/s/collection_sharding_runtime.h
@@ -137,7 +137,7 @@ public:
*
* This method internally acquires the CSRLock in IS to wait for eventual ongoing operations.
*/
- std::shared_ptr<Notification<void>> getCriticalSectionSignal(
+ boost::optional<SharedSemiFuture<void>> getCriticalSectionSignal(
OperationContext* opCtx, ShardingMigrationCriticalSection::Operation op);
/**
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 21e81d2888d..f8dc5077eae 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -145,12 +145,14 @@ bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationCont
invariant(!opCtx->lockState()->isLocked());
if (_migrationCriticalSectionSignal) {
- _migrationCriticalSectionSignal->waitFor(
- opCtx,
- opCtx->hasDeadline()
- ? std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMigrationCriticalSection)
- : kMaxWaitForMigrationCriticalSection);
- _migrationCriticalSectionSignal = nullptr;
+ auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() +
+ std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMigrationCriticalSection);
+
+ opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
+ _migrationCriticalSectionSignal->wait(opCtx);
+ });
+
+ _migrationCriticalSectionSignal = boost::none;
return true;
}
@@ -158,7 +160,7 @@ bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationCont
}
void OperationShardingState::setMigrationCriticalSectionSignal(
- std::shared_ptr<Notification<void>> critSecSignal) {
+ boost::optional<SharedSemiFuture<void>> critSecSignal) {
invariant(critSecSignal);
_migrationCriticalSectionSignal = std::move(critSecSignal);
}
@@ -168,12 +170,14 @@ bool OperationShardingState::waitForMovePrimaryCriticalSectionSignal(OperationCo
invariant(!opCtx->lockState()->isLocked());
if (_movePrimaryCriticalSectionSignal) {
- _movePrimaryCriticalSectionSignal->waitFor(
- opCtx,
- opCtx->hasDeadline() ? std::min(opCtx->getRemainingMaxTimeMillis(),
- kMaxWaitForMovePrimaryCriticalSection)
- : kMaxWaitForMovePrimaryCriticalSection);
- _movePrimaryCriticalSectionSignal = nullptr;
+ auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() +
+ std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMovePrimaryCriticalSection);
+
+ opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
+ _movePrimaryCriticalSectionSignal->wait(opCtx);
+ });
+
+ _movePrimaryCriticalSectionSignal = boost::none;
return true;
}
@@ -181,7 +185,7 @@ bool OperationShardingState::waitForMovePrimaryCriticalSectionSignal(OperationCo
}
void OperationShardingState::setMovePrimaryCriticalSectionSignal(
- std::shared_ptr<Notification<void>> critSecSignal) {
+ boost::optional<SharedSemiFuture<void>> critSecSignal) {
invariant(critSecSignal);
_movePrimaryCriticalSectionSignal = std::move(critSecSignal);
}
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 3a645419a79..41c158ba8fd 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -34,7 +34,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/database_version_gen.h"
-#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/future.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -145,7 +145,7 @@ public:
* migration for the namespace and that it would be prudent to wait for the critical section to
* complete before retrying so the router doesn't make wasteful requests.
*/
- void setMigrationCriticalSectionSignal(std::shared_ptr<Notification<void>> critSecSignal);
+ void setMigrationCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal);
/**
* This call is a no op if there isn't a currently active movePrimary critical section.
@@ -162,7 +162,7 @@ public:
* movePrimary for the namespace and that it would be prudent to wait for the critical section
* to complete before retrying so the router doesn't make wasteful requests.
*/
- void setMovePrimaryCriticalSectionSignal(std::shared_ptr<Notification<void>> critSecSignal);
+ void setMovePrimaryCriticalSectionSignal(boost::optional<SharedSemiFuture<void>> critSecSignal);
/**
* Stores the failed status in _shardingOperationFailedStatus.
@@ -195,12 +195,12 @@ private:
// This value will only be non-null if version check during the operation execution failed due
// to stale version and there was a migration for that namespace, which was in critical section.
- std::shared_ptr<Notification<void>> _migrationCriticalSectionSignal;
+ boost::optional<SharedSemiFuture<void>> _migrationCriticalSectionSignal;
// This value will only be non-null if version check during the operation execution failed due
// to stale version and there was a movePrimary for that namespace, which was in critical
// section.
- std::shared_ptr<Notification<void>> _movePrimaryCriticalSectionSignal;
+ boost::optional<SharedSemiFuture<void>> _movePrimaryCriticalSectionSignal;
// This value can only be set when a rerouting exception occurs during a write operation, and
// must be handled before this object gets destructed.
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index aba2cd2f632..68a8ebbb1c8 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -187,6 +187,8 @@ public:
// Step 5
+ const auto kTenSeconds = Milliseconds(10000);
+
// TODO: Refactor all of this
if (requestedVersion < collectionShardVersion &&
requestedVersion.epoch() == collectionShardVersion.epoch()) {
@@ -196,7 +198,12 @@ public:
collLock.reset();
autoDb.reset();
LOGV2(22056, "waiting till out of critical section");
- critSecSignal->waitFor(opCtx, Seconds(10));
+ auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() +
+ std::min(opCtx->getRemainingMaxTimeMillis(), kTenSeconds);
+
+ opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
+ critSecSignal->wait(opCtx);
+ });
}
errmsg = str::stream() << "shard global version for collection is higher "
@@ -217,7 +224,13 @@ public:
collLock.reset();
autoDb.reset();
LOGV2(22057, "waiting till out of critical section");
- critSecSignal->waitFor(opCtx, Seconds(10));
+
+ auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() +
+ std::min(opCtx->getRemainingMaxTimeMillis(), kTenSeconds);
+
+ opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
+ critSecSignal->wait(opCtx);
+ });
}
// need authoritative for first look
diff --git a/src/mongo/db/s/sharding_migration_critical_section.cpp b/src/mongo/db/s/sharding_migration_critical_section.cpp
index bf1c8cc944a..10b60790ffd 100644
--- a/src/mongo/db/s/sharding_migration_critical_section.cpp
+++ b/src/mongo/db/s/sharding_migration_critical_section.cpp
@@ -41,7 +41,7 @@ ShardingMigrationCriticalSection::~ShardingMigrationCriticalSection() {
void ShardingMigrationCriticalSection::enterCriticalSectionCatchUpPhase() {
invariant(!_critSecSignal);
- _critSecSignal = std::make_shared<Notification<void>>();
+ _critSecSignal.emplace();
_readsShouldWaitOnCritSec = false;
}
@@ -52,20 +52,20 @@ void ShardingMigrationCriticalSection::enterCriticalSectionCommitPhase() {
void ShardingMigrationCriticalSection::exitCriticalSection() {
if (_critSecSignal) {
- _critSecSignal->set();
+ _critSecSignal->emplaceValue();
_critSecSignal.reset();
}
}
-std::shared_ptr<Notification<void>> ShardingMigrationCriticalSection::getSignal(
+boost::optional<SharedSemiFuture<void>> ShardingMigrationCriticalSection::getSignal(
Operation op) const {
if (!_critSecSignal)
- return nullptr;
+ return boost::none;
if (op == kWrite || _readsShouldWaitOnCritSec)
- return _critSecSignal;
+ return _critSecSignal->getFuture();
- return nullptr;
+ return boost::none;
}
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_migration_critical_section.h b/src/mongo/db/s/sharding_migration_critical_section.h
index 0dbc8fd4df4..21a5ed91d9b 100644
--- a/src/mongo/db/s/sharding_migration_critical_section.h
+++ b/src/mongo/db/s/sharding_migration_critical_section.h
@@ -29,7 +29,9 @@
#pragma once
-#include "mongo/util/concurrency/notification.h"
+#include <boost/optional.hpp>
+
+#include "mongo/util/future.h"
namespace mongo {
@@ -68,17 +70,17 @@ public:
void exitCriticalSection();
/**
- * Retrieves a critical section notification to wait on. Will return nullptr if the migration is
+ * Retrieves a critical section future to wait on. Will return boost::none if the migration is
* not yet in the critical section or if the caller is a reader and the migration is not yet in
* the commit phase.
*/
enum Operation { kRead, kWrite };
- std::shared_ptr<Notification<void>> getSignal(Operation op) const;
+ boost::optional<SharedSemiFuture<void>> getSignal(Operation op) const;
private:
- // Whether the migration source is in a critical section. Tracked as a shared pointer so that
+ // Whether the migration source is in a critical section. Tracked as a shared promise so that
// callers don't have to hold metadata locks in order to wait on it.
- std::shared_ptr<Notification<void>> _critSecSignal;
+ boost::optional<SharedPromise<void>> _critSecSignal;
// Used to delay blocking reads up until the commit of the metadata on the config server needs
// to happen. This allows the shard to serve reads up until the config server metadata update
diff --git a/src/mongo/s/stale_exception.cpp b/src/mongo/s/stale_exception.cpp
index 47993e5eb68..ca834a9aaf6 100644
--- a/src/mongo/s/stale_exception.cpp
+++ b/src/mongo/s/stale_exception.cpp
@@ -52,7 +52,7 @@ StaleConfigInfo::StaleConfigInfo(NamespaceString nss,
ChunkVersion received,
boost::optional<ChunkVersion> wanted,
ShardId shardId,
- std::shared_ptr<Notification<void>> criticalSectionSignal)
+ boost::optional<SharedSemiFuture<void>> criticalSectionSignal)
: _nss(std::move(nss)),
_received(received),
_wanted(wanted),
diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h
index 9850c40ad11..94150894938 100644
--- a/src/mongo/s/stale_exception.h
+++ b/src/mongo/s/stale_exception.h
@@ -45,7 +45,7 @@ public:
ChunkVersion received,
boost::optional<ChunkVersion> wanted,
ShardId shardId,
- std::shared_ptr<Notification<void>> criticalSectionSignal = nullptr);
+ boost::optional<SharedSemiFuture<void>> = boost::none);
const auto& getNss() const {
return _nss;
@@ -79,7 +79,7 @@ private:
// This signal does not get serialized and therefore does not get propagated
// to the router.
- std::shared_ptr<Notification<void>> _criticalSectionSignal;
+ boost::optional<SharedSemiFuture<void>> _criticalSectionSignal;
};
using StaleConfigException = ExceptionFor<ErrorCodes::StaleConfig>;