summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/SConscript3
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.cpp19
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.idl2
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.cpp32
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.idl31
-rw-r--r--src/mongo/db/concurrency/d_concurrency.h22
-rw-r--r--src/mongo/db/dbhelpers.cpp23
-rw-r--r--src/mongo/db/dbhelpers.h21
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp109
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h15
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_entry_helpers_test.cpp187
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp41
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h29
-rw-r--r--src/mongo/db/repl/tenant_migration_state_machine.idl10
-rw-r--r--src/mongo/db/repl/tenant_migration_util.h36
17 files changed, 450 insertions, 134 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 7b74363fa7e..d451c3cd4f3 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -462,6 +462,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/repl_server_parameters',
'$BUILD_DIR/mongo/db/repl/tenant_migration_donor_service',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_recipient_service',
'$BUILD_DIR/mongo/db/rw_concern_d',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/server_options_core',
@@ -557,7 +558,9 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/connection_string',
'$BUILD_DIR/mongo/client/read_preference',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl',
'$BUILD_DIR/mongo/idl/idl_parser',
]
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
index d451be395f3..467e4c4c9c9 100644
--- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
@@ -55,25 +55,6 @@ public:
const RequestType& requestBody = request();
- // Sanity check that donor and recipient do not share any of the same hosts for
- // migration
- const auto donorConnectionString =
- repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
- const auto donorServers = donorConnectionString.getServers();
- const auto recipientServers =
- uassertStatusOK(
- MongoURI::parse(requestBody.getRecipientConnectionString().toString()))
- .getServers();
- for (const auto& server : donorServers) {
- uassert(ErrorCodes::InvalidOptions,
- "recipient and donor hosts must be different",
- std::none_of(recipientServers.begin(),
- recipientServers.end(),
- [&](const HostAndPort& recipientServer) {
- return server == recipientServer;
- }));
- }
-
const auto donorStateDoc =
TenantMigrationDonorDocument(requestBody.getMigrationId(),
requestBody.getRecipientConnectionString().toString(),
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.idl b/src/mongo/db/commands/tenant_migration_donor_cmds.idl
index 1c3b75c34d7..b1e3416a379 100644
--- a/src/mongo/db/commands/tenant_migration_donor_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.idl
@@ -63,6 +63,8 @@ commands:
recipientConnectionString:
description: "The URI string that the donor will utilize to create a connection with the recipient."
type: string
+ validator:
+ callback: "validateConnectionString"
tenantId:
description: "The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed."
type: string
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
index 7013f72be93..c20be7a533b 100644
--- a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp
@@ -28,8 +28,11 @@
*/
#include "mongo/db/commands.h"
+#include "mongo/db/commands/tenant_migration_donor_cmds_gen.h"
#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
+#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
namespace mongo {
namespace {
@@ -37,16 +40,40 @@ namespace {
class RecipientSyncDataCmd : public TypedCommand<RecipientSyncDataCmd> {
public:
using Request = RecipientSyncData;
+ using Response = RecipientSyncDataResponse;
class Invocation : public InvocationBase {
public:
using InvocationBase::InvocationBase;
- void typedRun(OperationContext* opCtx) {
+ Response typedRun(OperationContext* opCtx) {
uassert(ErrorCodes::CommandNotSupported,
"recipientSyncData command not enabled",
repl::enableTenantMigrations);
+
+ const auto& cmd = request();
+
+ TenantMigrationRecipientDocument stateDoc(cmd.getMigrationId(),
+ cmd.getDonorConnectionString().toString(),
+ cmd.getTenantId().toString(),
+ cmd.getReadPreference());
+
+ auto recipientService =
+ repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(repl::TenantMigrationRecipientService::
+ kTenantMigrationRecipientServiceName);
+ auto recipientInstance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx, recipientService, stateDoc.toBSON());
+ uassertStatusOK(recipientInstance->checkIfOptionsConflict(stateDoc));
+
+ auto returnAfterReachingTimestamp = cmd.getReturnAfterReachingTimestamp();
+ if (!returnAfterReachingTimestamp) {
+ return Response(recipientInstance->waitUntilMigrationReachesConsistentState(opCtx));
+ }
+
+ return Response(recipientInstance->waitUntilTimestampIsMajorityCommitted(
+ opCtx, *returnAfterReachingTimestamp));
}
void doCheckAuthorization(OperationContext* opCtx) const {}
@@ -61,7 +88,8 @@ public:
};
std::string help() const {
- return "Instructs the recipient to sync data as part of a tenant migration.";
+ return "Internal replica set command; instructs the recipient to sync data as part of a "
+ "tenant migration.";
}
bool adminOnly() const override {
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
index 01a772c4c38..c43fdef58e1 100644
--- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
@@ -31,6 +31,7 @@ global:
cpp_includes:
- "mongo/client/read_preference.h"
- "mongo/db/repl/tenant_migration_util.h"
+ - "mongo/db/repl/optime.h"
imports:
- "mongo/client/read_preference_setting.idl"
@@ -38,6 +39,15 @@ imports:
- "mongo/s/sharding_types.idl"
- "mongo/db/repl/replication_types.idl"
+structs:
+ recipientSyncDataResponse:
+ description: "Response for the 'recipientSyncData' command"
+ strict: false
+ fields:
+ majorityAppliedDonorOpTime:
+ type: optime
+ description: "Majority applied donor optime by the recipient"
+
commands:
recipientSyncData:
description: "Parser for the 'recipientSyncData' command."
@@ -48,20 +58,31 @@ commands:
description: "Unique identifier for the tenant migration."
type: uuid
donorConnectionString:
- description: "The URI string that the donor will utilize to create a connection with the recipient."
+ description: >-
+ The URI string that the recipient will utilize to create a connection with the
+ donor.
type: string
+ validator:
+ callback: "validateConnectionString"
tenantId:
- description: "The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed."
+ description: >-
+ The prefix from which the migrating database will be matched. The prefixes 'admin',
+ 'local', 'config', the empty string, are not allowed.
type: string
validator:
callback: "validateDatabasePrefix"
readPreference:
- description: "The read preference settings that the donor will pass on to the recipient."
+ description: >-
+ The read preference settings that the donor will pass on to the recipient.
type: readPreference
- returnAfterReachingOpTime:
- description: "If provided, the recipient should return after syncing up to this OpTime. Otherwise, the recipient will return once its copy of the data is consistent."
+ returnAfterReachingTimestamp:
+ description: >-
+ If provided, the recipient should return after syncing up to this timestamp.
+ Otherwise, the recipient will return once its copy of the data is consistent.
type: timestamp
optional: true
+ validator:
+ callback: "validateTimestampNotNull"
recipientForgetMigration:
description: "Parser for the 'recipientForgetMigration' command."
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h
index 4ae7af8cdb9..5c19dbf8d02 100644
--- a/src/mongo/db/concurrency/d_concurrency.h
+++ b/src/mongo/db/concurrency/d_concurrency.h
@@ -81,8 +81,11 @@ public:
: _rid(rid), _locker(locker), _result(LOCK_INVALID) {}
ResourceLock(Locker* locker, ResourceId rid, LockMode mode)
+ : ResourceLock(nullptr, locker, rid, mode) {}
+
+ ResourceLock(OperationContext* opCtx, Locker* locker, ResourceId rid, LockMode mode)
: _rid(rid), _locker(locker), _result(LOCK_INVALID) {
- lock(nullptr, mode);
+ lock(opCtx, mode);
}
ResourceLock(ResourceLock&& otherLock)
@@ -165,7 +168,13 @@ public:
class ExclusiveLock : public ResourceLock {
public:
ExclusiveLock(Locker* locker, ResourceMutex mutex)
- : ResourceLock(locker, mutex.rid(), MODE_X) {}
+ : ExclusiveLock(nullptr, locker, mutex) {}
+
+ /**
+ * Interruptible lock acquisition.
+ */
+ ExclusiveLock(OperationContext* opCtx, Locker* locker, ResourceMutex mutex)
+ : ResourceLock(opCtx, locker, mutex.rid(), MODE_X) {}
using ResourceLock::lock;
@@ -185,8 +194,13 @@ public:
*/
class SharedLock : public ResourceLock {
public:
- SharedLock(Locker* locker, ResourceMutex mutex)
- : ResourceLock(locker, mutex.rid(), MODE_IS) {}
+ SharedLock(Locker* locker, ResourceMutex mutex) : SharedLock(nullptr, locker, mutex) {}
+
+ /**
+ * Interruptible lock acquisition.
+ */
+ SharedLock(OperationContext* opCtx, Locker* locker, ResourceMutex mutex)
+ : ResourceLock(opCtx, locker, mutex.rid(), MODE_IS) {}
};
/**
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 09993f84475..4730e429f1c 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -44,7 +44,6 @@
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_request.h"
-#include "mongo/db/ops/update_result.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_planner.h"
@@ -224,21 +223,21 @@ bool Helpers::getLast(OperationContext* opCtx, const char* ns, BSONObj& result)
return false;
}
-void Helpers::upsert(OperationContext* opCtx,
- const string& ns,
- const BSONObj& o,
- bool fromMigrate) {
+UpdateResult Helpers::upsert(OperationContext* opCtx,
+ const string& ns,
+ const BSONObj& o,
+ bool fromMigrate) {
BSONElement e = o["_id"];
verify(e.type());
BSONObj id = e.wrap();
- upsert(opCtx, ns, id, o, fromMigrate);
+ return upsert(opCtx, ns, id, o, fromMigrate);
}
-void Helpers::upsert(OperationContext* opCtx,
- const string& ns,
- const BSONObj& filter,
- const BSONObj& updateMod,
- bool fromMigrate) {
+UpdateResult Helpers::upsert(OperationContext* opCtx,
+ const string& ns,
+ const BSONObj& filter,
+ const BSONObj& updateMod,
+ bool fromMigrate) {
OldClientContext context(opCtx, ns);
const NamespaceString requestNs(ns);
@@ -251,7 +250,7 @@ void Helpers::upsert(OperationContext* opCtx,
request.setFromMigration(fromMigrate);
request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::NO_YIELD);
- ::mongo::update(opCtx, context.db(), request);
+ return ::mongo::update(opCtx, context.db(), request);
}
void Helpers::update(OperationContext* opCtx,
diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h
index 339f5165455..17b1494c1ae 100644
--- a/src/mongo/db/dbhelpers.h
+++ b/src/mongo/db/dbhelpers.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/update_result.h"
#include "mongo/db/record_id.h"
namespace mongo {
@@ -117,14 +118,14 @@ struct Helpers {
static void putSingleton(OperationContext* opCtx, const char* ns, BSONObj obj);
/**
- * you have to lock
+ * Callers are expected to hold the collection lock.
* you do not have to have Context set
* o has to have an _id field or will assert
*/
- static void upsert(OperationContext* opCtx,
- const std::string& ns,
- const BSONObj& o,
- bool fromMigrate = false);
+ static UpdateResult upsert(OperationContext* opCtx,
+ const std::string& ns,
+ const BSONObj& o,
+ bool fromMigrate = false);
/**
* Performs an upsert of 'updateMod' if we don't match the given 'filter'.
@@ -132,11 +133,11 @@ struct Helpers {
* Note: Query yielding is turned off, so both read and writes are performed
* on the same storage snapshot.
*/
- static void upsert(OperationContext* opCtx,
- const std::string& ns,
- const BSONObj& filter,
- const BSONObj& updateMod,
- bool fromMigrate = false);
+ static UpdateResult upsert(OperationContext* opCtx,
+ const std::string& ns,
+ const BSONObj& filter,
+ const BSONObj& updateMod,
+ bool fromMigrate = false);
/**
* Performs an update of 'updateMod' for the entry matching the given 'filter'.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 03498cf54b9..c765b83033b 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1476,6 +1476,7 @@ env.CppUnitTest(
'tenant_oplog_batcher_test.cpp',
'vote_requester_test.cpp',
'wait_for_majority_service_test.cpp',
+ 'tenant_migration_recipient_entry_helpers_test.cpp',
'tenant_migration_recipient_service_test.cpp',
],
LIBDEPS=[
@@ -1556,6 +1557,7 @@ env.CppUnitTest(
'task_executor_mock',
'task_runner',
'tenant_migration_recipient_service',
+ 'tenant_migration_recipient_utils',
'tenant_oplog_processing',
'wait_for_majority_service',
],
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 1478ad2cf9c..6ae9760ccbd 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -448,7 +448,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
donorConnString.toString(),
_stateDoc.getTenantId().toString(),
_stateDoc.getReadPreference());
- request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp());
+ request.setReturnAfterReachingTimestamp(_stateDoc.getBlockTimestamp());
return request.toBSON(BSONObj());
}());
diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
index de656c59ca6..e5e0ff5c0c9 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp
@@ -36,71 +36,55 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
-#include "mongo/db/index_build_entry_helpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_request.h"
+#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/util/str.h"
+
namespace mongo {
namespace repl {
-namespace {
-/**
- * Creates the tenant migration recipients collection if it doesn't exist.
- * Note: Throws WriteConflictException if the collection already exist.
- */
-CollectionPtr ensureTenantMigrationRecipientsCollectionExists(OperationContext* opCtx,
- Database* db,
- const NamespaceString& nss) {
- // Sanity checks.
- invariant(db);
- invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX));
-
- auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
- if (!collection) {
- WriteUnitOfWork wuow(opCtx);
-
- collection = db->createCollection(opCtx, nss, CollectionOptions());
- // Ensure the collection exists.
- invariant(collection);
-
- wuow.commit();
- }
- return collection;
-}
-
-} // namespace
namespace tenantMigrationRecipientEntryHelpers {
Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) {
const auto nss = NamespaceString::kTenantMigrationRecipientsNamespace;
- return writeConflictRetry(opCtx, "insertStateDoc", nss.ns(), [&]() -> Status {
- // TODO SERVER-50741: Should be replaced by AutoGetCollection.
- AutoGetOrCreateDb db(opCtx, nss.db(), MODE_IX);
- Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
-
- uassert(ErrorCodes::PrimarySteppedDown,
- str::stream() << "No longer primary while attempting to insert tenant migration "
- "recipient state document",
- repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
-
- // TODO SERVER-50741: ensureTenantMigrationRecipientsCollectionExists() should be removed
- // and this should return ErrorCodes::NamespaceNotFound when collection is missing.
- auto collection = ensureTenantMigrationRecipientsCollectionExists(opCtx, db.getDb(), nss);
-
- WriteUnitOfWork wuow(opCtx);
- Status status =
- collection->insertDocument(opCtx, InsertStatement(stateDoc.toBSON()), nullptr);
- if (!status.isOK()) {
- return status;
- }
- wuow.commit();
- return Status::OK();
- });
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ // Sanity check
+ uassert(ErrorCodes::PrimarySteppedDown,
+ str::stream() << "No longer primary while attempting to insert tenant migration "
+ "recipient state document",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
+
+ return writeConflictRetry(
+ opCtx, "insertTenantMigrationRecipientStateDoc", nss.ns(), [&]() -> Status {
+ // Insert the 'stateDoc' if no active tenant migration found for the 'tenantId' provided
+ // in the 'stateDoc'. Tenant Migration is considered as active for a tenantId if a state
+ // document exists on the disk for that 'tenantId' and not marked to be garbage
+ // collected (i.e, 'expireAt' not set).
+ const auto filter = BSON(TenantMigrationRecipientDocument::kTenantIdFieldName
+ << stateDoc.getTenantId().toString()
+ << TenantMigrationRecipientDocument::kExpireAtFieldName
+ << BSON("$exists" << false));
+ const auto updateMod = BSON("$setOnInsert" << stateDoc.toBSON());
+ auto updateResult =
+ Helpers::upsert(opCtx, nss.ns(), filter, updateMod, /*fromMigrate=*/false);
+
+ // '$setOnInsert' update operator can no way modify the existing on-disk state doc.
+ invariant(!updateResult.numDocsModified);
+ if (updateResult.upsertedId.isEmpty()) {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Failed to insert the state doc: " << stateDoc.toBSON()
+ << "; Active tenant migration found for tenantId: "
+ << stateDoc.getTenantId()};
+ }
+ return Status::OK();
+ });
}
Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) {
@@ -111,18 +95,19 @@ Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDoc
return Status(ErrorCodes::NamespaceNotFound,
str::stream() << nss.ns() << " does not exist");
}
- auto updateReq = UpdateRequest();
- updateReq.setNamespaceString(nss);
- updateReq.setQuery(BSON("_id" << stateDoc.getId()));
- updateReq.setUpdateModification(
- write_ops::UpdateModification::parseFromClassicUpdate(stateDoc.toBSON()));
- auto updateResult = update(opCtx, collection.getDb(), updateReq);
- if (updateResult.numMatched == 0) {
- return {ErrorCodes::NoSuchKey,
- str::stream() << "Existing Tenant Migration State Document not found for id: "
- << stateDoc.getId()};
- }
- return Status::OK();
+
+ return writeConflictRetry(
+ opCtx, "updateTenantMigrationRecipientStateDoc", nss.ns(), [&]() -> Status {
+ auto updateResult =
+ Helpers::upsert(opCtx, nss.ns(), stateDoc.toBSON(), /*fromMigrate=*/false);
+ if (updateResult.numMatched == 0) {
+ return {ErrorCodes::NoSuchKey,
+ str::stream()
+ << "Existing Tenant Migration State Document not found for id: "
+ << stateDoc.getId()};
+ }
+ return Status::OK();
+ });
}
StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
index 3a6d88beb61..998af60a60b 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h
@@ -42,10 +42,19 @@ namespace repl {
namespace tenantMigrationRecipientEntryHelpers {
/**
- * Writes the state doc to the disk.
+ * Inserts the tenant migration recipient state document 'stateDoc' into
+ * 'config.tenantMigrationRecipients' collection. Also, creates the collection if not present
+ * before inserting the document.
*
- * Returns 'DuplicateKey' error code if a document already exists on the disk with the same
- * 'migrationUUID'.
+ * NOTE: A state doc might get inserted based on a decision made out of a stale read within a
+ * storage transaction. Callers are expected to have their own concurrency mechanism to handle
+ * write skew problem.
+ *
+ * @Returns 'ConflictingOperationInProgress' error code if an active tenant migration found for the
+ * tenantId provided in the 'stateDoc'.
+ *
+ * Throws 'DuplicateKey' error code if a document already exists on the disk with the same
+ * 'migrationUUID', irrespective of the document marked for garbage collect or not.
*/
Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers_test.cpp
new file mode 100644
index 00000000000..cb1cb69d146
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers_test.cpp
@@ -0,0 +1,187 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+
+namespace mongo {
+namespace repl {
+
+using namespace tenantMigrationRecipientEntryHelpers;
+
+class TenantMigrationRecipientEntryHelpersTest : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+ auto serviceContext = getServiceContext();
+
+ auto opCtx = cc().makeOperationContext();
+ ReplicationCoordinator::set(serviceContext,
+ std::make_unique<ReplicationCoordinatorMock>(serviceContext));
+ StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>());
+
+ repl::setOplogCollectionName(serviceContext);
+ repl::createOplog(opCtx.get());
+
+ // Step up the node.
+ long long term = 1;
+ auto replCoord = ReplicationCoordinator::get(getServiceContext());
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
+ ASSERT_OK(replCoord->updateTerm(opCtx.get(), term));
+ replCoord->setMyLastAppliedOpTimeAndWallTime(
+ OpTimeAndWallTime(OpTime(Timestamp(1, 1), term), Date_t()));
+ }
+
+ void tearDown() override {
+ ServiceContextMongoDTest::tearDown();
+ }
+
+protected:
+ bool checkStateDocPersisted(OperationContext* opCtx,
+ const TenantMigrationRecipientDocument& stateDoc) {
+ auto persistedStateDocWithStatus = getStateDoc(opCtx, stateDoc.getId());
+
+ auto status = persistedStateDocWithStatus.getStatus();
+ if (status == ErrorCodes::NoMatchingDocument) {
+ return false;
+ }
+ ASSERT_OK(status);
+
+ ASSERT_BSONOBJ_EQ(stateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON());
+ return true;
+ }
+};
+
+TEST_F(TenantMigrationRecipientEntryHelpersTest, AddTenantMigrationRecipientStateDoc) {
+ auto opCtx = cc().makeOperationContext();
+
+ const UUID migrationUUID = UUID::gen();
+ TenantMigrationRecipientDocument activeTenantAStateDoc(
+ migrationUUID,
+ "DonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ ASSERT_OK(insertStateDoc(opCtx.get(), activeTenantAStateDoc));
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), activeTenantAStateDoc));
+
+ // Same migration uuid and same tenant id.
+ TenantMigrationRecipientDocument stateDoc1(migrationUUID,
+ "AnotherDonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ auto status = insertStateDoc(opCtx.get(), stateDoc1);
+ ASSERT_EQUALS(ErrorCodes::ConflictingOperationInProgress, status.code());
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), activeTenantAStateDoc));
+
+ // Same migration uuid and different tenant id.
+ TenantMigrationRecipientDocument stateDoc2(migrationUUID,
+ "DonorHost:12345",
+ "tenantB",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ ASSERT_THROWS_CODE(
+ insertStateDoc(opCtx.get(), stateDoc2), DBException, ErrorCodes::DuplicateKey);
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), activeTenantAStateDoc));
+
+ // Different migration uuid and same tenant id.
+ TenantMigrationRecipientDocument stateDoc3(UUID::gen(),
+ "DonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ status = insertStateDoc(opCtx.get(), stateDoc3);
+ ASSERT_EQUALS(ErrorCodes::ConflictingOperationInProgress, status.code());
+ ASSERT_FALSE(checkStateDocPersisted(opCtx.get(), stateDoc3));
+
+ // Different migration uuid and different tenant id.
+ TenantMigrationRecipientDocument stateDoc4(UUID::gen(),
+ "DonorHost:12345",
+ "tenantB",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ ASSERT_OK(insertStateDoc(opCtx.get(), stateDoc4));
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), stateDoc4));
+}
+
+TEST_F(TenantMigrationRecipientEntryHelpersTest,
+ AddTenantMigrationRecipientStateDoc_GarbageCollect) {
+ auto opCtx = cc().makeOperationContext();
+
+ const UUID migrationUUID = UUID::gen();
+ TenantMigrationRecipientDocument inactiveTenantAStateDoc(
+ migrationUUID,
+ "DonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ inactiveTenantAStateDoc.setExpireAt(Date_t::now());
+ ASSERT_OK(insertStateDoc(opCtx.get(), inactiveTenantAStateDoc));
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), inactiveTenantAStateDoc));
+
+ // Same migration uuid and same tenant id.
+ TenantMigrationRecipientDocument stateDoc1(migrationUUID,
+ "AnotherDonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ ASSERT_THROWS_CODE(
+ insertStateDoc(opCtx.get(), stateDoc1), DBException, ErrorCodes::DuplicateKey);
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), inactiveTenantAStateDoc));
+
+ // Same migration uuid and different tenant id.
+ TenantMigrationRecipientDocument stateDoc2(migrationUUID,
+ "DonorHost:12345",
+ "tenantB",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ ASSERT_THROWS_CODE(
+ insertStateDoc(opCtx.get(), stateDoc2), DBException, ErrorCodes::DuplicateKey);
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), inactiveTenantAStateDoc));
+
+ // Different migration uuid and same tenant id.
+ TenantMigrationRecipientDocument stateDoc3(UUID::gen(),
+ "DonorHost:12345",
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ ASSERT_OK(insertStateDoc(opCtx.get(), stateDoc3));
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), stateDoc3));
+
+ // Different migration uuid and different tenant id.
+ TenantMigrationRecipientDocument stateDoc4(UUID::gen(),
+ "DonorHost:12345",
+ "tenantC",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ ASSERT_OK(insertStateDoc(opCtx.get(), stateDoc4));
+ ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), stateDoc4));
+}
+
+} // namespace repl
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 9b7842ac673..03901fc5fe8 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -163,11 +163,13 @@ ThreadPool::Limits TenantMigrationRecipientService::getThreadPoolLimits() const
std::shared_ptr<PrimaryOnlyService::Instance> TenantMigrationRecipientService::constructInstance(
BSONObj initialStateDoc) const {
- return std::make_shared<TenantMigrationRecipientService::Instance>(initialStateDoc);
+ return std::make_shared<TenantMigrationRecipientService::Instance>(this, initialStateDoc);
}
-TenantMigrationRecipientService::Instance::Instance(BSONObj stateDoc)
+TenantMigrationRecipientService::Instance::Instance(
+ const TenantMigrationRecipientService* recipientService, BSONObj stateDoc)
: PrimaryOnlyService::TypedInstance<Instance>(),
+ _recipientService(recipientService),
_stateDoc(TenantMigrationRecipientDocument::parse(IDLParserErrorContext("recipientStateDoc"),
stateDoc)),
_tenantId(_stateDoc.getTenantId().toString()),
@@ -175,6 +177,25 @@ TenantMigrationRecipientService::Instance::Instance(BSONObj stateDoc)
_donorConnectionString(_stateDoc.getDonorConnectionString().toString()),
_readPreference(_stateDoc.getReadPreference()) {}
+Status TenantMigrationRecipientService::Instance::checkIfOptionsConflict(
+ const TenantMigrationRecipientDocument& requestedStateDoc) const {
+ invariant(requestedStateDoc.getId() == _migrationUuid);
+
+ if (requestedStateDoc.getTenantId() == _tenantId &&
+ requestedStateDoc.getDonorConnectionString() == _donorConnectionString &&
+ requestedStateDoc.getReadPreference().equals(_readPreference)) {
+ return Status::OK();
+ }
+
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Requested options for tenant migration doesn't match"
+ << " the active migration options, migrationId: " << _migrationUuid
+ << ", tenantId: " << _tenantId
+ << ", connectionString: " << _donorConnectionString
+ << ", readPreference: " << _readPreference.toString()
+ << ", requested options:" << requestedStateDoc.toBSON());
+}
+
OpTime TenantMigrationRecipientService::Instance::waitUntilMigrationReachesConsistentState(
OperationContext* opCtx) const {
return _dataConsistentPromise.getFuture().get(opCtx);
@@ -314,7 +335,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc(
auto uniqueOpCtx = cc().makeOperationContext();
auto opCtx = uniqueOpCtx.get();
-
LOGV2_DEBUG(5081400,
2,
"Recipient migration service initializing state document",
@@ -325,7 +345,11 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc(
// Persist the state doc before starting the data sync.
_stateDoc.setState(TenantMigrationRecipientStateEnum::kStarted);
- uassertStatusOK(tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc));
+ {
+ Lock::ExclusiveLock stateDocInsertLock(
+ opCtx, opCtx->lockState(), _recipientService->_stateDocInsertMutex);
+ uassertStatusOK(tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc));
+ }
if (MONGO_unlikely(failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) {
LOGV2(4878500, "Persisting state doc failed due to fail point enabled.");
@@ -724,6 +748,13 @@ void TenantMigrationRecipientService::Instance::run(
_scopedExecutor = executor;
pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet();
+ LOGV2(4879607,
+ "Starting tenant migration recipient instance: ",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "connectionString"_attr = _donorConnectionString,
+ "readPreference"_attr = _readPreference);
+
ExecutorFuture(**executor)
.then([this] {
stdx::lock_guard lk(_mutex);
@@ -768,7 +799,7 @@ void TenantMigrationRecipientService::Instance::run(
str::stream() << "Can't start the data sync as the state doc is already marked "
"for garbage collect for migration uuid: "
<< getMigrationUUID(),
- !_stateDoc.getGarbageCollect());
+ !_stateDoc.getExpireAt());
_getStartOpTimesFromDonor(lk);
auto opCtx = cc().makeOperationContext();
uassertStatusOK(
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index d8c80112d04..e0b460479be 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -75,7 +75,8 @@ public:
class Instance final : public PrimaryOnlyService::TypedInstance<Instance> {
public:
- explicit Instance(BSONObj stateDoc);
+ explicit Instance(const TenantMigrationRecipientService* recipientService,
+ BSONObj stateDoc);
void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept final;
@@ -112,17 +113,24 @@ public:
*/
const std::string& getTenantId() const;
+ /**
+ * To be called on the instance returned by PrimaryOnlyService::getOrCreate(). Returns an
+ * error if the options this Instance was created with are incompatible with a request for
+ * an instance with the options given in 'options'.
+ */
+ Status checkIfOptionsConflict(const TenantMigrationRecipientDocument& StateDoc) const;
+
/*
* Blocks the thread until the tenant migration reaches consistent state in an interruptible
* mode. Returns the donor optime at which the migration reached consistent state. Throws
- * exceptions on error.
+ * exception on error.
*/
OpTime waitUntilMigrationReachesConsistentState(OperationContext* opCtx) const;
/*
* Blocks the thread until the tenant oplog applier applied data past the given 'donorTs'
* in an interruptible mode. Returns the majority applied donor optime which may be greater
- * or equal to given 'donorTs'. Throws throw exceptions on error.
+ * or equal to given 'donorTs'. Throws exception on error.
*/
OpTime waitUntilTimestampIsMajorityCommitted(OperationContext* opCtx,
const Timestamp& donorTs) const;
@@ -341,8 +349,9 @@ public:
// (M) Reads and writes guarded by _mutex.
// (W) Synchronization required only for writes.
- std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // (M)
- TenantMigrationRecipientDocument _stateDoc; // (M)
+ const TenantMigrationRecipientService* const _recipientService; // (R) (not owned)
+ std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // (M)
+ TenantMigrationRecipientDocument _stateDoc; // (M)
// This data is provided in the initial state doc and never changes. We keep copies to
// avoid having to obtain the mutex to access them.
@@ -385,6 +394,16 @@ public:
// Promise that is resolved Signaled when the tenant data sync has reached consistent point.
SharedPromise<OpTime> _dataConsistentPromise; // (W)
};
+
+private:
+ /*
+ * Ensures that only one Instance is able to insert the initial state doc provided by the user,
+ * into NamespaceString::kTenantMigrationRecipientsNamespace collection at a time.
+ *
+ * No other locks should be held when locking this. RSTl/global/db/collection locks have to be
+ * taken after taking this.
+ */
+ Lock::ResourceMutex _stateDocInsertMutex{"TenantMigrationRecipientStateDocInsert::mutex"};
};
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl
index 6e9c4a2cd6a..a23047231bf 100644
--- a/src/mongo/db/repl/tenant_migration_state_machine.idl
+++ b/src/mongo/db/repl/tenant_migration_state_machine.idl
@@ -132,12 +132,12 @@ structs:
type: TenantMigrationRecipientState
description: "The state of the tenant migration."
default: kUninitialized
- garbageCollect:
- type: bool
+ expireAt:
+ type: date
description: >-
- A boolean that determines whether the state machine should be deleted
- after a delay via the TTL monitor.
- default: false
+ The wall-clock time at which the state machine document should be
+ removed by the TTL monitor.
+ optional: true
startApplyingOpTime:
description: >-
Populated during data sync; the donor's operation time when the data
diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h
index 04dc44fec88..84b0a1c6c8a 100644
--- a/src/mongo/db/repl/tenant_migration_util.h
+++ b/src/mongo/db/repl/tenant_migration_util.h
@@ -31,6 +31,9 @@
#include <set>
#include "mongo/base/status.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/client/mongo_uri.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/util/str.h"
namespace mongo {
@@ -48,6 +51,37 @@ inline Status validateDatabasePrefix(const std::string& tenantId) {
return isPrefixSupported
? Status::OK()
: Status(ErrorCodes::BadValue,
- str::stream() << "cannot migrate databases with prefix \'" << tenantId << "'");
+ str::stream() << "cannot migrate databases for tenant \'" << tenantId << "'");
}
+
+inline Status validateTimestampNotNull(const Timestamp& ts) {
+ return (!ts.isNull())
+ ? Status::OK()
+ : Status(ErrorCodes::BadValue, str::stream() << "Timestamp can't be null");
+}
+
+inline Status validateConnectionString(const StringData& donorOrRecipientConnectionString) {
+ // Sanity check to make sure that donor and recipient do not share any of the same hosts
+ // for tenant migration.
+ const auto donorOrRecipientServers =
+ uassertStatusOK(MongoURI::parse(donorOrRecipientConnectionString.toString())).getServers();
+
+ const auto servers = repl::ReplicationCoordinator::get(cc().getServiceContext())
+ ->getConfig()
+ .getConnectionString()
+ .getServers();
+
+ for (auto&& server : servers) {
+ bool foundMatch = std::any_of(
+ donorOrRecipientServers.begin(),
+ donorOrRecipientServers.end(),
+ [&](const HostAndPort& donorOrRecipient) { return server == donorOrRecipient; });
+ if (foundMatch) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Donor and recipient hosts must be different.");
+ }
+ }
+ return Status::OK();
+}
+
} // namespace mongo