summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-12-10 09:14:18 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-10 18:20:53 +0000
commit4b8f297ca8da46eab9fd668a2c3f4df70724c2ac (patch)
treee56e62604af22240851cb85625edc35aa5905c40
parentdfb857953579304273b2737d7c8a094d19d13853 (diff)
downloadmongo-4b8f297ca8da46eab9fd668a2c3f4df70724c2ac.tar.gz
SERVER-48814: Implement recipientForgetMigration command
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml2
-rw-r--r--jstests/replsets/tenant_migration_donor_state_machine.js4
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.cpp21
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.idl57
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp27
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp300
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h54
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp455
-rw-r--r--src/mongo/util/uuid.h1
11 files changed, 783 insertions, 141 deletions
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
index 4913806a87d..7e5cd979b0f 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
@@ -121,7 +121,7 @@ executor:
mode: alwaysOn
data:
blockTimeMS: 250
- # TODO SERVER-48814: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
+ # TODO SERVER-53312: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
failpoint.returnResponseOkForRecipientSyncDataCmd:
mode: alwaysOn
# Set the delay before migration state machine is garbage collected to be short to avoid
diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js
index 72a37ecb75b..4f172b7dc1b 100644
--- a/jstests/replsets/tenant_migration_donor_state_machine.js
+++ b/jstests/replsets/tenant_migration_donor_state_machine.js
@@ -74,9 +74,7 @@ const donorRst = new ReplSetTest({
donorRst.startSet();
donorRst.initiate();
-// TODO SERVER-48814: Remove 'enableRecipientTesting: false'.
-const tenantMigrationTest =
- new TenantMigrationTest({name: jsTestName(), enableRecipientTesting: false, donorRst});
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
if (!tenantMigrationTest.isFeatureFlagEnabled()) {
jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
donorRst.stopSet();
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 9772d960b8f..c1d19fb1877 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -411,6 +411,8 @@ error_codes:
- {code: 333, name: ServiceExecutorInShutdown, categories: [ShutdownError,CancelationError,InternalOnly]}
- {code: 334, name: MechanismUnavailable}
+ - {code: 335, name: TenantMigrationForgotten}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
index 26c47c4328b..01728458942 100644
--- a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
@@ -146,6 +146,27 @@ public:
"recipientForgetMigration command not enabled",
repl::feature_flags::gTenantMigrations.isEnabled(
serverGlobalParams.featureCompatibility));
+ const auto& cmd = request();
+
+ auto recipientService =
+ repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(repl::TenantMigrationRecipientService::
+ kTenantMigrationRecipientServiceName);
+
+ // We may not have a document if recipientForgetMigration is received before
+ // recipientSyncData. But even if that's the case, we still need to create an instance
+ // and persist a state document that's marked garbage collectable (which is done by the
+ // main chain).
+ TenantMigrationRecipientDocument stateDoc(cmd.getMigrationId(),
+ cmd.getDonorConnectionString().toString(),
+ cmd.getTenantId().toString(),
+ cmd.getReadPreference());
+ auto recipientInstance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx, recipientService, stateDoc.toBSON());
+
+ // Instruct the instance run() function to mark this migration garbage collectable.
+ recipientInstance->onReceiveRecipientForgetMigration(opCtx);
+ recipientInstance->getCompletionFuture().get(opCtx);
}
void doCheckAuthorization(OperationContext* opCtx) const {}
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
index 35160bcf881..0d3d53c7c04 100644
--- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
@@ -48,34 +48,42 @@ structs:
type: optime
description: "Majority applied donor optime by the recipient"
+ MigrationRecipientCommonData:
+ description: "Command data for tenant migration recipient commands."
+ strict: true
+ fields:
+ migrationId:
+ description: "Unique identifier for the tenant migration."
+ type: uuid
+ donorConnectionString:
+ description: >-
+ The URI string that the recipient will utilize to create a connection with the
+ donor.
+ type: string
+ validator:
+ callback: "validateConnectionString"
+ tenantId:
+ description: >-
+ The prefix from which the migrating database will be matched. The prefixes 'admin',
+ 'local', 'config', the empty string, are not allowed.
+ type: string
+ validator:
+ callback: "validateDatabasePrefix"
+ readPreference:
+ description: >-
+ The read preference settings that the donor will pass on to the recipient.
+ type: readPreference
+
commands:
recipientSyncData:
description: "Parser for the 'recipientSyncData' command."
command_name: recipientSyncData
strict: true
namespace: ignored
+ inline_chained_structs: true
+ chained_structs:
+ MigrationRecipientCommonData: MigrationRecipientCommonData
fields:
- migrationId:
- description: "Unique identifier for the tenant migration."
- type: uuid
- donorConnectionString:
- description: >-
- The URI string that the recipient will utilize to create a connection with the
- donor.
- type: string
- validator:
- callback: "validateConnectionString"
- tenantId:
- description: >-
- The prefix from which the migrating database will be matched. The prefixes 'admin',
- 'local', 'config', the empty string, are not allowed.
- type: string
- validator:
- callback: "validateDatabasePrefix"
- readPreference:
- description: >-
- The read preference settings that the donor will pass on to the recipient.
- type: readPreference
returnAfterReachingDonorTimestamp:
description: >-
If provided, the recipient should return after syncing up to this donor timestamp.
@@ -90,7 +98,6 @@ commands:
command_name: recipientForgetMigration
strict: true
namespace: ignored
- fields:
- migrationId:
- description: "Unique identifier for the tenant migration."
- type: uuid
+ inline_chained_structs: true
+ chained_structs:
+ MigrationRecipientCommonData: MigrationRecipientCommonData
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 3d2a8244b29..6459f563fb7 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1304,6 +1304,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/read_preference',
+ '$BUILD_DIR/mongo/util/future_util',
'primary_only_service',
'tenant_migration_recipient_utils',
'wait_for_majority_service',
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 4acfbc350a5..b55f7b0f7be 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -460,10 +460,12 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
BSONObj cmdObj = BSONObj([&]() {
auto donorConnString =
repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
- RecipientSyncData request(_stateDoc.getId(),
- donorConnString.toString(),
- _stateDoc.getTenantId().toString(),
- _stateDoc.getReadPreference());
+ RecipientSyncData request;
+ request.setDbName(NamespaceString::kAdminDb);
+ request.setMigrationRecipientCommonData({_stateDoc.getId(),
+ donorConnString.toString(),
+ _stateDoc.getTenantId().toString(),
+ _stateDoc.getReadPreference()});
request.setReturnAfterReachingDonorTimestamp(_stateDoc.getBlockTimestamp());
return request.toBSON(BSONObj());
}());
@@ -475,10 +477,19 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const CancelationToken& token) {
- return _sendCommandToRecipient(executor,
- recipientTargeterRS,
- RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj()),
- token);
+
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ auto donorConnString =
+ repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
+ RecipientForgetMigration request;
+ request.setDbName(NamespaceString::kAdminDb);
+ request.setMigrationRecipientCommonData({_stateDoc.getId(),
+ donorConnString.toString(),
+ _stateDoc.getTenantId().toString(),
+ _stateDoc.getReadPreference()});
+ return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token);
}
SemiFuture<void> TenantMigrationDonorService::Instance::run(
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 580d52a916b..30309958a6a 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/future_util.h"
namespace mongo {
namespace repl {
@@ -65,6 +66,8 @@ constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd;
// A convenient place to set test-specific parameters.
MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration);
// Fails before waiting for the state doc to be majority replicated.
MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc);
@@ -78,6 +81,7 @@ MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone);
MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion);
+MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration);
namespace {
// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine
@@ -186,7 +190,7 @@ boost::optional<BSONObj> TenantMigrationRecipientService::Instance::reportForCur
BSONObjBuilder bob;
bob.append("desc", "tenant recipient migration");
- bob.append("migrationCompleted", _completionPromise.getFuture().isReady());
+ bob.append("migrationCompleted", _taskCompletionPromise.getFuture().isReady());
bob.append("instanceID", _stateDoc.getId().toBSON());
bob.append("tenantId", _stateDoc.getTenantId());
bob.append("readPreference", _stateDoc.getReadPreference().toInnerBSON());
@@ -229,11 +233,10 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo
auto getWaitOpTimeFuture = [&]() {
stdx::lock_guard lk(_mutex);
- if (_taskState.isDone()) {
- // When task state is done, we reset _tenantOplogApplier, so just throw the
- // task completion future result.
- invariant(getCompletionFuture().isReady());
- getCompletionFuture().get(opCtx);
+ if (_dataSyncCompletionPromise.getFuture().isReady()) {
+ // When the data sync is done, we reset _tenantOplogApplier, so just throw the data sync
+ // completion future result.
+ _dataSyncCompletionPromise.getFuture().get();
MONGO_UNREACHABLE;
}
@@ -339,7 +342,7 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
return _donorReplicaSetMonitor->getHostOrRefresh(_readPreference, getHostCancelSource.token())
.thenRunOn(**_scopedExecutor)
- .then([this](const HostAndPort& serverAddress) {
+ .then([this, self = shared_from_this()](const HostAndPort& serverAddress) {
// Application name is constructed such that it doesn't exceeds
// kMaxApplicationNameByteLength (128 bytes).
// "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) +
@@ -359,19 +362,20 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName, _authParams);
return ConnectionPair(std::move(client), std::move(oplogFetcherClient));
})
- .onError([this](const Status& status) -> SemiFuture<ConnectionPair> {
- LOGV2_ERROR(4880404,
- "Connecting to donor failed",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID(),
- "error"_attr = status);
-
- // Make sure we don't end up with a partially initialized set of connections.
- stdx::lock_guard lk(_mutex);
- _client = nullptr;
- _oplogFetcherClient = nullptr;
- return status;
- })
+ .onError(
+ [this, self = shared_from_this()](const Status& status) -> SemiFuture<ConnectionPair> {
+ LOGV2_ERROR(4880404,
+ "Connecting to donor failed",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "error"_attr = status);
+
+ // Make sure we don't end up with a partially initialized set of connections.
+ stdx::lock_guard lk(_mutex);
+ _client = nullptr;
+ _oplogFetcherClient = nullptr;
+ return status;
+ })
.semi();
}
@@ -523,12 +527,12 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
ReplicationProcess::kUninitializedRollbackId,
false /* requireFresherSyncSource */,
_dataReplicatorExternalState.get(),
- [this](OplogFetcher::Documents::const_iterator first,
- OplogFetcher::Documents::const_iterator last,
- const OplogFetcher::DocumentsInfo& info) {
+ [this, self = shared_from_this()](OplogFetcher::Documents::const_iterator first,
+ OplogFetcher::Documents::const_iterator last,
+ const OplogFetcher::DocumentsInfo& info) {
return _enqueueDocuments(first, last, info);
},
- [this](const Status& s, int rbid) { _oplogFetcherCallback(s); },
+ [this, self = shared_from_this()](const Status& s, int rbid) { _oplogFetcherCallback(s); },
tenantMigrationOplogFetcherBatchSize,
OplogFetcher::StartingPoint::kEnqueueFirstDoc,
_getOplogFetcherFilter(),
@@ -597,16 +601,17 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl
"Recipient migration service oplog fetcher stopped due to stopReplProducer failpoint",
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID());
- interrupt({ErrorCodes::Error(4881206),
- "Recipient migration service oplog fetcher stopped due to stopReplProducer "
- "failpoint"});
+ _interrupt({ErrorCodes::Error(4881206),
+ "Recipient migration service oplog fetcher stopped due to stopReplProducer "
+ "failpoint"},
+ /*skipWaitingForForgetMigration=*/false);
} else if (oplogFetcherStatus.code() != ErrorCodes::CallbackCanceled) {
LOGV2_ERROR(4881204,
"Recipient migration service oplog fetcher failed",
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID(),
"error"_attr = oplogFetcherStatus);
- interrupt(oplogFetcherStatus);
+ _interrupt(oplogFetcherStatus, /*skipWaitingForForgetMigration=*/false);
}
}
@@ -707,18 +712,19 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu
return _tenantOplogApplier
->getNotificationForOpTime(_stateDoc.getDataConsistentStopDonorOpTime().get())
.thenRunOn(**_scopedExecutor)
- .then([this](TenantOplogApplier::OpTimePair donorRecipientOpTime) {
- auto opCtx = cc().makeOperationContext();
+ .then(
+ [this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) {
+ auto opCtx = cc().makeOperationContext();
- stdx::lock_guard lk(_mutex);
- // Persist the state that tenant migration instance has reached
- // consistent state.
- _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent);
- uassertStatusOK(
- tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
- })
+ stdx::lock_guard lk(_mutex);
+ // Persist the state that tenant migration instance has reached
+ // consistent state.
+ _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent);
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
+ })
.semi();
}
@@ -752,8 +758,64 @@ void setPromiseErrorifNotReady(WithLock lk, Promise& promise, Status status) {
promise.setError(status);
}
+
+template <class Promise>
+void setPromiseOkifNotReady(WithLock lk, Promise& promise) {
+ if (promise.getFuture().isReady()) {
+ return;
+ }
+
+ promise.emplaceValue();
+}
+
} // namespace
+SemiFuture<void>
+TenantMigrationRecipientService::Instance::_markStateDocumentAsGarbageCollectable() {
+ _stopOrHangOnFailPoint(&fpAfterReceivingRecipientForgetMigration);
+
+ // Throws if we have failed to persist the state doc at the first place. This can only happen in
+ // unittests where we enable the autoRecipientForgetMigration failpoint. Otherwise,
+ // recipientForgetMigration will always wait for the state doc to be persisted first and thus
+ // this will only be called with _stateDocPersistedPromise resolved OK.
+ invariant(_stateDocPersistedPromise.getFuture().isReady());
+ _stateDocPersistedPromise.getFuture().get();
+
+ stdx::lock_guard lk(_mutex);
+
+ if (_stateDoc.getExpireAt()) {
+ // Nothing to do if the state doc already has the expireAt set.
+ return SemiFuture<void>::makeReady();
+ }
+
+ auto uniqueOpCtx = cc().makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+ _stateDoc.setState(TenantMigrationRecipientStateEnum::kDone);
+ _stateDoc.setExpireAt(opCtx->getServiceContext()->getFastClockSource()->now() +
+ Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
+
+ auto status = [&]() {
+ try {
+ // Update the state doc with the expireAt set.
+ return tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc);
+ } catch (DBException& ex) {
+ return ex.toStatus();
+ }
+ }();
+ if (!status.isOK()) {
+ // We assume that we only fail with shutDown/stepDown errors (i.e. for failovers).
+ // Otherwise, the whole chain would stop running without marking the state doc garbage
+ // collectable while we are still the primary.
+ invariant(ErrorCodes::isShutdownError(status) || ErrorCodes::isNotPrimaryError(status));
+ uassertStatusOK(status);
+ }
+
+ auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(writeOpTime)
+ .semi();
+}
+
void TenantMigrationRecipientService::Instance::_cancelRemainingWork(WithLock lk) {
if (_sharedData) {
stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
@@ -777,11 +839,19 @@ void TenantMigrationRecipientService::Instance::_cancelRemainingWork(WithLock lk
shutdownTarget(lk, _writerPool);
}
-void TenantMigrationRecipientService::Instance::interrupt(Status status) {
+void TenantMigrationRecipientService::Instance::_interrupt(Status status,
+ bool skipWaitingForForgetMigration) {
invariant(!status.isOK());
stdx::lock_guard lk(_mutex);
+ if (skipWaitingForForgetMigration) {
+ // We only get here on receiving the recipientForgetMigration command or on
+ // stepDown/shutDown. On receiving the recipientForgetMigration, the promise should have
+ // already been set.
+ setPromiseErrorifNotReady(lk, _receivedRecipientForgetMigrationPromise, status);
+ }
+
if (_taskState.isInterrupted() || _taskState.isDone()) {
// nothing to do.
return;
@@ -792,15 +862,54 @@ void TenantMigrationRecipientService::Instance::interrupt(Status status) {
// If the task is running, then setting promise result will be taken care by the main task
// continuation chain.
if (_taskState.isNotStarted()) {
+ invariant(skipWaitingForForgetMigration);
+ _stateDocPersistedPromise.setError(status);
_dataSyncStartedPromise.setError(status);
_dataConsistentPromise.setError(status);
- _completionPromise.setError(status);
+ _dataSyncCompletionPromise.setError(status);
+
+ // The interrupt() is called before the instance is scheduled to run. If the state doc has
+ // already been marked garbage collectable, resolve the completion promise with OK.
+ if (_stateDoc.getExpireAt()) {
+ _taskCompletionPromise.emplaceValue();
+ } else {
+ _taskCompletionPromise.setError(status);
+ }
}
_taskState.setState(TaskState::kInterrupted, status);
}
-void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status status) {
+void TenantMigrationRecipientService::Instance::interrupt(Status status) {
+ _interrupt(status, /*skipWaitingForForgetMigration=*/true);
+}
+
+void TenantMigrationRecipientService::Instance::onReceiveRecipientForgetMigration(
+ OperationContext* opCtx) {
+ LOGV2(4881400,
+ "Forgetting migration due to recipientForgetMigration command",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId());
+
+ // Wait until the state doc is initialized and persisted.
+ _stateDocPersistedPromise.getFuture().get(opCtx);
+
+ {
+ stdx::lock_guard lk(_mutex);
+ if (_receivedRecipientForgetMigrationPromise.getFuture().isReady()) {
+ return;
+ }
+ _receivedRecipientForgetMigrationPromise.emplaceValue();
+ }
+
+ // Interrupt the chain to mark the state doc garbage collectable.
+ _interrupt(Status(ErrorCodes::TenantMigrationForgotten,
+ str::stream() << "recipientForgetMigration received for migration "
+ << getMigrationUUID()),
+ /*skipWaitingForForgetMigration=*/false);
+}
+
+void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Status status) {
auto opCtx = cc().makeOperationContext();
std::unique_ptr<OplogFetcher> savedDonorOplogFetcher;
@@ -814,19 +923,11 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status
shutdownTarget(lk, _donorOplogFetcher);
shutdownTargetWithOpCtx(lk, _donorOplogBuffer, opCtx.get());
- if (status.isOK()) {
- // All intermediary promise should have been fulfilled already.
- invariant(_dataSyncStartedPromise.getFuture().isReady() &&
- _dataConsistentPromise.getFuture().isReady());
- _completionPromise.emplaceValue();
- }
-
invariant(!status.isOK());
+ setPromiseErrorifNotReady(lk, _stateDocPersistedPromise, status);
setPromiseErrorifNotReady(lk, _dataSyncStartedPromise, status);
setPromiseErrorifNotReady(lk, _dataConsistentPromise, status);
- setPromiseErrorifNotReady(lk, _completionPromise, status);
-
- _taskState.setState(TaskState::kDone);
+ setPromiseErrorifNotReady(lk, _dataSyncCompletionPromise, status);
// Save them to join() with it outside of _mutex.
using std::swap;
@@ -837,7 +938,7 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status
// Perform join outside the lock to avoid deadlocks.
joinTarget(savedDonorOplogFetcher);
- joinTarget(savedDonorOplogFetcher);
+ joinTarget(savedTenantOplogApplier);
joinTarget(savedWriterPool);
}
@@ -870,7 +971,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
"readPreference"_attr = _readPreference);
return ExecutorFuture(**executor)
- .then([this] {
+ .then([this, self = shared_from_this()] {
stdx::lock_guard lk(_mutex);
// Instance task can be started only once for the current term on a primary.
invariant(!_taskState.isDone());
@@ -881,20 +982,27 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
_taskState.setState(TaskState::kRunning);
+ pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet();
+
+ uassert(ErrorCodes::TenantMigrationForgotten,
+ str::stream() << "Migration " << getMigrationUUID()
+ << " already marked for garbage collect",
+ !_stateDoc.getExpireAt());
+
return _initializeStateDoc(lk);
})
- .then([this] {
+ .then([this, self = shared_from_this()] {
+ _stateDocPersistedPromise.emplaceValue();
_stopOrHangOnFailPoint(&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc);
return _createAndConnectClients();
})
- .then([this](ConnectionPair ConnectionPair) {
+ .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) {
stdx::lock_guard lk(_mutex);
if (_taskState.isInterrupted()) {
uassertStatusOK(_taskState.getInterruptStatus());
}
- // interrupt() called after this code block will interrupt the cloner, oplog applier and
- // fetcher.
+ // interrupt() called after this code block will interrupt the cloner and fetcher.
_client = std::move(ConnectionPair.first);
_oplogFetcherClient = std::move(ConnectionPair.second);
@@ -903,17 +1011,9 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
_sharedData = std::make_unique<TenantMigrationSharedData>(
getGlobalServiceContext()->getFastClockSource(), getMigrationUUID());
})
- .then([this] {
+ .then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance);
stdx::lock_guard lk(_mutex);
- // The instance is marked as garbage collect if the migration is either
- // committed or aborted on donor side. So, don't start the recipient task if the
- // instance state doc is marked for garbage collect.
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Can't start the data sync as the state doc is already marked "
- "for garbage collect for migration uuid: "
- << getMigrationUUID(),
- !_stateDoc.getExpireAt());
_getStartOpTimesFromDonor(lk);
auto opCtx = cc().makeOperationContext();
uassertStatusOK(
@@ -921,14 +1021,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return WaitForMajorityService::get(opCtx->getServiceContext())
.waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
})
- .then([this] {
+ .then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
_startOplogFetcher();
})
- .then([this] {
+ .then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance);
stdx::lock_guard lk(_mutex);
+
{
// Throwing error when cloner is canceled externally via interrupt(), makes the
// instance to skip the remaining task (i.e., starting oplog applier) in the
@@ -963,8 +1064,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
_dataSyncStartedPromise.emplaceValue();
return clonerFuture;
})
- .then([this] { return _onCloneSuccess(); })
- .then([this] {
+ .then([this, self = shared_from_this()] { return _onCloneSuccess(); })
+ .then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterCollectionClonerDone);
LOGV2_DEBUG(4881200,
1,
@@ -978,7 +1079,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
_stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance);
return _getDataConsistentFuture();
})
- .then([this] {
+ .then([this, self = shared_from_this()] {
stdx::lock_guard lk(_mutex);
LOGV2_DEBUG(4881101,
1,
@@ -990,7 +1091,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
_dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopDonorOpTime().get());
})
- .then([this] {
+ .then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
stdx::lock_guard lk(_mutex);
// wait for oplog applier to complete/stop.
@@ -999,8 +1100,14 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return _tenantOplogApplier->getNotificationForOpTime(OpTime::max());
})
.thenRunOn(_recipientService->getInstanceCleanupExecutor())
- .onCompletion([this](StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
- // We don't need the final optime from the oplog applier.
+ .onCompletion([this, self = shared_from_this()](
+ StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
+ // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we need
+ // to schedule the clean up work on the parent executor.
+
+ // We don't need the final optime from the oplog applier. The data sync does not
+ // normally stop by itself on success. It completes only on errors or on external
+ // interruption (e.g. by shutDown/stepDown or by recipientForgetMigration command).
Status status = applierStatus.getStatus();
{
// If we were interrupted during oplog application, replace oplog application
@@ -1035,7 +1142,48 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
hangBeforeTaskCompletion.pauseWhileSet();
}
- _cleanupOnTaskCompletion(status);
+ _cleanupOnDataSyncCompletion(status);
+
+ // Handle recipientForgetMigration.
+ stdx::lock_guard lk(_mutex);
+ if (_stateDoc.getExpireAt() ||
+ MONGO_unlikely(autoRecipientForgetMigration.shouldFail())) {
+ // Skip waiting for the recipientForgetMigration command.
+ setPromiseOkifNotReady(lk, _receivedRecipientForgetMigrationPromise);
+ }
+ })
+ .thenRunOn(**_scopedExecutor)
+ .then([this, self = shared_from_this()] {
+ // Schedule on the _scopedExecutor to make sure we are still the primary when waiting
+ // for the recipientForgetMigration command.
+ return _receivedRecipientForgetMigrationPromise.getFuture();
+ })
+ .then(
+ [this, self = shared_from_this()] { return _markStateDocumentAsGarbageCollectable(); })
+ .thenRunOn(_recipientService->getInstanceCleanupExecutor())
+ .onCompletion([this, self = shared_from_this()](Status status) {
+ // Schedule on the parent executor to mark the completion of the whole chain so this is
+ // safe even on shutDown/stepDown.
+ stdx::lock_guard lk(_mutex);
+ invariant(_dataSyncCompletionPromise.getFuture().isReady());
+ if (status.isOK()) {
+ LOGV2(4881401,
+ "Migration marked to be garbage collectable due to recipientForgetMigration "
+ "command",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "expireAt"_attr = *_stateDoc.getExpireAt());
+ setPromiseOkifNotReady(lk, _taskCompletionPromise);
+ } else {
+ // We should only hit here on a stepDown/shutDown.
+ LOGV2(4881402,
+ "Migration not marked to be garbage collectable",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "status"_attr = status);
+ setPromiseErrorifNotReady(lk, _taskCompletionPromise, status);
+ }
+ _taskState.setState(TaskState::kDone);
})
.semi();
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 0036756380e..320eb9b5366 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -89,11 +89,24 @@ public:
void interrupt(Status status) override;
/**
- * Returns a Future that will be resolved when all work associated with this Instance has
+ * Interrupts the migration for garbage collection.
+ */
+ void onReceiveRecipientForgetMigration(OperationContext* opCtx);
+
+ /**
+ * Returns a Future that will be resolved when data sync associated with this Instance has
* completed running.
*/
+ SharedSemiFuture<void> getDataSyncCompletionFuture() const {
+ return _dataSyncCompletionPromise.getFuture();
+ }
+
+ /**
+ * Returns a Future that will be resolved when the work associated with this Instance has
+ * completed to indicate whether the migration is forgotten successfully.
+ */
SharedSemiFuture<void> getCompletionFuture() const {
- return _completionPromise.getFuture();
+ return _taskCompletionPromise.getFuture();
}
/**
@@ -189,7 +202,7 @@ public:
void setState(StateFlag state, boost::optional<Status> interruptStatus = boost::none) {
invariant(checkIfValidTransition(state),
- str::stream() << "current state :" << toString(_state)
+ str::stream() << "current state: " << toString(_state)
<< ", new state: " << toString(state));
_state = state;
@@ -245,6 +258,14 @@ public:
};
/*
+ * Helper for interrupt().
+ * The _receivedForgetMigrationPromise is resolved when skipWaitingForForgetMigration is
+ * set (e.g. stepDown/shutDown). And we use skipWaitingForForgetMigration=false for
+ * interruptions coming from the instance's task chain itself (e.g. _oplogFetcherCallback).
+ */
+ void _interrupt(Status status, bool skipWaitingForForgetMigration);
+
+ /*
* Transitions the instance state to 'kStarted'.
*
* Persists the instance state doc and waits for it to be majority replicated.
@@ -252,6 +273,14 @@ public:
*/
SemiFuture<void> _initializeStateDoc(WithLock);
+ /*
+ * Transitions the instance state to 'kDone' and sets the expireAt field.
+ *
+ * Persists the instance state doc and waits for it to be majority replicated.
+ * Throws on shutdown / notPrimary errors.
+ */
+ SemiFuture<void> _markStateDocumentAsGarbageCollectable();
+
/**
* Creates a client, connects it to the donor, and authenticates it if authParams is
* non-empty. Throws a user assertion on failure.
@@ -285,7 +314,6 @@ public:
/**
* Starts the tenant oplog fetcher.
*/
-
void _startOplogFetcher();
/**
@@ -329,10 +357,10 @@ public:
void _cancelRemainingWork(WithLock lk);
/*
- * Performs some cleanup work on task completion, like, shutting down the components or
- * fulfilling any instance promises.
+ * Performs some cleanup work on sync completion, like, shutting down the components or
+ * fulfilling any data-sync related instance promises.
*/
- void _cleanupOnTaskCompletion(Status status);
+ void _cleanupOnDataSyncCompletion(Status status);
/*
* Makes the failpoint to stop or hang based on failpoint data "action" field.
@@ -386,13 +414,21 @@ public:
// Indicates whether the main task future continuation chain state kicked off by run().
TaskState _taskState; // (M)
- // Promise that is resolved when the chain of work kicked off by run() has completed.
- SharedPromise<void> _completionPromise; // (W)
+ // Promise that is resolved when the state document is initialized and persisted.
+ SharedPromise<void> _stateDocPersistedPromise; // (W)
// Promise that is resolved Signaled when the instance has started tenant database cloner
// and tenant oplog fetcher.
SharedPromise<void> _dataSyncStartedPromise; // (W)
// Promise that is resolved Signaled when the tenant data sync has reached consistent point.
SharedPromise<OpTime> _dataConsistentPromise; // (W)
+ // Promise that is resolved when the data sync has completed.
+ SharedPromise<void> _dataSyncCompletionPromise; // (W)
+ // Promise that is resolved when the recipientForgetMigration command is received or on
+ // stepDown/shutDown with errors.
+ SharedPromise<void> _receivedRecipientForgetMigrationPromise; // (W)
+ // Promise that is resolved when the chain of work kicked off by run() has completed to
+ // indicate whether the state doc is successfully marked as garbage collectable.
+ SharedPromise<void> _taskCompletionPromise; // (W)
};
private:
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index ae59a422d3b..f8d39c19b8c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -133,11 +133,12 @@ class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest {
public:
class stopFailPointEnableBlock : public FailPointEnableBlock {
public:
- explicit stopFailPointEnableBlock(StringData failPointName)
+ explicit stopFailPointEnableBlock(StringData failPointName,
+ std::int32_t error = stopFailPointErrorCode)
: FailPointEnableBlock(failPointName,
BSON("action"
<< "stop"
- << "stopErrorCode" << stopFailPointErrorCode)) {}
+ << "stopErrorCode" << error)) {}
};
void setUp() override {
@@ -150,6 +151,11 @@ public:
WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+ // Automatically mark the state doc garbage collectable after data sync completion.
+ globalFailPointRegistry()
+ .find("autoRecipientForgetMigration")
+ ->setMode(FailPoint::alwaysOn);
+
{
auto opCtx = cc().makeOperationContext();
auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext);
@@ -313,7 +319,8 @@ TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientService
ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
@@ -336,8 +343,11 @@ TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePe
ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
// Should be able to see the instance task failure error.
- auto status = instance->getCompletionFuture().getNoThrow();
+ auto status = instance->getDataSyncCompletionFuture().getNoThrow();
ASSERT_EQ(ErrorCodes::NotWritablePrimary, status.code());
+ // Should also fail to mark the state doc garbage collectable if we have failed to persist the
+ // state doc at the first place.
+ ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Primary) {
@@ -383,7 +393,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Secondary) {
@@ -429,7 +440,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_S
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFails) {
@@ -488,7 +500,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
// Wait for task completion failure.
ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference,
- instance->getCompletionFuture().getNoThrow().code());
+ instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFailsOver) {
@@ -537,7 +550,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_BadConnectString) {
@@ -601,7 +615,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi
ASSERT(instance.get());
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -640,7 +655,8 @@ TEST_F(TenantMigrationRecipientServiceTest,
pauseFailPoint->setMode(FailPoint::off, 0);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -676,7 +692,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi
ASSERT(instance.get());
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -722,7 +739,8 @@ TEST_F(TenantMigrationRecipientServiceTest,
pauseFailPoint->setMode(FailPoint::off, 0);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -750,7 +768,8 @@ TEST_F(TenantMigrationRecipientServiceTest,
ASSERT(instance.get());
// Wait for task completion.
- ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
// Even though we failed, the memory state should still match the on-disk state.
checkStateDocPersisted(opCtx.get(), instance.get());
@@ -796,7 +815,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFe
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner) {
@@ -860,7 +880,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplication) {
@@ -908,8 +929,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat
oplogFetcher->shutdownWith({ErrorCodes::Error(4881203), "Injected error"});
// Wait for task completion failure.
- auto status = instance->getCompletionFuture().getNoThrow();
+ auto status = instance->getDataSyncCompletionFuture().getNoThrow();
ASSERT_EQ(4881203, status.code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) {
@@ -964,7 +986,8 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) {
oplogFetcher->receiveBatch(1LL, {oplogEntry.toBSON()}, injectedEntryOpTime.getTimestamp());
// Wait for task completion failure.
- ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) {
@@ -1012,7 +1035,8 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) {
// Wait for task completion. Since we're using a test function to cancel the applier,
// the actual result is not critical.
- ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTokenNoopsToBuffer) {
@@ -1115,7 +1139,400 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTok
oplogFetcherFP->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_BeforeRun) {
+ const UUID migrationUUID = UUID::gen();
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance");
+ fp->setMode(FailPoint::alwaysOn);
+
+ auto opCtx = makeOperationContext();
+ auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+
+ // The task is interrupted before it start the chain.
+ instance->interrupt({ErrorCodes::InterruptedDueToReplStateChange, "Test stepdown"});
+
+ // Test that receiving recipientForgetMigration command after that should result in the same
+ // error.
+ ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()),
+ AssertionException,
+ ErrorCodes::InterruptedDueToReplStateChange);
+
+ fp->setMode(FailPoint::off);
+
+ // We should fail to mark the state doc garbage collectable.
+ ASSERT_EQ(instance->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToInitializeStateDoc) {
+ stopFailPointEnableBlock fp("failWhilePersistingTenantMigrationRecipientInstanceStateDoc");
+
+ const UUID migrationUUID = UUID::gen();
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ auto opCtx = makeOperationContext();
+ auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+
+ ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()),
+ AssertionException,
+ ErrorCodes::NotWritablePrimary);
+ // We should fail to mark the state doc garbage collectable if we have failed to initialize and
+ // persist the state doc at the first place.
+ ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), ErrorCodes::NotWritablePrimary);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_WaitUntilStateDocInitialized) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after persisting the
+ // state doc.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ const UUID migrationUUID = UUID::gen();
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ auto fp = globalFailPointRegistry().find("pauseAfterRunTenantMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn);
+
+ auto opCtx = makeOperationContext();
+ auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+
+ // Test that onReceiveRecipientForgetMigration waits until the state doc is initialized.
+ opCtx->setDeadlineAfterNowBy(Seconds(2), opCtx->getTimeoutError());
+ ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()),
+ AssertionException,
+ opCtx->getTimeoutError());
+
+ // Unblock the task chain.
+ fp->setMode(FailPoint::off);
+
+ // Make a new opCtx as the old one has expired due to timeout errors.
+ opCtx.reset();
+ opCtx = makeOperationContext();
+
+ // The onReceiveRecipientForgetMigration should eventually go through.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+ ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(),
+ ErrorCodes::TenantMigrationForgotten);
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881411,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now());
+ ASSERT_TRUE(doc.getStartApplyingDonorOpTime() == boost::none);
+ ASSERT_TRUE(doc.getStartFetchingDonorOpTime() == boost::none);
+ ASSERT_TRUE(doc.getDataConsistentStopDonorOpTime() == boost::none);
+ ASSERT_TRUE(doc.getCloneFinishedRecipientOpTime() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterStartOpTimes) {
+ auto fp =
+ globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
+
+ fp->setMode(FailPoint::off);
+ ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(),
+ ErrorCodes::TenantMigrationForgotten);
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881412,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterConsistent) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after reaching data
+ // consistent state.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ auto dataConsistentFp =
+ globalFailPointRegistry().find("fpAfterDataConsistentMigrationRecipientInstance");
+ auto initialTimesEntered = dataConsistentFp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Setting these causes us to skip cloning.
+ initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
+ initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+ dataConsistentFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881413,
+ "Test migration after consistent",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kConsistent);
+ ASSERT_TRUE(doc.getExpireAt() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+
+ // Test receiving duplicating recipientForgetMigration requests.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+
+ // Continue after data being consistent.
+ dataConsistentFp->setMode(FailPoint::off);
+
+ // The data sync should have completed.
+ ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(),
+ ErrorCodes::TenantMigrationForgotten);
+
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881414,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() >
+ opCtx->getServiceContext()->getFastClockSource()->now());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterFail) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after getting an
+ // error from the migration.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ stopFailPointEnableBlock fp("fpAfterCollectionClonerDone");
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Setting these causes us to skip cloning.
+ initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
+ initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ ASSERT_THROWS_CODE(instance->waitUntilMigrationReachesConsistentState(opCtx.get()),
+ AssertionException,
+ stopFailPointErrorCode);
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881415,
+ "Test migration after collection cloner done",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted);
+ ASSERT_TRUE(doc.getExpireAt() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+
+ // The data sync should have completed.
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+
+ // The instance should still be running and waiting for the recipientForgetMigration command.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881416,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() >
+ opCtx->getServiceContext()->getFastClockSource()->now());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToMarkGarbageCollectable) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after getting an
+ // error from the migration.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ stopFailPointEnableBlock fp("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc");
+ const UUID migrationUUID = UUID::gen();
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
+
+ // The data sync should have completed.
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+
+ // Fail marking the state doc garbage collectable with a different error code, simulating a
+ // stepDown.
+ stopFailPointEnableBlock fpFailForget("fpAfterReceivingRecipientForgetMigration",
+ ErrorCodes::NotWritablePrimary);
+
+ // The instance should still be running and waiting for the recipientForgetMigration command.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+ // Check that it fails to mark the state doc garbage collectable.
+ ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow().code());
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881417,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted);
+ ASSERT_TRUE(doc.getExpireAt() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h
index 4f1d9c06364..de3e4930a5d 100644
--- a/src/mongo/util/uuid.h
+++ b/src/mongo/util/uuid.h
@@ -83,6 +83,7 @@ class UUID {
friend class LogicalSessionFromClient;
friend class MigrationCoordinatorDocument;
friend class MigrationDestinationManager;
+ friend class MigrationRecipientCommonData;
friend class RangeDeletionTask;
friend class ResolvedKeyId;
friend class repl::CollectionInfo;