diff options
4 files changed, 155 insertions, 19 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js new file mode 100644 index 00000000000..3a13f606055 --- /dev/null +++ b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js @@ -0,0 +1,109 @@ +/** + * Exercises the code path for the recipientSyncData command that waits until a timestamp provided + * by the donor is majority committed: make sure that in this code path, when the recipient is + * interrupted by a primary step down, the recipient properly swaps the error code to the true code + * (like primary step down) that the donor can retry on. + * + * @tags: [requires_fcv_49, requires_replication, incompatible_with_windows_tls] + */ + +(function() { + +"use strict"; +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject() +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); + +// Use a single node replSet to simplify the process. +const donorRst = new ReplSetTest({ + nodes: 1, + name: jsTestName() + "_donor", + nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor +}); + +donorRst.startSet(); +donorRst.initiate(); + +// Make the batch size small so that we can pause before all the batches are applied. +const tenantMigrationTest = new TenantMigrationTest( + {name: jsTestName(), donorRst, sharedOptions: {setParameter: {tenantApplierBatchSizeOps: 2}}}); + +if (!tenantMigrationTest.isFeatureFlagEnabled()) { + donorRst.stopSet(); + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + return; +} + +const kMigrationId = UUID(); +const kTenantId = 'testTenantId'; +const kReadPreference = { + mode: "primary" +}; +const migrationOpts = { + migrationIdString: extractUUIDFromObject(kMigrationId), + tenantId: kTenantId, + readPreference: kReadPreference +}; + +const dbName = tenantMigrationTest.tenantDB(kTenantId, "testDB"); +const collName = jsTestName() + "_collection"; + +const recipientRst = tenantMigrationTest.getRecipientRst(); +const recipientPrimary = recipientRst.getPrimary(); + +// FailPoint to pause right before the data consistent promise is fulfilled. +const fpBeforeDataConsistent = configureFailPoint( + recipientPrimary, "fpBeforeFulfillingDataConsistentPromise", {action: "hang"}); +const fpBeforeApplierFutureCalled = + configureFailPoint(recipientPrimary, "fpWaitUntilTimestampMajorityCommitted"); + +tenantMigrationTest.insertDonorDB(dbName, collName); + +jsTestLog("Starting migration."); +// Start the migration, and allow it to progress to the point where the _dataConsistentPromise has +// been fulfilled. +tenantMigrationTest.startMigration(migrationOpts); + +jsTestLog("Waiting for data consistent promise."); +// Pause right before the _dataConsistentPromise is fulfilled. Therefore, the applier has +// finished applying entries at least until dataConsistentStopDonorOpTime. +fpBeforeDataConsistent.wait(); + +jsTestLog("Pausing the tenant oplog applier."); +// Pause the applier now. All the entries that the applier cannot process now are past the +// dataConsistentStopDonorOpTime. +const fpPauseOplogApplier = + configureFailPoint(recipientPrimary, "fpBeforeTenantOplogApplyingBatch"); + +jsTestLog("Writing to donor db."); +// Send writes to the donor. The applier will not be able to process these as it is paused. +const docsToApply = [...Array(10).keys()].map((i) => ({a: i})); +tenantMigrationTest.insertDonorDB(dbName, collName, docsToApply); + +jsTestLog("Waiting to hit failpoint in tenant oplog applier."); +fpPauseOplogApplier.wait(); + +jsTestLog("Allowing recipient to respond."); +// Allow the recipient to respond to the donor for the recipientSyncData command that waits on the +// fulfillment of the _dataConsistentPromise. The donor will then send another recipientSyncData +// command that waits on the provided donor timestamp to be majority committed. +fpBeforeDataConsistent.off(); + +jsTestLog("Reach the point where we are waiting for the tenant oplog applier to catch up."); +fpBeforeApplierFutureCalled.wait(); +fpBeforeApplierFutureCalled.off(); + +jsTestLog("Stepping another node up."); +// Make a new recipient primary step up. This will ask the applier to shutdown. +assert.commandWorked(recipientRst.getSecondaries()[0].adminCommand({replSetStepUp: 1})); + +jsTestLog("Release the tenant oplog applier failpoint."); +fpPauseOplogApplier.off(); + +jsTestLog("Waiting for migration to complete."); +assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + +donorRst.stopSet(); +tenantMigrationTest.stop(); +})();
\ No newline at end of file diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index bf964c2e834..c8571ed76c8 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -95,12 +95,14 @@ MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone); MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpBeforeFulfillingDataConsistentPromise); MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion); MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration); MONGO_FAIL_POINT_DEFINE(hangAfterCreatingRSM); MONGO_FAIL_POINT_DEFINE(skipRetriesWhenConnectingToDonorHost); MONGO_FAIL_POINT_DEFINE(fpBeforeDroppingOplogBufferCollection); +MONGO_FAIL_POINT_DEFINE(fpWaitUntilTimestampMajorityCommitted); namespace { // We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine @@ -320,8 +322,10 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilMigrationReachesConsi OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCommitted( OperationContext* opCtx, const Timestamp& donorTs) const { - // This gives assurance that _tenantOplogApplier pointer won't be empty. - _dataSyncStartedPromise.getFuture().get(opCtx); + // This gives assurance that _tenantOplogApplier pointer won't be empty, and that it has been + // started. Additionally, we must have finished processing the recipientSyncData command that + // waits on _dataConsistentPromise. + _dataConsistentPromise.getFuture().get(opCtx); auto getWaitOpTimeFuture = [&]() { stdx::lock_guard lk(_mutex); @@ -350,7 +354,23 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo return _tenantOplogApplier->getNotificationForOpTime( OpTime(donorTs, OpTime::kUninitializedTerm)); }; - auto donorRecipientOpTimePair = getWaitOpTimeFuture().get(opCtx); + + auto waitOpTimeFuture = getWaitOpTimeFuture(); + fpWaitUntilTimestampMajorityCommitted.pauseWhileSet(); + auto swDonorRecipientOpTimePair = waitOpTimeFuture.getNoThrow(); + + auto status = swDonorRecipientOpTimePair.getStatus(); + + // A cancelation error may occur due to an interrupt. If that is the case, replace the error + // code with the interrupt code, the true reason for interruption. + if (ErrorCodes::isCancelationError(status)) { + stdx::lock_guard lk(_mutex); + if (!_taskState.getInterruptStatus().isOK()) { + status = _taskState.getInterruptStatus(); + } + } + + uassertStatusOK(status); // We want to guarantee that the recipient logical clock has advanced to at least the donor // timestamp before returning success for recipientSyncData by doing a majority committed noop @@ -371,7 +391,7 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo WaitForMajorityService::get(opCtx->getServiceContext()) .waitUntilMajority(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()) .get(opCtx); - return donorRecipientOpTimePair.donorOpTime; + return swDonorRecipientOpTimePair.getValue().donorOpTime; } std::unique_ptr<DBClientConnection> TenantMigrationRecipientService::Instance::_connectAndAuth( @@ -1524,6 +1544,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( return _getDataConsistentFuture(); }) .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); stdx::lock_guard lk(_mutex); LOGV2_DEBUG(4881101, 1, @@ -1553,16 +1574,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( // normally stop by itself on success. It completes only on errors or on external // interruption (e.g. by shutDown/stepDown or by recipientForgetMigration command). Status status = applierStatus.getStatus(); - { - // If we were interrupted during oplog application, replace oplog application - // status with error state. + + // If we were interrupted during oplog application, replace oplog application + // status with error state. + // Network and cancellation errors can be caused due to interrupt() (which shuts + // down the cloner/fetcher dbClientConnection & oplog applier), so replace those + // error status with interrupt status, if set. + if (ErrorCodes::isCancelationError(status) || ErrorCodes::isNetworkError(status)) { stdx::lock_guard lk(_mutex); - // Network and cancellation errors can be caused due to interrupt() (which shuts - // down the cloner/fetcher dbClientConnection & oplog applier), so replace those - // error status with interrupt status, if set. - if ((ErrorCodes::isCancelationError(status) || - ErrorCodes::isNetworkError(status)) && - _taskState.isInterrupted()) { + if (_taskState.isInterrupted()) { LOGV2(4881207, "Migration completed with both error and interrupt", "tenantId"_attr = getTenantId(), diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 70f542eea1e..dae86b0dd74 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -217,11 +217,15 @@ public: str::stream() << "current state: " << toString(_state) << ", new state: " << toString(state)); + // The interruptStatus can exist (and should be non-OK) if and only if the state is + // kInterrupted. + invariant((state == kInterrupted && interruptStatus && !interruptStatus->isOK()) || + (state != kInterrupted && !interruptStatus), + str::stream() << "new state: " << toString(state) + << ", interruptStatus: " << interruptStatus); + _state = state; - if (interruptStatus) { - invariant(_state == kInterrupted && !interruptStatus->isOK()); - _interruptStatus = interruptStatus.get(); - } + _interruptStatus = (interruptStatus) ? interruptStatus.get() : _interruptStatus; } bool isNotStarted() const { @@ -265,8 +269,9 @@ public: private: // task state. StateFlag _state = kNotStarted; - // task interrupt status. - Status _interruptStatus = Status{ErrorCodes::InternalError, "Uninitialized value"}; + // task interrupt status. Set to Status::OK() only when the recipient service has not + // been interrupted so far, and is used to remember the initial interrupt error. + Status _interruptStatus = Status::OK(); }; /* diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 846a30533fd..a2143a95b87 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -56,6 +56,7 @@ namespace mongo { namespace repl { MONGO_FAIL_POINT_DEFINE(hangInTenantOplogApplication); +MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch); TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, const std::string& tenantId, @@ -277,6 +278,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) { uassertStatusOK(status); } + fpBeforeTenantOplogApplyingBatch.pauseWhileSet(); LOGV2_DEBUG(4886011, 1, |