summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authormathisbessamdb <mathis.bessa@mongodb.com>2022-11-01 19:07:15 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-01 19:42:23 +0000
commit6f8af2868adc224f762555b8132992cd421ee6fb (patch)
treec0b25491a132c43779d0aa7d6494d77ff4827e38 /src/mongo/db/repl
parent774dec4733a7f3066aace01e87842f085275b034 (diff)
downloadmongo-6f8af2868adc224f762555b8132992cd421ee6fb.tar.gz
SERVER-69551 Shard Merge recipient should retry opening the backup cursor if backupCursorCheckpointTimestamp is < startMigrationDonorTimestamp
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp117
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h17
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp593
4 files changed, 685 insertions, 43 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c09a9b4e2e3..ad8be47af0a 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1718,6 +1718,7 @@ if wiredtiger:
'tenant_migration_recipient_access_blocker_test.cpp',
'tenant_migration_recipient_entry_helpers_test.cpp',
'tenant_migration_recipient_service_test.cpp',
+ 'tenant_migration_recipient_service_shard_merge_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/bson/mutable/mutable_bson',
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 57ec62819a1..c6c4d4d2958 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -76,6 +76,7 @@
#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/assert_util.h"
@@ -93,6 +94,8 @@ const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex";
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd;
constexpr int kBackupCursorFileFetcherRetryAttempts = 10;
+constexpr int kCheckpointTsBackupCursorErrorCode = 6929900;
+constexpr int kCloseCursorBeforeOpenErrorCode = 50886;
NamespaceString getOplogBufferNs(const UUID& migrationUUID) {
return NamespaceString(NamespaceString::kConfigDb,
@@ -979,25 +982,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor()
nullptr);
request.sslMode = _donorUri.getSSLMode();
- auto scheduleResult =
- (_recipientService->getInstanceCleanupExecutor())
- ->scheduleRemoteCommand(
- request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- if (!args.response.isOK()) {
- LOGV2_WARNING(6113005,
- "killCursors command task failed",
- "error"_attr = redact(args.response.status));
- return;
- }
- auto status = getStatusFromCommandResult(args.response.data);
- if (status.isOK()) {
- LOGV2_INFO(6113415, "Killed backup cursor");
- } else {
- LOGV2_WARNING(6113006,
- "killCursors command failed",
- "error"_attr = redact(status));
- }
- });
+ const auto scheduleResult = _scheduleKillBackupCursorWithLock(
+ lk, _recipientService->getInstanceCleanupExecutor());
if (!scheduleResult.isOK()) {
LOGV2_WARNING(6113004,
"Failed to run killCursors command on backup cursor",
@@ -1009,13 +995,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor()
SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
const CancellationToken& token) {
- stdx::lock_guard lk(_mutex);
- LOGV2_DEBUG(6113000,
- 1,
- "Trying to open backup cursor on donor primary",
- "migrationId"_attr = _stateDoc.getId(),
- "donorConnectionString"_attr = _stateDoc.getDonorConnectionString());
- const auto cmdObj = [] {
+
+ const auto aggregateCommandRequestObj = [] {
AggregateCommandRequest aggRequest(
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
{BSON("$backupCursor" << BSONObj())});
@@ -1024,11 +1005,18 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
return aggRequest.toBSON(BSONObj());
}();
- auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp();
+ stdx::lock_guard lk(_mutex);
+ LOGV2_DEBUG(6113000,
+ 1,
+ "Trying to open backup cursor on donor primary",
+ "migrationId"_attr = _stateDoc.getId(),
+ "donorConnectionString"_attr = _stateDoc.getDonorConnectionString());
+
+ const auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp();
auto fetchStatus = std::make_shared<boost::optional<Status>>();
auto uniqueMetadataInfo = std::make_unique<boost::optional<shard_merge_utils::MetadataInfo>>();
- auto fetcherCallback =
+ const auto fetcherCallback =
[
this,
self = shared_from_this(),
@@ -1043,8 +1031,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
uassertStatusOK(dataStatus);
uassert(ErrorCodes::CallbackCanceled, "backup cursor interrupted", !token.isCanceled());
- auto uniqueOpCtx = cc().makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
+ const auto uniqueOpCtx = cc().makeOperationContext();
+ const auto opCtx = uniqueOpCtx.get();
const auto& data = dataStatus.getValue();
for (const BSONObj& doc : data.documents) {
@@ -1059,14 +1047,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
"backupCursorId"_attr = data.cursorId,
"backupCursorCheckpointTimestamp"_attr = checkpointTimestamp);
- // This ensures that the recipient won’t receive any 2 phase index build donor
- // oplog entries during the migration. We also have a check in the tenant oplog
- // applier to detect such oplog entries. Adding a check here helps us to detect
- // the problem earlier.
- uassert(6929900,
- "backupCursorCheckpointTimestamp should be greater than or equal to "
- "startMigrationDonorTimestamp",
- checkpointTimestamp >= startMigrationDonorTimestamp);
{
stdx::lock_guard lk(_mutex);
stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
@@ -1075,6 +1055,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
BackupCursorInfo{data.cursorId, data.nss, checkpointTimestamp});
}
+ // This ensures that the recipient won’t receive any 2 phase index build donor
+ // oplog entries during the migration. We also have a check in the tenant oplog
+ // applier to detect such oplog entries. Adding a check here helps us to detect
+ // the problem earlier.
+ uassert(kCheckpointTsBackupCursorErrorCode,
+ "backupCursorCheckpointTimestamp should be greater than or equal to "
+ "startMigrationDonorTimestamp",
+ checkpointTimestamp >= startMigrationDonorTimestamp);
+
invariant(metadataInfoPtr && !*metadataInfoPtr);
(*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo(
getMigrationUUID(), _client->getServerAddress(), metadata);
@@ -1133,10 +1122,10 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
};
_donorFilenameBackupCursorFileFetcher = std::make_unique<Fetcher>(
- (**_scopedExecutor).get(),
+ _backupCursorExecutor.get(),
_client->getServerHostAndPort(),
NamespaceString::kAdminDb.toString(),
- cmdObj,
+ aggregateCommandRequestObj,
fetcherCallback,
ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
executor::RemoteCommandRequest::kNoTimeout, /* aggregateTimeout */
@@ -1160,6 +1149,35 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
.semi();
}
+StatusWith<executor::TaskExecutor::CallbackHandle>
+TenantMigrationRecipientService::Instance::_scheduleKillBackupCursorWithLock(
+ WithLock lk, std::shared_ptr<executor::TaskExecutor> executor) {
+ auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk);
+ executor::RemoteCommandRequest killCursorsRequest(
+ _client->getServerHostAndPort(),
+ donorBackupCursorInfo.nss.db().toString(),
+ BSON("killCursors" << donorBackupCursorInfo.nss.coll().toString() << "cursors"
+ << BSON_ARRAY(donorBackupCursorInfo.cursorId)),
+ nullptr);
+ killCursorsRequest.sslMode = _donorUri.getSSLMode();
+
+ return executor->scheduleRemoteCommand(
+ killCursorsRequest, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ if (!args.response.isOK()) {
+ LOGV2_WARNING(6113005,
+ "killCursors command task failed",
+ "error"_attr = redact(args.response.status));
+ return;
+ }
+ auto status = getStatusFromCommandResult(args.response.data);
+ if (status.isOK()) {
+ LOGV2_INFO(6113415, "Killed backup cursor");
+ } else {
+ LOGV2_WARNING(6113006, "killCursors command failed", "error"_attr = redact(status));
+ }
+ });
+}
+
SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWithRetry(
const CancellationToken& token) {
return AsyncTry([this, self = shared_from_this(), token] { return _openBackupCursor(token); })
@@ -1169,8 +1187,20 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWit
"Retrying backup cursor creation after transient error",
"migrationId"_attr = getMigrationUUID(),
"status"_attr = status);
- // A checkpoint took place while opening a backup cursor. We
- // should retry and *not* cancel migration.
+
+ return false;
+ } else if (status.code() == kCheckpointTsBackupCursorErrorCode ||
+ status.code() == kCloseCursorBeforeOpenErrorCode) {
+ LOGV2_INFO(6955100,
+ "Closing backup cursor and retrying after getting retryable error",
+ "migrationId"_attr = getMigrationUUID(),
+ "status"_attr = status);
+
+ stdx::lock_guard lk(_mutex);
+ const auto scheduleResult =
+ _scheduleKillBackupCursorWithLock(lk, _backupCursorExecutor);
+ uassertStatusOK(scheduleResult);
+
return false;
}
@@ -1199,7 +1229,7 @@ void TenantMigrationRecipientService::Instance::_keepBackupCursorAlive(
auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk);
_backupCursorKeepAliveFuture =
shard_merge_utils::keepBackupCursorAlive(_backupCursorKeepAliveCancellation,
- **_scopedExecutor,
+ _backupCursorExecutor,
_client->getServerHostAndPort(),
donorBackupCursorInfo.cursorId,
donorBackupCursorInfo.nss);
@@ -2703,6 +2733,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
_scopedExecutor = executor;
+ _backupCursorExecutor = **_scopedExecutor;
auto scopedOutstandingMigrationCounter =
TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingReceivingCount();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 22de9a00fd1..a71f29139d9 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -213,6 +213,15 @@ public:
private:
friend class TenantMigrationRecipientServiceTest;
+ friend class TenantMigrationRecipientServiceShardMergeTest;
+
+ /**
+ * Only used for testing. Allows setting a custom task executor for backup cursor fetcher.
+ */
+ void setBackupCursorFetcherExecutor_forTest(
+ std::shared_ptr<executor::TaskExecutor> taskExecutor) {
+ _backupCursorExecutor = taskExecutor;
+ }
const NamespaceString _stateDocumentsNS =
NamespaceString::kTenantMigrationRecipientsNamespace;
@@ -605,6 +614,13 @@ public:
SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingShardMergeProtocol(
const CancellationToken& token);
+ /*
+ * Send the killBackupCursor command to the remote in order to close the backup cursor
+ * connection on the donor.
+ */
+ StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleKillBackupCursorWithLock(
+ WithLock lk, std::shared_ptr<executor::TaskExecutor> executor);
+
mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex");
// All member variables are labeled with one of the following codes indicating the
@@ -618,6 +634,7 @@ public:
ServiceContext* const _serviceContext;
const TenantMigrationRecipientService* const _recipientService; // (R) (not owned)
std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // (M)
+ std::shared_ptr<executor::TaskExecutor> _backupCursorExecutor; // (M)
TenantMigrationRecipientDocument _stateDoc; // (M)
// This data is provided in the initial state doc and never changes. We keep copies to
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp
new file mode 100644
index 00000000000..be6f0bbc799
--- /dev/null
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp
@@ -0,0 +1,593 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <fstream>
+#include <memory>
+
+#include "mongo/client/connpool.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/client/replica_set_monitor_protocol_test_util.h"
+#include "mongo/client/streamable_replica_set_monitor_for_testing.h"
+#include "mongo/config.h"
+#include "mongo/db/client.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/feature_compatibility_version_document_gen.h"
+#include "mongo/db/op_observer/op_observer_impl.h"
+#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/op_observer/oplog_writer_impl.h"
+#include "mongo/db/repl/drop_pending_collection_reaper.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_buffer_collection.h"
+#include "mongo/db/repl/oplog_fetcher_mock.h"
+#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/repl/primary_only_service_op_observer.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session/session_txn_record_gen.h"
+#include "mongo/db/storage/backup_cursor_hooks.h"
+#include "mongo/dbtests/mock/mock_conn_registry.h"
+#include "mongo/dbtests/mock/mock_replica_set.h"
+#include "mongo/executor/mock_network_fixture.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_mock.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/transport/transport_layer_manager.h"
+#include "mongo/transport/transport_layer_mock.h"
+#include "mongo/unittest/log_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/future.h"
+#include "mongo/util/net/ssl_util.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+
+namespace mongo {
+namespace repl {
+
+namespace {
+constexpr std::int32_t stopFailPointErrorCode = 4880402;
+const Timestamp kDefaultStartMigrationTimestamp(1, 1);
+
+OplogEntry makeOplogEntry(OpTime opTime,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ const boost::optional<UUID>& uuid,
+ BSONObj o,
+ boost::optional<BSONObj> o2) {
+ return {DurableOplogEntry(opTime, // optime
+ opType, // opType
+ nss, // namespace
+ uuid, // uuid
+ boost::none, // fromMigrate
+ OplogEntry::kOplogVersion, // version
+ o, // o
+ o2, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ Date_t(), // wall clock time
+ {}, // statement ids
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none, // post-image optime
+ boost::none, // ShardId of resharding recipient
+ boost::none, // _id
+ boost::none)}; // needsRetryImage
+}
+
+} // namespace
+
+class TenantMigrationRecipientServiceShardMergeTest : public ServiceContextMongoDTest {
+public:
+ class stopFailPointEnableBlock : public FailPointEnableBlock {
+ public:
+ explicit stopFailPointEnableBlock(StringData failPointName,
+ std::int32_t error = stopFailPointErrorCode)
+ : FailPointEnableBlock(failPointName,
+ BSON("action"
+ << "stop"
+ << "stopErrorCode" << error)) {}
+ };
+
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+ auto serviceContext = getServiceContext();
+
+ // Fake replSet just for creating consistent URI for monitor
+ MockReplicaSet replSet("donorSet", 1, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ _rsmMonitor.setup(replSet.getURI());
+
+ ConnectionString::setConnectionHook(mongo::MockConnRegistry::get()->getConnStrHook());
+
+ WaitForMajorityService::get(serviceContext).startup(serviceContext);
+
+ // Automatically mark the state doc garbage collectable after data sync completion.
+ globalFailPointRegistry()
+ .find("autoRecipientForgetMigration")
+ ->setMode(FailPoint::alwaysOn);
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext);
+ ReplicationCoordinator::set(serviceContext, std::move(replCoord));
+
+ repl::createOplog(opCtx.get());
+ {
+ Lock::GlobalWrite lk(opCtx.get());
+ OldClientContext ctx(opCtx.get(), NamespaceString::kRsOplogNamespace);
+ tenant_migration_util::createOplogViewForTenantMigrations(opCtx.get(), ctx.db());
+ }
+
+ // Need real (non-mock) storage for the oplog buffer.
+ StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>());
+
+ // The DropPendingCollectionReaper is required to drop the oplog buffer collection.
+ repl::DropPendingCollectionReaper::set(
+ serviceContext,
+ std::make_unique<repl::DropPendingCollectionReaper>(
+ StorageInterface::get(serviceContext)));
+
+ // Set up OpObserver so that repl::logOp() will store the oplog entry's optime in
+ // ReplClientInfo.
+ OpObserverRegistry* opObserverRegistry =
+ dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver());
+ opObserverRegistry->addObserver(
+ std::make_unique<OpObserverImpl>(std::make_unique<OplogWriterImpl>()));
+ opObserverRegistry->addObserver(
+ std::make_unique<PrimaryOnlyServiceOpObserver>(serviceContext));
+
+ _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
+ std::unique_ptr<TenantMigrationRecipientService> service =
+ std::make_unique<TenantMigrationRecipientService>(getServiceContext());
+ _registry->registerService(std::move(service));
+ _registry->onStartup(opCtx.get());
+ }
+ stepUp();
+
+ _service = _registry->lookupServiceByName(
+ TenantMigrationRecipientService::kTenantMigrationRecipientServiceName);
+ ASSERT(_service);
+
+ // MockReplicaSet uses custom connection string which does not support auth.
+ auto authFp = globalFailPointRegistry().find("skipTenantMigrationRecipientAuth");
+ authFp->setMode(FailPoint::alwaysOn);
+
+ // Set the sslMode to allowSSL to avoid validation error.
+ sslGlobalParams.sslMode.store(SSLParams::SSLMode_allowSSL);
+ // Skipped unless tested explicitly, as we will not receive an FCV document from the donor
+ // in these unittests without (unsightly) intervention.
+ auto compFp = globalFailPointRegistry().find("skipComparingRecipientAndDonorFCV");
+ compFp->setMode(FailPoint::alwaysOn);
+
+ // Skip fetching retryable writes, as we will test this logic entirely in integration
+ // tests.
+ auto fetchRetryableWritesFp =
+ globalFailPointRegistry().find("skipFetchingRetryableWritesEntriesBeforeStartOpTime");
+ fetchRetryableWritesFp->setMode(FailPoint::alwaysOn);
+
+ // Skip fetching committed transactions, as we will test this logic entirely in integration
+ // tests.
+ auto fetchCommittedTransactionsFp =
+ globalFailPointRegistry().find("skipFetchingCommittedTransactions");
+ fetchCommittedTransactionsFp->setMode(FailPoint::alwaysOn);
+
+ // setup mock networking that will be use to mock the backup cursor traffic.
+ auto net = std::make_unique<executor::NetworkInterfaceMock>();
+ _net = net.get();
+
+ executor::ThreadPoolMock::Options dbThreadPoolOptions;
+ dbThreadPoolOptions.onCreateThread = []() { Client::initThread("FetchMockTaskExecutor"); };
+
+ auto pool = std::make_unique<executor::ThreadPoolMock>(_net, 1, dbThreadPoolOptions);
+ _threadpoolTaskExecutor =
+ std::make_shared<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _threadpoolTaskExecutor->startup();
+ }
+
+ void tearDown() override {
+ _threadpoolTaskExecutor->shutdown();
+ _threadpoolTaskExecutor->join();
+
+ auto authFp = globalFailPointRegistry().find("skipTenantMigrationRecipientAuth");
+ authFp->setMode(FailPoint::off);
+
+ // Unset the sslMode.
+ sslGlobalParams.sslMode.store(SSLParams::SSLMode_disabled);
+
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+
+ _registry->onShutdown();
+ _service = nullptr;
+
+ StorageInterface::set(getServiceContext(), {});
+
+ // Clearing the connection pool is necessary when doing tests which use the
+ // ReplicaSetMonitor. See src/mongo/dbtests/mock/mock_replica_set.h for details.
+ ScopedDbConnection::clearPool();
+ ReplicaSetMonitorProtocolTestUtil::resetRSMProtocol();
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ void stepDown() {
+ ASSERT_OK(ReplicationCoordinator::get(getServiceContext())
+ ->setFollowerMode(MemberState::RS_SECONDARY));
+ _registry->onStepDown();
+ }
+
+ void stepUp() {
+ auto opCtx = cc().makeOperationContext();
+ auto replCoord = ReplicationCoordinator::get(getServiceContext());
+
+ // Advance term
+ _term++;
+
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
+ ASSERT_OK(replCoord->updateTerm(opCtx.get(), _term));
+ replCoord->setMyLastAppliedOpTimeAndWallTime(
+ OpTimeAndWallTime(OpTime(Timestamp(1, 1), _term), Date_t()));
+
+ _registry->onStepUpComplete(opCtx.get(), _term);
+ }
+
+protected:
+ TenantMigrationRecipientServiceShardMergeTest()
+ : ServiceContextMongoDTest(Options{}.useMockClock(true)) {}
+
+ PrimaryOnlyServiceRegistry* _registry;
+ PrimaryOnlyService* _service;
+ long long _term = 0;
+
+ bool _collCreated = false;
+ size_t _numSecondaryIndexesCreated{0};
+ size_t _numDocsInserted{0};
+
+ const TenantMigrationPEMPayload kRecipientPEMPayload = [&] {
+ std::ifstream infile("jstests/libs/client.pem");
+ std::string buf((std::istreambuf_iterator<char>(infile)), std::istreambuf_iterator<char>());
+
+ auto swCertificateBlob =
+ ssl_util::findPEMBlob(buf, "CERTIFICATE"_sd, 0 /* position */, false /* allowEmpty */);
+ ASSERT_TRUE(swCertificateBlob.isOK());
+
+ auto swPrivateKeyBlob =
+ ssl_util::findPEMBlob(buf, "PRIVATE KEY"_sd, 0 /* position */, false /* allowEmpty */);
+ ASSERT_TRUE(swPrivateKeyBlob.isOK());
+
+ return TenantMigrationPEMPayload{swCertificateBlob.getValue().toString(),
+ swPrivateKeyBlob.getValue().toString()};
+ }();
+
+ void checkStateDocPersisted(OperationContext* opCtx,
+ const TenantMigrationRecipientService::Instance* instance) {
+ auto memoryStateDoc = getStateDoc(instance);
+ auto persistedStateDocWithStatus =
+ tenantMigrationRecipientEntryHelpers::getStateDoc(opCtx, memoryStateDoc.getId());
+ ASSERT_OK(persistedStateDocWithStatus.getStatus());
+ ASSERT_BSONOBJ_EQ(memoryStateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON());
+ }
+ void insertToNodes(MockReplicaSet* replSet,
+ const std::string& nss,
+ BSONObj obj,
+ const std::vector<HostAndPort>& hosts) {
+ for (const auto& host : hosts) {
+ replSet->getNode(host.toString())->insert(nss, obj);
+ }
+ }
+
+ void clearCollection(MockReplicaSet* replSet,
+ const std::string& nss,
+ const std::vector<HostAndPort>& hosts) {
+ for (const auto& host : hosts) {
+ replSet->getNode(host.toString())->remove(nss, BSONObj{} /*filter*/);
+ }
+ }
+
+ void insertTopOfOplog(MockReplicaSet* replSet,
+ const OpTime& topOfOplogOpTime,
+ const std::vector<HostAndPort> hosts = {}) {
+ const auto targetHosts = hosts.empty() ? replSet->getHosts() : hosts;
+ // The MockRemoteDBService does not actually implement the database, so to make our
+ // find work correctly we must make sure there's only one document to find.
+ clearCollection(replSet, NamespaceString::kRsOplogNamespace.ns(), targetHosts);
+ insertToNodes(replSet,
+ NamespaceString::kRsOplogNamespace.ns(),
+ makeOplogEntry(topOfOplogOpTime,
+ OpTypeEnum::kNoop,
+ {} /* namespace */,
+ boost::none /* uuid */,
+ BSONObj() /* o */,
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON(),
+ targetHosts);
+ }
+
+ // Accessors to class private members
+ DBClientConnection* getClient(const TenantMigrationRecipientService::Instance* instance) const {
+ return instance->_client.get();
+ }
+
+ const TenantMigrationRecipientDocument& getStateDoc(
+ const TenantMigrationRecipientService::Instance* instance) const {
+ return instance->_stateDoc;
+ }
+
+ sdam::MockTopologyManager* getTopologyManager() {
+ return _rsmMonitor.getTopologyManager();
+ }
+
+ ClockSource* clock() {
+ return &_clkSource;
+ }
+
+ executor::NetworkInterfaceMock* getNet() {
+ return _net;
+ }
+
+ executor::NetworkInterfaceMock* _net = nullptr;
+ std::shared_ptr<executor::TaskExecutor> _threadpoolTaskExecutor;
+
+ void setInstanceBackupCursorFetcherExecutor(
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance) {
+ instance->setBackupCursorFetcherExecutor_forTest(_threadpoolTaskExecutor);
+ }
+
+private:
+ ClockSourceMock _clkSource;
+
+ unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{
+ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)};
+ unittest::MinimumLoggedSeverityGuard _tenantMigrationSeverityGuard{
+ logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)};
+
+ StreamableReplicaSetMonitorForTesting _rsmMonitor;
+ RAIIServerParameterControllerForTest _findHostTimeout{"defaultFindReplicaSetHostTimeoutMS", 10};
+};
+
+#ifdef MONGO_CONFIG_SSL
+
+void waitForReadyRequest(executor::NetworkInterfaceMock* net) {
+ while (!net->hasReadyRequests()) {
+ net->advanceTime(net->now() + Milliseconds{1});
+ }
+}
+
+BSONObj createEmptyCursorResponse(const NamespaceString& nss, CursorId backupCursorId) {
+ return BSON(
+ "cursor" << BSON("nextBatch" << BSONArray() << "id" << backupCursorId << "ns" << nss.ns())
+ << "ok" << 1.0);
+}
+
+BSONObj createBackupCursorResponse(const Timestamp& checkpointTimestamp,
+ const NamespaceString& nss,
+ CursorId backupCursorId) {
+ const UUID backupId =
+ UUID(uassertStatusOK(UUID::parse(("2b068e03-5961-4d8e-b47a-d1c8cbd4b835"))));
+ StringData remoteDbPath = "/data/db/job0/mongorunner/test-1";
+ BSONObjBuilder cursor;
+ BSONArrayBuilder batch(cursor.subarrayStart("firstBatch"));
+ auto metaData = BSON("backupId" << backupId << "checkpointTimestamp" << checkpointTimestamp
+ << "dbpath" << remoteDbPath);
+ batch.append(BSON("metadata" << metaData));
+
+ batch.done();
+ cursor.append("id", backupCursorId);
+ cursor.append("ns", nss.ns());
+ BSONObjBuilder backupCursorReply;
+ backupCursorReply.append("cursor", cursor.obj());
+ backupCursorReply.append("ok", 1.0);
+ return backupCursorReply.obj();
+}
+
+void sendReponseToExpectedRequest(const BSONObj& backupCursorResponse,
+ const std::string& expectedRequestFieldName,
+ executor::NetworkInterfaceMock* net) {
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ ASSERT_EQUALS(expectedRequestFieldName, request.cmdObj.firstElementFieldNameStringData());
+ net->scheduleSuccessfulResponse(
+ noi, executor::RemoteCommandResponse(backupCursorResponse, Milliseconds()));
+ net->runReadyNetworkOperations();
+}
+
+BSONObj createServerAggregateReply() {
+ return CursorResponse(
+ NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
+ 0 /* cursorId */,
+ {BSON("byteOffset" << 0 << "endOfFile" << true << "data"
+ << BSONBinData(0, 0, BinDataGeneral))})
+ .toBSONAsInitialResponse();
+}
+
+TEST_F(TenantMigrationRecipientServiceShardMergeTest, OpenBackupCursorSuccessfully) {
+ stopFailPointEnableBlock fp("fpBeforeAdvancingStableTimestamp");
+ const UUID migrationUUID = UUID::gen();
+ const CursorId backupCursorId = 12345;
+ const NamespaceString aggregateNs = NamespaceString("admin.$cmd.aggregate");
+
+ auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion");
+ auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock()));
+ insertTopOfOplog(&replSet, OpTime(Timestamp(5, 1), 1));
+
+ // Mock the aggregate response from the donor.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("aggregate", createServerAggregateReply());
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge);
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn);
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ setInstanceBackupCursorFetcherExecutor(instance);
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ fp->setMode(FailPoint::off);
+ }
+
+ {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ waitForReadyRequest(net);
+ // Mocking the aggregate command network response of the backup cursor in order to have
+ // data to parse.
+ sendReponseToExpectedRequest(createBackupCursorResponse(kDefaultStartMigrationTimestamp,
+ aggregateNs,
+ backupCursorId),
+ "aggregate",
+ net);
+ sendReponseToExpectedRequest(
+ createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net);
+ sendReponseToExpectedRequest(
+ createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net);
+ }
+
+ taskFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ checkStateDocPersisted(opCtx.get(), instance.get());
+
+ taskFp->setMode(FailPoint::off);
+
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceShardMergeTest, OpenBackupCursorAndRetriesDueToTs) {
+ stopFailPointEnableBlock fp("fpBeforeAdvancingStableTimestamp");
+ const UUID migrationUUID = UUID::gen();
+ const CursorId backupCursorId = 12345;
+ const NamespaceString aggregateNs = NamespaceString("admin.$cmd.aggregate");
+
+ auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion");
+ auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock()));
+ insertTopOfOplog(&replSet, OpTime(Timestamp(5, 1), 1));
+
+ // Mock the aggregate response from the donor.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("aggregate", createServerAggregateReply());
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge);
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn);
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ setInstanceBackupCursorFetcherExecutor(instance);
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ fp->setMode(FailPoint::off);
+ }
+
+ {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ waitForReadyRequest(net);
+
+ // Mocking the aggregate command network response of the backup cursor in order to have data
+ // to parse. In this case we pass a timestamp that is inferior to the
+ // startMigrationTimestamp which will cause a retry. We then provide a correct timestamp in
+ // the next response and succeed.
+ sendReponseToExpectedRequest(
+ createBackupCursorResponse(Timestamp(0, 0), aggregateNs, backupCursorId),
+ "aggregate",
+ net);
+ sendReponseToExpectedRequest(createBackupCursorResponse(kDefaultStartMigrationTimestamp,
+ aggregateNs,
+ backupCursorId),
+ "killCursors",
+ net);
+ sendReponseToExpectedRequest(
+ createEmptyCursorResponse(aggregateNs, backupCursorId), "killCursors", net);
+ sendReponseToExpectedRequest(createBackupCursorResponse(kDefaultStartMigrationTimestamp,
+ aggregateNs,
+ backupCursorId),
+ "aggregate",
+ net);
+ sendReponseToExpectedRequest(
+ createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net);
+ sendReponseToExpectedRequest(
+ createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net);
+ }
+
+ taskFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ checkStateDocPersisted(opCtx.get(), instance.get());
+
+ taskFp->setMode(FailPoint::off);
+
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow());
+}
+
+#endif
+} // namespace repl
+} // namespace mongo