summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-01-21 01:25:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-28 02:11:41 +0000
commite9360bd8e7cb8f1447ffd513149d284c394bb4a0 (patch)
tree7404bfc08bba640c65cbaf1aed551a6428aedf06 /src/mongo
parent96fe72c36d370a4067240738f051021d4daf72ce (diff)
downloadmongo-e9360bd8e7cb8f1447ffd513149d284c394bb4a0.tar.gz
SERVER-53404 Make tenant migration donor copy the recipient's cluster time signing keys before sending recipientSyncData
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/fetcher.cpp29
-rw-r--r--src/mongo/client/fetcher.h5
-rw-r--r--src/mongo/db/catalog/database_impl.cpp3
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.idl4
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.idl6
-rw-r--r--src/mongo/db/logical_time_validator.cpp5
-rw-r--r--src/mongo/db/logical_time_validator.h5
-rw-r--r--src/mongo/db/namespace_string.cpp22
-rw-r--r--src/mongo/db/namespace_string.h15
-rw-r--r--src/mongo/db/repl/SConscript18
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp160
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h20
-rw-r--r--src/mongo/db/repl/tenant_migration_pem_payload.idl4
-rw-r--r--src/mongo/db/repl/tenant_migration_state_machine.idl6
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp112
-rw-r--r--src/mongo/db/repl/tenant_migration_util.h28
16 files changed, 360 insertions, 82 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index d8c78fd25c0..cf0f72cb113 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -157,7 +157,8 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
const BSONObj& metadata,
Milliseconds findNetworkTimeout,
Milliseconds getMoreNetworkTimeout,
- std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy)
+ std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy,
+ transport::ConnectSSLMode sslMode)
: _executor(executor),
_source(source),
_dbname(dbname),
@@ -168,9 +169,15 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
_getMoreNetworkTimeout(getMoreNetworkTimeout),
_firstRemoteCommandScheduler(
_executor,
- RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, nullptr, _findNetworkTimeout),
+ [&] {
+ RemoteCommandRequest request(
+ _source, _dbname, _cmdObj, _metadata, nullptr, _findNetworkTimeout);
+ request.sslMode = sslMode;
+ return request;
+ }(),
[this](const auto& x) { return this->_callback(x, kFirstBatchFieldName); },
- std::move(firstCommandRetryPolicy)) {
+ std::move(firstCommandRetryPolicy)),
+ _sslMode(sslMode) {
uassert(ErrorCodes::BadValue, "callback function cannot be null", _work);
}
@@ -297,11 +304,14 @@ Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) {
return Status(ErrorCodes::CallbackCanceled,
"fetcher was shut down after previous batch was processed");
}
+
+ RemoteCommandRequest request(
+ _source, _dbname, cmdObj, _metadata, nullptr, _getMoreNetworkTimeout);
+ request.sslMode = _sslMode;
+
StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult =
_executor->scheduleRemoteCommand(
- RemoteCommandRequest(
- _source, _dbname, cmdObj, _metadata, nullptr, _getMoreNetworkTimeout),
- [this](const auto& x) { return this->_callback(x, kNextBatchFieldName); });
+ request, [this](const auto& x) { return this->_callback(x, kNextBatchFieldName); });
if (!scheduleResult.isOK()) {
return scheduleResult.getStatus();
@@ -405,9 +415,12 @@ void Fetcher::_sendKillCursors(const CursorId id, const NamespaceString& nss) {
"error"_attr = redact(status));
}
};
+
auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id));
- auto scheduleResult = _executor->scheduleRemoteCommand(
- RemoteCommandRequest(_source, _dbname, cmdObj, nullptr), logKillCursorsResult);
+ RemoteCommandRequest request(_source, _dbname, cmdObj, nullptr);
+ request.sslMode = _sslMode;
+
+ auto scheduleResult = _executor->scheduleRemoteCommand(request, logKillCursorsResult);
if (!scheduleResult.isOK()) {
LOGV2_WARNING(23920,
"Failed to schedule killCursors command: {error}",
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index 3d0d10abec8..4ce57087c20 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -131,7 +131,8 @@ public:
Milliseconds findNetworkTimeout = RemoteCommandRequest::kNoTimeout,
Milliseconds getMoreNetworkTimeout = RemoteCommandRequest::kNoTimeout,
std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy =
- RemoteCommandRetryScheduler::makeNoRetryPolicy());
+ RemoteCommandRetryScheduler::makeNoRetryPolicy(),
+ transport::ConnectSSLMode sslMode = transport::kGlobalSSLMode);
virtual ~Fetcher();
@@ -259,6 +260,8 @@ private:
// First remote command scheduler.
RemoteCommandRetryScheduler _firstRemoteCommandScheduler;
+
+ const transport::ConnectSSLMode _sslMode;
};
/**
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index 87ebf3d6a47..789ea8e882e 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -348,7 +348,8 @@ Status DatabaseImpl::dropCollection(OperationContext* opCtx,
"turn off profiling before dropping system.profile collection");
} else if (!(nss.isSystemDotViews() || nss.isHealthlog() ||
nss == NamespaceString::kLogicalSessionsNamespace ||
- nss == NamespaceString::kSystemKeysNamespace ||
+ nss == NamespaceString::kKeysCollectionNamespace ||
+ nss == NamespaceString::kExternalKeysCollectionNamespace ||
nss.isTemporaryReshardingCollection())) {
return Status(ErrorCodes::IllegalOperation,
str::stream() << "can't drop system collection " << nss);
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.idl b/src/mongo/db/commands/tenant_migration_donor_cmds.idl
index 43823f99cb2..f81b718cffa 100644
--- a/src/mongo/db/commands/tenant_migration_donor_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.idl
@@ -66,12 +66,12 @@ commands:
description: "The URI string that the donor will utilize to create a connection with the recipient."
type: string
validator:
- callback: "validateConnectionString"
+ callback: "tenant_migration_util::validateConnectionString"
tenantId:
description: "The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed."
type: string
validator:
- callback: "validateDatabasePrefix"
+ callback: "tenant_migration_util::validateDatabasePrefix"
readPreference:
description: "The read preference settings that the donor will pass on to the recipient."
type: readPreference
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
index 4c96e219c4e..a3c4fb991d2 100644
--- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
@@ -62,14 +62,14 @@ structs:
donor.
type: string
validator:
- callback: "validateConnectionString"
+ callback: "tenant_migration_util::validateConnectionString"
tenantId:
description: >-
The prefix from which the migrating database will be matched. The prefixes 'admin',
'local', 'config', the empty string, are not allowed.
type: string
validator:
- callback: "validateDatabasePrefix"
+ callback: "tenant_migration_util::validateDatabasePrefix"
readPreference:
description: >-
The read preference settings that the donor will pass on to the recipient.
@@ -98,7 +98,7 @@ commands:
type: timestamp
optional: true
validator:
- callback: "validateTimestampNotNull"
+ callback: "tenant_migration_util::validateTimestampNotNull"
recipientForgetMigration:
description: "Parser for the 'recipientForgetMigration' command."
diff --git a/src/mongo/db/logical_time_validator.cpp b/src/mongo/db/logical_time_validator.cpp
index d2b8866a79c..b848b49cc5e 100644
--- a/src/mongo/db/logical_time_validator.cpp
+++ b/src/mongo/db/logical_time_validator.cpp
@@ -220,6 +220,11 @@ bool LogicalTimeValidator::shouldGossipLogicalTime() {
return _getKeyManagerCopy()->hasSeenKeys();
}
+void LogicalTimeValidator::refreshKeyManagerCache(OperationContext* opCtx) {
+ invariant(_keyManager);
+ _keyManager->refreshNow(opCtx);
+}
+
void LogicalTimeValidator::resetKeyManagerCache() {
LOGV2(20716, "Resetting key manager cache");
invariant(_keyManager);
diff --git a/src/mongo/db/logical_time_validator.h b/src/mongo/db/logical_time_validator.h
index 1b78c51d1f5..825b7dc70df 100644
--- a/src/mongo/db/logical_time_validator.h
+++ b/src/mongo/db/logical_time_validator.h
@@ -109,6 +109,11 @@ public:
void stopKeyManager();
/**
+ * Forces the key manager cache to refresh.
+ */
+ void refreshKeyManagerCache(OperationContext* opCtx);
+
+ /**
* Reset the key manager cache of keys.
*/
void resetKeyManagerCache();
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index eea72c9e3f8..11445d700a5 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -79,8 +79,10 @@ const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(Namespac
"cache.collections");
const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb,
"cache.databases");
-const NamespaceString NamespaceString::kSystemKeysNamespace(NamespaceString::kAdminDb,
- "system.keys");
+const NamespaceString NamespaceString::kKeysCollectionNamespace(NamespaceString::kAdminDb,
+ "system.keys");
+const NamespaceString NamespaceString::kExternalKeysCollectionNamespace(
+ NamespaceString::kAdminDb, "system.external_validation_keys");
const NamespaceString NamespaceString::kRsOplogNamespace(NamespaceString::kLocalDb, "oplog.rs");
const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString::kLocalDb,
"system.replset");
@@ -108,12 +110,6 @@ const NamespaceString NamespaceString::kReshardingApplierProgressNamespace(
const NamespaceString NamespaceString::kReshardingTxnClonerProgressNamespace(
NamespaceString::kConfigDb, "localReshardingOperations.recipient.progress_txn_cloner");
-const NamespaceString NamespaceString::kKeysCollectionNamespace(NamespaceString::kAdminDb,
- "system.keys");
-
-const NamespaceString NamespaceString::kExternalKeysCollectionNamespace(
- NamespaceString::kAdminDb, "system.external_validation_keys");
-
bool NamespaceString::isListCollectionsCursorNS() const {
return coll() == listCollectionsCursorCol;
}
@@ -128,16 +124,12 @@ bool NamespaceString::isLegalClientSystemNS() const {
return true;
if (coll() == kServerConfigurationNamespace.coll())
return true;
- if (coll() == kSystemKeysNamespace.coll())
+ if (coll() == kKeysCollectionNamespace.coll())
return true;
- if (coll() == "system.backup_users")
+ if (coll() == kExternalKeysCollectionNamespace.coll())
return true;
- if (coll() == kExternalKeysCollectionNamespace.coll()) {
- // TODO (SERVER-53404): This was added to allow client in an integration test to
- // manually insert the key document into this system collection. Remove this when the
- // tenant migration donor does the copying by itself.
+ if (coll() == "system.backup_users")
return true;
- }
} else if (db() == kConfigDb) {
if (coll() == "system.sessions")
return true;
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 03accfec6f7..a60adb43d51 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -100,8 +100,12 @@ public:
// of a specific database
static const NamespaceString kShardConfigDatabasesNamespace;
- // Name for causal consistency's key collection.
- static const NamespaceString kSystemKeysNamespace;
+ // Namespace for storing keys for signing and validating cluster times created by the cluster
+ // that this node is in.
+ static const NamespaceString kKeysCollectionNamespace;
+
+ // Namespace for storing keys for validating cluster times created by other clusters.
+ static const NamespaceString kExternalKeysCollectionNamespace;
// Namespace of the the oplog collection.
static const NamespaceString kRsOplogNamespace;
@@ -148,13 +152,6 @@ public:
// Namespace for storing config.transactions cloner progress for resharding.
static const NamespaceString kReshardingTxnClonerProgressNamespace;
- // Namespace for storing keys for signing and validating cluster times created by the cluster
- // that this node is in.
- static const NamespaceString kKeysCollectionNamespace;
-
- // Namespace for storing keys for validating cluster times created by other clusters.
- static const NamespaceString kExternalKeysCollectionNamespace;
-
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index ca18216794f..f8d16936ad9 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1284,10 +1284,16 @@ env.Library(
)
env.Library(
- target='tenant_migration_recipient_utils',
+ target='tenant_migration_utils',
source=[
+ "tenant_migration_util.cpp",
"tenant_migration_recipient_entry_helpers.cpp",
],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/util/future_util',
+ "repl_server_parameters",
+ 'wait_for_majority_service',
+ ],
LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/base",
"$BUILD_DIR/mongo/db/catalog_raii",
@@ -1306,10 +1312,8 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/read_preference',
- '$BUILD_DIR/mongo/util/future_util',
'primary_only_service',
- 'tenant_migration_recipient_utils',
- 'wait_for_majority_service',
+ 'tenant_migration_utils',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/client/clientdriver_network',
@@ -1363,10 +1367,10 @@ env.Library(
'tenant_migration_donor_service.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/client/fetcher',
'primary_only_service',
- 'repl_server_parameters',
'tenant_migration_donor',
- 'wait_for_majority_service',
+ 'tenant_migration_utils',
],
)
@@ -1573,7 +1577,7 @@ env.CppUnitTest(
'task_executor_mock',
'task_runner',
'tenant_migration_recipient_service',
- 'tenant_migration_recipient_utils',
+ 'tenant_migration_utils',
'tenant_oplog_processing',
'wait_for_majority_service',
],
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 6959f19d995..5d10481b025 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/query/find_command_gen.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/tenant_migration_access_blocker.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
@@ -58,13 +59,18 @@ namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(abortTenantMigrationBeforeLeavingBlockingState);
+MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterPersitingInitialDonorStateDoc);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingBlockingState);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingDataSyncState);
const std::string kTTLIndexName = "TenantMigrationDonorTTLIndex";
-const Seconds kRecipientSyncDataTimeout(30);
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
+const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly);
+
+const Seconds kRecipientSyncDataTimeout(30);
+const int kMaxRecipientKeyDocsFindAttempts = 10;
+
std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlocker(
ServiceContext* serviceContext, StringData tenantId) {
return TenantMigrationAccessBlockerRegistry::get(serviceContext)
@@ -261,6 +267,11 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx)
void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() {
_instanceCancelationSource.cancel();
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (auto fetcher = _recipientKeysFetcher.lock()) {
+ fetcher->shutdown();
+ }
}
void TenantMigrationDonorService::Instance::onReceiveDonorForgetMigration() {
@@ -287,7 +298,96 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) {
}
}
-ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDocument(
+ExecutorFuture<void>
+TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
+ const CancelationToken& token) {
+ return recipientTargeterRS
+ ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token())
+ .thenRunOn(**executor)
+ .then([this, self = shared_from_this(), executor](HostAndPort host) {
+ const auto nss = NamespaceString::kKeysCollectionNamespace;
+
+ const auto cmdObj = [&] {
+ FindCommand request(NamespaceStringOrUUID{nss});
+ request.setReadConcern(
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern)
+ .toBSONInner());
+ return request.toBSON(BSONObj());
+ }();
+
+ std::vector<ExternalKeysCollectionDocument> keyDocs;
+ boost::optional<Status> fetchStatus;
+
+ auto fetcherCallback = [this, self = shared_from_this(), &keyDocs, &fetchStatus](
+ const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ // Throw out any accumulated results on error
+ if (!dataStatus.isOK()) {
+ fetchStatus = dataStatus.getStatus();
+ keyDocs.clear();
+ return;
+ }
+
+ const auto& data = dataStatus.getValue();
+ for (const BSONObj& doc : data.documents) {
+ keyDocs.push_back(tenant_migration_util::makeExternalClusterTimeKeyDoc(
+ _serviceContext, _recipientUri.getSetName(), doc.getOwned()));
+ }
+ fetchStatus = Status::OK();
+
+ if (!getMoreBob) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+
+ auto fetcher = std::make_shared<Fetcher>(
+ _recipientCmdExecutor.get(),
+ host,
+ nss.db().toString(),
+ cmdObj,
+ fetcherCallback,
+ kPrimaryOnlyReadPreference.toContainingBSON(),
+ executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
+ executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
+ RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
+ kMaxRecipientKeyDocsFindAttempts, executor::RemoteCommandRequest::kNoTimeout),
+ transport::kEnableSSL);
+ uassertStatusOK(fetcher->schedule());
+
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _recipientKeysFetcher = fetcher;
+ }
+
+ fetcher->join();
+
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _recipientKeysFetcher.reset();
+ }
+
+ if (!fetchStatus) {
+ // The callback never got invoked.
+ uasserted(5340400, "Internal error running cursor callback in command");
+ }
+ uassertStatusOK(fetchStatus.get());
+
+ return keyDocs;
+ })
+ .then([this, self = shared_from_this(), executor, token](auto keyDocs) {
+ checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+
+ return tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache(
+ executor, std::move(keyDocs), _instanceCancelationSource.token());
+ });
+}
+
+ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDoc(
std::shared_ptr<executor::ScopedTaskExecutor> executor) {
invariant(_stateDoc.getState() == TenantMigrationDonorStateEnum::kUninitialized);
_stateDoc.setState(TenantMigrationDonorStateEnum::kDataSync);
@@ -299,7 +399,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
writeConflictRetry(
- opCtx, "tenantMigrationInsertStateDoc", _stateDocumentsNS.ns(), [&] {
+ opCtx, "TenantMigrationDonorInsertStateDoc", _stateDocumentsNS.ns(), [&] {
const auto filter =
BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId());
const auto updateMod = BSON("$setOnInsert" << _stateDoc.toBSON());
@@ -321,7 +421,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
.on(**executor, CancelationToken::uncancelable());
}
-ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDocument(
+ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const TenantMigrationDonorStateEnum nextState) {
const auto originalStateDocBson = _stateDoc.toBSON();
@@ -332,15 +432,14 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
- uassertStatusOK(writeConflictRetry(
- opCtx, "updateStateDoc", _stateDocumentsNS.ns(), [&]() -> Status {
- AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
- if (!collection) {
- return Status(ErrorCodes::NamespaceNotFound,
- str::stream()
- << _stateDocumentsNS.ns() << " does not exist");
- }
+ AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << _stateDocumentsNS.ns() << " does not exist",
+ collection);
+
+ writeConflictRetry(
+ opCtx, "TenantMigrationDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] {
WriteUnitOfWork wuow(opCtx);
const auto originalRecordId = Helpers::findOne(opCtx,
@@ -403,8 +502,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
wuow.commit();
updateOpTime = oplogSlot;
- return Status::OK();
- }));
+ });
invariant(updateOpTime);
return updateOpTime.get();
@@ -418,11 +516,10 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
}
ExecutorFuture<repl::OpTime>
-TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable(
+TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable(
std::shared_ptr<executor::ScopedTaskExecutor> executor) {
_stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
-
return AsyncTry([this, self = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
@@ -431,7 +528,7 @@ TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable(
writeConflictRetry(
opCtx,
- "tenantMigrationDonorMarkStateDocAsGarbageCollectable",
+ "TenantMigrationDonorMarkStateDocAsGarbageCollectable",
_stateDocumentsNS.ns(),
[&] {
const auto filter =
@@ -486,7 +583,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi
const BSONObj& cmdObj) {
return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj] {
return recipientTargeterRS
- ->findHost(ReadPreferenceSetting(), _instanceCancelationSource.token())
+ ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token())
.thenRunOn(**executor)
.then([this, self = shared_from_this(), executor, cmdObj](auto recipientHost) {
executor::RemoteCommandRequest request(std::move(recipientHost),
@@ -495,7 +592,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi
rpc::makeEmptyMetadata(),
nullptr,
kRecipientSyncDataTimeout);
-
request.sslMode = transport::kEnableSSL;
return (_recipientCmdExecutor)
@@ -526,7 +622,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
- BSONObj cmdObj = BSONObj([&]() {
+ const auto cmdObj = [&] {
auto donorConnString =
repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
RecipientSyncData request;
@@ -538,7 +634,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
_stateDoc.getRecipientCertificateForDonor()});
request.setReturnAfterReachingDonorTimestamp(_stateDoc.getBlockTimestamp());
return request.toBSON(BSONObj());
- }());
+ }();
return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj);
}
@@ -576,15 +672,25 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
// Enter "dataSync" state.
- return _insertStateDocument(executor).then(
- [this, self = shared_from_this(), executor](repl::OpTime opTime) {
+ return _insertStateDoc(executor)
+ .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
// TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
// use its base PrimaryOnlyService's cancelation source to pass tokens
// in calls to WaitForMajorityService::waitUntilMajority.
return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ })
+ .then([this, self = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ pauseTenantMigrationAfterPersitingInitialDonorStateDoc.pauseWhileSet(opCtx);
});
})
.then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+
+ return _fetchAndStoreRecipientClusterTimeKeyDocs(executor, recipientTargeterRS, token);
+ })
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -601,7 +707,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
// Enter "blocking" state.
- return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kBlocking)
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking)
.then([this, self = shared_from_this(), executor, token](
repl::OpTime opTime) {
// TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
@@ -691,7 +797,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
// Enter "commit" state.
- return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kCommitted)
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted)
.then([this, self = shared_from_this(), executor, token](
repl::OpTime opTime) {
// TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
@@ -728,7 +834,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
} else {
// Enter "abort" state.
_abortReason.emplace(status);
- return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kAborted)
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted)
.then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
return _waitForMajorityWriteConcern(executor, std::move(opTime))
.then([this, self = shared_from_this()] {
@@ -766,7 +872,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS);
})
.then([this, self = shared_from_this(), executor] {
- return _markStateDocumentAsGarbageCollectable(executor);
+ return _markStateDocAsGarbageCollectable(executor);
})
.then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
return _waitForMajorityWriteConcern(executor, std::move(opTime));
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 6bbc131c525..e0f11201602 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/base/string_data.h"
+#include "mongo/client/fetcher.h"
#include "mongo/client/remote_command_targeter_rs.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
@@ -149,10 +150,19 @@ public:
const NamespaceString _stateDocumentsNS = NamespaceString::kTenantMigrationDonorsNamespace;
/**
+ * Fetches all key documents from the recipient's admin.system.keys collection, stores
+ * them in admin.system.external_validation_keys, and refreshes the keys cache.
+ */
+ ExecutorFuture<void> _fetchAndStoreRecipientClusterTimeKeyDocs(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
+ const CancelationToken& token);
+
+ /**
* Inserts the state document to _stateDocumentsNS and returns the opTime for the insert
* oplog entry.
*/
- ExecutorFuture<repl::OpTime> _insertStateDocument(
+ ExecutorFuture<repl::OpTime> _insertStateDoc(
std::shared_ptr<executor::ScopedTaskExecutor> executor);
/**
@@ -161,7 +171,7 @@ public:
* commitOrAbortTimestamp depending on the state. Returns the opTime for the update oplog
* entry.
*/
- ExecutorFuture<repl::OpTime> _updateStateDocument(
+ ExecutorFuture<repl::OpTime> _updateStateDoc(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const TenantMigrationDonorStateEnum nextState);
@@ -169,7 +179,7 @@ public:
* Sets the "expireAt" time for the state document to be garbage collected, and returns the
* the opTime for the write.
*/
- ExecutorFuture<repl::OpTime> _markStateDocumentAsGarbageCollectable(
+ ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable(
std::shared_ptr<executor::ScopedTaskExecutor> executor);
/**
@@ -216,6 +226,10 @@ public:
return recipientCmdThreadPoolLimits;
}
+ // Weak pointer to the Fetcher used for fetching admin.system.keys documents from the
+ // recipient. It is only not null when the instance is actively fetching the documents.
+ std::weak_ptr<Fetcher> _recipientKeysFetcher;
+
boost::optional<Status> _abortReason;
// Protects the durable state and the promises below.
diff --git a/src/mongo/db/repl/tenant_migration_pem_payload.idl b/src/mongo/db/repl/tenant_migration_pem_payload.idl
index 2e4aa666c62..6886c4be34a 100644
--- a/src/mongo/db/repl/tenant_migration_pem_payload.idl
+++ b/src/mongo/db/repl/tenant_migration_pem_payload.idl
@@ -42,9 +42,9 @@ structs:
type: string
description: "Certificate PEM blob."
validator:
- callback: "validateCertificatePEMPayload"
+ callback: "tenant_migration_util::validateCertificatePEMPayload"
privateKey:
type: string
description: "Private key PEM blob."
validator:
- callback: "validatePrivateKeyPEMPayload"
+ callback: "tenant_migration_util::validatePrivateKeyPEMPayload"
diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl
index 02d6d7f42b8..a473cd06a88 100644
--- a/src/mongo/db/repl/tenant_migration_state_machine.idl
+++ b/src/mongo/db/repl/tenant_migration_state_machine.idl
@@ -72,7 +72,7 @@ structs:
The URI string that the donor will utilize to create a connection with
the recipient.
validator:
- callback: "validateConnectionString"
+ callback: "tenant_migration_util::validateConnectionString"
readPreference:
type: readPreference
description: >-
@@ -132,12 +132,12 @@ structs:
The URI string that the recipient will utilize to create a connection
with the donor.
validator:
- callback: "validateConnectionString"
+ callback: "tenant_migration_util::validateConnectionString"
tenantId:
type: string
description: "The tenantId for the migration."
validator:
- callback: "validateDatabasePrefix"
+ callback: "tenant_migration_util::validateDatabasePrefix"
readPreference:
type: readPreference
description: >-
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
new file mode 100644
index 00000000000..69e3e935759
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/tenant_migration_util.h"
+
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/util/future_util.h"
+
+namespace mongo {
+
+namespace tenant_migration_util {
+
+ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(ServiceContext* serviceContext,
+ std::string rsName,
+ BSONObj keyDoc) {
+ auto originalKeyDoc = KeysCollectionDocument::parse(IDLParserErrorContext("keyDoc"), keyDoc);
+
+ ExternalKeysCollectionDocument externalKeyDoc(
+ OID::gen(),
+ originalKeyDoc.getKeyId(),
+ rsName,
+ serviceContext->getFastClockSource()->now() +
+ Seconds{repl::tenantMigrationExternalKeysRemovalDelaySecs.load()});
+ externalKeyDoc.setKeysCollectionDocumentBase(originalKeyDoc.getKeysCollectionDocumentBase());
+
+ return externalKeyDoc;
+}
+
+ExecutorFuture<void> storeExternalClusterTimeKeyDocsAndRefreshCache(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::vector<ExternalKeysCollectionDocument> keyDocs,
+ const CancelationToken& token) {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ auto nss = NamespaceString::kExternalKeysCollectionNamespace;
+
+ for (auto& keyDoc : keyDocs) {
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ writeConflictRetry(opCtx, "CloneExternalKeyDocs", nss.ns(), [&] {
+ const auto filter = BSON(ExternalKeysCollectionDocument::kKeyIdFieldName
+ << keyDoc.getKeyId()
+ << ExternalKeysCollectionDocument::kReplicaSetNameFieldName
+ << keyDoc.getReplicaSetName()
+ << ExternalKeysCollectionDocument::kTTLExpiresAtFieldName
+ << BSON("$lt" << keyDoc.getTTLExpiresAt()));
+
+ // Remove _id since updating _id is not allowed.
+ const auto updateMod = keyDoc.toBSON().removeField("_id");
+
+ Helpers::upsert(opCtx,
+ nss.ns(),
+ filter,
+ updateMod,
+ /*fromMigrate=*/false);
+ });
+ }
+
+ const auto opTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(opTime)
+ .thenRunOn(**executor)
+ .then([] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ auto validator = LogicalTimeValidator::get(opCtx);
+ if (validator) {
+ // Refresh the keys cache to avoid validation errors for external cluster times with
+ // a keyId that matches the keyId of an internal key since the LogicalTimeValidator
+ // only refreshes the cache when it cannot find a matching internal key.
+ validator->refreshKeyManagerCache(opCtx);
+ }
+ });
+}
+
+} // namespace tenant_migration_util
+
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h
index 009d9649097..e7d253ed6fd 100644
--- a/src/mongo/db/repl/tenant_migration_util.h
+++ b/src/mongo/db/repl/tenant_migration_util.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -26,6 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#pragma once
#include <set>
@@ -34,7 +35,9 @@
#include "mongo/bson/timestamp.h"
#include "mongo/client/mongo_uri.h"
#include "mongo/config.h"
+#include "mongo/db/keys_collection_document_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/executor/scoped_task_executor.h"
#include "mongo/util/net/ssl_util.h"
#include "mongo/util/str.h"
@@ -46,6 +49,8 @@ const std::set<std::string> kUnsupportedTenantIds{"", "admin", "local", "config"
} // namespace
+namespace tenant_migration_util {
+
inline Status validateDatabasePrefix(const std::string& tenantId) {
const bool isPrefixSupported =
kUnsupportedTenantIds.find(tenantId) == kUnsupportedTenantIds.end();
@@ -122,4 +127,25 @@ inline Status validatePrivateKeyPEMPayload(const StringData& payload) {
#endif
}
+/*
+ * Creates an ExternalKeysCollectionDocument representing an admin.system.external_validation_keys
+ * document from the given the admin.system.keys document BSONObj.
+ */
+ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(ServiceContext* serviceContext,
+ std::string rsName,
+ BSONObj keyDoc);
+
+/*
+ * For each given ExternalKeysCollectionDocument, inserts it if there is not an existing document in
+ * admin.system.external_validation_keys for it with the same keyId and replicaSetName. Otherwise,
+ * updates the ttlExpiresAt of the existing document if it is less than the new ttlExpiresAt. Waits
+ * for the writes to be majority-committed, and refreshes the logical validator's cache.
+ */
+ExecutorFuture<void> storeExternalClusterTimeKeyDocsAndRefreshCache(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::vector<ExternalKeysCollectionDocument> keyDocs,
+ const CancelationToken& token);
+
+} // namespace tenant_migration_util
+
} // namespace mongo