summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2021-04-01 15:42:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-12 18:09:32 +0000
commit74492a41157f8035ed658c75c006a784f55592e3 (patch)
tree63fe6ae55fa7e524b0b8ac1d1b13b2fe79cce2d5
parentf19be01c6b586270b3b5d0b867537e9ff1a903b2 (diff)
downloadmongo-74492a41157f8035ed658c75c006a784f55592e3.tar.gz
SERVER-55193 Support back-to-back tenant migrations
(cherry picked from commit 7c4fdf48f8882818e778c3c2931b0e24aa99711d)
-rw-r--r--jstests/replsets/tenant_migration_retryable_write_retry_on_recipient.js4
-rw-r--r--jstests/replsets/tenant_migrations_back_to_back.js153
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker.h20
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp101
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_registry.h63
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_util.cpp128
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp3
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.cpp9
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp3
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp6
-rw-r--r--src/mongo/db/ttl.cpp4
11 files changed, 396 insertions, 98 deletions
diff --git a/jstests/replsets/tenant_migration_retryable_write_retry_on_recipient.js b/jstests/replsets/tenant_migration_retryable_write_retry_on_recipient.js
index 370da8b9952..940f6132fad 100644
--- a/jstests/replsets/tenant_migration_retryable_write_retry_on_recipient.js
+++ b/jstests/replsets/tenant_migration_retryable_write_retry_on_recipient.js
@@ -270,10 +270,6 @@ const migrationOpts2 = {
tenantId: kTenantId,
};
-// TODO(SERVER-55193): Make back-to-back migration work and remove this 'remove'
-assert.commandWorked(
- recipientRst.getPrimary().getDB("config").tenantMigrationRecipients.remove({}));
-
assert.commandWorked(tenantMigrationTest2.runMigration(migrationOpts2));
// Print the no-op oplog entries for debugging purposes.
diff --git a/jstests/replsets/tenant_migrations_back_to_back.js b/jstests/replsets/tenant_migrations_back_to_back.js
new file mode 100644
index 00000000000..8e8f015fe8d
--- /dev/null
+++ b/jstests/replsets/tenant_migrations_back_to_back.js
@@ -0,0 +1,153 @@
+/**
+ * Tests a back-to-back migration scenario where we migrate immediately from replica sets A->B->C.
+ * Specifically, this tests that when replica set B has both a recipient and donor access blocker,
+ * old reads will continue to be blocked by the recipient access blocker even while it acts as a
+ * donor for a newly initiated migration.
+ *
+ * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft,
+ * incompatible_with_windows_tls]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js"); // for 'Thread'
+load("jstests/libs/uuid_util.js");
+load("jstests/replsets/rslib.js"); // for 'getLastOpTime'
+
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
+if (!tenantMigrationTest.isFeatureFlagEnabled()) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ return;
+}
+
+const kTenantId = "testTenantId";
+const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDb");
+const kCollName = "testColl";
+
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+const recipientRst = tenantMigrationTest.getRecipientRst();
+
+const migrationId = UUID();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
+ tenantId: kTenantId,
+};
+
+// Select a read timestamp < blockTimestamp.
+const preMigrationTimestamp = getLastOpTime(donorPrimary).ts;
+let waitForRejectReadsBeforeTsFp = configureFailPoint(
+ recipientPrimary, "fpAfterWaitForRejectReadsBeforeTimestamp", {action: "hang"});
+
+const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst());
+const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
+migrationThread.start();
+waitForRejectReadsBeforeTsFp.wait();
+
+const donorDoc =
+ donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({tenantId: kTenantId});
+assert.lt(preMigrationTimestamp, donorDoc.blockTimestamp);
+waitForRejectReadsBeforeTsFp.off();
+// Wait for the migration to complete.
+jsTest.log("Waiting for migration to complete");
+const stateRes = assert.commandWorked(migrationThread.returnData());
+assert.eq(stateRes.state, TenantMigrationTest.DonorState.kCommitted);
+
+tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString);
+
+recipientRst.nodes.forEach(node => {
+ const db = node.getDB(kDbName);
+ const res = db.runCommand({
+ find: kCollName,
+ readConcern: {
+ level: "snapshot",
+ atClusterTime: preMigrationTimestamp,
+ }
+ });
+ assert.commandFailedWithCode(res, ErrorCodes.SnapshotTooOld, tojson(res));
+ assert.eq(res.errmsg, "Tenant read is not allowed before migration completes");
+});
+
+jsTestLog("Running a back-to-back migration");
+const tenantMigrationTest2 = new TenantMigrationTest(
+ {name: jsTestName() + "2", donorRst: tenantMigrationTest.getRecipientRst()});
+const donor2Primary = tenantMigrationTest2.getDonorPrimary();
+const donor2RstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest2.getDonorRst());
+const migration2Id = UUID();
+const migrationOpts2 = {
+ migrationIdString: extractUUIDFromObject(migration2Id),
+ recipientConnString: tenantMigrationTest2.getRecipientConnString(),
+ tenantId: kTenantId,
+};
+
+const newDonorRst = recipientRst;
+
+let waitAfterCreatingMtab =
+ configureFailPoint(donor2Primary, "pauseTenantMigrationBeforeLeavingBlockingState");
+const migration2Thread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts2, donor2RstArgs);
+migration2Thread.start();
+// At this point, 'donor2Primary' should have both a recipient and donor access blocker. The donor
+// access blocker has entered the blocking state, and the recipient access blocker should
+// still be blocking reads with timestamps < blocktimestamp from the previous migration.
+waitAfterCreatingMtab.wait();
+// Check that the current serverStatus reflects the recipient access blocker.
+const mtabStatus = tenantMigrationTest.getTenantMigrationAccessBlocker(donor2Primary, kTenantId);
+assert.eq(mtabStatus.state, TenantMigrationTest.RecipientAccessState.kRejectBefore, mtabStatus);
+assert(mtabStatus.hasOwnProperty("rejectBeforeTimestamp"), mtabStatus);
+assert.eq(mtabStatus["rejectBeforeTimestamp"], donorDoc.blockTimestamp, mtabStatus);
+
+// The server value representation of the donor blocking state.
+// TODO (SERVER-55913): Check that serverStatus returns both recipient and donor states.
+const kBlocking = 3;
+const res = assert.commandWorked(
+ donor2Primary.adminCommand({currentOp: true, desc: "tenant donor migration"}));
+assert.eq(bsonWoCompare(res.inprog[0].instanceID, migration2Id), 0, tojson(res.inprog));
+assert.eq(res.inprog[0].lastDurableState, kBlocking, tojson(res.inprog));
+
+// Get the block timestamp for this new migration.
+const donorDoc2 =
+ donor2Primary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({tenantId: kTenantId});
+const blockTimestamp2 = donorDoc2.blockTimestamp;
+
+// The donor access blocker should block reads after the blockTimestamp of the new migration.
+newDonorRst.nodes.forEach(node => {
+ jsTestLog("Test that read times out on node: " + node);
+ const db = node.getDB(kDbName);
+ assert.commandFailedWithCode(db.runCommand({
+ find: kCollName,
+ readConcern: {
+ afterClusterTime: blockTimestamp2,
+ },
+ maxTimeMS: 2 * 1000,
+ }),
+ ErrorCodes.MaxTimeMSExpired);
+});
+
+// The recipient access blocker should fail reads before the blockTimestamp of the old migration.
+newDonorRst.nodes.forEach(node => {
+ jsTestLog("Test that read fails on node: " + node);
+ const db = node.getDB(kDbName);
+ const res = db.runCommand({
+ find: kCollName,
+ readConcern: {
+ level: "snapshot",
+ atClusterTime: preMigrationTimestamp,
+ }
+ });
+ assert.commandFailedWithCode(res, ErrorCodes.SnapshotTooOld, tojson(res));
+ assert.eq(res.errmsg, "Tenant read is not allowed before migration completes");
+});
+
+waitAfterCreatingMtab.off();
+migration2Thread.join();
+
+tenantMigrationTest2.stop();
+tenantMigrationTest.stop();
+})();
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h
index a88c835f258..b2498adfe84 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker.h
@@ -39,11 +39,17 @@
namespace mongo {
/**
- * Tenant access blocking interface used by TenantMigrationDonorAccessBlocker.
+ * Tenant access blocking interface used by TenantMigrationDonorAccessBlocker and
+ * TenantMigrationRecipientAccessBlocker.
*/
class TenantMigrationAccessBlocker {
public:
- TenantMigrationAccessBlocker() = default;
+ /**
+ * The blocker type determines the context in which the access blocker is used.
+ */
+ enum class BlockerType { kDonor, kRecipient };
+
+ TenantMigrationAccessBlocker(BlockerType type) : _type(type) {}
virtual ~TenantMigrationAccessBlocker() = default;
/**
@@ -97,6 +103,16 @@ public:
* thrown based on the given status.
*/
virtual void recordTenantMigrationError(Status status) = 0;
+
+ /**
+ * Returns the type of access blocker.
+ */
+ virtual BlockerType getType() {
+ return _type;
+ }
+
+private:
+ const BlockerType _type;
};
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
index 0cb9587bd5d..96738408ebc 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
@@ -26,11 +26,12 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#include "mongo/db/repl/tenant_migration_access_blocker_registry.h"
#include "mongo/db/repl/tenant_migration_access_blocker.h"
namespace mongo {
+using MtabType = TenantMigrationAccessBlocker::BlockerType;
+using MtabPair = TenantMigrationAccessBlockerRegistry::DonorRecipientAccessBlockerPair;
const ServiceContext::Decoration<TenantMigrationAccessBlockerRegistry>
TenantMigrationAccessBlockerRegistry::get =
@@ -39,42 +40,70 @@ const ServiceContext::Decoration<TenantMigrationAccessBlockerRegistry>
void TenantMigrationAccessBlockerRegistry::add(StringData tenantId,
std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
stdx::lock_guard<Latch> lg(_mutex);
-
+ auto mtabType = mtab->getType();
// Assume that all tenant ids (i.e. 'tenantId') have equal length.
auto it = _tenantMigrationAccessBlockers.find(tenantId);
-
if (it != _tenantMigrationAccessBlockers.end()) {
- uasserted(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Found active migration for tenantId \"" << tenantId << "\"");
+ if (it->second.getAccessBlocker(mtabType)) {
+ uasserted(ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Found active migration for tenantId \"" << tenantId << "\"");
+ }
+ // The migration protocol guarantees that the original donor node must be garbage collected
+ // before it can be chosen as a recipient under the same tenant. Therefore, we only expect
+ // to have both recipient and donor access blockers in the case of back-to-back migrations
+ // where the node participates first as a recipient then a donor.
+ invariant(mtabType == MtabType::kDonor);
+ it->second.setAccessBlocker(mtab);
+ return;
}
-
- _tenantMigrationAccessBlockers.emplace(tenantId, mtab);
+ MtabPair mtabPair;
+ mtabPair.setAccessBlocker(mtab);
+ _tenantMigrationAccessBlockers.emplace(tenantId, mtabPair);
}
-void TenantMigrationAccessBlockerRegistry::remove(StringData tenantId) {
+void TenantMigrationAccessBlockerRegistry::remove(StringData tenantId, MtabType type) {
stdx::lock_guard<Latch> lg(_mutex);
auto it = _tenantMigrationAccessBlockers.find(tenantId);
invariant(it != _tenantMigrationAccessBlockers.end());
+ auto mtabPair = it->second;
+ mtabPair.clearAccessBlocker(type);
+ if (!mtabPair.getAccessBlocker(MtabType::kDonor) &&
+ !mtabPair.getAccessBlocker(MtabType::kRecipient)) {
+ _tenantMigrationAccessBlockers.erase(it);
+ }
+}
- _tenantMigrationAccessBlockers.erase(it);
+boost::optional<MtabPair>
+TenantMigrationAccessBlockerRegistry::getTenantMigrationAccessBlockerForDbName(StringData dbName) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ return _getTenantMigrationAccessBlockersForDbName(dbName, lg);
}
std::shared_ptr<TenantMigrationAccessBlocker>
-TenantMigrationAccessBlockerRegistry::getTenantMigrationAccessBlockerForDbName(StringData dbName) {
+TenantMigrationAccessBlockerRegistry::getTenantMigrationAccessBlockerForDbName(StringData dbName,
+ MtabType type) {
stdx::lock_guard<Latch> lg(_mutex);
+ auto mtabPair = _getTenantMigrationAccessBlockersForDbName(dbName, lg);
+ if (!mtabPair) {
+ return nullptr;
+ }
+ return mtabPair->getAccessBlocker(type);
+}
- auto it = std::find_if(
- _tenantMigrationAccessBlockers.begin(),
- _tenantMigrationAccessBlockers.end(),
- [dbName](
- const std::pair<std::string, std::shared_ptr<TenantMigrationAccessBlocker>>& blocker) {
- StringData tenantId = blocker.first;
- return dbName.startsWith(tenantId + "_");
- });
+boost::optional<MtabPair>
+TenantMigrationAccessBlockerRegistry::_getTenantMigrationAccessBlockersForDbName(StringData dbName,
+ WithLock) {
+ auto it = std::find_if(_tenantMigrationAccessBlockers.begin(),
+ _tenantMigrationAccessBlockers.end(),
+ [dbName](const std::pair<std::string, MtabPair>& blocker) {
+ StringData tenantId = blocker.first;
+ return dbName.startsWith(tenantId + "_");
+ });
if (it == _tenantMigrationAccessBlockers.end()) {
- return nullptr;
+ return boost::none;
} else {
return it->second;
}
@@ -82,12 +111,12 @@ TenantMigrationAccessBlockerRegistry::getTenantMigrationAccessBlockerForDbName(S
std::shared_ptr<TenantMigrationAccessBlocker>
TenantMigrationAccessBlockerRegistry::getTenantMigrationAccessBlockerForTenantId(
- StringData tenantId) {
+ StringData tenantId, MtabType type) {
stdx::lock_guard<Latch> lg(_mutex);
auto it = _tenantMigrationAccessBlockers.find(tenantId);
if (it != _tenantMigrationAccessBlockers.end()) {
- return it->second;
+ return it->second.getAccessBlocker(type);
} else {
return nullptr;
}
@@ -102,25 +131,27 @@ void TenantMigrationAccessBlockerRegistry::appendInfoForServerStatus(
BSONObjBuilder* builder) const {
stdx::lock_guard<Latch> lg(_mutex);
- std::for_each(
- _tenantMigrationAccessBlockers.begin(),
- _tenantMigrationAccessBlockers.end(),
- [builder](
- const std::pair<std::string, std::shared_ptr<TenantMigrationAccessBlocker>>& blocker) {
- blocker.second->appendInfoForServerStatus(builder);
- });
+ for (auto& [_, mtabPair] : _tenantMigrationAccessBlockers) {
+ if (auto recipientMtab = mtabPair.getAccessBlocker(MtabType::kRecipient)) {
+ recipientMtab->appendInfoForServerStatus(builder);
+ }
+ if (auto donorMtab = mtabPair.getAccessBlocker(MtabType::kDonor)) {
+ donorMtab->appendInfoForServerStatus(builder);
+ }
+ }
}
void TenantMigrationAccessBlockerRegistry::onMajorityCommitPointUpdate(repl::OpTime opTime) {
stdx::lock_guard<Latch> lg(_mutex);
- std::for_each(
- _tenantMigrationAccessBlockers.begin(),
- _tenantMigrationAccessBlockers.end(),
- [opTime](
- const std::pair<std::string, std::shared_ptr<TenantMigrationAccessBlocker>>& blocker) {
- blocker.second->onMajorityCommitPointUpdate(opTime);
- });
+ for (auto& [_, mtabPair] : _tenantMigrationAccessBlockers) {
+ if (auto recipientMtab = mtabPair.getAccessBlocker(MtabType::kRecipient)) {
+ recipientMtab->onMajorityCommitPointUpdate(opTime);
+ }
+ if (auto donorMtab = mtabPair.getAccessBlocker(MtabType::kDonor)) {
+ donorMtab->onMajorityCommitPointUpdate(opTime);
+ }
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.h b/src/mongo/db/repl/tenant_migration_access_blocker_registry.h
index a198e289826..51f06185868 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.h
@@ -31,6 +31,7 @@
#include "mongo/base/string_data.h"
#include "mongo/db/repl/tenant_migration_donor_access_blocker.h"
+#include "mongo/db/repl/tenant_migration_recipient_access_blocker.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -41,6 +42,46 @@ class TenantMigrationAccessBlockerRegistry {
delete;
public:
+ struct DonorRecipientAccessBlockerPair {
+ DonorRecipientAccessBlockerPair() = default;
+ DonorRecipientAccessBlockerPair(
+ std::shared_ptr<TenantMigrationDonorAccessBlocker> donor,
+ std::shared_ptr<TenantMigrationRecipientAccessBlocker> recipient)
+ : _donor(donor), _recipient(recipient) {}
+
+ const std::shared_ptr<TenantMigrationAccessBlocker> getAccessBlocker(
+ TenantMigrationAccessBlocker::BlockerType type) const {
+ if (type == TenantMigrationAccessBlocker::BlockerType::kDonor) {
+ return _donor;
+ }
+ return _recipient;
+ }
+
+ void setAccessBlocker(std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
+ invariant(mtab);
+ if (mtab->getType() == TenantMigrationAccessBlocker::BlockerType::kDonor) {
+ invariant(!_donor);
+ _donor = mtab;
+ } else {
+ invariant(!_recipient);
+ _recipient = mtab;
+ }
+ }
+
+ void clearAccessBlocker(TenantMigrationAccessBlocker::BlockerType type) {
+ if (type == TenantMigrationAccessBlocker::BlockerType::kDonor) {
+ invariant(_donor);
+ _donor.reset();
+ } else {
+ invariant(_recipient);
+ _recipient.reset();
+ }
+ }
+
+ private:
+ std::shared_ptr<TenantMigrationAccessBlocker> _donor;
+ std::shared_ptr<TenantMigrationAccessBlocker> _recipient;
+ };
TenantMigrationAccessBlockerRegistry() = default;
static const ServiceContext::Decoration<TenantMigrationAccessBlockerRegistry> get;
@@ -53,22 +94,30 @@ public:
/**
* Invariants that an entry for tenantId exists, and then removes the entry for (tenantId, mtab)
*/
- void remove(StringData tenantId);
+ void remove(StringData tenantId, TenantMigrationAccessBlocker::BlockerType type);
/**
* Iterates through each of the TenantMigrationAccessBlockers and
- * returns the first TenantMigrationAccessBlocker it finds whose tenantId is a prefix for
+ * returns the first 'DonorRecipientAccessBlockerPair' it finds whose tenantId is a prefix for
* dbName.
*/
- std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlockerForDbName(
+ boost::optional<DonorRecipientAccessBlockerPair> getTenantMigrationAccessBlockerForDbName(
StringData dbName);
/**
+ * Iterates through each of the TenantMigrationAccessBlockers and
+ * returns the first 'TenantMigrationAccessBlocker' it finds whose 'tenantId' is a prefix for
+ * 'dbName' and is of the requested type.
+ */
+ std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlockerForDbName(
+ StringData dbName, TenantMigrationAccessBlocker::BlockerType type);
+
+ /**
* Searches through TenantMigrationAccessBlockers and
* returns the TenantMigrationAccessBlocker that matches tenantId.
*/
std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlockerForTenantId(
- StringData tenantId);
+ StringData tenantId, TenantMigrationAccessBlocker::BlockerType type);
/**
* Shuts down each of the TenantMigrationAccessBlockers and releases the shared_ptrs to the
@@ -93,8 +142,10 @@ public:
void onMajorityCommitPointUpdate(repl::OpTime opTime);
private:
- using TenantMigrationAccessBlockersMap =
- StringMap<std::shared_ptr<TenantMigrationAccessBlocker>>;
+ using TenantMigrationAccessBlockersMap = StringMap<DonorRecipientAccessBlockerPair>;
+
+ boost::optional<DonorRecipientAccessBlockerPair> _getTenantMigrationAccessBlockersForDbName(
+ StringData dbName, WithLock);
mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationAccessBlockerRegistry::_mutex");
TenantMigrationAccessBlockersMap _tenantMigrationAccessBlockers;
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
index c8bbb3045ee..4d80a4c3fe5 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
@@ -56,27 +56,27 @@ MONGO_FAIL_POINT_DEFINE(skipRecoverTenantMigrationAccessBlockers);
namespace tenant_migration_access_blocker {
namespace {
+using MtabType = TenantMigrationAccessBlocker::BlockerType;
constexpr char kThreadNamePrefix[] = "TenantMigrationWorker-";
constexpr char kPoolName[] = "TenantMigrationWorkerThreadPool";
constexpr char kNetName[] = "TenantMigrationWorkerNetwork";
const auto donorStateDocToDeleteDecoration = OperationContext::declareDecoration<BSONObj>();
-
} // namespace
std::shared_ptr<TenantMigrationDonorAccessBlocker> getTenantMigrationDonorAccessBlocker(
ServiceContext* const serviceContext, StringData tenantId) {
return checked_pointer_cast<TenantMigrationDonorAccessBlocker>(
TenantMigrationAccessBlockerRegistry::get(serviceContext)
- .getTenantMigrationAccessBlockerForTenantId(tenantId));
+ .getTenantMigrationAccessBlockerForTenantId(tenantId, MtabType::kDonor));
}
std::shared_ptr<TenantMigrationRecipientAccessBlocker> getTenantMigrationRecipientAccessBlocker(
ServiceContext* const serviceContext, StringData tenantId) {
return checked_pointer_cast<TenantMigrationRecipientAccessBlocker>(
TenantMigrationAccessBlockerRegistry::get(serviceContext)
- .getTenantMigrationAccessBlockerForTenantId(tenantId));
+ .getTenantMigrationAccessBlockerForTenantId(tenantId, MtabType::kRecipient));
}
TenantMigrationDonorDocument parseDonorStateDocument(const BSONObj& doc) {
@@ -131,65 +131,104 @@ TenantMigrationDonorDocument parseDonorStateDocument(const BSONObj& doc) {
}
SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbName) {
- auto mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .getTenantMigrationAccessBlockerForDbName(dbName);
+ // We need to check both donor and recipient access blockers in the case where two
+ // migrations happen back-to-back before the old recipient state (from the first
+ // migration) is garbage collected.
+ auto mtabPair = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .getTenantMigrationAccessBlockerForDbName(dbName);
- if (!mtab) {
+ if (!mtabPair) {
return Status::OK();
}
// Source to cancel the timeout if the operation completed in time.
CancelationSource cancelTimeoutSource;
-
- auto canReadFuture = mtab->getCanReadFuture(opCtx);
-
- // Optimisation: if the future is already ready, we are done.
- if (canReadFuture.isReady()) {
- auto status = canReadFuture.getNoThrow();
- mtab->recordTenantMigrationError(status);
- return status;
- }
-
- auto executor = mtab->getAsyncBlockingOperationsExecutor();
+ // Source to cancel waiting on the 'canReadFutures'.
+ CancelationSource cancelCanReadSource;
+ const auto donorMtab = mtabPair->getAccessBlocker(MtabType::kDonor);
+ const auto recipientMtab = mtabPair->getAccessBlocker(MtabType::kRecipient);
+ // A vector of futures where the donor access blocker's 'getCanReadFuture' will always precede
+ // the recipient's.
std::vector<ExecutorFuture<void>> futures;
- futures.emplace_back(std::move(canReadFuture).semi().thenRunOn(executor));
+ std::shared_ptr<executor::TaskExecutor> executor;
+ if (donorMtab) {
+ auto canReadFuture = donorMtab->getCanReadFuture(opCtx);
+ if (canReadFuture.isReady()) {
+ auto status = canReadFuture.getNoThrow();
+ donorMtab->recordTenantMigrationError(status);
+ if (!recipientMtab) {
+ return status;
+ }
+ }
+ executor = donorMtab->getAsyncBlockingOperationsExecutor();
+ futures.emplace_back(std::move(canReadFuture).semi().thenRunOn(executor));
+ }
+ if (recipientMtab) {
+ auto canReadFuture = recipientMtab->getCanReadFuture(opCtx);
+ if (canReadFuture.isReady()) {
+ auto status = canReadFuture.getNoThrow();
+ recipientMtab->recordTenantMigrationError(status);
+ if (!donorMtab) {
+ return status;
+ }
+ }
+ executor = recipientMtab->getAsyncBlockingOperationsExecutor();
+ futures.emplace_back(std::move(canReadFuture).semi().thenRunOn(executor));
+ }
if (opCtx->hasDeadline()) {
- auto deadlineReachedFuture =
- executor->sleepUntil(opCtx->getDeadline(), cancelTimeoutSource.token());
- // The timeout condition is optional with index #1.
- futures.push_back(std::move(deadlineReachedFuture));
+ // Cancel waiting for operations if we timeout.
+ executor->sleepUntil(opCtx->getDeadline(), cancelTimeoutSource.token())
+ .getAsync([cancelCanReadSource](auto) mutable { cancelCanReadSource.cancel(); });
}
- return whenAny(std::move(futures))
+ return future_util::withCancelation(whenAll(std::move(futures)), cancelCanReadSource.token())
.thenRunOn(executor)
- .then([cancelTimeoutSource, opCtx, mtab, executor](WhenAnyResult<void> result) mutable {
- const auto& [status, idx] = result;
- if (idx == 0) {
- // Read unblock condition finished first.
- cancelTimeoutSource.cancel();
- mtab->recordTenantMigrationError(status);
- return status;
- } else if (idx == 1) {
- // Deadline finished first, throw error.
- return Status(opCtx->getTimeoutError(),
- "Read timed out waiting for tenant migration blocker",
- mtab->getDebugInfo());
- }
- MONGO_UNREACHABLE;
- })
- .onError([cancelTimeoutSource](Status status) mutable {
+ .then([cancelTimeoutSource, donorMtab, recipientMtab](std::vector<Status> results) mutable {
cancelTimeoutSource.cancel();
- return status;
+ auto resultIter = results.begin();
+ const auto donorMtabStatus = donorMtab ? *resultIter++ : Status::OK();
+ const auto recipientMtabStatus = recipientMtab ? *resultIter : Status::OK();
+ if (!donorMtabStatus.isOK()) {
+ donorMtab->recordTenantMigrationError(donorMtabStatus);
+ LOGV2(5519301,
+ "Received error while waiting on donor access blocker",
+ "error"_attr = donorMtabStatus);
+ }
+ if (!recipientMtabStatus.isOK()) {
+ recipientMtab->recordTenantMigrationError(recipientMtabStatus);
+ LOGV2(5519302,
+ "Received error while waiting on recipient access blocker",
+ "error"_attr = recipientMtabStatus);
+ if (donorMtabStatus.isOK()) {
+ return recipientMtabStatus;
+ }
+ }
+ return donorMtabStatus;
})
+ .onError<ErrorCodes::CallbackCanceled>(
+ [cancelCanReadSource, donorMtab, recipientMtab, opCtx](Status status) mutable {
+ cancelCanReadSource.cancel();
+ // At least one of 'donorMtab' or 'recipientMtab' must exist if we timed out here.
+ BSONObj info =
+ donorMtab ? donorMtab->getDebugInfo() : recipientMtab->getDebugInfo();
+ if (recipientMtab) {
+ info = info.addField(
+ recipientMtab->getDebugInfo().getField("donorConnectionString"));
+ }
+ return Status(opCtx->getTimeoutError(),
+ "Read timed out waiting for tenant migration blocker",
+ info);
+ })
.semi(); // To require continuation in the user executor.
}
void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringData dbName) {
if (repl::ReadConcernArgs::get(opCtx).getLevel() ==
repl::ReadConcernLevel::kLinearizableReadConcern) {
+ // Only the donor access blocker will block linearizable reads.
if (auto mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .getTenantMigrationAccessBlockerForDbName(dbName)) {
+ .getTenantMigrationAccessBlockerForDbName(dbName, MtabType::kDonor)) {
auto status = mtab->checkIfLinearizableReadWasAllowed(opCtx);
mtab->recordTenantMigrationError(status);
uassertStatusOK(status);
@@ -198,8 +237,10 @@ void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringDat
}
void checkIfCanWriteOrThrow(OperationContext* opCtx, StringData dbName) {
+ // The migration protocol guarantees the recipient will not get writes until the migration
+ // is committed.
auto mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .getTenantMigrationAccessBlockerForDbName(dbName);
+ .getTenantMigrationAccessBlockerForDbName(dbName, MtabType::kDonor);
if (mtab) {
auto status = mtab->checkIfCanWrite();
@@ -209,8 +250,9 @@ void checkIfCanWriteOrThrow(OperationContext* opCtx, StringData dbName) {
}
Status checkIfCanBuildIndex(OperationContext* opCtx, StringData dbName) {
+ // We only block index builds on the donor.
auto mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .getTenantMigrationAccessBlockerForDbName(dbName);
+ .getTenantMigrationAccessBlockerForDbName(dbName, MtabType::kDonor);
if (mtab) {
// This log is included for synchronization of the tenant migration buildindex jstests.
diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
index bf4c8dc8220..1e6e452a08f 100644
--- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
@@ -53,7 +53,8 @@ const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
TenantMigrationDonorAccessBlocker::TenantMigrationDonorAccessBlocker(
ServiceContext* serviceContext, std::string tenantId, std::string recipientConnString)
- : _serviceContext(serviceContext),
+ : TenantMigrationAccessBlocker(BlockerType::kDonor),
+ _serviceContext(serviceContext),
_tenantId(std::move(tenantId)),
_recipientConnString(std::move(recipientConnString)) {
_asyncBlockingOperationsExecutor = TenantMigrationAccessBlockerExecutor::get(serviceContext)
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
index 9ddaca648d9..f7c032492da 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
@@ -66,7 +66,8 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
// the write.
opCtx->recoveryUnit()->onRollback([opCtx, donorStateDoc] {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .remove(donorStateDoc.getTenantId());
+ .remove(donorStateDoc.getTenantId(),
+ TenantMigrationAccessBlocker::BlockerType::kDonor);
});
}
}
@@ -142,7 +143,8 @@ public:
// been aborted and forgotten.
if (_donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) {
TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext())
- .remove(_donorStateDoc.getTenantId());
+ .remove(_donorStateDoc.getTenantId(),
+ TenantMigrationAccessBlocker::BlockerType::kDonor);
}
return;
}
@@ -262,7 +264,8 @@ void TenantMigrationDonorOpObserver::onDelete(OperationContext* opCtx,
!tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .remove(tenantIdToDeleteDecoration(opCtx).get());
+ .remove(tenantIdToDeleteDecoration(opCtx).get(),
+ TenantMigrationAccessBlocker::BlockerType::kDonor);
});
}
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
index 4ddf2bb1260..18040a2f596 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
@@ -56,7 +56,8 @@ TenantMigrationRecipientAccessBlocker::TenantMigrationRecipientAccessBlocker(
UUID migrationId,
std::string tenantId,
std::string donorConnString)
- : _serviceContext(serviceContext),
+ : TenantMigrationAccessBlocker(BlockerType::kRecipient),
+ _serviceContext(serviceContext),
_migrationId(migrationId),
_tenantId(std::move(tenantId)),
_donorConnString(std::move(donorConnString)) {
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index 6c01ec62935..d84d706e6b0 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -101,7 +101,8 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
// re-allow reads and future migrations with the same tenantId as this migration
// has already been aborted and forgotten.
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .remove(recipientStateDoc.getTenantId());
+ .remove(recipientStateDoc.getTenantId(),
+ TenantMigrationAccessBlocker::BlockerType::kRecipient);
return;
}
// Once the state doc is marked garbage collectable the TTL deletions should be
@@ -159,7 +160,8 @@ void TenantMigrationRecipientOpObserver::onDelete(OperationContext* opCtx,
!tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .remove(tenantIdToDeleteDecoration(opCtx).get());
+ .remove(tenantIdToDeleteDecoration(opCtx).get(),
+ TenantMigrationAccessBlocker::BlockerType::kRecipient);
});
}
}
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index 18f84480343..fca28411500 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -81,6 +81,7 @@ Counter64 ttlDeletedDocuments;
ServerStatusMetricField<Counter64> ttlPassesDisplay("ttl.passes", &ttlPasses);
ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments",
&ttlDeletedDocuments);
+using MtabType = TenantMigrationAccessBlocker::BlockerType;
class TTLMonitor : public BackgroundJob {
public:
@@ -267,7 +268,8 @@ private:
if (coll.getDb() &&
nullptr !=
(mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .getTenantMigrationAccessBlockerForDbName(coll.getDb()->name())) &&
+ .getTenantMigrationAccessBlockerForDbName(coll.getDb()->name(),
+ MtabType::kRecipient)) &&
mtab->checkIfShouldBlockTTL()) {
LOGV2_DEBUG(53768,
1,