diff options
-rw-r--r-- | src/mongo/db/repl/abstract_async_component.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_async_component.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 218 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 3 |
8 files changed, 330 insertions, 62 deletions
diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp index 77b086af97e..fb0995047ba 100644 --- a/src/mongo/db/repl/abstract_async_component.cpp +++ b/src/mongo/db/repl/abstract_async_component.cpp @@ -43,6 +43,19 @@ AbstractAsyncComponent::AbstractAsyncComponent(executor::TaskExecutor* executor, uassert(ErrorCodes::BadValue, "task executor cannot be null", executor); } +AbstractAsyncComponent::~AbstractAsyncComponent() { + // This is required because it's illegal to destroy a promise that has not been set, + // which can happen if a component is created and then destroyed without being started. + // (e.g. if system shutdown happens between creation and start) + // Note we _cannot_ use the mutex here; it belongs to the subclass, which has already + // been destroyed. + if (_state != State::kComplete) { + _statePromise.setError( + {ErrorCodes::CallbackCanceled, + str::stream() << "Component " << _componentName << " destroyed when not finished"}); + } +} + executor::TaskExecutor* AbstractAsyncComponent::_getExecutor() { return _executor; } @@ -116,8 +129,19 @@ void AbstractAsyncComponent::shutdown() noexcept { } void AbstractAsyncComponent::join() noexcept { + // Right now the only possible error value is due to destruction. + joinAsync().getNoThrow().ignore(); +} + +SemiFuture<void> AbstractAsyncComponent::joinAsync() noexcept { stdx::unique_lock<Latch> lk(*_getMutex()); - _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); }); + // If the component has never been started or is already shut down, just return a ready future. + if (!_isActive_inlock()) { + return SemiFuture<void>::makeReady(); + } + + // Otherwise, we are running and the future will be set when shutdown is complete. + return _statePromise.getFuture().semi(); } AbstractAsyncComponent::State AbstractAsyncComponent::getState_forTest() noexcept { @@ -133,7 +157,7 @@ void AbstractAsyncComponent::_transitionToComplete() noexcept { void AbstractAsyncComponent::_transitionToComplete_inlock() noexcept { invariant(State::kComplete != _state); _state = State::kComplete; - _stateCondition.notify_all(); + _statePromise.emplaceValue(); } Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus( diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h index 5b0e6426900..084d21b9328 100644 --- a/src/mongo/db/repl/abstract_async_component.h +++ b/src/mongo/db/repl/abstract_async_component.h @@ -58,7 +58,7 @@ class AbstractAsyncComponent { public: AbstractAsyncComponent(executor::TaskExecutor* executor, const std::string& componentName); - virtual ~AbstractAsyncComponent() = default; + virtual ~AbstractAsyncComponent(); /** * Returns true if this component is currently running or in the process of shutting down. @@ -85,6 +85,11 @@ public: void join() noexcept; /** + * Returns a future that will complete when the component becomes inactive. + */ + SemiFuture<void> joinAsync() noexcept; + + /** * State transitions: * PreStart --> Running --> ShuttingDown --> Complete * It is possible to skip intermediate states. For example, calling shutdown() when the @@ -216,6 +221,8 @@ private: // (R) Read-only in concurrent operation; no synchronization required. // (S) Self-synchronizing; access in any way from any context. // (M) Reads and writes guarded by mutex returned by _getMutex(). + // (W) May read and call const methods without synchronization, + // must hold mutex to write and call non-const methods. // Task executor used to schedule tasks and remote commands. executor::TaskExecutor* const _executor; // (R) @@ -228,7 +235,7 @@ private: State _state = State::kPreStart; // (M) // Used by _transitionToComplete_inlock() to signal changes in '_state'. - mutable stdx::condition_variable _stateCondition; // (S) + mutable SharedPromise<void> _statePromise; // (W) }; /** diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index e6b9a97f643..86f25deec44 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -96,6 +96,9 @@ void OplogBufferCollection::startup(OperationContext* opCtx) { return; } + // If the collection doesn't already exist, create it. + _createCollection(opCtx); + stdx::lock_guard<Latch> lk(_mutex); // If we are starting from an existing collection, we must populate the in memory state of the // buffer. @@ -387,9 +390,12 @@ BSONObj OplogBufferCollection::_peek_inlock(OperationContext* opCtx, PeekMode pe void OplogBufferCollection::_createCollection(OperationContext* opCtx) { CollectionOptions options; - options.temp = true; + options.temp = _options.useTemporaryCollection; UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - fassert(40154, _storageInterface->createCollection(opCtx, _nss, options)); + auto status = _storageInterface->createCollection(opCtx, _nss, options); + if (status.code() == ErrorCodes::NamespaceExists) + return; + fassert(40154, status); } void OplogBufferCollection::_dropCollection(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index 30cee7fc9cf..2f50bfd288c 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -43,8 +43,9 @@ namespace repl { class StorageInterface; /** - * Oplog buffer backed by a temporary collection. This collection is created in startup() and - * removed in shutdown(). The documents will be popped and peeked in timestamp order. + * Oplog buffer backed by an optionally temporary collection. This collection is optionally created + * in startup() and removed in shutdown(). The documents will be popped and peeked in timestamp + * order. */ class OplogBufferCollection : public RandomAccessOplogBuffer { public: @@ -56,11 +57,12 @@ public: std::size_t peekCacheSize = 0; bool dropCollectionAtStartup = true; bool dropCollectionAtShutdown = true; + bool useTemporaryCollection = true; Options() {} }; /** - * Returns default namespace for temporary collection used to hold data in oplog buffer. + * Returns default namespace for collection used to hold data in oplog buffer. */ static NamespaceString getDefaultNamespace(); @@ -129,7 +131,7 @@ public: private: /* - * Creates a temporary collection with the _nss namespace. + * Creates an (optionally temporary) collection with the _nss namespace. */ void _createCollection(OperationContext* opCtx); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index f15073da8e3..9b7842ac673 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -74,6 +74,8 @@ MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone); +MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion); namespace { @@ -421,12 +423,14 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize); options.dropCollectionAtStartup = false; options.dropCollectionAtShutdown = false; + options.useTemporaryCollection = false; NamespaceString oplogBufferNs(NamespaceString::kConfigDb, kOplogBufferPrefix + getMigrationUUID().toString()); stdx::lock_guard lk(_mutex); invariant(_stateDoc.getStartFetchingOpTime()); _donorOplogBuffer = std::make_unique<OplogBufferCollection>( StorageInterface::get(opCtx.get()), oplogBufferNs, options); + _donorOplogBuffer->startup(opCtx.get()); _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>(); _donorOplogFetcher = (*_createOplogFetcherFn)( (**_scopedExecutor).get(), @@ -477,8 +481,27 @@ Status TenantMigrationRecipientService::Instance::_enqueueDocuments( } void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status oplogFetcherStatus) { - // TODO(SERVER-48812): Abort the migration unless the error is CallbackCanceled and - // the migration has finished. + // The oplog fetcher is normally canceled when migration is done; any other error + // indicates failure. + if (oplogFetcherStatus.isOK()) { + // Oplog fetcher status of "OK" means the stopReplProducer failpoint is set. Migration + // cannot continue in this state so force a failure. + LOGV2_ERROR( + 4881205, + "Recipient migration service oplog fetcher stopped due to stopReplProducer failpoint", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID()); + interrupt({ErrorCodes::Error(4881206), + "Recipient migration service oplog fetcher stopped due to stopReplProducer " + "failpoint"}); + } else if (oplogFetcherStatus.code() != ErrorCodes::CallbackCanceled) { + LOGV2_ERROR(4881204, + "Recipient migration service oplog fetcher failed", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "error"_attr = oplogFetcherStatus); + interrupt(oplogFetcherStatus); + } } namespace { @@ -608,6 +631,15 @@ void TenantMigrationRecipientService::Instance::_shutdownComponents(WithLock lk) // interrupts running tenant oplog fetcher. _oplogFetcherClient->shutdownAndDisallowReconnect(); } + + if (_tenantOplogApplier) { + _tenantOplogApplier->shutdown(); + } + + if (_donorOplogBuffer) { + auto opCtx = cc().makeOperationContext(); + _donorOplogBuffer->shutdown(opCtx.get()); + } } void TenantMigrationRecipientService::Instance::interrupt(Status status) { @@ -637,6 +669,12 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status stdx::lock_guard lk(_mutex); _shutdownComponents(lk); + + if (_donorOplogFetcher) { + _donorOplogFetcher->shutdown(); + _donorOplogFetcher->join(); + } + if (_writerPool) { _writerPool->join(); } @@ -746,8 +784,22 @@ void TenantMigrationRecipientService::Instance::run( _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); stdx::lock_guard lk(_mutex); - // Create the oplog applier and do not start now. - // TODO SERVER-48812: Oplog application in MigrationServiceInstance. + // Create the oplog applier but do not start it yet. + invariant(_stateDoc.getStartApplyingOpTime()); + LOGV2_DEBUG(4881202, + 1, + "Recipient migration service creating oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startApplyingOpTime"_attr = *_stateDoc.getStartApplyingOpTime()); + + _tenantOplogApplier = + std::make_unique<TenantOplogApplier>(_migrationUuid, + _tenantId, + *_stateDoc.getStartApplyingOpTime(), + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get()); // Start the cloner. auto clonerFuture = _startTenantAllDatabaseCloner(lk); @@ -759,10 +811,16 @@ void TenantMigrationRecipientService::Instance::run( .then([this] { return _onCloneSuccess(); }) .then([this] { _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone); - // Start the oplog applier. - // TODO SERVER-48812: Oplog application in MigrationServiceInstance + LOGV2_DEBUG(4881200, + 1, + "Recipient migration service starting oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID()); + + uassertStatusOK(_tenantOplogApplier->startup()); + _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); + return _getDataConsistentFuture(); }) - .then([this] { return _getDataConsistentFuture(); }) .then([this] { stdx::lock_guard lk(_mutex); LOGV2_DEBUG(4881101, @@ -775,10 +833,36 @@ void TenantMigrationRecipientService::Instance::run( _dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopOpTime().get()); }) .then([this] { - // wait for the oplog applier to complete/stop. - // TODO SERVER-48812: Oplog application in MigrationServiceInstance + _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); + // wait for oplog applier to complete/stop. + // The oplog applier does not exit normally; it must be shut down externally, + // e.g. by recipientForgetMigration. + return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); }) - .getAsync([this](Status status) { + .getAsync([this](StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { + // We don't need the final optime from the oplog applier. + Status status = applierStatus.getStatus(); + { + // If we were interrupted during oplog application, replace oplog application + // status with error state. + stdx::lock_guard lk(_mutex); + if ((status.isOK() || ErrorCodes::isCancelationError(status)) && + _taskState.isInterrupted()) { + // We get an "OK" result when the stopReplProducer failpoint is set. This also + // cancels the migration. We will have already logged this in + // _oplogFetcherCallback() + if (!status.isOK()) { + LOGV2(4881207, + "Migration completed with both error and interrupt", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "completionStatus"_attr = status, + "interruptStatus"_attr = _taskState.getInterruptStatus()); + } + status = _taskState.getInterruptStatus(); + } + } + LOGV2(4878501, "Tenant migration recipient instance: Data sync completed.", "tenantId"_attr = getTenantId(), diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 4ad755393a9..d8c80112d04 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -135,6 +135,14 @@ public: _createOplogFetcherFn = std::move(createOplogFetcherFn); } + /** + * Stops the oplog applier without going through tenantForgetMigration. + */ + void stopOplogApplier_forTest() { + stdx::lock_guard lk(_mutex); + _tenantOplogApplier->shutdown(); + } + private: friend class TenantMigrationRecipientServiceTest; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index b7cfaf9dc79..1a0d38eef05 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + #include <boost/optional/optional_io.hpp> #include <memory> @@ -42,7 +44,7 @@ #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/primary_only_service_op_observer.h" #include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" @@ -54,6 +56,7 @@ #include "mongo/executor/network_interface.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/unittest/log_test.h" #include "mongo/unittest/unittest.h" @@ -138,6 +141,10 @@ public: repl::setOplogCollectionName(serviceContext); repl::createOplog(opCtx.get()); + + // Need real (non-mock) storage for the oplog buffer. + StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>()); + // Set up OpObserver so that repl::logOp() will store the oplog entry's optime in // ReplClientInfo. OpObserverRegistry* opObserverRegistry = @@ -154,30 +161,6 @@ public: } stepUp(); - auto storageInterfaceMock = std::make_unique<StorageInterfaceMock>(); - storageInterfaceMock->createCollFn = [this](OperationContext* opCtx, - const NamespaceString& nss, - const CollectionOptions& options) -> Status { - this->_collCreated = true; - return Status::OK(); - }; - storageInterfaceMock->createIndexesOnEmptyCollFn = - [this](OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<BSONObj>& secondaryIndexSpecs) -> Status { - this->_numSecondaryIndexesCreated += secondaryIndexSpecs.size(); - return Status::OK(); - }; - storageInterfaceMock->insertDocumentsFn = [this](OperationContext* opCtx, - const NamespaceStringOrUUID& nsOrUUID, - const std::vector<InsertStatement>& ops) { - this->_numDocsInserted += ops.size(); - return Status::OK(); - }; - - // Set the mock storge interface for the tests. - StorageInterface::set(serviceContext, std::move(storageInterfaceMock)); - _service = _registry->lookupServiceByName( TenantMigrationRecipientService::kTenantMigrationRecipientServiceName); ASSERT(_service); @@ -189,6 +172,8 @@ public: _registry->onShutdown(); _service = nullptr; + StorageInterface::set(getServiceContext(), {}); + // Clearing the connection pool is necessary when doing tests which use the // ReplicaSetMonitor. See src/mongo/dbtests/mock/mock_replica_set.h for details. ScopedDbConnection::clearPool(); @@ -284,10 +269,6 @@ protected: private: unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)}; - StorageInterfaceMock::CreateCollectionFn _standardCreateCollectionFn; - StorageInterfaceMock::CreateIndexesOnEmptyCollectionFn - _standardCreateIndexesOnEmptyCollectionFn; - std::unique_ptr<StorageInterfaceMock> _storageInterface; }; @@ -351,7 +332,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P const UUID migrationUUID = UUID::gen(); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); TenantMigrationRecipientDocument TenantMigrationRecipientInstance( migrationUUID, @@ -399,7 +380,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_S const UUID migrationUUID = UUID::gen(); - MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /* dollarPrefixHosts */); TenantMigrationRecipientDocument TenantMigrationRecipientInstance( migrationUUID, @@ -450,7 +431,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P const UUID migrationUUID = UUID::gen(); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); // Primary is unavailable. replSet.kill(replSet.getHosts()[0].toString()); @@ -508,7 +489,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P const UUID migrationUUID = UUID::gen(); - MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /* dollarPrefixHosts */); // Primary is unavailable. replSet.kill(replSet.getHosts()[0].toString()); @@ -604,7 +585,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi const UUID migrationUUID = UUID::gen(); const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); insertTopOfOplog(&replSet, topOfOplogOpTime); TenantMigrationRecipientDocument initialStateDocument( @@ -640,7 +621,7 @@ TEST_F(TenantMigrationRecipientServiceTest, const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); insertTopOfOplog(&replSet, topOfOplogOpTime); TenantMigrationRecipientDocument initialStateDocument( @@ -677,7 +658,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi const OpTime txnLastWriteOpTime(Timestamp(4, 1), 1); const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); insertTopOfOplog(&replSet, topOfOplogOpTime); SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t()); lastTxn.setStartOpTime(txnStartOpTime); @@ -720,7 +701,7 @@ TEST_F(TenantMigrationRecipientServiceTest, const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); insertTopOfOplog(&replSet, topOfOplogOpTime); SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t()); lastTxn.setStartOpTime(txnStartOpTime); @@ -760,7 +741,7 @@ TEST_F(TenantMigrationRecipientServiceTest, const UUID migrationUUID = UUID::gen(); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); TenantMigrationRecipientDocument initialStateDocument( migrationUUID, @@ -792,7 +773,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFe const UUID migrationUUID = UUID::gen(); const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); insertTopOfOplog(&replSet, topOfOplogOpTime); TenantMigrationRecipientDocument initialStateDocument( @@ -832,12 +813,14 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner << "stop")); auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); + auto taskFpGuard = makeGuard([&taskFp] { taskFp->setMode(FailPoint::off); }); + auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn); const UUID migrationUUID = UUID::gen(); const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); - MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); insertTopOfOplog(&replSet, topOfOplogOpTime); // Skip the cloners in this test, so we provide an empty list of databases. @@ -856,7 +839,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner std::shared_ptr<TenantMigrationRecipientService::Instance> instance; OpTime cloneCompletionRecipientOpTime; { - FailPointEnableBlock fp("fpAfterRetrievingStartOpTimesMigrationRecipientInstance", + // This failpoint will stop us just before the cloner starts. + FailPointEnableBlock fp("fpAfterStartingOplogFetcherMigrationRecipientInstance", BSON("action" << "hang")); // Create and start the instance. @@ -881,11 +865,163 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner ASSERT_EQ(cloneCompletionRecipientOpTime, getStateDoc(instance.get()).getCloneFinishedOpTime()); checkStateDocPersisted(opCtx.get(), instance.get()); + taskFpGuard.dismiss(); taskFp->setMode(FailPoint::off); // Wait for task completion success. ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } +TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplication) { + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Setting these causes us to skip cloning. + initialStateDocument.setCloneFinishedOpTime(topOfOplogOpTime); + initialStateDocument.setDataConsistentStopOpTime(topOfOplogOpTime); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + LOGV2(4881201, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + checkStateDocPersisted(opCtx.get(), instance.get()); + // The oplog fetcher should exist and be running. + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + + // Kill it. + oplogFetcher->shutdownWith({ErrorCodes::Error(4881203), "Injected error"}); + + // Wait for task completion failure. + auto status = instance->getCompletionFuture().getNoThrow(); + ASSERT_EQ(4881203, status.code()); +} + +TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + const OpTime injectedEntryOpTime(Timestamp(6, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Setting these causes us to skip cloning. + initialStateDocument.setCloneFinishedOpTime(topOfOplogOpTime); + initialStateDocument.setDataConsistentStopOpTime(topOfOplogOpTime); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + LOGV2(4881208, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + checkStateDocPersisted(opCtx.get(), instance.get()); + // The oplog fetcher should exist and be running. + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + + // Send an oplog entry not from our tenant, which should cause the oplog applier to assert. + auto oplogEntry = makeOplogEntry(injectedEntryOpTime, + OpTypeEnum::kInsert, + NamespaceString("admin.bogus"), + UUID::gen(), + BSON("_id" + << "bad insert"), + boost::none /* o2 */); + oplogFetcher->receiveBatch(1LL, {oplogEntry.toBSON()}); + + // Wait for task completion failure. + ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) { + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Setting these causes us to skip cloning. + initialStateDocument.setCloneFinishedOpTime(topOfOplogOpTime); + initialStateDocument.setDataConsistentStopOpTime(topOfOplogOpTime); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + LOGV2(4881209, + "Waiting for recipient service to reach consistent state", + "suite"_attr = _agent.getSuiteName(), + "test"_attr = _agent.getTestName()); + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + + checkStateDocPersisted(opCtx.get(), instance.get()); + // The oplog fetcher should exist and be running. + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + + // Stop the oplog applier. + instance->stopOplogApplier_forTest(); + + // Wait for task completion success. Since we're using a test function to cancel the applier, + // the actual result is not critical. + ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index cab5881cfdf..72c5294867f 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -93,7 +93,8 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo return SemiFuture<OpTimePair>::makeReady(_finalStatus); } // If this optime has already passed, just return a ready future. - if (_lastBatchCompletedOpTimes.donorOpTime >= donorOpTime) { + if (_lastBatchCompletedOpTimes.donorOpTime >= donorOpTime || + _beginApplyingAfterOpTime >= donorOpTime) { return SemiFuture<OpTimePair>::makeReady(_lastBatchCompletedOpTimes); } |