diff options
author | Suganthi Mani <suganthi.mani@mongodb.com> | 2020-08-11 06:04:16 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-09 05:12:40 +0000 |
commit | 2bcab5793e96a6b8466b1cf8a3d7289ec4263b8d (patch) | |
tree | f9afd88512007872a5880cd341ad84d14435b8ea | |
parent | cbb82fd1b270f84e544243acbba2cb3fed779c28 (diff) | |
download | mongo-2bcab5793e96a6b8466b1cf8a3d7289ec4263b8d.tar.gz |
SERVER-48785 Create TenantMigrationRecipientService and TenantMigrationRecipientServiceInstance.
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp | 139 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h | 67 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 156 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 96 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 169 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_state_machine.idl | 3 |
11 files changed, 694 insertions, 8 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index ff76c2daefb..25c4d852403 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2113,6 +2113,7 @@ env.Library( 'repl/replication_recovery', 'repl/serveronly_repl', 'repl/storage_interface_impl', + 'repl/tenant_migration_recipient_service', 'repl/topology_coordinator', 'repl/wait_for_majority_service', 's/sessions_collection_config_server', diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index f6809e00bf9..093bb83eb18 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -125,6 +125,7 @@ #include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/tenant_migration_donor_service.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/repl_set_member_in_standalone_mode.h" @@ -303,13 +304,15 @@ void initializeCommandHooks(ServiceContext* serviceContext) { void registerPrimaryOnlyServices(ServiceContext* serviceContext) { auto registry = repl::PrimaryOnlyServiceRegistry::get(serviceContext); - std::unique_ptr<TenantMigrationDonorService> tenantMigrationDonorService = - std::make_unique<TenantMigrationDonorService>(serviceContext); - registry->registerService(std::move(tenantMigrationDonorService)); - std::unique_ptr<ReshardingCoordinatorService> reshardingCoordinatorService = - std::make_unique<ReshardingCoordinatorService>(serviceContext); - registry->registerService(std::move(reshardingCoordinatorService)); + std::vector<std::unique_ptr<repl::PrimaryOnlyService>> services; + services.push_back(std::make_unique<TenantMigrationDonorService>(serviceContext)); + services.push_back(std::make_unique<repl::TenantMigrationRecipientService>(serviceContext)); + services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext)); + + for (auto& service : services) { + registry->registerService(std::move(service)); + } } MONGO_FAIL_POINT_DEFINE(shutdownAtStartup); diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 5e4100ea8ac..9471aca909c 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -72,6 +72,9 @@ const NamespaceString NamespaceString::kMigrationCoordinatorsNamespace(Namespace const NamespaceString NamespaceString::kTenantMigrationDonorsNamespace(NamespaceString::kConfigDb, "tenantMigrationDonors"); +const NamespaceString NamespaceString::kTenantMigrationRecipientsNamespace( + NamespaceString::kConfigDb, "tenantMigrationRecipients"); + const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb, "cache.collections"); const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb, diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index ad3c5e7bbb9..a43406f8bd4 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -112,6 +112,9 @@ public: // Namespace for storing the persisted state of tenant migration donors. static const NamespaceString kTenantMigrationDonorsNamespace; + // Namespace for storing the persisted state of tenant migration recipient service instances. + static const NamespaceString kTenantMigrationRecipientsNamespace; + // Namespace for replica set configuration settings. static const NamespaceString kSystemReplSetNamespace; diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d1fd2714f33..eeddbed7948 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1248,13 +1248,56 @@ env.Library( ) env.Library( + target='tenant_migration_state_machine_idl', + source=[ + env.Idlc('tenant_migration_state_machine.idl')[0], + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/read_preference', + '$BUILD_DIR/mongo/idl/idl_parser', + 'optime', + ], +) + +env.Library( + target='tenant_migration_recipient_utils', + source=[ + "tenant_migration_recipient_entry_helpers.cpp", + ], + LIBDEPS_PRIVATE=[ + "$BUILD_DIR/mongo/base", + "$BUILD_DIR/mongo/db/catalog_raii", + "$BUILD_DIR/mongo/db/dbhelpers", + "$BUILD_DIR/mongo/db/namespace_string", + '$BUILD_DIR/mongo/db/service_context', + "$BUILD_DIR/mongo/db/storage/write_unit_of_work", + "tenant_migration_state_machine_idl", + ], +) + +env.Library( + target='tenant_migration_recipient_service', + source= [ + 'tenant_migration_recipient_service.cpp', + ], + LIBDEPS=[ + 'primary_only_service', + 'tenant_migration_recipient_utils', + 'wait_for_majority_service', + ], + LIBDEPS_PRIVATE=[ + 'tenant_migration_state_machine_idl', + ] +) + +env.Library( target='tenant_migration_donor', source=[ 'tenant_migration_access_blocker.cpp', 'tenant_migration_access_blocker_by_prefix.cpp', 'tenant_migration_access_blocker_server_status_section.cpp', 'tenant_migration_donor_util.cpp', - env.Idlc('tenant_migration_state_machine.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -1270,7 +1313,8 @@ env.Library( 'local_oplog_info', 'optime', 'repl_coordinator_interface', - 'tenant_migration_conflict_info' + 'tenant_migration_conflict_info', + 'tenant_migration_state_machine_idl' ], ) @@ -1411,6 +1455,7 @@ env.CppUnitTest( 'tenant_oplog_batcher_test.cpp', 'vote_requester_test.cpp', 'wait_for_majority_service_test.cpp', + 'tenant_migration_recipient_service_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -1488,6 +1533,7 @@ env.CppUnitTest( 'sync_source_selector_mock', 'task_executor_mock', 'task_runner', + 'tenant_migration_recipient_service', 'tenant_oplog_processing', 'wait_for_majority_service', ], diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp new file mode 100644 index 00000000000..69b181302f7 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/index_build_entry_helpers.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/util/str.h" + +namespace mongo { +namespace repl { +namespace { +/** + * Creates the tenant migration recipients collection if it doesn't exist. + * Note: Throws WriteConflictException if the collection already exist. + */ +const Collection* ensureTenantMigrationRecipientsCollectionExists(OperationContext* opCtx, + Database* db, + const NamespaceString& nss) { + // Sanity checks. + invariant(db); + invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); + + auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); + if (!collection) { + WriteUnitOfWork wuow(opCtx); + + collection = db->createCollection(opCtx, nss, CollectionOptions()); + // Ensure the collection exists. + invariant(collection); + + wuow.commit(); + } + return collection; +} + +} // namespace + +namespace tenantMigrationRecipientEntryHelpers { + +Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) { + const auto nss = NamespaceString::kTenantMigrationRecipientsNamespace; + return writeConflictRetry(opCtx, "insertStateDoc", nss.ns(), [&]() -> Status { + // TODO SERVER-50741: Should be replaced by AutoGetCollection. + AutoGetOrCreateDb db(opCtx, nss.db(), MODE_IX); + Lock::CollectionLock collLock(opCtx, nss, MODE_IX); + + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "No longer primary while attempting to insert tenant migration " + "recipient state document", + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); + + // TODO SERVER-50741: ensureTenantMigrationRecipientsCollectionExists() should be removed + // and this should return ErrorCodes::NamespaceNotFound when collection is missing. + auto collection = ensureTenantMigrationRecipientsCollectionExists(opCtx, db.getDb(), nss); + + WriteUnitOfWork wuow(opCtx); + Status status = + collection->insertDocument(opCtx, InsertStatement(stateDoc.toBSON()), nullptr); + if (!status.isOK()) { + return status; + } + wuow.commit(); + return Status::OK(); + }); +} + +StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx, + const UUID& migrationUUID) { + // Read the most up to date data. + ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kNoTimestamp); + AutoGetCollectionForRead autoCollection(opCtx, + NamespaceString::kTenantMigrationRecipientsNamespace); + const Collection* collection = autoCollection.getCollection(); + + if (!collection) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection not found: " + << NamespaceString::kTenantMigrationRecipientsNamespace.ns()); + } + + BSONObj result; + auto foundDoc = Helpers::findOne( + opCtx, collection, BSON("_id" << migrationUUID), result, /*requireIndex=*/true); + if (!foundDoc) { + return Status(ErrorCodes::NoMatchingDocument, + str::stream() << "No matching state doc found with tenant migration UUID: " + << migrationUUID); + } + + try { + return TenantMigrationRecipientDocument::parse(IDLParserErrorContext("recipientStateDoc"), + result); + } catch (DBException& ex) { + return ex.toStatus( + str::stream() << "Invalid BSON found for matching document with tenant migration UUID: " + << migrationUUID << " , res: " << result); + } +} + +} // namespace tenantMigrationRecipientEntryHelpers +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h new file mode 100644 index 00000000000..7bd3ece2ab2 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { + +class OperationContext; +class TenantMigrationRecipientDocument; +class Status; +class UUID; +template <typename T> +class StatusWith; + +namespace repl { +namespace tenantMigrationRecipientEntryHelpers { + +/** + * Writes the state doc to the disk. + * + * Returns 'DuplicateKey' error code if a document already exists on the disk with the same + * 'migrationUUID'. + */ +Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc); + +/** + * Returns the state doc matching the document with 'migrationUUID' from the disk if it + * exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date + * data. + * + * If the stored state doc on disk contains invalid BSON, the 'InvalidBSON' error code is + * returned. + * + * Returns 'NoMatchingDocument' error code if no document with 'migrationUUID' is found. + */ +StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx, + const UUID& migrationUUID); + +} // namespace tenantMigrationRecipientEntryHelpers +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp new file mode 100644 index 00000000000..02b0f7be1ab --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -0,0 +1,156 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/client.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" + +namespace mongo { +namespace repl { + +// Fails before waiting for the state doc to be majority replicated. +MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); + +TenantMigrationRecipientService::TenantMigrationRecipientService(ServiceContext* serviceContext) + : PrimaryOnlyService(serviceContext) {} + +StringData TenantMigrationRecipientService::getServiceName() const { + return kTenantMigrationRecipientServiceName; +} + +NamespaceString TenantMigrationRecipientService::getStateDocumentsNS() const { + return NamespaceString::kTenantMigrationRecipientsNamespace; +} + +ThreadPool::Limits TenantMigrationRecipientService::getThreadPoolLimits() const { + // TODO SERVER-50669: This will be replaced by a tunable server parameter. + return ThreadPool::Limits(); +} + +std::shared_ptr<PrimaryOnlyService::Instance> TenantMigrationRecipientService::constructInstance( + BSONObj initialStateDoc) const { + return std::make_shared<TenantMigrationRecipientService::Instance>(initialStateDoc); +} + +TenantMigrationRecipientService::Instance::Instance(BSONObj stateDoc) + : PrimaryOnlyService::TypedInstance<Instance>() { + _stateDoc = TenantMigrationRecipientDocument::parse(IDLParserErrorContext("recipientStateDoc"), + stateDoc); +} + +SemiFuture<void> TenantMigrationRecipientService::Instance::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { + return ExecutorFuture(**executor) + .then([this]() -> SharedSemiFuture<void> { + auto uniqueOpCtx = cc().makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + + // The instance is marked as garbage collect if the migration is either + // committed or aborted on donor side. So, don't start the recipient task if the + // instance state doc is marked for garbage collect. + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Can't start the data sync as the state doc is already marked " + "for garbage collect for migration uuid: " + << getMigrationUUID(), + !isMarkedForGarbageCollect()); + + auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + + // Persist the state doc before starting the data sync. + auto status = tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc); + + // TODO SERVER-50742: Ignoring duplicate check step should be removed. + // We can hit duplicate key error when the instances are rebuilt on a new primary after + // step up. So, it's ok to ignore duplicate key errors. + if (status != ErrorCodes::DuplicateKey) { + uassertStatusOK(status); + } + + auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + // No writes happened implies that the state doc is already on disk. This can happen + // only when the instances are rebuilt on node step up. And, + // PrimaryOnlyService::onStepUp() waits for majority commit of the primary no-op oplog + // entry written by the node in the newer term before scheduling the Instance::run(). + // So, it's safe to assume that instance's state document written in an older term on + // disk won't get rolled back for step up case. + if (lastOpBeforeRun == lastOpAfterRun) { + // TODO SERVER-50742: Add an invariant check to make sure this case can happen only + // for step up. + return {Future<void>::makeReady()}; + } + + if (MONGO_unlikely( + failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) { + LOGV2(4878500, "Persisting state doc failed due to fail point enabled."); + uassert(ErrorCodes::NotWritablePrimary, "not writable primary ", false); + } + + // Wait for the state doc to be majority replicated. + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(lastOpAfterRun); + }) + .then([this] { + // TODO SERVER-48808: Run cloners in MigrationServiceInstance + // TODO SERVER-48811: Oplog fetching in MigrationServiceInstance + }) + .onCompletion([this](Status status) { + LOGV2(4878501, + "Tenant Recipient data sync completed.", + "migrationId"_attr = getMigrationUUID(), + "dbPrefix"_attr = _stateDoc.getDatabasePrefix(), + "status"_attr = status); + return status; + }) + .semi(); +} + +const UUID& TenantMigrationRecipientService::Instance::getMigrationUUID() const { + stdx::lock_guard lk(_mutex); + return _stateDoc.getId(); +} + +bool TenantMigrationRecipientService::Instance::isMarkedForGarbageCollect() const { + stdx::lock_guard lk(_mutex); + return _stateDoc.getGarbageCollect(); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h new file mode 100644 index 00000000000..c57fdb1a062 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <memory> + +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" + +namespace mongo { + +class OperationContext; +class ServiceContext; + +namespace repl { + +/** + * TenantMigrationRecipientService is a primary only service to handle + * data copy portion of a multitenant migration on recipient side. + */ +class TenantMigrationRecipientService final : public PrimaryOnlyService { + // Disallows copying. + TenantMigrationRecipientService(const TenantMigrationRecipientService&) = delete; + TenantMigrationRecipientService& operator=(const TenantMigrationRecipientService&) = delete; + +public: + static constexpr StringData kTenantMigrationRecipientServiceName = + "TenantMigrationRecipientService"_sd; + + explicit TenantMigrationRecipientService(ServiceContext* serviceContext); + ~TenantMigrationRecipientService() = default; + + StringData getServiceName() const final; + + NamespaceString getStateDocumentsNS() const final; + + ThreadPool::Limits getThreadPoolLimits() const final; + + std::shared_ptr<PrimaryOnlyService::Instance> constructInstance( + BSONObj initialStateDoc) const final; + + class Instance final : public PrimaryOnlyService::TypedInstance<Instance> { + public: + explicit Instance(BSONObj stateDoc); + + SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept final; + + /* + * Returns the instance id. + */ + const UUID& getMigrationUUID() const; + + /* + * Returns true if the instance state doc is marked for garbage collect. + */ + bool isMarkedForGarbageCollect() const; + + private: + // Protects below data members. + mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex"); + + TenantMigrationRecipientDocument _stateDoc; + }; +}; + + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp new file mode 100644 index 00000000000..874f1a11a02 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <memory> + +#include "mongo/db/client.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/repl/primary_only_service_op_observer.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/future.h" + +using namespace mongo; +using namespace mongo::repl; + +class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest { +public: + void setUp() override { + ServiceContextMongoDTest::setUp(); + auto serviceContext = getServiceContext(); + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + { + auto opCtx = cc().makeOperationContext(); + auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext); + ReplicationCoordinator::set(serviceContext, std::move(replCoord)); + + repl::setOplogCollectionName(serviceContext); + repl::createOplog(opCtx.get()); + // Set up OpObserver so that repl::logOp() will store the oplog entry's optime in + // ReplClientInfo. + OpObserverRegistry* opObserverRegistry = + dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); + opObserverRegistry->addObserver( + std::make_unique<PrimaryOnlyServiceOpObserver>(serviceContext)); + + _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); + std::unique_ptr<TenantMigrationRecipientService> service = + std::make_unique<TenantMigrationRecipientService>(getServiceContext()); + _registry->registerService(std::move(service)); + _registry->onStartup(opCtx.get()); + } + stepUp(); + + _service = _registry->lookupServiceByName( + TenantMigrationRecipientService::kTenantMigrationRecipientServiceName); + ASSERT(_service); + } + + void tearDown() override { + WaitForMajorityService::get(getServiceContext()).shutDown(); + + _registry->shutdown(); + _service = nullptr; + + ServiceContextMongoDTest::tearDown(); + } + + void stepDown() { + ASSERT_OK(ReplicationCoordinator::get(getServiceContext()) + ->setFollowerMode(MemberState::RS_SECONDARY)); + _registry->onStepDown(); + } + + void stepUp() { + auto opCtx = cc().makeOperationContext(); + auto replCoord = ReplicationCoordinator::get(getServiceContext()); + + // Advance term + _term++; + + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY)); + ASSERT_OK(replCoord->updateTerm(opCtx.get(), _term)); + replCoord->setMyLastAppliedOpTimeAndWallTime( + OpTimeAndWallTime(OpTime(Timestamp(1, 1), _term), Date_t())); + + _registry->onStepUpComplete(opCtx.get(), _term); + } + +protected: + PrimaryOnlyServiceRegistry* _registry; + PrimaryOnlyService* _service; + long long _term = 0; +}; + + +TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientServiceInstanceCreation) { + const UUID migrationUUID = UUID::gen(); + + TenantMigrationRecipientDocument TenantMigrationRecipientInstance( + migrationUUID, + "DonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet::primaryOnly())); + + // Create and start the instance. + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + _service, TenantMigrationRecipientInstance.toBSON()); + ASSERT(instance.get()); + ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); + + // Wait for task completion success. + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + + +TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePersisitingStateDoc) { + FailPointEnableBlock failPoint("failWhilePersistingTenantMigrationRecipientInstanceStateDoc"); + + const UUID migrationUUID = UUID::gen(); + + TenantMigrationRecipientDocument TenantMigrationRecipientInstance( + migrationUUID, + "DonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet::primaryOnly())); + + // Create and start the instance. + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + _service, TenantMigrationRecipientInstance.toBSON()); + ASSERT(instance.get()); + ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); + + // Should be able to see the instance task failure error. + auto status = instance->getCompletionFuture().getNoThrow(); + ASSERT_EQ(ErrorCodes::NotWritablePrimary, status.code()); +}
\ No newline at end of file diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index e29354bac78..4c2fe5bd430 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -109,9 +109,12 @@ structs: startApplyingOpTime: description: "Populated during data sync; the donor's operation time when the data cloning starts." type: optime + optional: true startFetchingOpTime: description: "Populated during data sync; the donor's operation time of the last open transaction when the data cloning started." type: optime + optional: true cloneFinishedOptime: description: "Populated during data sync; the recipient operation time when the data cloning finishes." type: optime + optional: true |