summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorVishnu Kaushik <vishnu.kaushik@mongodb.com>2021-02-16 19:34:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-18 01:48:19 +0000
commit1ff65f085af47596a2048baa2a1ccf5d9c9a2755 (patch)
treea7d137399f95a4d67979cde9d64e96e0f2ed5835 /src/mongo
parentb2d42972c374aeea62ed203ea652631fe26d53bb (diff)
downloadmongo-1ff65f085af47596a2048baa2a1ccf5d9c9a2755.tar.gz
SERVER-53926 replace recipientSyncData (with returnAfterTimestamp) errors with interrupt status when appropriate
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp46
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h17
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp2
3 files changed, 46 insertions, 19 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index bf964c2e834..c8571ed76c8 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -95,12 +95,14 @@ MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout);
MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone);
MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(fpBeforeFulfillingDataConsistentPromise);
MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion);
MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration);
MONGO_FAIL_POINT_DEFINE(hangAfterCreatingRSM);
MONGO_FAIL_POINT_DEFINE(skipRetriesWhenConnectingToDonorHost);
MONGO_FAIL_POINT_DEFINE(fpBeforeDroppingOplogBufferCollection);
+MONGO_FAIL_POINT_DEFINE(fpWaitUntilTimestampMajorityCommitted);
namespace {
// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine
@@ -320,8 +322,10 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilMigrationReachesConsi
OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCommitted(
OperationContext* opCtx, const Timestamp& donorTs) const {
- // This gives assurance that _tenantOplogApplier pointer won't be empty.
- _dataSyncStartedPromise.getFuture().get(opCtx);
+ // This gives assurance that _tenantOplogApplier pointer won't be empty, and that it has been
+ // started. Additionally, we must have finished processing the recipientSyncData command that
+ // waits on _dataConsistentPromise.
+ _dataConsistentPromise.getFuture().get(opCtx);
auto getWaitOpTimeFuture = [&]() {
stdx::lock_guard lk(_mutex);
@@ -350,7 +354,23 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo
return _tenantOplogApplier->getNotificationForOpTime(
OpTime(donorTs, OpTime::kUninitializedTerm));
};
- auto donorRecipientOpTimePair = getWaitOpTimeFuture().get(opCtx);
+
+ auto waitOpTimeFuture = getWaitOpTimeFuture();
+ fpWaitUntilTimestampMajorityCommitted.pauseWhileSet();
+ auto swDonorRecipientOpTimePair = waitOpTimeFuture.getNoThrow();
+
+ auto status = swDonorRecipientOpTimePair.getStatus();
+
+ // A cancelation error may occur due to an interrupt. If that is the case, replace the error
+ // code with the interrupt code, the true reason for interruption.
+ if (ErrorCodes::isCancelationError(status)) {
+ stdx::lock_guard lk(_mutex);
+ if (!_taskState.getInterruptStatus().isOK()) {
+ status = _taskState.getInterruptStatus();
+ }
+ }
+
+ uassertStatusOK(status);
// We want to guarantee that the recipient logical clock has advanced to at least the donor
// timestamp before returning success for recipientSyncData by doing a majority committed noop
@@ -371,7 +391,7 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo
WaitForMajorityService::get(opCtx->getServiceContext())
.waitUntilMajority(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp())
.get(opCtx);
- return donorRecipientOpTimePair.donorOpTime;
+ return swDonorRecipientOpTimePair.getValue().donorOpTime;
}
std::unique_ptr<DBClientConnection> TenantMigrationRecipientService::Instance::_connectAndAuth(
@@ -1524,6 +1544,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return _getDataConsistentFuture();
})
.then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
stdx::lock_guard lk(_mutex);
LOGV2_DEBUG(4881101,
1,
@@ -1553,16 +1574,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// normally stop by itself on success. It completes only on errors or on external
// interruption (e.g. by shutDown/stepDown or by recipientForgetMigration command).
Status status = applierStatus.getStatus();
- {
- // If we were interrupted during oplog application, replace oplog application
- // status with error state.
+
+ // If we were interrupted during oplog application, replace oplog application
+ // status with error state.
+ // Network and cancellation errors can be caused due to interrupt() (which shuts
+ // down the cloner/fetcher dbClientConnection & oplog applier), so replace those
+ // error status with interrupt status, if set.
+ if (ErrorCodes::isCancelationError(status) || ErrorCodes::isNetworkError(status)) {
stdx::lock_guard lk(_mutex);
- // Network and cancellation errors can be caused due to interrupt() (which shuts
- // down the cloner/fetcher dbClientConnection & oplog applier), so replace those
- // error status with interrupt status, if set.
- if ((ErrorCodes::isCancelationError(status) ||
- ErrorCodes::isNetworkError(status)) &&
- _taskState.isInterrupted()) {
+ if (_taskState.isInterrupted()) {
LOGV2(4881207,
"Migration completed with both error and interrupt",
"tenantId"_attr = getTenantId(),
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 70f542eea1e..dae86b0dd74 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -217,11 +217,15 @@ public:
str::stream() << "current state: " << toString(_state)
<< ", new state: " << toString(state));
+ // The interruptStatus can exist (and should be non-OK) if and only if the state is
+ // kInterrupted.
+ invariant((state == kInterrupted && interruptStatus && !interruptStatus->isOK()) ||
+ (state != kInterrupted && !interruptStatus),
+ str::stream() << "new state: " << toString(state)
+ << ", interruptStatus: " << interruptStatus);
+
_state = state;
- if (interruptStatus) {
- invariant(_state == kInterrupted && !interruptStatus->isOK());
- _interruptStatus = interruptStatus.get();
- }
+ _interruptStatus = (interruptStatus) ? interruptStatus.get() : _interruptStatus;
}
bool isNotStarted() const {
@@ -265,8 +269,9 @@ public:
private:
// task state.
StateFlag _state = kNotStarted;
- // task interrupt status.
- Status _interruptStatus = Status{ErrorCodes::InternalError, "Uninitialized value"};
+ // task interrupt status. Set to Status::OK() only when the recipient service has not
+ // been interrupted so far, and is used to remember the initial interrupt error.
+ Status _interruptStatus = Status::OK();
};
/*
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 846a30533fd..a2143a95b87 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -56,6 +56,7 @@ namespace mongo {
namespace repl {
MONGO_FAIL_POINT_DEFINE(hangInTenantOplogApplication);
+MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch);
TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
const std::string& tenantId,
@@ -277,6 +278,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
uassertStatusOK(status);
}
+ fpBeforeTenantOplogApplyingBatch.pauseWhileSet();
LOGV2_DEBUG(4886011,
1,