diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2021-01-21 01:25:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-28 02:11:41 +0000 |
commit | e9360bd8e7cb8f1447ffd513149d284c394bb4a0 (patch) | |
tree | 7404bfc08bba640c65cbaf1aed551a6428aedf06 /src/mongo | |
parent | 96fe72c36d370a4067240738f051021d4daf72ce (diff) | |
download | mongo-e9360bd8e7cb8f1447ffd513149d284c394bb4a0.tar.gz |
SERVER-53404 Make tenant migration donor copy the recipient's cluster time signing keys before sending recipientSyncData
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 29 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 5 | ||||
-rw-r--r-- | src/mongo/db/catalog/database_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/tenant_migration_donor_cmds.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/commands/tenant_migration_recipient_cmds.idl | 6 | ||||
-rw-r--r-- | src/mongo/db/logical_time_validator.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/logical_time_validator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 160 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.h | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_pem_payload.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_state_machine.idl | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_util.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_util.h | 28 |
16 files changed, 360 insertions, 82 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index d8c78fd25c0..cf0f72cb113 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -157,7 +157,8 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, const BSONObj& metadata, Milliseconds findNetworkTimeout, Milliseconds getMoreNetworkTimeout, - std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy) + std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy, + transport::ConnectSSLMode sslMode) : _executor(executor), _source(source), _dbname(dbname), @@ -168,9 +169,15 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, _getMoreNetworkTimeout(getMoreNetworkTimeout), _firstRemoteCommandScheduler( _executor, - RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, nullptr, _findNetworkTimeout), + [&] { + RemoteCommandRequest request( + _source, _dbname, _cmdObj, _metadata, nullptr, _findNetworkTimeout); + request.sslMode = sslMode; + return request; + }(), [this](const auto& x) { return this->_callback(x, kFirstBatchFieldName); }, - std::move(firstCommandRetryPolicy)) { + std::move(firstCommandRetryPolicy)), + _sslMode(sslMode) { uassert(ErrorCodes::BadValue, "callback function cannot be null", _work); } @@ -297,11 +304,14 @@ Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) { return Status(ErrorCodes::CallbackCanceled, "fetcher was shut down after previous batch was processed"); } + + RemoteCommandRequest request( + _source, _dbname, cmdObj, _metadata, nullptr, _getMoreNetworkTimeout); + request.sslMode = _sslMode; + StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = _executor->scheduleRemoteCommand( - RemoteCommandRequest( - _source, _dbname, cmdObj, _metadata, nullptr, _getMoreNetworkTimeout), - [this](const auto& x) { return this->_callback(x, kNextBatchFieldName); }); + request, [this](const auto& x) { return this->_callback(x, kNextBatchFieldName); }); if (!scheduleResult.isOK()) { return scheduleResult.getStatus(); @@ -405,9 +415,12 @@ void Fetcher::_sendKillCursors(const CursorId id, const NamespaceString& nss) { "error"_attr = redact(status)); } }; + auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id)); - auto scheduleResult = _executor->scheduleRemoteCommand( - RemoteCommandRequest(_source, _dbname, cmdObj, nullptr), logKillCursorsResult); + RemoteCommandRequest request(_source, _dbname, cmdObj, nullptr); + request.sslMode = _sslMode; + + auto scheduleResult = _executor->scheduleRemoteCommand(request, logKillCursorsResult); if (!scheduleResult.isOK()) { LOGV2_WARNING(23920, "Failed to schedule killCursors command: {error}", diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 3d0d10abec8..4ce57087c20 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -131,7 +131,8 @@ public: Milliseconds findNetworkTimeout = RemoteCommandRequest::kNoTimeout, Milliseconds getMoreNetworkTimeout = RemoteCommandRequest::kNoTimeout, std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy = - RemoteCommandRetryScheduler::makeNoRetryPolicy()); + RemoteCommandRetryScheduler::makeNoRetryPolicy(), + transport::ConnectSSLMode sslMode = transport::kGlobalSSLMode); virtual ~Fetcher(); @@ -259,6 +260,8 @@ private: // First remote command scheduler. RemoteCommandRetryScheduler _firstRemoteCommandScheduler; + + const transport::ConnectSSLMode _sslMode; }; /** diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index 87ebf3d6a47..789ea8e882e 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -348,7 +348,8 @@ Status DatabaseImpl::dropCollection(OperationContext* opCtx, "turn off profiling before dropping system.profile collection"); } else if (!(nss.isSystemDotViews() || nss.isHealthlog() || nss == NamespaceString::kLogicalSessionsNamespace || - nss == NamespaceString::kSystemKeysNamespace || + nss == NamespaceString::kKeysCollectionNamespace || + nss == NamespaceString::kExternalKeysCollectionNamespace || nss.isTemporaryReshardingCollection())) { return Status(ErrorCodes::IllegalOperation, str::stream() << "can't drop system collection " << nss); diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.idl b/src/mongo/db/commands/tenant_migration_donor_cmds.idl index 43823f99cb2..f81b718cffa 100644 --- a/src/mongo/db/commands/tenant_migration_donor_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_donor_cmds.idl @@ -66,12 +66,12 @@ commands: description: "The URI string that the donor will utilize to create a connection with the recipient." type: string validator: - callback: "validateConnectionString" + callback: "tenant_migration_util::validateConnectionString" tenantId: description: "The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed." type: string validator: - callback: "validateDatabasePrefix" + callback: "tenant_migration_util::validateDatabasePrefix" readPreference: description: "The read preference settings that the donor will pass on to the recipient." type: readPreference diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl index 4c96e219c4e..a3c4fb991d2 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl @@ -62,14 +62,14 @@ structs: donor. type: string validator: - callback: "validateConnectionString" + callback: "tenant_migration_util::validateConnectionString" tenantId: description: >- The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed. type: string validator: - callback: "validateDatabasePrefix" + callback: "tenant_migration_util::validateDatabasePrefix" readPreference: description: >- The read preference settings that the donor will pass on to the recipient. @@ -98,7 +98,7 @@ commands: type: timestamp optional: true validator: - callback: "validateTimestampNotNull" + callback: "tenant_migration_util::validateTimestampNotNull" recipientForgetMigration: description: "Parser for the 'recipientForgetMigration' command." diff --git a/src/mongo/db/logical_time_validator.cpp b/src/mongo/db/logical_time_validator.cpp index d2b8866a79c..b848b49cc5e 100644 --- a/src/mongo/db/logical_time_validator.cpp +++ b/src/mongo/db/logical_time_validator.cpp @@ -220,6 +220,11 @@ bool LogicalTimeValidator::shouldGossipLogicalTime() { return _getKeyManagerCopy()->hasSeenKeys(); } +void LogicalTimeValidator::refreshKeyManagerCache(OperationContext* opCtx) { + invariant(_keyManager); + _keyManager->refreshNow(opCtx); +} + void LogicalTimeValidator::resetKeyManagerCache() { LOGV2(20716, "Resetting key manager cache"); invariant(_keyManager); diff --git a/src/mongo/db/logical_time_validator.h b/src/mongo/db/logical_time_validator.h index 1b78c51d1f5..825b7dc70df 100644 --- a/src/mongo/db/logical_time_validator.h +++ b/src/mongo/db/logical_time_validator.h @@ -109,6 +109,11 @@ public: void stopKeyManager(); /** + * Forces the key manager cache to refresh. + */ + void refreshKeyManagerCache(OperationContext* opCtx); + + /** * Reset the key manager cache of keys. */ void resetKeyManagerCache(); diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index eea72c9e3f8..11445d700a5 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -79,8 +79,10 @@ const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(Namespac "cache.collections"); const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb, "cache.databases"); -const NamespaceString NamespaceString::kSystemKeysNamespace(NamespaceString::kAdminDb, - "system.keys"); +const NamespaceString NamespaceString::kKeysCollectionNamespace(NamespaceString::kAdminDb, + "system.keys"); +const NamespaceString NamespaceString::kExternalKeysCollectionNamespace( + NamespaceString::kAdminDb, "system.external_validation_keys"); const NamespaceString NamespaceString::kRsOplogNamespace(NamespaceString::kLocalDb, "oplog.rs"); const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString::kLocalDb, "system.replset"); @@ -108,12 +110,6 @@ const NamespaceString NamespaceString::kReshardingApplierProgressNamespace( const NamespaceString NamespaceString::kReshardingTxnClonerProgressNamespace( NamespaceString::kConfigDb, "localReshardingOperations.recipient.progress_txn_cloner"); -const NamespaceString NamespaceString::kKeysCollectionNamespace(NamespaceString::kAdminDb, - "system.keys"); - -const NamespaceString NamespaceString::kExternalKeysCollectionNamespace( - NamespaceString::kAdminDb, "system.external_validation_keys"); - bool NamespaceString::isListCollectionsCursorNS() const { return coll() == listCollectionsCursorCol; } @@ -128,16 +124,12 @@ bool NamespaceString::isLegalClientSystemNS() const { return true; if (coll() == kServerConfigurationNamespace.coll()) return true; - if (coll() == kSystemKeysNamespace.coll()) + if (coll() == kKeysCollectionNamespace.coll()) return true; - if (coll() == "system.backup_users") + if (coll() == kExternalKeysCollectionNamespace.coll()) return true; - if (coll() == kExternalKeysCollectionNamespace.coll()) { - // TODO (SERVER-53404): This was added to allow client in an integration test to - // manually insert the key document into this system collection. Remove this when the - // tenant migration donor does the copying by itself. + if (coll() == "system.backup_users") return true; - } } else if (db() == kConfigDb) { if (coll() == "system.sessions") return true; diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 03accfec6f7..a60adb43d51 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -100,8 +100,12 @@ public: // of a specific database static const NamespaceString kShardConfigDatabasesNamespace; - // Name for causal consistency's key collection. - static const NamespaceString kSystemKeysNamespace; + // Namespace for storing keys for signing and validating cluster times created by the cluster + // that this node is in. + static const NamespaceString kKeysCollectionNamespace; + + // Namespace for storing keys for validating cluster times created by other clusters. + static const NamespaceString kExternalKeysCollectionNamespace; // Namespace of the the oplog collection. static const NamespaceString kRsOplogNamespace; @@ -148,13 +152,6 @@ public: // Namespace for storing config.transactions cloner progress for resharding. static const NamespaceString kReshardingTxnClonerProgressNamespace; - // Namespace for storing keys for signing and validating cluster times created by the cluster - // that this node is in. - static const NamespaceString kKeysCollectionNamespace; - - // Namespace for storing keys for validating cluster times created by other clusters. - static const NamespaceString kExternalKeysCollectionNamespace; - /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index ca18216794f..f8d16936ad9 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1284,10 +1284,16 @@ env.Library( ) env.Library( - target='tenant_migration_recipient_utils', + target='tenant_migration_utils', source=[ + "tenant_migration_util.cpp", "tenant_migration_recipient_entry_helpers.cpp", ], + LIBDEPS=[ + '$BUILD_DIR/mongo/util/future_util', + "repl_server_parameters", + 'wait_for_majority_service', + ], LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/base", "$BUILD_DIR/mongo/db/catalog_raii", @@ -1306,10 +1312,8 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/client/read_preference', - '$BUILD_DIR/mongo/util/future_util', 'primary_only_service', - 'tenant_migration_recipient_utils', - 'wait_for_majority_service', + 'tenant_migration_utils', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/client/clientdriver_network', @@ -1363,10 +1367,10 @@ env.Library( 'tenant_migration_donor_service.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/client/fetcher', 'primary_only_service', - 'repl_server_parameters', 'tenant_migration_donor', - 'wait_for_majority_service', + 'tenant_migration_utils', ], ) @@ -1573,7 +1577,7 @@ env.CppUnitTest( 'task_executor_mock', 'task_runner', 'tenant_migration_recipient_service', - 'tenant_migration_recipient_utils', + 'tenant_migration_utils', 'tenant_oplog_processing', 'wait_for_majority_service', ], diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 6959f19d995..5d10481b025 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -39,6 +39,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/query/find_command_gen.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/tenant_migration_access_blocker.h" #include "mongo/db/repl/tenant_migration_donor_util.h" @@ -58,13 +59,18 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(abortTenantMigrationBeforeLeavingBlockingState); +MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterPersitingInitialDonorStateDoc); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingBlockingState); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingDataSyncState); const std::string kTTLIndexName = "TenantMigrationDonorTTLIndex"; -const Seconds kRecipientSyncDataTimeout(30); const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); +const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly); + +const Seconds kRecipientSyncDataTimeout(30); +const int kMaxRecipientKeyDocsFindAttempts = 10; + std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlocker( ServiceContext* serviceContext, StringData tenantId) { return TenantMigrationAccessBlockerRegistry::get(serviceContext) @@ -261,6 +267,11 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx) void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() { _instanceCancelationSource.cancel(); + + stdx::lock_guard<Latch> lg(_mutex); + if (auto fetcher = _recipientKeysFetcher.lock()) { + fetcher->shutdown(); + } } void TenantMigrationDonorService::Instance::onReceiveDonorForgetMigration() { @@ -287,7 +298,96 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) { } } -ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDocument( +ExecutorFuture<void> +TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token) { + return recipientTargeterRS + ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token()) + .thenRunOn(**executor) + .then([this, self = shared_from_this(), executor](HostAndPort host) { + const auto nss = NamespaceString::kKeysCollectionNamespace; + + const auto cmdObj = [&] { + FindCommand request(NamespaceStringOrUUID{nss}); + request.setReadConcern( + repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern) + .toBSONInner()); + return request.toBSON(BSONObj()); + }(); + + std::vector<ExternalKeysCollectionDocument> keyDocs; + boost::optional<Status> fetchStatus; + + auto fetcherCallback = [this, self = shared_from_this(), &keyDocs, &fetchStatus]( + const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + // Throw out any accumulated results on error + if (!dataStatus.isOK()) { + fetchStatus = dataStatus.getStatus(); + keyDocs.clear(); + return; + } + + const auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + keyDocs.push_back(tenant_migration_util::makeExternalClusterTimeKeyDoc( + _serviceContext, _recipientUri.getSetName(), doc.getOwned())); + } + fetchStatus = Status::OK(); + + if (!getMoreBob) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + + auto fetcher = std::make_shared<Fetcher>( + _recipientCmdExecutor.get(), + host, + nss.db().toString(), + cmdObj, + fetcherCallback, + kPrimaryOnlyReadPreference.toContainingBSON(), + executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */ + executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */ + RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>( + kMaxRecipientKeyDocsFindAttempts, executor::RemoteCommandRequest::kNoTimeout), + transport::kEnableSSL); + uassertStatusOK(fetcher->schedule()); + + { + stdx::lock_guard<Latch> lg(_mutex); + _recipientKeysFetcher = fetcher; + } + + fetcher->join(); + + { + stdx::lock_guard<Latch> lg(_mutex); + _recipientKeysFetcher.reset(); + } + + if (!fetchStatus) { + // The callback never got invoked. + uasserted(5340400, "Internal error running cursor callback in command"); + } + uassertStatusOK(fetchStatus.get()); + + return keyDocs; + }) + .then([this, self = shared_from_this(), executor, token](auto keyDocs) { + checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + + return tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache( + executor, std::move(keyDocs), _instanceCancelationSource.token()); + }); +} + +ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor) { invariant(_stateDoc.getState() == TenantMigrationDonorStateEnum::kUninitialized); _stateDoc.setState(TenantMigrationDonorStateEnum::kDataSync); @@ -299,7 +399,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); writeConflictRetry( - opCtx, "tenantMigrationInsertStateDoc", _stateDocumentsNS.ns(), [&] { + opCtx, "TenantMigrationDonorInsertStateDoc", _stateDocumentsNS.ns(), [&] { const auto filter = BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()); const auto updateMod = BSON("$setOnInsert" << _stateDoc.toBSON()); @@ -321,7 +421,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState .on(**executor, CancelationToken::uncancelable()); } -ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDocument( +ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor, const TenantMigrationDonorStateEnum nextState) { const auto originalStateDocBson = _stateDoc.toBSON(); @@ -332,15 +432,14 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); - uassertStatusOK(writeConflictRetry( - opCtx, "updateStateDoc", _stateDocumentsNS.ns(), [&]() -> Status { - AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); - if (!collection) { - return Status(ErrorCodes::NamespaceNotFound, - str::stream() - << _stateDocumentsNS.ns() << " does not exist"); - } + AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << _stateDocumentsNS.ns() << " does not exist", + collection); + + writeConflictRetry( + opCtx, "TenantMigrationDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] { WriteUnitOfWork wuow(opCtx); const auto originalRecordId = Helpers::findOne(opCtx, @@ -403,8 +502,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState wuow.commit(); updateOpTime = oplogSlot; - return Status::OK(); - })); + }); invariant(updateOpTime); return updateOpTime.get(); @@ -418,11 +516,10 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState } ExecutorFuture<repl::OpTime> -TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable( +TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( std::shared_ptr<executor::ScopedTaskExecutor> executor) { _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); - return AsyncTry([this, self = shared_from_this()] { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); @@ -431,7 +528,7 @@ TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable( writeConflictRetry( opCtx, - "tenantMigrationDonorMarkStateDocAsGarbageCollectable", + "TenantMigrationDonorMarkStateDocAsGarbageCollectable", _stateDocumentsNS.ns(), [&] { const auto filter = @@ -486,7 +583,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi const BSONObj& cmdObj) { return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj] { return recipientTargeterRS - ->findHost(ReadPreferenceSetting(), _instanceCancelationSource.token()) + ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token()) .thenRunOn(**executor) .then([this, self = shared_from_this(), executor, cmdObj](auto recipientHost) { executor::RemoteCommandRequest request(std::move(recipientHost), @@ -495,7 +592,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi rpc::makeEmptyMetadata(), nullptr, kRecipientSyncDataTimeout); - request.sslMode = transport::kEnableSSL; return (_recipientCmdExecutor) @@ -526,7 +622,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); - BSONObj cmdObj = BSONObj([&]() { + const auto cmdObj = [&] { auto donorConnString = repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); RecipientSyncData request; @@ -538,7 +634,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa _stateDoc.getRecipientCertificateForDonor()}); request.setReturnAfterReachingDonorTimestamp(_stateDoc.getBlockTimestamp()); return request.toBSON(BSONObj()); - }()); + }(); return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj); } @@ -576,15 +672,25 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } // Enter "dataSync" state. - return _insertStateDocument(executor).then( - [this, self = shared_from_this(), executor](repl::OpTime opTime) { + return _insertStateDoc(executor) + .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should // use its base PrimaryOnlyService's cancelation source to pass tokens // in calls to WaitForMajorityService::waitUntilMajority. return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }) + .then([this, self = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + pauseTenantMigrationAfterPersitingInitialDonorStateDoc.pauseWhileSet(opCtx); }); }) .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + + return _fetchAndStoreRecipientClusterTimeKeyDocs(executor, recipientTargeterRS, token); + }) + .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -601,7 +707,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); // Enter "blocking" state. - return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kBlocking) + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking) .then([this, self = shared_from_this(), executor, token]( repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should @@ -691,7 +797,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); // Enter "commit" state. - return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kCommitted) + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted) .then([this, self = shared_from_this(), executor, token]( repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should @@ -728,7 +834,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } else { // Enter "abort" state. _abortReason.emplace(status); - return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kAborted) + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime)) .then([this, self = shared_from_this()] { @@ -766,7 +872,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS); }) .then([this, self = shared_from_this(), executor] { - return _markStateDocumentAsGarbageCollectable(executor); + return _markStateDocAsGarbageCollectable(executor); }) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime)); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 6bbc131c525..e0f11201602 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/base/string_data.h" +#include "mongo/client/fetcher.h" #include "mongo/client/remote_command_targeter_rs.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/repl_server_parameters_gen.h" @@ -149,10 +150,19 @@ public: const NamespaceString _stateDocumentsNS = NamespaceString::kTenantMigrationDonorsNamespace; /** + * Fetches all key documents from the recipient's admin.system.keys collection, stores + * them in admin.system.external_validation_keys, and refreshes the keys cache. + */ + ExecutorFuture<void> _fetchAndStoreRecipientClusterTimeKeyDocs( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token); + + /** * Inserts the state document to _stateDocumentsNS and returns the opTime for the insert * oplog entry. */ - ExecutorFuture<repl::OpTime> _insertStateDocument( + ExecutorFuture<repl::OpTime> _insertStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor); /** @@ -161,7 +171,7 @@ public: * commitOrAbortTimestamp depending on the state. Returns the opTime for the update oplog * entry. */ - ExecutorFuture<repl::OpTime> _updateStateDocument( + ExecutorFuture<repl::OpTime> _updateStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor, const TenantMigrationDonorStateEnum nextState); @@ -169,7 +179,7 @@ public: * Sets the "expireAt" time for the state document to be garbage collected, and returns the * the opTime for the write. */ - ExecutorFuture<repl::OpTime> _markStateDocumentAsGarbageCollectable( + ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable( std::shared_ptr<executor::ScopedTaskExecutor> executor); /** @@ -216,6 +226,10 @@ public: return recipientCmdThreadPoolLimits; } + // Weak pointer to the Fetcher used for fetching admin.system.keys documents from the + // recipient. It is only not null when the instance is actively fetching the documents. + std::weak_ptr<Fetcher> _recipientKeysFetcher; + boost::optional<Status> _abortReason; // Protects the durable state and the promises below. diff --git a/src/mongo/db/repl/tenant_migration_pem_payload.idl b/src/mongo/db/repl/tenant_migration_pem_payload.idl index 2e4aa666c62..6886c4be34a 100644 --- a/src/mongo/db/repl/tenant_migration_pem_payload.idl +++ b/src/mongo/db/repl/tenant_migration_pem_payload.idl @@ -42,9 +42,9 @@ structs: type: string description: "Certificate PEM blob." validator: - callback: "validateCertificatePEMPayload" + callback: "tenant_migration_util::validateCertificatePEMPayload" privateKey: type: string description: "Private key PEM blob." validator: - callback: "validatePrivateKeyPEMPayload" + callback: "tenant_migration_util::validatePrivateKeyPEMPayload" diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index 02d6d7f42b8..a473cd06a88 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -72,7 +72,7 @@ structs: The URI string that the donor will utilize to create a connection with the recipient. validator: - callback: "validateConnectionString" + callback: "tenant_migration_util::validateConnectionString" readPreference: type: readPreference description: >- @@ -132,12 +132,12 @@ structs: The URI string that the recipient will utilize to create a connection with the donor. validator: - callback: "validateConnectionString" + callback: "tenant_migration_util::validateConnectionString" tenantId: type: string description: "The tenantId for the migration." validator: - callback: "validateDatabasePrefix" + callback: "tenant_migration_util::validateDatabasePrefix" readPreference: type: readPreference description: >- diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp new file mode 100644 index 00000000000..69e3e935759 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/tenant_migration_util.h" + +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/logical_time_validator.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/util/future_util.h" + +namespace mongo { + +namespace tenant_migration_util { + +ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(ServiceContext* serviceContext, + std::string rsName, + BSONObj keyDoc) { + auto originalKeyDoc = KeysCollectionDocument::parse(IDLParserErrorContext("keyDoc"), keyDoc); + + ExternalKeysCollectionDocument externalKeyDoc( + OID::gen(), + originalKeyDoc.getKeyId(), + rsName, + serviceContext->getFastClockSource()->now() + + Seconds{repl::tenantMigrationExternalKeysRemovalDelaySecs.load()}); + externalKeyDoc.setKeysCollectionDocumentBase(originalKeyDoc.getKeysCollectionDocumentBase()); + + return externalKeyDoc; +} + +ExecutorFuture<void> storeExternalClusterTimeKeyDocsAndRefreshCache( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::vector<ExternalKeysCollectionDocument> keyDocs, + const CancelationToken& token) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + auto nss = NamespaceString::kExternalKeysCollectionNamespace; + + for (auto& keyDoc : keyDocs) { + AutoGetCollection collection(opCtx, nss, MODE_IX); + + writeConflictRetry(opCtx, "CloneExternalKeyDocs", nss.ns(), [&] { + const auto filter = BSON(ExternalKeysCollectionDocument::kKeyIdFieldName + << keyDoc.getKeyId() + << ExternalKeysCollectionDocument::kReplicaSetNameFieldName + << keyDoc.getReplicaSetName() + << ExternalKeysCollectionDocument::kTTLExpiresAtFieldName + << BSON("$lt" << keyDoc.getTTLExpiresAt())); + + // Remove _id since updating _id is not allowed. + const auto updateMod = keyDoc.toBSON().removeField("_id"); + + Helpers::upsert(opCtx, + nss.ns(), + filter, + updateMod, + /*fromMigrate=*/false); + }); + } + + const auto opTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(opTime) + .thenRunOn(**executor) + .then([] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto validator = LogicalTimeValidator::get(opCtx); + if (validator) { + // Refresh the keys cache to avoid validation errors for external cluster times with + // a keyId that matches the keyId of an internal key since the LogicalTimeValidator + // only refreshes the cache when it cannot find a matching internal key. + validator->refreshKeyManagerCache(opCtx); + } + }); +} + +} // namespace tenant_migration_util + +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h index 009d9649097..e7d253ed6fd 100644 --- a/src/mongo/db/repl/tenant_migration_util.h +++ b/src/mongo/db/repl/tenant_migration_util.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -26,6 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #pragma once #include <set> @@ -34,7 +35,9 @@ #include "mongo/bson/timestamp.h" #include "mongo/client/mongo_uri.h" #include "mongo/config.h" +#include "mongo/db/keys_collection_document_gen.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/executor/scoped_task_executor.h" #include "mongo/util/net/ssl_util.h" #include "mongo/util/str.h" @@ -46,6 +49,8 @@ const std::set<std::string> kUnsupportedTenantIds{"", "admin", "local", "config" } // namespace +namespace tenant_migration_util { + inline Status validateDatabasePrefix(const std::string& tenantId) { const bool isPrefixSupported = kUnsupportedTenantIds.find(tenantId) == kUnsupportedTenantIds.end(); @@ -122,4 +127,25 @@ inline Status validatePrivateKeyPEMPayload(const StringData& payload) { #endif } +/* + * Creates an ExternalKeysCollectionDocument representing an admin.system.external_validation_keys + * document from the given the admin.system.keys document BSONObj. + */ +ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(ServiceContext* serviceContext, + std::string rsName, + BSONObj keyDoc); + +/* + * For each given ExternalKeysCollectionDocument, inserts it if there is not an existing document in + * admin.system.external_validation_keys for it with the same keyId and replicaSetName. Otherwise, + * updates the ttlExpiresAt of the existing document if it is less than the new ttlExpiresAt. Waits + * for the writes to be majority-committed, and refreshes the logical validator's cache. + */ +ExecutorFuture<void> storeExternalClusterTimeKeyDocsAndRefreshCache( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::vector<ExternalKeysCollectionDocument> keyDocs, + const CancelationToken& token); + +} // namespace tenant_migration_util + } // namespace mongo |