summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-09-01 19:22:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-09 02:11:58 +0000
commit3626a65b1d1f8ff2d230704146a09595f78bfe51 (patch)
treec9e6a45831bb3c29ff7d866cb0405c9e73482f78 /src/mongo/db/repl
parente21416d2889b54830624d812a48b2a5e07f4e47a (diff)
downloadmongo-3626a65b1d1f8ff2d230704146a09595f78bfe51.tar.gz
SERVER-49204 Implement donorForgetMigration command
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl13
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp127
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h38
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp49
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h5
-rw-r--r--src/mongo/db/repl/tenant_migration_state_machine.idl8
7 files changed, 183 insertions, 58 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 63a2626d1b3..d1fd2714f33 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1281,6 +1281,7 @@ env.Library(
],
LIBDEPS=[
'primary_only_service',
+ 'repl_server_parameters',
'tenant_migration_donor',
'wait_for_majority_service',
],
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 562d80ad126..4153ad4ef0e 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -330,4 +330,15 @@ server_parameters:
set_at: startup
cpp_vartype: bool
cpp_varname: enableTenantMigrations
- default: false \ No newline at end of file
+ default: false
+
+ tenantMigrationGarbageCollectionDelayMS:
+ description: >-
+ The amount of time in milliseconds that the donor or recipient should wait before
+ removing the migration state document after receiving donorForgetMigration or
+ recipientForgetMigration.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: tenantMigrationGarbageCollectionDelayMS
+ default:
+ expr: 48 * 60 * 60 * 1000
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 8f3fb868e7a..d821c6c3f26 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/tenant_migration_access_blocker.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
@@ -173,6 +174,34 @@ repl::OpTime TenantMigrationDonorService::Instance::_updateStateDocument(
return updateOpTime.get();
}
+repl::OpTime TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable() {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ DBDirectClient dbClient(opCtx);
+
+ _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
+
+ auto commandResponse = dbClient.runCommand([&] {
+ write_ops::Update updateOp(_stateDocumentsNS);
+ auto updateModification =
+ write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON());
+ write_ops::UpdateOpEntry updateEntry(
+ BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()),
+ updateModification);
+ updateEntry.setMulti(false);
+ updateEntry.setUpsert(false);
+ updateOp.setUpdates({updateEntry});
+
+ return updateOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+}
+
ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime) {
return WaitForMajorityService::get(_serviceContext)
@@ -180,27 +209,11 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit
.thenRunOn(**executor);
}
-ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
+ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient(
+ OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter) {
- if (skipSendingRecipientSyncDataCommand.shouldFail()) {
- return ExecutorFuture<void>(**executor, Status::OK());
- }
-
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
-
- BSONObj cmdObj = BSONObj([&]() {
- auto donorConnString =
- repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
- RecipientSyncData request(_stateDoc.getId(),
- donorConnString.toString(),
- _stateDoc.getDatabasePrefix().toString(),
- _stateDoc.getReadPreference());
- request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp());
- return request.toBSON(BSONObj());
- }());
-
+ RemoteCommandTargeter* recipientTargeter,
+ const BSONObj& cmdObj) {
HostAndPort recipientHost =
uassertStatusOK(recipientTargeter->findHost(opCtx, ReadPreferenceSetting()));
@@ -238,6 +251,42 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
});
}
+ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter) {
+ if (skipSendingRecipientSyncDataCommand.shouldFail()) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ BSONObj cmdObj = BSONObj([&]() {
+ auto donorConnString =
+ repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
+ RecipientSyncData request(_stateDoc.getId(),
+ donorConnString.toString(),
+ _stateDoc.getDatabasePrefix().toString(),
+ _stateDoc.getReadPreference());
+ request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp());
+ return request.toBSON(BSONObj());
+ }());
+
+ return _sendCommandToRecipient(opCtx, executor, recipientTargeter, cmdObj);
+}
+
+ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter) {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ return _sendCommandToRecipient(opCtx,
+ executor,
+ recipientTargeter,
+ RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj()));
+}
+
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
auto recipientUri =
@@ -298,15 +347,6 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
// Wait for the migration to commit or abort.
return _mtab->onCompletion();
})
- .onError([this](Status status) {
- if (!status.isOK() && _abortReason) {
- status.addContext(str::stream()
- << "Tenant migration with id \"" << _stateDoc.getId()
- << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix()
- << "\" aborted due to " << _abortReason);
- }
- return status;
- })
.onCompletion([this](Status status) {
LOGV2(5006601,
"Tenant migration completed",
@@ -314,6 +354,35 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"dbPrefix"_attr = _stateDoc.getDatabasePrefix(),
"status"_attr = status,
"abortReason"_attr = _abortReason);
+
+ if (status.isOK()) {
+ _decisionPromise.emplaceValue();
+ } else {
+ if (_abortReason) {
+ status.addContext(str::stream()
+ << "Tenant migration with id \"" << _stateDoc.getId()
+ << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix()
+ << "\" aborted due to " << _abortReason);
+ }
+ _decisionPromise.setError(status);
+ }
+ })
+ .then([this, executor] {
+ // Wait for the donorForgetMigration command.
+ return _receivedDonorForgetMigrationPromise.getFuture();
+ })
+ .then([this, executor] {
+ const auto opTime = _markStateDocumentAsGarbageCollectable();
+ return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ })
+ .then([this, executor, recipientTargeter] {
+ return _sendRecipientForgetMigrationCommand(executor, recipientTargeter.get());
+ })
+ .onCompletion([this, executor](Status status) {
+ LOGV2(4920400,
+ "Marked migration state as garbage collectable",
+ "migrationId"_attr = _stateDoc.getId(),
+ "expireAt"_attr = _stateDoc.getExpireAt());
return status;
})
.semi();
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 30fe3a41461..ddf178121e4 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -80,6 +80,17 @@ public:
*/
Status checkIfOptionsConflict(BSONObj options);
+ /**
+ * Returns a Future that will be resolved when the migration has committed or aborted.
+ */
+ SharedSemiFuture<void> getDecisionFuture() const {
+ return _decisionPromise.getFuture();
+ }
+
+ void onReceiveDonorForgetMigration() {
+ _receivedDonorForgetMigrationPromise.emplaceValue();
+ }
+
private:
const NamespaceString _stateDocumentsNS = NamespaceString::kTenantMigrationDonorsNamespace;
@@ -98,24 +109,51 @@ public:
repl::OpTime _updateStateDocument(const TenantMigrationDonorStateEnum nextState);
/**
+ * Sets the "expireAt" time for the state document to be garbage collected.
+ */
+ repl::OpTime _markStateDocumentAsGarbageCollectable();
+
+ /**
* Waits for given opTime to be majority committed.
*/
ExecutorFuture<void> _waitForMajorityWriteConcern(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime);
/**
+ * Sends the given command to the recipient replica set.
+ */
+ ExecutorFuture<void> _sendCommandToRecipient(
+ OperationContext* opCtx,
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter,
+ const BSONObj& cmdObj);
+
+ /**
* Sends the recipientSyncData command to the recipient replica set.
*/
ExecutorFuture<void> _sendRecipientSyncDataCommand(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
RemoteCommandTargeter* recipientTargeter);
+ /**
+ * Sends the recipientForgetMigration command to the recipient replica set.
+ */
+ ExecutorFuture<void> _sendRecipientForgetMigrationCommand(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter);
+
ServiceContext* _serviceContext;
TenantMigrationDonorDocument _stateDoc;
std::shared_ptr<TenantMigrationAccessBlocker> _mtab;
boost::optional<Status> _abortReason;
+
+ // Promise that is resolved when the donor has majority-committed the migration decision.
+ SharedPromise<void> _decisionPromise;
+
+ // Promise that is resolved when the donor receives the donorForgetMigration command.
+ SharedPromise<void> _receivedDonorForgetMigrationPromise;
};
private:
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index 296e041658d..edae74ab4ff 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -58,7 +58,8 @@ const char kNetName[] = "TenantMigrationWorkerNetwork";
* Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking
* state.
*/
-void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) {
+void onTransitionToBlocking(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kBlocking);
invariant(donorStateDoc.getBlockTimestamp());
@@ -91,7 +92,8 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
/**
* Transitions the TenantMigrationAccessBlocker to the committed state.
*/
-void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) {
+void onTransitionToCommitted(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kCommitted);
invariant(donorStateDoc.getCommitOrAbortOpTime());
@@ -105,7 +107,8 @@ void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocume
/**
* Transitions the TenantMigrationAccessBlocker to the aborted state.
*/
-void onTransitionToAborted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) {
+void onTransitionToAborted(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAborted);
invariant(donorStateDoc.getCommitOrAbortOpTime());
@@ -130,24 +133,28 @@ std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(
executor::makeNetworkInterface(kNetName, nullptr, nullptr));
}
-void onDonorStateTransition(OperationContext* opCtx, const BSONObj& donorStateDoc) {
- auto parsedDonorStateDoc =
- TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), donorStateDoc);
-
- switch (parsedDonorStateDoc.getState()) {
- case TenantMigrationDonorStateEnum::kDataSync:
- break;
- case TenantMigrationDonorStateEnum::kBlocking:
- onTransitionToBlocking(opCtx, parsedDonorStateDoc);
- break;
- case TenantMigrationDonorStateEnum::kCommitted:
- onTransitionToCommitted(opCtx, parsedDonorStateDoc);
- break;
- case TenantMigrationDonorStateEnum::kAborted:
- onTransitionToAborted(opCtx, parsedDonorStateDoc);
- break;
- default:
- MONGO_UNREACHABLE;
+void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson) {
+ auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"),
+ donorStateDocBson);
+ if (donorStateDoc.getExpireAt()) {
+ TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext())
+ .remove(donorStateDoc.getDatabasePrefix());
+ } else {
+ switch (donorStateDoc.getState()) {
+ case TenantMigrationDonorStateEnum::kDataSync:
+ break;
+ case TenantMigrationDonorStateEnum::kBlocking:
+ onTransitionToBlocking(opCtx, donorStateDoc);
+ break;
+ case TenantMigrationDonorStateEnum::kCommitted:
+ onTransitionToCommitted(opCtx, donorStateDoc);
+ break;
+ case TenantMigrationDonorStateEnum::kAborted:
+ onTransitionToAborted(opCtx, donorStateDoc);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
}
}
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 5b1d9ef4168..4d3376fe170 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -49,10 +49,9 @@ namespace tenant_migration_donor {
std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(ServiceContext* serviceContext);
/**
- * Updates the TenantMigrationAccessBlocker for the tenant migration represented by the given
- * config.migrationDonors document.
+ * Updates the donor's in-memory migration state to reflect the given persisted state.
*/
-void onDonorStateTransition(OperationContext* opCtx, const BSONObj& doc);
+void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson);
/**
* If the operation has read concern "snapshot" or includes afterClusterTime, and the database is
diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl
index 603c87e1bc7..e29354bac78 100644
--- a/src/mongo/db/repl/tenant_migration_state_machine.idl
+++ b/src/mongo/db/repl/tenant_migration_state_machine.idl
@@ -78,10 +78,10 @@ structs:
description:
"The opTime at which the donor's state document was set to 'committed' or 'aborted'."
optional: true
- garbageCollect:
- type: bool
- description: "A boolean that determines whether the state machine should be deleted after a delay via the TTL monitor."
- default: false
+ expireAt:
+ type: date
+ description: "The wall-clock time at which the state machine document should be removed by the TTL monitor."
+ optional: true
tenantMigrationRecipientDocument:
description: "Represents an in-progress tenant migration on the migration recipient."