diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2021-02-22 16:25:02 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-08 23:31:09 +0000 |
commit | 421ddfa11b6eb4b38f676a0c4da3560fcf63713d (patch) | |
tree | 4cfba4354f916911bf219d51d88064cafe6b729e /src | |
parent | d729c92c37bb1679f03fab1547f6e5b102b8594c (diff) | |
download | mongo-421ddfa11b6eb4b38f676a0c4da3560fcf63713d.tar.gz |
SERVER-54203 Tenant migration donor should wait for external keys to replicate to all its nodes
Diffstat (limited to 'src')
12 files changed, 74 insertions, 6 deletions
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 2cbd7966051..6ced4b8648e 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -240,6 +240,14 @@ public: const WriteConcernOptions& writeConcern) = 0; /** + * Returns a future that will be set when the given writeConcern has been satisfied or the node + * is not a writable primary, is interrupted, or shuts down. Notably this will ignore the + * wTimeout in the given write concern. + */ + virtual SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout( + const OpTime& opTime, const WriteConcernOptions& writeConcern) = 0; + + /** * Causes this node to relinquish being primary for at least 'stepdownTime'. If 'force' is * false, before doing so it will wait for 'waitTime' for one other electable node to be caught * up before stepping down. Throws on error. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index fd8c15bbd20..a7edca785fa 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1957,6 +1957,19 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitRepli return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())}; } +SharedSemiFuture<void> ReplicationCoordinatorImpl::awaitReplicationAsyncNoWTimeout( + const OpTime& opTime, const WriteConcernOptions& writeConcern) { + WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); + + // The returned future won't account for wTimeout or wDeadline, so reject any write concerns + // with either option to avoid misuse. + invariant(fixedWriteConcern.wDeadline == Date_t::max()); + invariant(fixedWriteConcern.wTimeout == WriteConcernOptions::kNoTimeout); + + stdx::lock_guard lg(_mutex); + return _startWaitingForReplication(lg, opTime, fixedWriteConcern); +} + BSONObj ReplicationCoordinatorImpl::_getReplicationProgress(WithLock wl) const { BSONObjBuilder progress; diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index f48f5032212..2c6531528c5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -138,6 +138,9 @@ public: virtual ReplicationCoordinator::StatusAndDuration awaitReplication( OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern); + virtual SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout( + const OpTime& opTime, const WriteConcernOptions& writeConcern); + void stepDown(OperationContext* opCtx, bool force, const Milliseconds& waitTime, diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index bb83d17424a..6ac3d119814 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -170,6 +170,11 @@ void ReplicationCoordinatorMock::setAwaitReplicationReturnValueFunction( _awaitReplicationReturnValueFunction = std::move(returnValueFunction); } +SharedSemiFuture<void> ReplicationCoordinatorMock::awaitReplicationAsyncNoWTimeout( + const OpTime& opTime, const WriteConcernOptions& writeConcern) { + MONGO_UNREACHABLE; +} + void ReplicationCoordinatorMock::stepDown(OperationContext* opCtx, bool force, const Milliseconds& waitTime, diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index ad7c4a7855b..0deec65a503 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -104,6 +104,9 @@ public: virtual ReplicationCoordinator::StatusAndDuration awaitReplication( OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern); + virtual SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout( + const OpTime& opTime, const WriteConcernOptions& writeConcern); + void stepDown(OperationContext* opCtx, bool force, const Milliseconds& waitTime, diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index a0ac32b2312..c48fdd81964 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -257,6 +257,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorNoOp::awaitRepli MONGO_UNREACHABLE; } +SharedSemiFuture<void> ReplicationCoordinatorNoOp::awaitReplicationAsyncNoWTimeout( + const OpTime& opTime, const WriteConcernOptions& writeConcern) { + MONGO_UNREACHABLE; +} + void ReplicationCoordinatorNoOp::stepDown(OperationContext*, const bool, const Milliseconds&, diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index c92474c9c3e..19c3001df06 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -112,6 +112,9 @@ public: const OpTime&, const WriteConcernOptions&) final; + SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout( + const OpTime& opTime, const WriteConcernOptions& writeConcern) final; + void stepDown(OperationContext*, bool, const Milliseconds&, const Milliseconds&) final; Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions&) const final; diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 4b6760de73a..1acaf24f635 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -42,6 +42,7 @@ #include "mongo/db/persistent_task_store.h" #include "mongo/db/query/find_command_gen.h" #include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_donor_access_blocker.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" @@ -63,6 +64,7 @@ MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterPersistingInitialDonorStateDoc) MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingBlockingState); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingDataSyncState); +MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationDonorBeforeWaitingForKeysToReplicate); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationDonorBeforeMarkingStateGarbageCollectable); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeEnteringFutureChain); @@ -469,8 +471,24 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs auto keyDocs) { checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - tenant_migration_util::storeExternalClusterTimeKeyDocs(executor, - std::move(keyDocs)); + return tenant_migration_util::storeExternalClusterTimeKeyDocs( + executor, std::move(keyDocs)); + }) + .then([this, self = shared_from_this(), serviceToken, instanceToken]( + repl::OpTime lastKeyOpTime) { + checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); + + pauseTenantMigrationDonorBeforeWaitingForKeysToReplicate.pauseWhileSet(); + + auto votingMembersWriteConcern = + WriteConcernOptions(repl::ReplSetConfig::kConfigAllWriteConcernName, + WriteConcernOptions::SyncMode::NONE, + WriteConcernOptions::kNoTimeout); + auto writeConcernFuture = repl::ReplicationCoordinator::get(_serviceContext) + ->awaitReplicationAsyncNoWTimeout( + lastKeyOpTime, votingMembersWriteConcern); + return future_util::withCancelation(std::move(writeConcernFuture), + instanceToken); }); }) .until([instanceToken](Status status) { diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 3bd29b05a7f..e7cbd783fd2 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -66,8 +66,8 @@ ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(UUID migrationId, B return externalKeyDoc; } -void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::vector<ExternalKeysCollectionDocument> keyDocs) { +repl::OpTime storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::vector<ExternalKeysCollectionDocument> keyDocs) { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); auto nss = NamespaceString::kExternalKeysCollectionNamespace; @@ -89,6 +89,8 @@ void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecuto /*fromMigrate=*/false); }); } + + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); } void createOplogViewForTenantMigrations(OperationContext* opCtx, Database* db) { diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h index c7e129423a1..dfa288d913c 100644 --- a/src/mongo/db/repl/tenant_migration_util.h +++ b/src/mongo/db/repl/tenant_migration_util.h @@ -140,8 +140,8 @@ ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(UUID migrationId, B * config.external_validation_keys for it with the same keyId and replicaSetName. Otherwise, * updates the ttlExpiresAt of the existing document if it is less than the new ttlExpiresAt. */ -void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::vector<ExternalKeysCollectionDocument> keyDocs); +repl::OpTime storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::vector<ExternalKeysCollectionDocument> keyDocs); /** * Sets the "ttlExpiresAt" field for the external keys so they can be garbage collected by the ttl diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index 74a544833fc..9a5ffd03fd8 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -281,6 +281,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorEmbedded::awaitR UASSERT_NOT_IMPLEMENTED; } +SharedSemiFuture<void> ReplicationCoordinatorEmbedded::awaitReplicationAsyncNoWTimeout( + const OpTime&, const WriteConcernOptions&) { + UASSERT_NOT_IMPLEMENTED; +} + void ReplicationCoordinatorEmbedded::stepDown(OperationContext*, const bool, const Milliseconds&, diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index 7b965ca830e..42aa8cd554f 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -114,6 +114,9 @@ public: repl::ReplicationCoordinator::StatusAndDuration awaitReplication( OperationContext*, const repl::OpTime&, const WriteConcernOptions&) override; + SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout(const repl::OpTime&, + const WriteConcernOptions&) override; + void stepDown(OperationContext*, bool, const Milliseconds&, const Milliseconds&) override; Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions&) const override; |