summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSuganthi Mani <suganthi.mani@mongodb.com>2020-08-11 06:04:16 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-09 05:12:40 +0000
commit2bcab5793e96a6b8466b1cf8a3d7289ec4263b8d (patch)
treef9afd88512007872a5880cd341ad84d14435b8ea
parentcbb82fd1b270f84e544243acbba2cb3fed779c28 (diff)
downloadmongo-2bcab5793e96a6b8466b1cf8a3d7289ec4263b8d.tar.gz
SERVER-48785 Create TenantMigrationRecipientService and TenantMigrationRecipientServiceInstance.
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/mongod_main.cpp15
-rw-r--r--src/mongo/db/namespace_string.cpp3
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/repl/SConscript50
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp139
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h67
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp156
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h96
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp169
-rw-r--r--src/mongo/db/repl/tenant_migration_state_machine.idl3
11 files changed, 694 insertions, 8 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index ff76c2daefb..25c4d852403 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2113,6 +2113,7 @@ env.Library(
'repl/replication_recovery',
'repl/serveronly_repl',
'repl/storage_interface_impl',
+ 'repl/tenant_migration_recipient_service',
'repl/topology_coordinator',
'repl/wait_for_majority_service',
's/sessions_collection_config_server',
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index f6809e00bf9..093bb83eb18 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -125,6 +125,7 @@
#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/tenant_migration_donor_service.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/repl_set_member_in_standalone_mode.h"
@@ -303,13 +304,15 @@ void initializeCommandHooks(ServiceContext* serviceContext) {
void registerPrimaryOnlyServices(ServiceContext* serviceContext) {
auto registry = repl::PrimaryOnlyServiceRegistry::get(serviceContext);
- std::unique_ptr<TenantMigrationDonorService> tenantMigrationDonorService =
- std::make_unique<TenantMigrationDonorService>(serviceContext);
- registry->registerService(std::move(tenantMigrationDonorService));
- std::unique_ptr<ReshardingCoordinatorService> reshardingCoordinatorService =
- std::make_unique<ReshardingCoordinatorService>(serviceContext);
- registry->registerService(std::move(reshardingCoordinatorService));
+ std::vector<std::unique_ptr<repl::PrimaryOnlyService>> services;
+ services.push_back(std::make_unique<TenantMigrationDonorService>(serviceContext));
+ services.push_back(std::make_unique<repl::TenantMigrationRecipientService>(serviceContext));
+ services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext));
+
+ for (auto& service : services) {
+ registry->registerService(std::move(service));
+ }
}
MONGO_FAIL_POINT_DEFINE(shutdownAtStartup);
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 5e4100ea8ac..9471aca909c 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -72,6 +72,9 @@ const NamespaceString NamespaceString::kMigrationCoordinatorsNamespace(Namespace
const NamespaceString NamespaceString::kTenantMigrationDonorsNamespace(NamespaceString::kConfigDb,
"tenantMigrationDonors");
+const NamespaceString NamespaceString::kTenantMigrationRecipientsNamespace(
+ NamespaceString::kConfigDb, "tenantMigrationRecipients");
+
const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb,
"cache.collections");
const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb,
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index ad3c5e7bbb9..a43406f8bd4 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -112,6 +112,9 @@ public:
// Namespace for storing the persisted state of tenant migration donors.
static const NamespaceString kTenantMigrationDonorsNamespace;
+ // Namespace for storing the persisted state of tenant migration recipient service instances.
+ static const NamespaceString kTenantMigrationRecipientsNamespace;
+
// Namespace for replica set configuration settings.
static const NamespaceString kSystemReplSetNamespace;
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d1fd2714f33..eeddbed7948 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1248,13 +1248,56 @@ env.Library(
)
env.Library(
+ target='tenant_migration_state_machine_idl',
+ source=[
+ env.Idlc('tenant_migration_state_machine.idl')[0],
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/read_preference',
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ 'optime',
+ ],
+)
+
+env.Library(
+ target='tenant_migration_recipient_utils',
+ source=[
+ "tenant_migration_recipient_entry_helpers.cpp",
+ ],
+ LIBDEPS_PRIVATE=[
+ "$BUILD_DIR/mongo/base",
+ "$BUILD_DIR/mongo/db/catalog_raii",
+ "$BUILD_DIR/mongo/db/dbhelpers",
+ "$BUILD_DIR/mongo/db/namespace_string",
+ '$BUILD_DIR/mongo/db/service_context',
+ "$BUILD_DIR/mongo/db/storage/write_unit_of_work",
+ "tenant_migration_state_machine_idl",
+ ],
+)
+
+env.Library(
+ target='tenant_migration_recipient_service',
+ source= [
+ 'tenant_migration_recipient_service.cpp',
+ ],
+ LIBDEPS=[
+ 'primary_only_service',
+ 'tenant_migration_recipient_utils',
+ 'wait_for_majority_service',
+ ],
+ LIBDEPS_PRIVATE=[
+ 'tenant_migration_state_machine_idl',
+ ]
+)
+
+env.Library(
target='tenant_migration_donor',
source=[
'tenant_migration_access_blocker.cpp',
'tenant_migration_access_blocker_by_prefix.cpp',
'tenant_migration_access_blocker_server_status_section.cpp',
'tenant_migration_donor_util.cpp',
- env.Idlc('tenant_migration_state_machine.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -1270,7 +1313,8 @@ env.Library(
'local_oplog_info',
'optime',
'repl_coordinator_interface',
- 'tenant_migration_conflict_info'
+ 'tenant_migration_conflict_info',
+ 'tenant_migration_state_machine_idl'
],
)
@@ -1411,6 +1455,7 @@ env.CppUnitTest(
'tenant_oplog_batcher_test.cpp',
'vote_requester_test.cpp',
'wait_for_majority_service_test.cpp',
+ 'tenant_migration_recipient_service_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -1488,6 +1533,7 @@ env.CppUnitTest(
'sync_source_selector_mock',
'task_executor_mock',
'task_runner',
+ 'tenant_migration_recipient_service',
'tenant_oplog_processing',
'wait_for_majority_service',
],
diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
new file mode 100644
index 00000000000..69b181302f7
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
@@ -0,0 +1,139 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/index_build_entry_helpers.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/util/str.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+/**
+ * Creates the tenant migration recipients collection if it doesn't exist.
+ * Note: Throws WriteConflictException if the collection already exist.
+ */
+const Collection* ensureTenantMigrationRecipientsCollectionExists(OperationContext* opCtx,
+ Database* db,
+ const NamespaceString& nss) {
+ // Sanity checks.
+ invariant(db);
+ invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX));
+
+ auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
+ if (!collection) {
+ WriteUnitOfWork wuow(opCtx);
+
+ collection = db->createCollection(opCtx, nss, CollectionOptions());
+ // Ensure the collection exists.
+ invariant(collection);
+
+ wuow.commit();
+ }
+ return collection;
+}
+
+} // namespace
+
+namespace tenantMigrationRecipientEntryHelpers {
+
+Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) {
+ const auto nss = NamespaceString::kTenantMigrationRecipientsNamespace;
+ return writeConflictRetry(opCtx, "insertStateDoc", nss.ns(), [&]() -> Status {
+ // TODO SERVER-50741: Should be replaced by AutoGetCollection.
+ AutoGetOrCreateDb db(opCtx, nss.db(), MODE_IX);
+ Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
+
+ uassert(ErrorCodes::PrimarySteppedDown,
+ str::stream() << "No longer primary while attempting to insert tenant migration "
+ "recipient state document",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
+
+ // TODO SERVER-50741: ensureTenantMigrationRecipientsCollectionExists() should be removed
+ // and this should return ErrorCodes::NamespaceNotFound when collection is missing.
+ auto collection = ensureTenantMigrationRecipientsCollectionExists(opCtx, db.getDb(), nss);
+
+ WriteUnitOfWork wuow(opCtx);
+ Status status =
+ collection->insertDocument(opCtx, InsertStatement(stateDoc.toBSON()), nullptr);
+ if (!status.isOK()) {
+ return status;
+ }
+ wuow.commit();
+ return Status::OK();
+ });
+}
+
+StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx,
+ const UUID& migrationUUID) {
+ // Read the most up to date data.
+ ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kNoTimestamp);
+ AutoGetCollectionForRead autoCollection(opCtx,
+ NamespaceString::kTenantMigrationRecipientsNamespace);
+ const Collection* collection = autoCollection.getCollection();
+
+ if (!collection) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Collection not found: "
+ << NamespaceString::kTenantMigrationRecipientsNamespace.ns());
+ }
+
+ BSONObj result;
+ auto foundDoc = Helpers::findOne(
+ opCtx, collection, BSON("_id" << migrationUUID), result, /*requireIndex=*/true);
+ if (!foundDoc) {
+ return Status(ErrorCodes::NoMatchingDocument,
+ str::stream() << "No matching state doc found with tenant migration UUID: "
+ << migrationUUID);
+ }
+
+ try {
+ return TenantMigrationRecipientDocument::parse(IDLParserErrorContext("recipientStateDoc"),
+ result);
+ } catch (DBException& ex) {
+ return ex.toStatus(
+ str::stream() << "Invalid BSON found for matching document with tenant migration UUID: "
+ << migrationUUID << " , res: " << result);
+ }
+}
+
+} // namespace tenantMigrationRecipientEntryHelpers
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
new file mode 100644
index 00000000000..7bd3ece2ab2
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+class OperationContext;
+class TenantMigrationRecipientDocument;
+class Status;
+class UUID;
+template <typename T>
+class StatusWith;
+
+namespace repl {
+namespace tenantMigrationRecipientEntryHelpers {
+
+/**
+ * Writes the state doc to the disk.
+ *
+ * Returns 'DuplicateKey' error code if a document already exists on the disk with the same
+ * 'migrationUUID'.
+ */
+Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc);
+
+/**
+ * Returns the state doc matching the document with 'migrationUUID' from the disk if it
+ * exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date
+ * data.
+ *
+ * If the stored state doc on disk contains invalid BSON, the 'InvalidBSON' error code is
+ * returned.
+ *
+ * Returns 'NoMatchingDocument' error code if no document with 'migrationUUID' is found.
+ */
+StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx,
+ const UUID& migrationUUID);
+
+} // namespace tenantMigrationRecipientEntryHelpers
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
new file mode 100644
index 00000000000..02b0f7be1ab
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -0,0 +1,156 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+
+namespace mongo {
+namespace repl {
+
+// Fails before waiting for the state doc to be majority replicated.
+MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc);
+
+TenantMigrationRecipientService::TenantMigrationRecipientService(ServiceContext* serviceContext)
+ : PrimaryOnlyService(serviceContext) {}
+
+StringData TenantMigrationRecipientService::getServiceName() const {
+ return kTenantMigrationRecipientServiceName;
+}
+
+NamespaceString TenantMigrationRecipientService::getStateDocumentsNS() const {
+ return NamespaceString::kTenantMigrationRecipientsNamespace;
+}
+
+ThreadPool::Limits TenantMigrationRecipientService::getThreadPoolLimits() const {
+ // TODO SERVER-50669: This will be replaced by a tunable server parameter.
+ return ThreadPool::Limits();
+}
+
+std::shared_ptr<PrimaryOnlyService::Instance> TenantMigrationRecipientService::constructInstance(
+ BSONObj initialStateDoc) const {
+ return std::make_shared<TenantMigrationRecipientService::Instance>(initialStateDoc);
+}
+
+TenantMigrationRecipientService::Instance::Instance(BSONObj stateDoc)
+ : PrimaryOnlyService::TypedInstance<Instance>() {
+ _stateDoc = TenantMigrationRecipientDocument::parse(IDLParserErrorContext("recipientStateDoc"),
+ stateDoc);
+}
+
+SemiFuture<void> TenantMigrationRecipientService::Instance::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
+ return ExecutorFuture(**executor)
+ .then([this]() -> SharedSemiFuture<void> {
+ auto uniqueOpCtx = cc().makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+
+ // The instance is marked as garbage collect if the migration is either
+ // committed or aborted on donor side. So, don't start the recipient task if the
+ // instance state doc is marked for garbage collect.
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Can't start the data sync as the state doc is already marked "
+ "for garbage collect for migration uuid: "
+ << getMigrationUUID(),
+ !isMarkedForGarbageCollect());
+
+ auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+
+ // Persist the state doc before starting the data sync.
+ auto status = tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc);
+
+ // TODO SERVER-50742: Ignoring duplicate check step should be removed.
+ // We can hit duplicate key error when the instances are rebuilt on a new primary after
+ // step up. So, it's ok to ignore duplicate key errors.
+ if (status != ErrorCodes::DuplicateKey) {
+ uassertStatusOK(status);
+ }
+
+ auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ // No writes happened implies that the state doc is already on disk. This can happen
+ // only when the instances are rebuilt on node step up. And,
+ // PrimaryOnlyService::onStepUp() waits for majority commit of the primary no-op oplog
+ // entry written by the node in the newer term before scheduling the Instance::run().
+ // So, it's safe to assume that instance's state document written in an older term on
+ // disk won't get rolled back for step up case.
+ if (lastOpBeforeRun == lastOpAfterRun) {
+ // TODO SERVER-50742: Add an invariant check to make sure this case can happen only
+ // for step up.
+ return {Future<void>::makeReady()};
+ }
+
+ if (MONGO_unlikely(
+ failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) {
+ LOGV2(4878500, "Persisting state doc failed due to fail point enabled.");
+ uassert(ErrorCodes::NotWritablePrimary, "not writable primary ", false);
+ }
+
+ // Wait for the state doc to be majority replicated.
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(lastOpAfterRun);
+ })
+ .then([this] {
+ // TODO SERVER-48808: Run cloners in MigrationServiceInstance
+ // TODO SERVER-48811: Oplog fetching in MigrationServiceInstance
+ })
+ .onCompletion([this](Status status) {
+ LOGV2(4878501,
+ "Tenant Recipient data sync completed.",
+ "migrationId"_attr = getMigrationUUID(),
+ "dbPrefix"_attr = _stateDoc.getDatabasePrefix(),
+ "status"_attr = status);
+ return status;
+ })
+ .semi();
+}
+
+const UUID& TenantMigrationRecipientService::Instance::getMigrationUUID() const {
+ stdx::lock_guard lk(_mutex);
+ return _stateDoc.getId();
+}
+
+bool TenantMigrationRecipientService::Instance::isMarkedForGarbageCollect() const {
+ stdx::lock_guard lk(_mutex);
+ return _stateDoc.getGarbageCollect();
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
new file mode 100644
index 00000000000..c57fdb1a062
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <memory>
+
+#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+
+namespace mongo {
+
+class OperationContext;
+class ServiceContext;
+
+namespace repl {
+
+/**
+ * TenantMigrationRecipientService is a primary only service to handle
+ * data copy portion of a multitenant migration on recipient side.
+ */
+class TenantMigrationRecipientService final : public PrimaryOnlyService {
+ // Disallows copying.
+ TenantMigrationRecipientService(const TenantMigrationRecipientService&) = delete;
+ TenantMigrationRecipientService& operator=(const TenantMigrationRecipientService&) = delete;
+
+public:
+ static constexpr StringData kTenantMigrationRecipientServiceName =
+ "TenantMigrationRecipientService"_sd;
+
+ explicit TenantMigrationRecipientService(ServiceContext* serviceContext);
+ ~TenantMigrationRecipientService() = default;
+
+ StringData getServiceName() const final;
+
+ NamespaceString getStateDocumentsNS() const final;
+
+ ThreadPool::Limits getThreadPoolLimits() const final;
+
+ std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialStateDoc) const final;
+
+ class Instance final : public PrimaryOnlyService::TypedInstance<Instance> {
+ public:
+ explicit Instance(BSONObj stateDoc);
+
+ SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept final;
+
+ /*
+ * Returns the instance id.
+ */
+ const UUID& getMigrationUUID() const;
+
+ /*
+ * Returns true if the instance state doc is marked for garbage collect.
+ */
+ bool isMarkedForGarbageCollect() const;
+
+ private:
+ // Protects below data members.
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex");
+
+ TenantMigrationRecipientDocument _stateDoc;
+ };
+};
+
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
new file mode 100644
index 00000000000..874f1a11a02
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <memory>
+
+#include "mongo/db/client.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/repl/primary_only_service_op_observer.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/future.h"
+
+using namespace mongo;
+using namespace mongo::repl;
+
+class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+ auto serviceContext = getServiceContext();
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext);
+ ReplicationCoordinator::set(serviceContext, std::move(replCoord));
+
+ repl::setOplogCollectionName(serviceContext);
+ repl::createOplog(opCtx.get());
+ // Set up OpObserver so that repl::logOp() will store the oplog entry's optime in
+ // ReplClientInfo.
+ OpObserverRegistry* opObserverRegistry =
+ dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opObserverRegistry->addObserver(
+ std::make_unique<PrimaryOnlyServiceOpObserver>(serviceContext));
+
+ _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
+ std::unique_ptr<TenantMigrationRecipientService> service =
+ std::make_unique<TenantMigrationRecipientService>(getServiceContext());
+ _registry->registerService(std::move(service));
+ _registry->onStartup(opCtx.get());
+ }
+ stepUp();
+
+ _service = _registry->lookupServiceByName(
+ TenantMigrationRecipientService::kTenantMigrationRecipientServiceName);
+ ASSERT(_service);
+ }
+
+ void tearDown() override {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+
+ _registry->shutdown();
+ _service = nullptr;
+
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ void stepDown() {
+ ASSERT_OK(ReplicationCoordinator::get(getServiceContext())
+ ->setFollowerMode(MemberState::RS_SECONDARY));
+ _registry->onStepDown();
+ }
+
+ void stepUp() {
+ auto opCtx = cc().makeOperationContext();
+ auto replCoord = ReplicationCoordinator::get(getServiceContext());
+
+ // Advance term
+ _term++;
+
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
+ ASSERT_OK(replCoord->updateTerm(opCtx.get(), _term));
+ replCoord->setMyLastAppliedOpTimeAndWallTime(
+ OpTimeAndWallTime(OpTime(Timestamp(1, 1), _term), Date_t()));
+
+ _registry->onStepUpComplete(opCtx.get(), _term);
+ }
+
+protected:
+ PrimaryOnlyServiceRegistry* _registry;
+ PrimaryOnlyService* _service;
+ long long _term = 0;
+};
+
+
+TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientServiceInstanceCreation) {
+ const UUID migrationUUID = UUID::gen();
+
+ TenantMigrationRecipientDocument TenantMigrationRecipientInstance(
+ migrationUUID,
+ "DonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet::primaryOnly()));
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ _service, TenantMigrationRecipientInstance.toBSON());
+ ASSERT(instance.get());
+ ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
+
+ // Wait for task completion success.
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+
+TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePersisitingStateDoc) {
+ FailPointEnableBlock failPoint("failWhilePersistingTenantMigrationRecipientInstanceStateDoc");
+
+ const UUID migrationUUID = UUID::gen();
+
+ TenantMigrationRecipientDocument TenantMigrationRecipientInstance(
+ migrationUUID,
+ "DonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet::primaryOnly()));
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ _service, TenantMigrationRecipientInstance.toBSON());
+ ASSERT(instance.get());
+ ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
+
+ // Should be able to see the instance task failure error.
+ auto status = instance->getCompletionFuture().getNoThrow();
+ ASSERT_EQ(ErrorCodes::NotWritablePrimary, status.code());
+} \ No newline at end of file
diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl
index e29354bac78..4c2fe5bd430 100644
--- a/src/mongo/db/repl/tenant_migration_state_machine.idl
+++ b/src/mongo/db/repl/tenant_migration_state_machine.idl
@@ -109,9 +109,12 @@ structs:
startApplyingOpTime:
description: "Populated during data sync; the donor's operation time when the data cloning starts."
type: optime
+ optional: true
startFetchingOpTime:
description: "Populated during data sync; the donor's operation time of the last open transaction when the data cloning started."
type: optime
+ optional: true
cloneFinishedOptime:
description: "Populated during data sync; the recipient operation time when the data cloning finishes."
type: optime
+ optional: true