summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js109
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp46
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h17
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp2
4 files changed, 155 insertions, 19 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js
new file mode 100644
index 00000000000..3a13f606055
--- /dev/null
+++ b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js
@@ -0,0 +1,109 @@
+/**
+ * Exercises the code path for the recipientSyncData command that waits until a timestamp provided
+ * by the donor is majority committed: make sure that in this code path, when the recipient is
+ * interrupted by a primary step down, the recipient properly swaps the error code to the true code
+ * (like primary step down) that the donor can retry on.
+ *
+ * @tags: [requires_fcv_49, requires_replication, incompatible_with_windows_tls]
+ */
+
+(function() {
+
+"use strict";
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject()
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+
+// Use a single node replSet to simplify the process.
+const donorRst = new ReplSetTest({
+ nodes: 1,
+ name: jsTestName() + "_donor",
+ nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor
+});
+
+donorRst.startSet();
+donorRst.initiate();
+
+// Make the batch size small so that we can pause before all the batches are applied.
+const tenantMigrationTest = new TenantMigrationTest(
+ {name: jsTestName(), donorRst, sharedOptions: {setParameter: {tenantApplierBatchSizeOps: 2}}});
+
+if (!tenantMigrationTest.isFeatureFlagEnabled()) {
+ donorRst.stopSet();
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ return;
+}
+
+const kMigrationId = UUID();
+const kTenantId = 'testTenantId';
+const kReadPreference = {
+ mode: "primary"
+};
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(kMigrationId),
+ tenantId: kTenantId,
+ readPreference: kReadPreference
+};
+
+const dbName = tenantMigrationTest.tenantDB(kTenantId, "testDB");
+const collName = jsTestName() + "_collection";
+
+const recipientRst = tenantMigrationTest.getRecipientRst();
+const recipientPrimary = recipientRst.getPrimary();
+
+// FailPoint to pause right before the data consistent promise is fulfilled.
+const fpBeforeDataConsistent = configureFailPoint(
+ recipientPrimary, "fpBeforeFulfillingDataConsistentPromise", {action: "hang"});
+const fpBeforeApplierFutureCalled =
+ configureFailPoint(recipientPrimary, "fpWaitUntilTimestampMajorityCommitted");
+
+tenantMigrationTest.insertDonorDB(dbName, collName);
+
+jsTestLog("Starting migration.");
+// Start the migration, and allow it to progress to the point where the _dataConsistentPromise has
+// been fulfilled.
+tenantMigrationTest.startMigration(migrationOpts);
+
+jsTestLog("Waiting for data consistent promise.");
+// Pause right before the _dataConsistentPromise is fulfilled. Therefore, the applier has
+// finished applying entries at least until dataConsistentStopDonorOpTime.
+fpBeforeDataConsistent.wait();
+
+jsTestLog("Pausing the tenant oplog applier.");
+// Pause the applier now. All the entries that the applier cannot process now are past the
+// dataConsistentStopDonorOpTime.
+const fpPauseOplogApplier =
+ configureFailPoint(recipientPrimary, "fpBeforeTenantOplogApplyingBatch");
+
+jsTestLog("Writing to donor db.");
+// Send writes to the donor. The applier will not be able to process these as it is paused.
+const docsToApply = [...Array(10).keys()].map((i) => ({a: i}));
+tenantMigrationTest.insertDonorDB(dbName, collName, docsToApply);
+
+jsTestLog("Waiting to hit failpoint in tenant oplog applier.");
+fpPauseOplogApplier.wait();
+
+jsTestLog("Allowing recipient to respond.");
+// Allow the recipient to respond to the donor for the recipientSyncData command that waits on the
+// fulfillment of the _dataConsistentPromise. The donor will then send another recipientSyncData
+// command that waits on the provided donor timestamp to be majority committed.
+fpBeforeDataConsistent.off();
+
+jsTestLog("Reach the point where we are waiting for the tenant oplog applier to catch up.");
+fpBeforeApplierFutureCalled.wait();
+fpBeforeApplierFutureCalled.off();
+
+jsTestLog("Stepping another node up.");
+// Make a new recipient primary step up. This will ask the applier to shutdown.
+assert.commandWorked(recipientRst.getSecondaries()[0].adminCommand({replSetStepUp: 1}));
+
+jsTestLog("Release the tenant oplog applier failpoint.");
+fpPauseOplogApplier.off();
+
+jsTestLog("Waiting for migration to complete.");
+assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+
+donorRst.stopSet();
+tenantMigrationTest.stop();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index bf964c2e834..c8571ed76c8 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -95,12 +95,14 @@ MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout);
MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone);
MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(fpBeforeFulfillingDataConsistentPromise);
MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion);
MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration);
MONGO_FAIL_POINT_DEFINE(hangAfterCreatingRSM);
MONGO_FAIL_POINT_DEFINE(skipRetriesWhenConnectingToDonorHost);
MONGO_FAIL_POINT_DEFINE(fpBeforeDroppingOplogBufferCollection);
+MONGO_FAIL_POINT_DEFINE(fpWaitUntilTimestampMajorityCommitted);
namespace {
// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine
@@ -320,8 +322,10 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilMigrationReachesConsi
OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCommitted(
OperationContext* opCtx, const Timestamp& donorTs) const {
- // This gives assurance that _tenantOplogApplier pointer won't be empty.
- _dataSyncStartedPromise.getFuture().get(opCtx);
+ // This gives assurance that _tenantOplogApplier pointer won't be empty, and that it has been
+ // started. Additionally, we must have finished processing the recipientSyncData command that
+ // waits on _dataConsistentPromise.
+ _dataConsistentPromise.getFuture().get(opCtx);
auto getWaitOpTimeFuture = [&]() {
stdx::lock_guard lk(_mutex);
@@ -350,7 +354,23 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo
return _tenantOplogApplier->getNotificationForOpTime(
OpTime(donorTs, OpTime::kUninitializedTerm));
};
- auto donorRecipientOpTimePair = getWaitOpTimeFuture().get(opCtx);
+
+ auto waitOpTimeFuture = getWaitOpTimeFuture();
+ fpWaitUntilTimestampMajorityCommitted.pauseWhileSet();
+ auto swDonorRecipientOpTimePair = waitOpTimeFuture.getNoThrow();
+
+ auto status = swDonorRecipientOpTimePair.getStatus();
+
+ // A cancelation error may occur due to an interrupt. If that is the case, replace the error
+ // code with the interrupt code, the true reason for interruption.
+ if (ErrorCodes::isCancelationError(status)) {
+ stdx::lock_guard lk(_mutex);
+ if (!_taskState.getInterruptStatus().isOK()) {
+ status = _taskState.getInterruptStatus();
+ }
+ }
+
+ uassertStatusOK(status);
// We want to guarantee that the recipient logical clock has advanced to at least the donor
// timestamp before returning success for recipientSyncData by doing a majority committed noop
@@ -371,7 +391,7 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo
WaitForMajorityService::get(opCtx->getServiceContext())
.waitUntilMajority(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp())
.get(opCtx);
- return donorRecipientOpTimePair.donorOpTime;
+ return swDonorRecipientOpTimePair.getValue().donorOpTime;
}
std::unique_ptr<DBClientConnection> TenantMigrationRecipientService::Instance::_connectAndAuth(
@@ -1524,6 +1544,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return _getDataConsistentFuture();
})
.then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
stdx::lock_guard lk(_mutex);
LOGV2_DEBUG(4881101,
1,
@@ -1553,16 +1574,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// normally stop by itself on success. It completes only on errors or on external
// interruption (e.g. by shutDown/stepDown or by recipientForgetMigration command).
Status status = applierStatus.getStatus();
- {
- // If we were interrupted during oplog application, replace oplog application
- // status with error state.
+
+ // If we were interrupted during oplog application, replace oplog application
+ // status with error state.
+ // Network and cancellation errors can be caused due to interrupt() (which shuts
+ // down the cloner/fetcher dbClientConnection & oplog applier), so replace those
+ // error status with interrupt status, if set.
+ if (ErrorCodes::isCancelationError(status) || ErrorCodes::isNetworkError(status)) {
stdx::lock_guard lk(_mutex);
- // Network and cancellation errors can be caused due to interrupt() (which shuts
- // down the cloner/fetcher dbClientConnection & oplog applier), so replace those
- // error status with interrupt status, if set.
- if ((ErrorCodes::isCancelationError(status) ||
- ErrorCodes::isNetworkError(status)) &&
- _taskState.isInterrupted()) {
+ if (_taskState.isInterrupted()) {
LOGV2(4881207,
"Migration completed with both error and interrupt",
"tenantId"_attr = getTenantId(),
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 70f542eea1e..dae86b0dd74 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -217,11 +217,15 @@ public:
str::stream() << "current state: " << toString(_state)
<< ", new state: " << toString(state));
+ // The interruptStatus can exist (and should be non-OK) if and only if the state is
+ // kInterrupted.
+ invariant((state == kInterrupted && interruptStatus && !interruptStatus->isOK()) ||
+ (state != kInterrupted && !interruptStatus),
+ str::stream() << "new state: " << toString(state)
+ << ", interruptStatus: " << interruptStatus);
+
_state = state;
- if (interruptStatus) {
- invariant(_state == kInterrupted && !interruptStatus->isOK());
- _interruptStatus = interruptStatus.get();
- }
+ _interruptStatus = (interruptStatus) ? interruptStatus.get() : _interruptStatus;
}
bool isNotStarted() const {
@@ -265,8 +269,9 @@ public:
private:
// task state.
StateFlag _state = kNotStarted;
- // task interrupt status.
- Status _interruptStatus = Status{ErrorCodes::InternalError, "Uninitialized value"};
+ // task interrupt status. Set to Status::OK() only when the recipient service has not
+ // been interrupted so far, and is used to remember the initial interrupt error.
+ Status _interruptStatus = Status::OK();
};
/*
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 846a30533fd..a2143a95b87 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -56,6 +56,7 @@ namespace mongo {
namespace repl {
MONGO_FAIL_POINT_DEFINE(hangInTenantOplogApplication);
+MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch);
TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
const std::string& tenantId,
@@ -277,6 +278,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
uassertStatusOK(status);
}
+ fpBeforeTenantOplogApplyingBatch.pauseWhileSet();
LOGV2_DEBUG(4886011,
1,