diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2020-09-01 19:22:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-09 02:11:58 +0000 |
commit | 3626a65b1d1f8ff2d230704146a09595f78bfe51 (patch) | |
tree | c9e6a45831bb3c29ff7d866cb0405c9e73482f78 /src/mongo/db/repl | |
parent | e21416d2889b54830624d812a48b2a5e07f4e47a (diff) | |
download | mongo-3626a65b1d1f8ff2d230704146a09595f78bfe51.tar.gz |
SERVER-49204 Implement donorForgetMigration command
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 127 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.h | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_state_machine.idl | 8 |
7 files changed, 183 insertions, 58 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 63a2626d1b3..d1fd2714f33 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1281,6 +1281,7 @@ env.Library( ], LIBDEPS=[ 'primary_only_service', + 'repl_server_parameters', 'tenant_migration_donor', 'wait_for_majority_service', ], diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 562d80ad126..4153ad4ef0e 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -330,4 +330,15 @@ server_parameters: set_at: startup cpp_vartype: bool cpp_varname: enableTenantMigrations - default: false
\ No newline at end of file + default: false + + tenantMigrationGarbageCollectionDelayMS: + description: >- + The amount of time in milliseconds that the donor or recipient should wait before + removing the migration state document after receiving donorForgetMigration or + recipientForgetMigration. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord<int> + cpp_varname: tenantMigrationGarbageCollectionDelayMS + default: + expr: 48 * 60 * 60 * 1000 diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 8f3fb868e7a..d821c6c3f26 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -38,6 +38,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/tenant_migration_access_blocker.h" #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/db/repl/tenant_migration_donor_util.h" @@ -173,6 +174,34 @@ repl::OpTime TenantMigrationDonorService::Instance::_updateStateDocument( return updateOpTime.get(); } +repl::OpTime TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + DBDirectClient dbClient(opCtx); + + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); + + auto commandResponse = dbClient.runCommand([&] { + write_ops::Update updateOp(_stateDocumentsNS); + auto updateModification = + write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON()); + write_ops::UpdateOpEntry updateEntry( + BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()), + updateModification); + updateEntry.setMulti(false); + updateEntry.setUpsert(false); + updateOp.setUpdates({updateEntry}); + + return updateOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); +} + ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime) { return WaitForMajorityService::get(_serviceContext) @@ -180,27 +209,11 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit .thenRunOn(**executor); } -ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( +ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient( + OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - RemoteCommandTargeter* recipientTargeter) { - if (skipSendingRecipientSyncDataCommand.shouldFail()) { - return ExecutorFuture<void>(**executor, Status::OK()); - } - - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - BSONObj cmdObj = BSONObj([&]() { - auto donorConnString = - repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); - RecipientSyncData request(_stateDoc.getId(), - donorConnString.toString(), - _stateDoc.getDatabasePrefix().toString(), - _stateDoc.getReadPreference()); - request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp()); - return request.toBSON(BSONObj()); - }()); - + RemoteCommandTargeter* recipientTargeter, + const BSONObj& cmdObj) { HostAndPort recipientHost = uassertStatusOK(recipientTargeter->findHost(opCtx, ReadPreferenceSetting())); @@ -238,6 +251,42 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa }); } +ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + RemoteCommandTargeter* recipientTargeter) { + if (skipSendingRecipientSyncDataCommand.shouldFail()) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + BSONObj cmdObj = BSONObj([&]() { + auto donorConnString = + repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); + RecipientSyncData request(_stateDoc.getId(), + donorConnString.toString(), + _stateDoc.getDatabasePrefix().toString(), + _stateDoc.getReadPreference()); + request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp()); + return request.toBSON(BSONObj()); + }()); + + return _sendCommandToRecipient(opCtx, executor, recipientTargeter, cmdObj); +} + +ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + RemoteCommandTargeter* recipientTargeter) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + return _sendCommandToRecipient(opCtx, + executor, + recipientTargeter, + RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj())); +} + SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { auto recipientUri = @@ -298,15 +347,6 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( // Wait for the migration to commit or abort. return _mtab->onCompletion(); }) - .onError([this](Status status) { - if (!status.isOK() && _abortReason) { - status.addContext(str::stream() - << "Tenant migration with id \"" << _stateDoc.getId() - << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix() - << "\" aborted due to " << _abortReason); - } - return status; - }) .onCompletion([this](Status status) { LOGV2(5006601, "Tenant migration completed", @@ -314,6 +354,35 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "dbPrefix"_attr = _stateDoc.getDatabasePrefix(), "status"_attr = status, "abortReason"_attr = _abortReason); + + if (status.isOK()) { + _decisionPromise.emplaceValue(); + } else { + if (_abortReason) { + status.addContext(str::stream() + << "Tenant migration with id \"" << _stateDoc.getId() + << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix() + << "\" aborted due to " << _abortReason); + } + _decisionPromise.setError(status); + } + }) + .then([this, executor] { + // Wait for the donorForgetMigration command. + return _receivedDonorForgetMigrationPromise.getFuture(); + }) + .then([this, executor] { + const auto opTime = _markStateDocumentAsGarbageCollectable(); + return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }) + .then([this, executor, recipientTargeter] { + return _sendRecipientForgetMigrationCommand(executor, recipientTargeter.get()); + }) + .onCompletion([this, executor](Status status) { + LOGV2(4920400, + "Marked migration state as garbage collectable", + "migrationId"_attr = _stateDoc.getId(), + "expireAt"_attr = _stateDoc.getExpireAt()); return status; }) .semi(); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 30fe3a41461..ddf178121e4 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -80,6 +80,17 @@ public: */ Status checkIfOptionsConflict(BSONObj options); + /** + * Returns a Future that will be resolved when the migration has committed or aborted. + */ + SharedSemiFuture<void> getDecisionFuture() const { + return _decisionPromise.getFuture(); + } + + void onReceiveDonorForgetMigration() { + _receivedDonorForgetMigrationPromise.emplaceValue(); + } + private: const NamespaceString _stateDocumentsNS = NamespaceString::kTenantMigrationDonorsNamespace; @@ -98,24 +109,51 @@ public: repl::OpTime _updateStateDocument(const TenantMigrationDonorStateEnum nextState); /** + * Sets the "expireAt" time for the state document to be garbage collected. + */ + repl::OpTime _markStateDocumentAsGarbageCollectable(); + + /** * Waits for given opTime to be majority committed. */ ExecutorFuture<void> _waitForMajorityWriteConcern( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime); /** + * Sends the given command to the recipient replica set. + */ + ExecutorFuture<void> _sendCommandToRecipient( + OperationContext* opCtx, + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + RemoteCommandTargeter* recipientTargeter, + const BSONObj& cmdObj); + + /** * Sends the recipientSyncData command to the recipient replica set. */ ExecutorFuture<void> _sendRecipientSyncDataCommand( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, RemoteCommandTargeter* recipientTargeter); + /** + * Sends the recipientForgetMigration command to the recipient replica set. + */ + ExecutorFuture<void> _sendRecipientForgetMigrationCommand( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + RemoteCommandTargeter* recipientTargeter); + ServiceContext* _serviceContext; TenantMigrationDonorDocument _stateDoc; std::shared_ptr<TenantMigrationAccessBlocker> _mtab; boost::optional<Status> _abortReason; + + // Promise that is resolved when the donor has majority-committed the migration decision. + SharedPromise<void> _decisionPromise; + + // Promise that is resolved when the donor receives the donorForgetMigration command. + SharedPromise<void> _receivedDonorForgetMigrationPromise; }; private: diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp index 296e041658d..edae74ab4ff 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp @@ -58,7 +58,8 @@ const char kNetName[] = "TenantMigrationWorkerNetwork"; * Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking * state. */ -void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) { +void onTransitionToBlocking(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kBlocking); invariant(donorStateDoc.getBlockTimestamp()); @@ -91,7 +92,8 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen /** * Transitions the TenantMigrationAccessBlocker to the committed state. */ -void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) { +void onTransitionToCommitted(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kCommitted); invariant(donorStateDoc.getCommitOrAbortOpTime()); @@ -105,7 +107,8 @@ void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocume /** * Transitions the TenantMigrationAccessBlocker to the aborted state. */ -void onTransitionToAborted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) { +void onTransitionToAborted(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAborted); invariant(donorStateDoc.getCommitOrAbortOpTime()); @@ -130,24 +133,28 @@ std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor( executor::makeNetworkInterface(kNetName, nullptr, nullptr)); } -void onDonorStateTransition(OperationContext* opCtx, const BSONObj& donorStateDoc) { - auto parsedDonorStateDoc = - TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), donorStateDoc); - - switch (parsedDonorStateDoc.getState()) { - case TenantMigrationDonorStateEnum::kDataSync: - break; - case TenantMigrationDonorStateEnum::kBlocking: - onTransitionToBlocking(opCtx, parsedDonorStateDoc); - break; - case TenantMigrationDonorStateEnum::kCommitted: - onTransitionToCommitted(opCtx, parsedDonorStateDoc); - break; - case TenantMigrationDonorStateEnum::kAborted: - onTransitionToAborted(opCtx, parsedDonorStateDoc); - break; - default: - MONGO_UNREACHABLE; +void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson) { + auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), + donorStateDocBson); + if (donorStateDoc.getExpireAt()) { + TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()) + .remove(donorStateDoc.getDatabasePrefix()); + } else { + switch (donorStateDoc.getState()) { + case TenantMigrationDonorStateEnum::kDataSync: + break; + case TenantMigrationDonorStateEnum::kBlocking: + onTransitionToBlocking(opCtx, donorStateDoc); + break; + case TenantMigrationDonorStateEnum::kCommitted: + onTransitionToCommitted(opCtx, donorStateDoc); + break; + case TenantMigrationDonorStateEnum::kAborted: + onTransitionToAborted(opCtx, donorStateDoc); + break; + default: + MONGO_UNREACHABLE; + } } } diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h index 5b1d9ef4168..4d3376fe170 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.h +++ b/src/mongo/db/repl/tenant_migration_donor_util.h @@ -49,10 +49,9 @@ namespace tenant_migration_donor { std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(ServiceContext* serviceContext); /** - * Updates the TenantMigrationAccessBlocker for the tenant migration represented by the given - * config.migrationDonors document. + * Updates the donor's in-memory migration state to reflect the given persisted state. */ -void onDonorStateTransition(OperationContext* opCtx, const BSONObj& doc); +void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson); /** * If the operation has read concern "snapshot" or includes afterClusterTime, and the database is diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index 603c87e1bc7..e29354bac78 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -78,10 +78,10 @@ structs: description: "The opTime at which the donor's state document was set to 'committed' or 'aborted'." optional: true - garbageCollect: - type: bool - description: "A boolean that determines whether the state machine should be deleted after a delay via the TTL monitor." - default: false + expireAt: + type: date + description: "The wall-clock time at which the state machine document should be removed by the TTL monitor." + optional: true tenantMigrationRecipientDocument: description: "Represents an in-progress tenant migration on the migration recipient." |