summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_access_blocker.h
diff options
context:
space:
mode:
authorAndrew Shuvalov <andrew.shuvalov@mongodb.com>2020-12-21 17:39:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-11 23:40:11 +0000
commit1fca5817faaa9067d26dcf5261d4ea85bf7507a6 (patch)
tree78ff21435d2da0f8b14188b061db3a0e1b418a7c /src/mongo/db/repl/tenant_migration_access_blocker.h
parent69ff284997164b653421175dd0d76de8e5e1f310 (diff)
downloadmongo-1fca5817faaa9067d26dcf5261d4ea85bf7507a6.tar.gz
SERVER-52783: Make tenant_migration_donor_util::checkIfCanReadOrBlock return a Future, for now still synchronous
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_access_blocker.h')
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker.h85
1 files changed, 75 insertions, 10 deletions
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h
index d075079f0b4..a0886958920 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker.h
@@ -34,9 +34,62 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/executor/task_executor.h"
namespace mongo {
+// Safe wrapper for a SharedPromise that allows setting the promise more than once.
+template <typename Payload>
+class RepeatableSharedPromise {
+ using payload_unless_void =
+ std::conditional_t<std::is_void_v<Payload>, future_details::FakeVoid, Payload>;
+
+public:
+ RepeatableSharedPromise(payload_unless_void valueAtTermination)
+ : _sharedPromise(std::make_unique<SharedPromise<Payload>>()),
+ _valueAtTermination(valueAtTermination) {
+ static_assert(!std::is_void_v<Payload>);
+ }
+
+ RepeatableSharedPromise()
+ : _sharedPromise(std::make_unique<SharedPromise<Payload>>()), _valueAtTermination({}) {
+ static_assert(std::is_void_v<Payload>);
+ }
+
+ inline ~RepeatableSharedPromise();
+
+ SharedSemiFuture<Payload> getFuture() {
+ stdx::unique_lock<Latch> ul(_mutex);
+ return _sharedPromise->getFuture();
+ }
+
+ // Set promise with value for non-void Payload or with Status.
+ // In case of void Payload always use Status with or without error code.
+ void setFrom(StatusOrStatusWith<const Payload> sosw) noexcept {
+ stdx::unique_lock<Latch> ul(_mutex);
+ _sharedPromise->setFrom(std::move(sosw));
+ // Promise can be set only once, replace it with a new one.
+ _sharedPromise = std::make_unique<SharedPromise<Payload>>();
+ }
+
+private:
+ mutable Mutex _mutex;
+ std::unique_ptr<SharedPromise<Payload>> _sharedPromise;
+ // In destructor, set the final payload value to this.
+ const payload_unless_void _valueAtTermination;
+};
+
+template <typename Payload>
+inline RepeatableSharedPromise<Payload>::~RepeatableSharedPromise() {
+ _sharedPromise->setFrom(StatusOrStatusWith<Payload>(_valueAtTermination));
+}
+
+template <>
+inline RepeatableSharedPromise<void>::~RepeatableSharedPromise() {
+ _sharedPromise->setFrom(Status::OK());
+}
+
+
/**
* The TenantMigrationAccessBlocker is used to block and eventually reject reads and writes to a
* database while the Atlas Serverless tenant that owns the database is being migrated from this
@@ -66,7 +119,7 @@ namespace mongo {
* checkIfCanWriteOrBlock returns successfully and the write is retried in the loop). This loop is
* used because writes must not block after being assigned an OpTime but before committing.
*
- * Reads with afterClusterTime or atClusterTime call checkIfCanReadOrBlock at some point after
+ * Reads with afterClusterTime or atClusterTime call getCanReadFuture at some point after
* waiting for readConcern, that is, after waiting to reach their clusterTime, which includes
* waiting for all earlier oplog holes to be filled.
*
@@ -89,10 +142,10 @@ namespace mongo {
* "blockTimestamp".
*
* At this point:
- * - Reads on the node that have already passed checkIfCanReadOrBlock must have a clusterTime before
+ * - Reads on the node that have already passed getCanReadFuture must have a clusterTime before
* the blockTimestamp, since the write at blockTimestamp hasn't committed yet (i.e., there's still
* an oplog hole at blockTimestamp).
- * - Reads on the node that have not yet passed checkIfCanReadOrBlock will end up blocking.
+ * - Reads on the node that have not yet passed getCanReadFuture will end up blocking.
*
* If the "start blocking" write aborts or the write rolls back via replication rollback, the node
* calls rollBackStartBlocking.
@@ -120,10 +173,7 @@ public:
TenantMigrationAccessBlocker(ServiceContext* serviceContext,
std::string tenantId,
- std::string recipientConnString)
- : _serviceContext(serviceContext),
- _tenantId(std::move(tenantId)),
- _recipientConnString(std::move(recipientConnString)) {}
+ std::string recipientConnString);
//
// Called by all writes and reads against the database.
@@ -133,8 +183,7 @@ public:
Status waitUntilCommittedOrAborted(OperationContext* opCtx);
void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx);
- void checkIfCanDoClusterTimeReadOrBlock(OperationContext* opCtx,
- const Timestamp& readTimestamp);
+ SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx);
//
// Called while donating this database.
@@ -165,14 +214,27 @@ public:
SharedSemiFuture<void> onCompletion();
+ std::shared_ptr<executor::TaskExecutor> getAsyncBlockingOperationsExecutor() {
+ return _asyncBlockingOperationsExecutor;
+ }
+
void appendInfoForServerStatus(BSONObjBuilder* builder) const;
std::string stateToString(State state) const;
+ // Returns structured info with current tenant ID and connection string.
+ BSONObj getDebugInfo() const;
+
private:
void _onMajorityCommitCommitOpTime(stdx::unique_lock<Latch>& lk);
void _onMajorityCommitAbortOpTime(stdx::unique_lock<Latch>& lk);
+ void _lockAsyncExecutorInstance(ServiceContext* serviceContext);
+
+ // Helper for the method 'getCanReadFuture()'.
+ SharedSemiFuture<void> _getCanDoClusterTimeReadFuture(OperationContext* opCtx,
+ Timestamp readTimestamp);
+
ServiceContext* _serviceContext;
const std::string _tenantId;
const std::string _recipientConnString;
@@ -186,8 +248,11 @@ private:
boost::optional<repl::OpTime> _commitOpTime;
boost::optional<repl::OpTime> _abortOpTime;
- stdx::condition_variable _transitionOutOfBlockingCV;
SharedPromise<void> _completionPromise;
+
+ RepeatableSharedPromise<void> _transitionOutOfBlockingPromise;
+
+ std::shared_ptr<executor::TaskExecutor> _asyncBlockingOperationsExecutor;
};
} // namespace mongo