diff options
author | mathisbessamdb <mathis.bessa@mongodb.com> | 2022-11-01 19:07:15 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-01 19:42:23 +0000 |
commit | 6f8af2868adc224f762555b8132992cd421ee6fb (patch) | |
tree | c0b25491a132c43779d0aa7d6494d77ff4827e38 /src/mongo/db/repl | |
parent | 774dec4733a7f3066aace01e87842f085275b034 (diff) | |
download | mongo-6f8af2868adc224f762555b8132992cd421ee6fb.tar.gz |
SERVER-69551 Shard Merge recipient should retry opening the backup cursor if backupCursorCheckpointTimestamp is < startMigrationDonorTimestamp
Diffstat (limited to 'src/mongo/db/repl')
4 files changed, 685 insertions, 43 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c09a9b4e2e3..ad8be47af0a 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1718,6 +1718,7 @@ if wiredtiger: 'tenant_migration_recipient_access_blocker_test.cpp', 'tenant_migration_recipient_entry_helpers_test.cpp', 'tenant_migration_recipient_service_test.cpp', + 'tenant_migration_recipient_service_shard_merge_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/bson/mutable/mutable_bson', diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 57ec62819a1..c6c4d4d2958 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -76,6 +76,7 @@ #include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/db/write_concern_options.h" +#include "mongo/executor/task_executor.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" @@ -93,6 +94,8 @@ const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex"; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; constexpr int kBackupCursorFileFetcherRetryAttempts = 10; +constexpr int kCheckpointTsBackupCursorErrorCode = 6929900; +constexpr int kCloseCursorBeforeOpenErrorCode = 50886; NamespaceString getOplogBufferNs(const UUID& migrationUUID) { return NamespaceString(NamespaceString::kConfigDb, @@ -979,25 +982,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor() nullptr); request.sslMode = _donorUri.getSSLMode(); - auto scheduleResult = - (_recipientService->getInstanceCleanupExecutor()) - ->scheduleRemoteCommand( - request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - if (!args.response.isOK()) { - LOGV2_WARNING(6113005, - "killCursors command task failed", - "error"_attr = redact(args.response.status)); - return; - } - auto status = getStatusFromCommandResult(args.response.data); - if (status.isOK()) { - LOGV2_INFO(6113415, "Killed backup cursor"); - } else { - LOGV2_WARNING(6113006, - "killCursors command failed", - "error"_attr = redact(status)); - } - }); + const auto scheduleResult = _scheduleKillBackupCursorWithLock( + lk, _recipientService->getInstanceCleanupExecutor()); if (!scheduleResult.isOK()) { LOGV2_WARNING(6113004, "Failed to run killCursors command on backup cursor", @@ -1009,13 +995,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor() SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( const CancellationToken& token) { - stdx::lock_guard lk(_mutex); - LOGV2_DEBUG(6113000, - 1, - "Trying to open backup cursor on donor primary", - "migrationId"_attr = _stateDoc.getId(), - "donorConnectionString"_attr = _stateDoc.getDonorConnectionString()); - const auto cmdObj = [] { + + const auto aggregateCommandRequestObj = [] { AggregateCommandRequest aggRequest( NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), {BSON("$backupCursor" << BSONObj())}); @@ -1024,11 +1005,18 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( return aggRequest.toBSON(BSONObj()); }(); - auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp(); + stdx::lock_guard lk(_mutex); + LOGV2_DEBUG(6113000, + 1, + "Trying to open backup cursor on donor primary", + "migrationId"_attr = _stateDoc.getId(), + "donorConnectionString"_attr = _stateDoc.getDonorConnectionString()); + + const auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp(); auto fetchStatus = std::make_shared<boost::optional<Status>>(); auto uniqueMetadataInfo = std::make_unique<boost::optional<shard_merge_utils::MetadataInfo>>(); - auto fetcherCallback = + const auto fetcherCallback = [ this, self = shared_from_this(), @@ -1043,8 +1031,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( uassertStatusOK(dataStatus); uassert(ErrorCodes::CallbackCanceled, "backup cursor interrupted", !token.isCanceled()); - auto uniqueOpCtx = cc().makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); + const auto uniqueOpCtx = cc().makeOperationContext(); + const auto opCtx = uniqueOpCtx.get(); const auto& data = dataStatus.getValue(); for (const BSONObj& doc : data.documents) { @@ -1059,14 +1047,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( "backupCursorId"_attr = data.cursorId, "backupCursorCheckpointTimestamp"_attr = checkpointTimestamp); - // This ensures that the recipient won’t receive any 2 phase index build donor - // oplog entries during the migration. We also have a check in the tenant oplog - // applier to detect such oplog entries. Adding a check here helps us to detect - // the problem earlier. - uassert(6929900, - "backupCursorCheckpointTimestamp should be greater than or equal to " - "startMigrationDonorTimestamp", - checkpointTimestamp >= startMigrationDonorTimestamp); { stdx::lock_guard lk(_mutex); stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); @@ -1075,6 +1055,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( BackupCursorInfo{data.cursorId, data.nss, checkpointTimestamp}); } + // This ensures that the recipient won’t receive any 2 phase index build donor + // oplog entries during the migration. We also have a check in the tenant oplog + // applier to detect such oplog entries. Adding a check here helps us to detect + // the problem earlier. + uassert(kCheckpointTsBackupCursorErrorCode, + "backupCursorCheckpointTimestamp should be greater than or equal to " + "startMigrationDonorTimestamp", + checkpointTimestamp >= startMigrationDonorTimestamp); + invariant(metadataInfoPtr && !*metadataInfoPtr); (*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo( getMigrationUUID(), _client->getServerAddress(), metadata); @@ -1133,10 +1122,10 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( }; _donorFilenameBackupCursorFileFetcher = std::make_unique<Fetcher>( - (**_scopedExecutor).get(), + _backupCursorExecutor.get(), _client->getServerHostAndPort(), NamespaceString::kAdminDb.toString(), - cmdObj, + aggregateCommandRequestObj, fetcherCallback, ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(), executor::RemoteCommandRequest::kNoTimeout, /* aggregateTimeout */ @@ -1160,6 +1149,35 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor( .semi(); } +StatusWith<executor::TaskExecutor::CallbackHandle> +TenantMigrationRecipientService::Instance::_scheduleKillBackupCursorWithLock( + WithLock lk, std::shared_ptr<executor::TaskExecutor> executor) { + auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk); + executor::RemoteCommandRequest killCursorsRequest( + _client->getServerHostAndPort(), + donorBackupCursorInfo.nss.db().toString(), + BSON("killCursors" << donorBackupCursorInfo.nss.coll().toString() << "cursors" + << BSON_ARRAY(donorBackupCursorInfo.cursorId)), + nullptr); + killCursorsRequest.sslMode = _donorUri.getSSLMode(); + + return executor->scheduleRemoteCommand( + killCursorsRequest, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + if (!args.response.isOK()) { + LOGV2_WARNING(6113005, + "killCursors command task failed", + "error"_attr = redact(args.response.status)); + return; + } + auto status = getStatusFromCommandResult(args.response.data); + if (status.isOK()) { + LOGV2_INFO(6113415, "Killed backup cursor"); + } else { + LOGV2_WARNING(6113006, "killCursors command failed", "error"_attr = redact(status)); + } + }); +} + SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWithRetry( const CancellationToken& token) { return AsyncTry([this, self = shared_from_this(), token] { return _openBackupCursor(token); }) @@ -1169,8 +1187,20 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWit "Retrying backup cursor creation after transient error", "migrationId"_attr = getMigrationUUID(), "status"_attr = status); - // A checkpoint took place while opening a backup cursor. We - // should retry and *not* cancel migration. + + return false; + } else if (status.code() == kCheckpointTsBackupCursorErrorCode || + status.code() == kCloseCursorBeforeOpenErrorCode) { + LOGV2_INFO(6955100, + "Closing backup cursor and retrying after getting retryable error", + "migrationId"_attr = getMigrationUUID(), + "status"_attr = status); + + stdx::lock_guard lk(_mutex); + const auto scheduleResult = + _scheduleKillBackupCursorWithLock(lk, _backupCursorExecutor); + uassertStatusOK(scheduleResult); + return false; } @@ -1199,7 +1229,7 @@ void TenantMigrationRecipientService::Instance::_keepBackupCursorAlive( auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk); _backupCursorKeepAliveFuture = shard_merge_utils::keepBackupCursorAlive(_backupCursorKeepAliveCancellation, - **_scopedExecutor, + _backupCursorExecutor, _client->getServerHostAndPort(), donorBackupCursorInfo.cursorId, donorBackupCursorInfo.nss); @@ -2703,6 +2733,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { _scopedExecutor = executor; + _backupCursorExecutor = **_scopedExecutor; auto scopedOutstandingMigrationCounter = TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingReceivingCount(); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 22de9a00fd1..a71f29139d9 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -213,6 +213,15 @@ public: private: friend class TenantMigrationRecipientServiceTest; + friend class TenantMigrationRecipientServiceShardMergeTest; + + /** + * Only used for testing. Allows setting a custom task executor for backup cursor fetcher. + */ + void setBackupCursorFetcherExecutor_forTest( + std::shared_ptr<executor::TaskExecutor> taskExecutor) { + _backupCursorExecutor = taskExecutor; + } const NamespaceString _stateDocumentsNS = NamespaceString::kTenantMigrationRecipientsNamespace; @@ -605,6 +614,13 @@ public: SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingShardMergeProtocol( const CancellationToken& token); + /* + * Send the killBackupCursor command to the remote in order to close the backup cursor + * connection on the donor. + */ + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleKillBackupCursorWithLock( + WithLock lk, std::shared_ptr<executor::TaskExecutor> executor); + mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex"); // All member variables are labeled with one of the following codes indicating the @@ -618,6 +634,7 @@ public: ServiceContext* const _serviceContext; const TenantMigrationRecipientService* const _recipientService; // (R) (not owned) std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // (M) + std::shared_ptr<executor::TaskExecutor> _backupCursorExecutor; // (M) TenantMigrationRecipientDocument _stateDoc; // (M) // This data is provided in the initial state doc and never changes. We keep copies to diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp new file mode 100644 index 00000000000..be6f0bbc799 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp @@ -0,0 +1,593 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <fstream> +#include <memory> + +#include "mongo/client/connpool.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/client/replica_set_monitor_protocol_test_util.h" +#include "mongo/client/streamable_replica_set_monitor_for_testing.h" +#include "mongo/config.h" +#include "mongo/db/client.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/feature_compatibility_version_document_gen.h" +#include "mongo/db/op_observer/op_observer_impl.h" +#include "mongo/db/op_observer/op_observer_registry.h" +#include "mongo/db/op_observer/oplog_writer_impl.h" +#include "mongo/db/repl/drop_pending_collection_reaper.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_buffer_collection.h" +#include "mongo/db/repl/oplog_fetcher_mock.h" +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/repl/primary_only_service_op_observer.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session/session_txn_record_gen.h" +#include "mongo/db/storage/backup_cursor_hooks.h" +#include "mongo/dbtests/mock/mock_conn_registry.h" +#include "mongo/dbtests/mock/mock_replica_set.h" +#include "mongo/executor/mock_network_fixture.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_mock.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/transport/transport_layer_manager.h" +#include "mongo/transport/transport_layer_mock.h" +#include "mongo/unittest/log_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/future.h" +#include "mongo/util/net/ssl_util.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + + +namespace mongo { +namespace repl { + +namespace { +constexpr std::int32_t stopFailPointErrorCode = 4880402; +const Timestamp kDefaultStartMigrationTimestamp(1, 1); + +OplogEntry makeOplogEntry(OpTime opTime, + OpTypeEnum opType, + NamespaceString nss, + const boost::optional<UUID>& uuid, + BSONObj o, + boost::optional<BSONObj> o2) { + return {DurableOplogEntry(opTime, // optime + opType, // opType + nss, // namespace + uuid, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + o, // o + o2, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + {}, // statement ids + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none, // _id + boost::none)}; // needsRetryImage +} + +} // namespace + +class TenantMigrationRecipientServiceShardMergeTest : public ServiceContextMongoDTest { +public: + class stopFailPointEnableBlock : public FailPointEnableBlock { + public: + explicit stopFailPointEnableBlock(StringData failPointName, + std::int32_t error = stopFailPointErrorCode) + : FailPointEnableBlock(failPointName, + BSON("action" + << "stop" + << "stopErrorCode" << error)) {} + }; + + void setUp() override { + ServiceContextMongoDTest::setUp(); + auto serviceContext = getServiceContext(); + + // Fake replSet just for creating consistent URI for monitor + MockReplicaSet replSet("donorSet", 1, true /* hasPrimary */, true /* dollarPrefixHosts */); + _rsmMonitor.setup(replSet.getURI()); + + ConnectionString::setConnectionHook(mongo::MockConnRegistry::get()->getConnStrHook()); + + WaitForMajorityService::get(serviceContext).startup(serviceContext); + + // Automatically mark the state doc garbage collectable after data sync completion. + globalFailPointRegistry() + .find("autoRecipientForgetMigration") + ->setMode(FailPoint::alwaysOn); + + { + auto opCtx = cc().makeOperationContext(); + auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext); + ReplicationCoordinator::set(serviceContext, std::move(replCoord)); + + repl::createOplog(opCtx.get()); + { + Lock::GlobalWrite lk(opCtx.get()); + OldClientContext ctx(opCtx.get(), NamespaceString::kRsOplogNamespace); + tenant_migration_util::createOplogViewForTenantMigrations(opCtx.get(), ctx.db()); + } + + // Need real (non-mock) storage for the oplog buffer. + StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>()); + + // The DropPendingCollectionReaper is required to drop the oplog buffer collection. + repl::DropPendingCollectionReaper::set( + serviceContext, + std::make_unique<repl::DropPendingCollectionReaper>( + StorageInterface::get(serviceContext))); + + // Set up OpObserver so that repl::logOp() will store the oplog entry's optime in + // ReplClientInfo. + OpObserverRegistry* opObserverRegistry = + dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver()); + opObserverRegistry->addObserver( + std::make_unique<OpObserverImpl>(std::make_unique<OplogWriterImpl>())); + opObserverRegistry->addObserver( + std::make_unique<PrimaryOnlyServiceOpObserver>(serviceContext)); + + _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); + std::unique_ptr<TenantMigrationRecipientService> service = + std::make_unique<TenantMigrationRecipientService>(getServiceContext()); + _registry->registerService(std::move(service)); + _registry->onStartup(opCtx.get()); + } + stepUp(); + + _service = _registry->lookupServiceByName( + TenantMigrationRecipientService::kTenantMigrationRecipientServiceName); + ASSERT(_service); + + // MockReplicaSet uses custom connection string which does not support auth. + auto authFp = globalFailPointRegistry().find("skipTenantMigrationRecipientAuth"); + authFp->setMode(FailPoint::alwaysOn); + + // Set the sslMode to allowSSL to avoid validation error. + sslGlobalParams.sslMode.store(SSLParams::SSLMode_allowSSL); + // Skipped unless tested explicitly, as we will not receive an FCV document from the donor + // in these unittests without (unsightly) intervention. + auto compFp = globalFailPointRegistry().find("skipComparingRecipientAndDonorFCV"); + compFp->setMode(FailPoint::alwaysOn); + + // Skip fetching retryable writes, as we will test this logic entirely in integration + // tests. + auto fetchRetryableWritesFp = + globalFailPointRegistry().find("skipFetchingRetryableWritesEntriesBeforeStartOpTime"); + fetchRetryableWritesFp->setMode(FailPoint::alwaysOn); + + // Skip fetching committed transactions, as we will test this logic entirely in integration + // tests. + auto fetchCommittedTransactionsFp = + globalFailPointRegistry().find("skipFetchingCommittedTransactions"); + fetchCommittedTransactionsFp->setMode(FailPoint::alwaysOn); + + // setup mock networking that will be use to mock the backup cursor traffic. + auto net = std::make_unique<executor::NetworkInterfaceMock>(); + _net = net.get(); + + executor::ThreadPoolMock::Options dbThreadPoolOptions; + dbThreadPoolOptions.onCreateThread = []() { Client::initThread("FetchMockTaskExecutor"); }; + + auto pool = std::make_unique<executor::ThreadPoolMock>(_net, 1, dbThreadPoolOptions); + _threadpoolTaskExecutor = + std::make_shared<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _threadpoolTaskExecutor->startup(); + } + + void tearDown() override { + _threadpoolTaskExecutor->shutdown(); + _threadpoolTaskExecutor->join(); + + auto authFp = globalFailPointRegistry().find("skipTenantMigrationRecipientAuth"); + authFp->setMode(FailPoint::off); + + // Unset the sslMode. + sslGlobalParams.sslMode.store(SSLParams::SSLMode_disabled); + + WaitForMajorityService::get(getServiceContext()).shutDown(); + + _registry->onShutdown(); + _service = nullptr; + + StorageInterface::set(getServiceContext(), {}); + + // Clearing the connection pool is necessary when doing tests which use the + // ReplicaSetMonitor. See src/mongo/dbtests/mock/mock_replica_set.h for details. + ScopedDbConnection::clearPool(); + ReplicaSetMonitorProtocolTestUtil::resetRSMProtocol(); + ServiceContextMongoDTest::tearDown(); + } + + void stepDown() { + ASSERT_OK(ReplicationCoordinator::get(getServiceContext()) + ->setFollowerMode(MemberState::RS_SECONDARY)); + _registry->onStepDown(); + } + + void stepUp() { + auto opCtx = cc().makeOperationContext(); + auto replCoord = ReplicationCoordinator::get(getServiceContext()); + + // Advance term + _term++; + + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY)); + ASSERT_OK(replCoord->updateTerm(opCtx.get(), _term)); + replCoord->setMyLastAppliedOpTimeAndWallTime( + OpTimeAndWallTime(OpTime(Timestamp(1, 1), _term), Date_t())); + + _registry->onStepUpComplete(opCtx.get(), _term); + } + +protected: + TenantMigrationRecipientServiceShardMergeTest() + : ServiceContextMongoDTest(Options{}.useMockClock(true)) {} + + PrimaryOnlyServiceRegistry* _registry; + PrimaryOnlyService* _service; + long long _term = 0; + + bool _collCreated = false; + size_t _numSecondaryIndexesCreated{0}; + size_t _numDocsInserted{0}; + + const TenantMigrationPEMPayload kRecipientPEMPayload = [&] { + std::ifstream infile("jstests/libs/client.pem"); + std::string buf((std::istreambuf_iterator<char>(infile)), std::istreambuf_iterator<char>()); + + auto swCertificateBlob = + ssl_util::findPEMBlob(buf, "CERTIFICATE"_sd, 0 /* position */, false /* allowEmpty */); + ASSERT_TRUE(swCertificateBlob.isOK()); + + auto swPrivateKeyBlob = + ssl_util::findPEMBlob(buf, "PRIVATE KEY"_sd, 0 /* position */, false /* allowEmpty */); + ASSERT_TRUE(swPrivateKeyBlob.isOK()); + + return TenantMigrationPEMPayload{swCertificateBlob.getValue().toString(), + swPrivateKeyBlob.getValue().toString()}; + }(); + + void checkStateDocPersisted(OperationContext* opCtx, + const TenantMigrationRecipientService::Instance* instance) { + auto memoryStateDoc = getStateDoc(instance); + auto persistedStateDocWithStatus = + tenantMigrationRecipientEntryHelpers::getStateDoc(opCtx, memoryStateDoc.getId()); + ASSERT_OK(persistedStateDocWithStatus.getStatus()); + ASSERT_BSONOBJ_EQ(memoryStateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON()); + } + void insertToNodes(MockReplicaSet* replSet, + const std::string& nss, + BSONObj obj, + const std::vector<HostAndPort>& hosts) { + for (const auto& host : hosts) { + replSet->getNode(host.toString())->insert(nss, obj); + } + } + + void clearCollection(MockReplicaSet* replSet, + const std::string& nss, + const std::vector<HostAndPort>& hosts) { + for (const auto& host : hosts) { + replSet->getNode(host.toString())->remove(nss, BSONObj{} /*filter*/); + } + } + + void insertTopOfOplog(MockReplicaSet* replSet, + const OpTime& topOfOplogOpTime, + const std::vector<HostAndPort> hosts = {}) { + const auto targetHosts = hosts.empty() ? replSet->getHosts() : hosts; + // The MockRemoteDBService does not actually implement the database, so to make our + // find work correctly we must make sure there's only one document to find. + clearCollection(replSet, NamespaceString::kRsOplogNamespace.ns(), targetHosts); + insertToNodes(replSet, + NamespaceString::kRsOplogNamespace.ns(), + makeOplogEntry(topOfOplogOpTime, + OpTypeEnum::kNoop, + {} /* namespace */, + boost::none /* uuid */, + BSONObj() /* o */, + boost::none /* o2 */) + .getEntry() + .toBSON(), + targetHosts); + } + + // Accessors to class private members + DBClientConnection* getClient(const TenantMigrationRecipientService::Instance* instance) const { + return instance->_client.get(); + } + + const TenantMigrationRecipientDocument& getStateDoc( + const TenantMigrationRecipientService::Instance* instance) const { + return instance->_stateDoc; + } + + sdam::MockTopologyManager* getTopologyManager() { + return _rsmMonitor.getTopologyManager(); + } + + ClockSource* clock() { + return &_clkSource; + } + + executor::NetworkInterfaceMock* getNet() { + return _net; + } + + executor::NetworkInterfaceMock* _net = nullptr; + std::shared_ptr<executor::TaskExecutor> _threadpoolTaskExecutor; + + void setInstanceBackupCursorFetcherExecutor( + std::shared_ptr<TenantMigrationRecipientService::Instance> instance) { + instance->setBackupCursorFetcherExecutor_forTest(_threadpoolTaskExecutor); + } + +private: + ClockSourceMock _clkSource; + + unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{ + logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)}; + unittest::MinimumLoggedSeverityGuard _tenantMigrationSeverityGuard{ + logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)}; + + StreamableReplicaSetMonitorForTesting _rsmMonitor; + RAIIServerParameterControllerForTest _findHostTimeout{"defaultFindReplicaSetHostTimeoutMS", 10}; +}; + +#ifdef MONGO_CONFIG_SSL + +void waitForReadyRequest(executor::NetworkInterfaceMock* net) { + while (!net->hasReadyRequests()) { + net->advanceTime(net->now() + Milliseconds{1}); + } +} + +BSONObj createEmptyCursorResponse(const NamespaceString& nss, CursorId backupCursorId) { + return BSON( + "cursor" << BSON("nextBatch" << BSONArray() << "id" << backupCursorId << "ns" << nss.ns()) + << "ok" << 1.0); +} + +BSONObj createBackupCursorResponse(const Timestamp& checkpointTimestamp, + const NamespaceString& nss, + CursorId backupCursorId) { + const UUID backupId = + UUID(uassertStatusOK(UUID::parse(("2b068e03-5961-4d8e-b47a-d1c8cbd4b835")))); + StringData remoteDbPath = "/data/db/job0/mongorunner/test-1"; + BSONObjBuilder cursor; + BSONArrayBuilder batch(cursor.subarrayStart("firstBatch")); + auto metaData = BSON("backupId" << backupId << "checkpointTimestamp" << checkpointTimestamp + << "dbpath" << remoteDbPath); + batch.append(BSON("metadata" << metaData)); + + batch.done(); + cursor.append("id", backupCursorId); + cursor.append("ns", nss.ns()); + BSONObjBuilder backupCursorReply; + backupCursorReply.append("cursor", cursor.obj()); + backupCursorReply.append("ok", 1.0); + return backupCursorReply.obj(); +} + +void sendReponseToExpectedRequest(const BSONObj& backupCursorResponse, + const std::string& expectedRequestFieldName, + executor::NetworkInterfaceMock* net) { + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + ASSERT_EQUALS(expectedRequestFieldName, request.cmdObj.firstElementFieldNameStringData()); + net->scheduleSuccessfulResponse( + noi, executor::RemoteCommandResponse(backupCursorResponse, Milliseconds())); + net->runReadyNetworkOperations(); +} + +BSONObj createServerAggregateReply() { + return CursorResponse( + NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), + 0 /* cursorId */, + {BSON("byteOffset" << 0 << "endOfFile" << true << "data" + << BSONBinData(0, 0, BinDataGeneral))}) + .toBSONAsInitialResponse(); +} + +TEST_F(TenantMigrationRecipientServiceShardMergeTest, OpenBackupCursorSuccessfully) { + stopFailPointEnableBlock fp("fpBeforeAdvancingStableTimestamp"); + const UUID migrationUUID = UUID::gen(); + const CursorId backupCursorId = 12345; + const NamespaceString aggregateNs = NamespaceString("admin.$cmd.aggregate"); + + auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); + auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); + insertTopOfOplog(&replSet, OpTime(Timestamp(5, 1), 1)); + + // Mock the aggregate response from the donor. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("aggregate", createServerAggregateReply()); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn); + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + fp->waitForTimesEntered(initialTimesEntered + 1); + setInstanceBackupCursorFetcherExecutor(instance); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + fp->setMode(FailPoint::off); + } + + { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + waitForReadyRequest(net); + // Mocking the aggregate command network response of the backup cursor in order to have + // data to parse. + sendReponseToExpectedRequest(createBackupCursorResponse(kDefaultStartMigrationTimestamp, + aggregateNs, + backupCursorId), + "aggregate", + net); + sendReponseToExpectedRequest( + createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net); + sendReponseToExpectedRequest( + createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net); + } + + taskFp->waitForTimesEntered(initialTimesEntered + 1); + + checkStateDocPersisted(opCtx.get(), instance.get()); + + taskFp->setMode(FailPoint::off); + + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceShardMergeTest, OpenBackupCursorAndRetriesDueToTs) { + stopFailPointEnableBlock fp("fpBeforeAdvancingStableTimestamp"); + const UUID migrationUUID = UUID::gen(); + const CursorId backupCursorId = 12345; + const NamespaceString aggregateNs = NamespaceString("admin.$cmd.aggregate"); + + auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); + auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); + insertTopOfOplog(&replSet, OpTime(Timestamp(5, 1), 1)); + + // Mock the aggregate response from the donor. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("aggregate", createServerAggregateReply()); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn); + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + fp->waitForTimesEntered(initialTimesEntered + 1); + setInstanceBackupCursorFetcherExecutor(instance); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + fp->setMode(FailPoint::off); + } + + { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + waitForReadyRequest(net); + + // Mocking the aggregate command network response of the backup cursor in order to have data + // to parse. In this case we pass a timestamp that is inferior to the + // startMigrationTimestamp which will cause a retry. We then provide a correct timestamp in + // the next response and succeed. + sendReponseToExpectedRequest( + createBackupCursorResponse(Timestamp(0, 0), aggregateNs, backupCursorId), + "aggregate", + net); + sendReponseToExpectedRequest(createBackupCursorResponse(kDefaultStartMigrationTimestamp, + aggregateNs, + backupCursorId), + "killCursors", + net); + sendReponseToExpectedRequest( + createEmptyCursorResponse(aggregateNs, backupCursorId), "killCursors", net); + sendReponseToExpectedRequest(createBackupCursorResponse(kDefaultStartMigrationTimestamp, + aggregateNs, + backupCursorId), + "aggregate", + net); + sendReponseToExpectedRequest( + createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net); + sendReponseToExpectedRequest( + createEmptyCursorResponse(aggregateNs, backupCursorId), "getMore", net); + } + + taskFp->waitForTimesEntered(initialTimesEntered + 1); + + checkStateDocPersisted(opCtx.get(), instance.get()); + + taskFp->setMode(FailPoint::off); + + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow()); +} + +#endif +} // namespace repl +} // namespace mongo |