diff options
author | Andrew Shuvalov <andrew.shuvalov@mongodb.com> | 2020-12-21 17:39:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-11 23:40:11 +0000 |
commit | 1fca5817faaa9067d26dcf5261d4ea85bf7507a6 (patch) | |
tree | 78ff21435d2da0f8b14188b061db3a0e1b418a7c /src/mongo/db/repl/tenant_migration_access_blocker.h | |
parent | 69ff284997164b653421175dd0d76de8e5e1f310 (diff) | |
download | mongo-1fca5817faaa9067d26dcf5261d4ea85bf7507a6.tar.gz |
SERVER-52783: Make tenant_migration_donor_util::checkIfCanReadOrBlock return a Future, for now still synchronous
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_access_blocker.h')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_access_blocker.h | 85 |
1 files changed, 75 insertions, 10 deletions
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h index d075079f0b4..a0886958920 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker.h @@ -34,9 +34,62 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" +#include "mongo/executor/task_executor.h" namespace mongo { +// Safe wrapper for a SharedPromise that allows setting the promise more than once. +template <typename Payload> +class RepeatableSharedPromise { + using payload_unless_void = + std::conditional_t<std::is_void_v<Payload>, future_details::FakeVoid, Payload>; + +public: + RepeatableSharedPromise(payload_unless_void valueAtTermination) + : _sharedPromise(std::make_unique<SharedPromise<Payload>>()), + _valueAtTermination(valueAtTermination) { + static_assert(!std::is_void_v<Payload>); + } + + RepeatableSharedPromise() + : _sharedPromise(std::make_unique<SharedPromise<Payload>>()), _valueAtTermination({}) { + static_assert(std::is_void_v<Payload>); + } + + inline ~RepeatableSharedPromise(); + + SharedSemiFuture<Payload> getFuture() { + stdx::unique_lock<Latch> ul(_mutex); + return _sharedPromise->getFuture(); + } + + // Set promise with value for non-void Payload or with Status. + // In case of void Payload always use Status with or without error code. + void setFrom(StatusOrStatusWith<const Payload> sosw) noexcept { + stdx::unique_lock<Latch> ul(_mutex); + _sharedPromise->setFrom(std::move(sosw)); + // Promise can be set only once, replace it with a new one. + _sharedPromise = std::make_unique<SharedPromise<Payload>>(); + } + +private: + mutable Mutex _mutex; + std::unique_ptr<SharedPromise<Payload>> _sharedPromise; + // In destructor, set the final payload value to this. + const payload_unless_void _valueAtTermination; +}; + +template <typename Payload> +inline RepeatableSharedPromise<Payload>::~RepeatableSharedPromise() { + _sharedPromise->setFrom(StatusOrStatusWith<Payload>(_valueAtTermination)); +} + +template <> +inline RepeatableSharedPromise<void>::~RepeatableSharedPromise() { + _sharedPromise->setFrom(Status::OK()); +} + + /** * The TenantMigrationAccessBlocker is used to block and eventually reject reads and writes to a * database while the Atlas Serverless tenant that owns the database is being migrated from this @@ -66,7 +119,7 @@ namespace mongo { * checkIfCanWriteOrBlock returns successfully and the write is retried in the loop). This loop is * used because writes must not block after being assigned an OpTime but before committing. * - * Reads with afterClusterTime or atClusterTime call checkIfCanReadOrBlock at some point after + * Reads with afterClusterTime or atClusterTime call getCanReadFuture at some point after * waiting for readConcern, that is, after waiting to reach their clusterTime, which includes * waiting for all earlier oplog holes to be filled. * @@ -89,10 +142,10 @@ namespace mongo { * "blockTimestamp". * * At this point: - * - Reads on the node that have already passed checkIfCanReadOrBlock must have a clusterTime before + * - Reads on the node that have already passed getCanReadFuture must have a clusterTime before * the blockTimestamp, since the write at blockTimestamp hasn't committed yet (i.e., there's still * an oplog hole at blockTimestamp). - * - Reads on the node that have not yet passed checkIfCanReadOrBlock will end up blocking. + * - Reads on the node that have not yet passed getCanReadFuture will end up blocking. * * If the "start blocking" write aborts or the write rolls back via replication rollback, the node * calls rollBackStartBlocking. @@ -120,10 +173,7 @@ public: TenantMigrationAccessBlocker(ServiceContext* serviceContext, std::string tenantId, - std::string recipientConnString) - : _serviceContext(serviceContext), - _tenantId(std::move(tenantId)), - _recipientConnString(std::move(recipientConnString)) {} + std::string recipientConnString); // // Called by all writes and reads against the database. @@ -133,8 +183,7 @@ public: Status waitUntilCommittedOrAborted(OperationContext* opCtx); void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx); - void checkIfCanDoClusterTimeReadOrBlock(OperationContext* opCtx, - const Timestamp& readTimestamp); + SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx); // // Called while donating this database. @@ -165,14 +214,27 @@ public: SharedSemiFuture<void> onCompletion(); + std::shared_ptr<executor::TaskExecutor> getAsyncBlockingOperationsExecutor() { + return _asyncBlockingOperationsExecutor; + } + void appendInfoForServerStatus(BSONObjBuilder* builder) const; std::string stateToString(State state) const; + // Returns structured info with current tenant ID and connection string. + BSONObj getDebugInfo() const; + private: void _onMajorityCommitCommitOpTime(stdx::unique_lock<Latch>& lk); void _onMajorityCommitAbortOpTime(stdx::unique_lock<Latch>& lk); + void _lockAsyncExecutorInstance(ServiceContext* serviceContext); + + // Helper for the method 'getCanReadFuture()'. + SharedSemiFuture<void> _getCanDoClusterTimeReadFuture(OperationContext* opCtx, + Timestamp readTimestamp); + ServiceContext* _serviceContext; const std::string _tenantId; const std::string _recipientConnString; @@ -186,8 +248,11 @@ private: boost::optional<repl::OpTime> _commitOpTime; boost::optional<repl::OpTime> _abortOpTime; - stdx::condition_variable _transitionOutOfBlockingCV; SharedPromise<void> _completionPromise; + + RepeatableSharedPromise<void> _transitionOutOfBlockingPromise; + + std::shared_ptr<executor::TaskExecutor> _asyncBlockingOperationsExecutor; }; } // namespace mongo |