summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2021-02-22 16:25:02 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-08 23:31:09 +0000
commit421ddfa11b6eb4b38f676a0c4da3560fcf63713d (patch)
tree4cfba4354f916911bf219d51d88064cafe6b729e /src
parentd729c92c37bb1679f03fab1547f6e5b102b8594c (diff)
downloadmongo-421ddfa11b6eb4b38f676a0c4da3560fcf63713d.tar.gz
SERVER-54203 Tenant migration donor should wait for external keys to replicate to all its nodes
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h3
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp22
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp6
-rw-r--r--src/mongo/db/repl/tenant_migration_util.h4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp5
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h3
12 files changed, 74 insertions, 6 deletions
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 2cbd7966051..6ced4b8648e 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -240,6 +240,14 @@ public:
const WriteConcernOptions& writeConcern) = 0;
/**
+ * Returns a future that will be set when the given writeConcern has been satisfied or the node
+ * is not a writable primary, is interrupted, or shuts down. Notably this will ignore the
+ * wTimeout in the given write concern.
+ */
+ virtual SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout(
+ const OpTime& opTime, const WriteConcernOptions& writeConcern) = 0;
+
+ /**
* Causes this node to relinquish being primary for at least 'stepdownTime'. If 'force' is
* false, before doing so it will wait for 'waitTime' for one other electable node to be caught
* up before stepping down. Throws on error.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index fd8c15bbd20..a7edca785fa 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1957,6 +1957,19 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitRepli
return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
}
+SharedSemiFuture<void> ReplicationCoordinatorImpl::awaitReplicationAsyncNoWTimeout(
+ const OpTime& opTime, const WriteConcernOptions& writeConcern) {
+ WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
+
+ // The returned future won't account for wTimeout or wDeadline, so reject any write concerns
+ // with either option to avoid misuse.
+ invariant(fixedWriteConcern.wDeadline == Date_t::max());
+ invariant(fixedWriteConcern.wTimeout == WriteConcernOptions::kNoTimeout);
+
+ stdx::lock_guard lg(_mutex);
+ return _startWaitingForReplication(lg, opTime, fixedWriteConcern);
+}
+
BSONObj ReplicationCoordinatorImpl::_getReplicationProgress(WithLock wl) const {
BSONObjBuilder progress;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index f48f5032212..2c6531528c5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -138,6 +138,9 @@ public:
virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern);
+ virtual SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout(
+ const OpTime& opTime, const WriteConcernOptions& writeConcern);
+
void stepDown(OperationContext* opCtx,
bool force,
const Milliseconds& waitTime,
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index bb83d17424a..6ac3d119814 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -170,6 +170,11 @@ void ReplicationCoordinatorMock::setAwaitReplicationReturnValueFunction(
_awaitReplicationReturnValueFunction = std::move(returnValueFunction);
}
+SharedSemiFuture<void> ReplicationCoordinatorMock::awaitReplicationAsyncNoWTimeout(
+ const OpTime& opTime, const WriteConcernOptions& writeConcern) {
+ MONGO_UNREACHABLE;
+}
+
void ReplicationCoordinatorMock::stepDown(OperationContext* opCtx,
bool force,
const Milliseconds& waitTime,
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index ad7c4a7855b..0deec65a503 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -104,6 +104,9 @@ public:
virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern);
+ virtual SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout(
+ const OpTime& opTime, const WriteConcernOptions& writeConcern);
+
void stepDown(OperationContext* opCtx,
bool force,
const Milliseconds& waitTime,
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index a0ac32b2312..c48fdd81964 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -257,6 +257,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorNoOp::awaitRepli
MONGO_UNREACHABLE;
}
+SharedSemiFuture<void> ReplicationCoordinatorNoOp::awaitReplicationAsyncNoWTimeout(
+ const OpTime& opTime, const WriteConcernOptions& writeConcern) {
+ MONGO_UNREACHABLE;
+}
+
void ReplicationCoordinatorNoOp::stepDown(OperationContext*,
const bool,
const Milliseconds&,
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index c92474c9c3e..19c3001df06 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -112,6 +112,9 @@ public:
const OpTime&,
const WriteConcernOptions&) final;
+ SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout(
+ const OpTime& opTime, const WriteConcernOptions& writeConcern) final;
+
void stepDown(OperationContext*, bool, const Milliseconds&, const Milliseconds&) final;
Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions&) const final;
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 4b6760de73a..1acaf24f635 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/query/find_command_gen.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_donor_access_blocker.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
@@ -63,6 +64,7 @@ MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterPersistingInitialDonorStateDoc)
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingBlockingState);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingDataSyncState);
+MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationDonorBeforeWaitingForKeysToReplicate);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationDonorBeforeMarkingStateGarbageCollectable);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeEnteringFutureChain);
@@ -469,8 +471,24 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
auto keyDocs) {
checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
- tenant_migration_util::storeExternalClusterTimeKeyDocs(executor,
- std::move(keyDocs));
+ return tenant_migration_util::storeExternalClusterTimeKeyDocs(
+ executor, std::move(keyDocs));
+ })
+ .then([this, self = shared_from_this(), serviceToken, instanceToken](
+ repl::OpTime lastKeyOpTime) {
+ checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
+
+ pauseTenantMigrationDonorBeforeWaitingForKeysToReplicate.pauseWhileSet();
+
+ auto votingMembersWriteConcern =
+ WriteConcernOptions(repl::ReplSetConfig::kConfigAllWriteConcernName,
+ WriteConcernOptions::SyncMode::NONE,
+ WriteConcernOptions::kNoTimeout);
+ auto writeConcernFuture = repl::ReplicationCoordinator::get(_serviceContext)
+ ->awaitReplicationAsyncNoWTimeout(
+ lastKeyOpTime, votingMembersWriteConcern);
+ return future_util::withCancelation(std::move(writeConcernFuture),
+ instanceToken);
});
})
.until([instanceToken](Status status) {
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
index 3bd29b05a7f..e7cbd783fd2 100644
--- a/src/mongo/db/repl/tenant_migration_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -66,8 +66,8 @@ ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(UUID migrationId, B
return externalKeyDoc;
}
-void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor,
- std::vector<ExternalKeysCollectionDocument> keyDocs) {
+repl::OpTime storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::vector<ExternalKeysCollectionDocument> keyDocs) {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
auto nss = NamespaceString::kExternalKeysCollectionNamespace;
@@ -89,6 +89,8 @@ void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecuto
/*fromMigrate=*/false);
});
}
+
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
}
void createOplogViewForTenantMigrations(OperationContext* opCtx, Database* db) {
diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h
index c7e129423a1..dfa288d913c 100644
--- a/src/mongo/db/repl/tenant_migration_util.h
+++ b/src/mongo/db/repl/tenant_migration_util.h
@@ -140,8 +140,8 @@ ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(UUID migrationId, B
* config.external_validation_keys for it with the same keyId and replicaSetName. Otherwise,
* updates the ttlExpiresAt of the existing document if it is less than the new ttlExpiresAt.
*/
-void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor,
- std::vector<ExternalKeysCollectionDocument> keyDocs);
+repl::OpTime storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::vector<ExternalKeysCollectionDocument> keyDocs);
/**
* Sets the "ttlExpiresAt" field for the external keys so they can be garbage collected by the ttl
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index 74a544833fc..9a5ffd03fd8 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -281,6 +281,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorEmbedded::awaitR
UASSERT_NOT_IMPLEMENTED;
}
+SharedSemiFuture<void> ReplicationCoordinatorEmbedded::awaitReplicationAsyncNoWTimeout(
+ const OpTime&, const WriteConcernOptions&) {
+ UASSERT_NOT_IMPLEMENTED;
+}
+
void ReplicationCoordinatorEmbedded::stepDown(OperationContext*,
const bool,
const Milliseconds&,
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index 7b965ca830e..42aa8cd554f 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -114,6 +114,9 @@ public:
repl::ReplicationCoordinator::StatusAndDuration awaitReplication(
OperationContext*, const repl::OpTime&, const WriteConcernOptions&) override;
+ SharedSemiFuture<void> awaitReplicationAsyncNoWTimeout(const repl::OpTime&,
+ const WriteConcernOptions&) override;
+
void stepDown(OperationContext*, bool, const Milliseconds&, const Milliseconds&) override;
Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions&) const override;