summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2020-10-15 10:53:04 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-15 15:28:49 +0000
commitd9e78a012398850fdfbe6fbf4fceda25d06a0f51 (patch)
tree8ba3bf20c02ca5b7de8dced6fca9ac63bc2fe1f4
parent3d37adb36aac3a16449fc42902c9a3689b581593 (diff)
downloadmongo-d9e78a012398850fdfbe6fbf4fceda25d06a0f51.tar.gz
SERVER-48812 Start oplog application in MigrationServiceInstance
-rw-r--r--src/mongo/db/repl/abstract_async_component.cpp28
-rw-r--r--src/mongo/db/repl/abstract_async_component.h11
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp10
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h10
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp104
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h8
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp218
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp3
8 files changed, 330 insertions, 62 deletions
diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp
index 77b086af97e..fb0995047ba 100644
--- a/src/mongo/db/repl/abstract_async_component.cpp
+++ b/src/mongo/db/repl/abstract_async_component.cpp
@@ -43,6 +43,19 @@ AbstractAsyncComponent::AbstractAsyncComponent(executor::TaskExecutor* executor,
uassert(ErrorCodes::BadValue, "task executor cannot be null", executor);
}
+AbstractAsyncComponent::~AbstractAsyncComponent() {
+ // This is required because it's illegal to destroy a promise that has not been set,
+ // which can happen if a component is created and then destroyed without being started.
+ // (e.g. if system shutdown happens between creation and start)
+ // Note we _cannot_ use the mutex here; it belongs to the subclass, which has already
+ // been destroyed.
+ if (_state != State::kComplete) {
+ _statePromise.setError(
+ {ErrorCodes::CallbackCanceled,
+ str::stream() << "Component " << _componentName << " destroyed when not finished"});
+ }
+}
+
executor::TaskExecutor* AbstractAsyncComponent::_getExecutor() {
return _executor;
}
@@ -116,8 +129,19 @@ void AbstractAsyncComponent::shutdown() noexcept {
}
void AbstractAsyncComponent::join() noexcept {
+ // Right now the only possible error value is due to destruction.
+ joinAsync().getNoThrow().ignore();
+}
+
+SemiFuture<void> AbstractAsyncComponent::joinAsync() noexcept {
stdx::unique_lock<Latch> lk(*_getMutex());
- _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); });
+ // If the component has never been started or is already shut down, just return a ready future.
+ if (!_isActive_inlock()) {
+ return SemiFuture<void>::makeReady();
+ }
+
+ // Otherwise, we are running and the future will be set when shutdown is complete.
+ return _statePromise.getFuture().semi();
}
AbstractAsyncComponent::State AbstractAsyncComponent::getState_forTest() noexcept {
@@ -133,7 +157,7 @@ void AbstractAsyncComponent::_transitionToComplete() noexcept {
void AbstractAsyncComponent::_transitionToComplete_inlock() noexcept {
invariant(State::kComplete != _state);
_state = State::kComplete;
- _stateCondition.notify_all();
+ _statePromise.emplaceValue();
}
Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus(
diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h
index 5b0e6426900..084d21b9328 100644
--- a/src/mongo/db/repl/abstract_async_component.h
+++ b/src/mongo/db/repl/abstract_async_component.h
@@ -58,7 +58,7 @@ class AbstractAsyncComponent {
public:
AbstractAsyncComponent(executor::TaskExecutor* executor, const std::string& componentName);
- virtual ~AbstractAsyncComponent() = default;
+ virtual ~AbstractAsyncComponent();
/**
* Returns true if this component is currently running or in the process of shutting down.
@@ -85,6 +85,11 @@ public:
void join() noexcept;
/**
+ * Returns a future that will complete when the component becomes inactive.
+ */
+ SemiFuture<void> joinAsync() noexcept;
+
+ /**
* State transitions:
* PreStart --> Running --> ShuttingDown --> Complete
* It is possible to skip intermediate states. For example, calling shutdown() when the
@@ -216,6 +221,8 @@ private:
// (R) Read-only in concurrent operation; no synchronization required.
// (S) Self-synchronizing; access in any way from any context.
// (M) Reads and writes guarded by mutex returned by _getMutex().
+ // (W) May read and call const methods without synchronization,
+ // must hold mutex to write and call non-const methods.
// Task executor used to schedule tasks and remote commands.
executor::TaskExecutor* const _executor; // (R)
@@ -228,7 +235,7 @@ private:
State _state = State::kPreStart; // (M)
// Used by _transitionToComplete_inlock() to signal changes in '_state'.
- mutable stdx::condition_variable _stateCondition; // (S)
+ mutable SharedPromise<void> _statePromise; // (W)
};
/**
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index e6b9a97f643..86f25deec44 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -96,6 +96,9 @@ void OplogBufferCollection::startup(OperationContext* opCtx) {
return;
}
+ // If the collection doesn't already exist, create it.
+ _createCollection(opCtx);
+
stdx::lock_guard<Latch> lk(_mutex);
// If we are starting from an existing collection, we must populate the in memory state of the
// buffer.
@@ -387,9 +390,12 @@ BSONObj OplogBufferCollection::_peek_inlock(OperationContext* opCtx, PeekMode pe
void OplogBufferCollection::_createCollection(OperationContext* opCtx) {
CollectionOptions options;
- options.temp = true;
+ options.temp = _options.useTemporaryCollection;
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- fassert(40154, _storageInterface->createCollection(opCtx, _nss, options));
+ auto status = _storageInterface->createCollection(opCtx, _nss, options);
+ if (status.code() == ErrorCodes::NamespaceExists)
+ return;
+ fassert(40154, status);
}
void OplogBufferCollection::_dropCollection(OperationContext* opCtx) {
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index 30cee7fc9cf..2f50bfd288c 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -43,8 +43,9 @@ namespace repl {
class StorageInterface;
/**
- * Oplog buffer backed by a temporary collection. This collection is created in startup() and
- * removed in shutdown(). The documents will be popped and peeked in timestamp order.
+ * Oplog buffer backed by an optionally temporary collection. This collection is optionally created
+ * in startup() and removed in shutdown(). The documents will be popped and peeked in timestamp
+ * order.
*/
class OplogBufferCollection : public RandomAccessOplogBuffer {
public:
@@ -56,11 +57,12 @@ public:
std::size_t peekCacheSize = 0;
bool dropCollectionAtStartup = true;
bool dropCollectionAtShutdown = true;
+ bool useTemporaryCollection = true;
Options() {}
};
/**
- * Returns default namespace for temporary collection used to hold data in oplog buffer.
+ * Returns default namespace for collection used to hold data in oplog buffer.
*/
static NamespaceString getDefaultNamespace();
@@ -129,7 +131,7 @@ public:
private:
/*
- * Creates a temporary collection with the _nss namespace.
+ * Creates an (optionally temporary) collection with the _nss namespace.
*/
void _createCollection(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index f15073da8e3..9b7842ac673 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -74,6 +74,8 @@ MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout);
MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone);
+MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion);
namespace {
@@ -421,12 +423,14 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize);
options.dropCollectionAtStartup = false;
options.dropCollectionAtShutdown = false;
+ options.useTemporaryCollection = false;
NamespaceString oplogBufferNs(NamespaceString::kConfigDb,
kOplogBufferPrefix + getMigrationUUID().toString());
stdx::lock_guard lk(_mutex);
invariant(_stateDoc.getStartFetchingOpTime());
_donorOplogBuffer = std::make_unique<OplogBufferCollection>(
StorageInterface::get(opCtx.get()), oplogBufferNs, options);
+ _donorOplogBuffer->startup(opCtx.get());
_dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>();
_donorOplogFetcher = (*_createOplogFetcherFn)(
(**_scopedExecutor).get(),
@@ -477,8 +481,27 @@ Status TenantMigrationRecipientService::Instance::_enqueueDocuments(
}
void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status oplogFetcherStatus) {
- // TODO(SERVER-48812): Abort the migration unless the error is CallbackCanceled and
- // the migration has finished.
+ // The oplog fetcher is normally canceled when migration is done; any other error
+ // indicates failure.
+ if (oplogFetcherStatus.isOK()) {
+ // Oplog fetcher status of "OK" means the stopReplProducer failpoint is set. Migration
+ // cannot continue in this state so force a failure.
+ LOGV2_ERROR(
+ 4881205,
+ "Recipient migration service oplog fetcher stopped due to stopReplProducer failpoint",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID());
+ interrupt({ErrorCodes::Error(4881206),
+ "Recipient migration service oplog fetcher stopped due to stopReplProducer "
+ "failpoint"});
+ } else if (oplogFetcherStatus.code() != ErrorCodes::CallbackCanceled) {
+ LOGV2_ERROR(4881204,
+ "Recipient migration service oplog fetcher failed",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "error"_attr = oplogFetcherStatus);
+ interrupt(oplogFetcherStatus);
+ }
}
namespace {
@@ -608,6 +631,15 @@ void TenantMigrationRecipientService::Instance::_shutdownComponents(WithLock lk)
// interrupts running tenant oplog fetcher.
_oplogFetcherClient->shutdownAndDisallowReconnect();
}
+
+ if (_tenantOplogApplier) {
+ _tenantOplogApplier->shutdown();
+ }
+
+ if (_donorOplogBuffer) {
+ auto opCtx = cc().makeOperationContext();
+ _donorOplogBuffer->shutdown(opCtx.get());
+ }
}
void TenantMigrationRecipientService::Instance::interrupt(Status status) {
@@ -637,6 +669,12 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status
stdx::lock_guard lk(_mutex);
_shutdownComponents(lk);
+
+ if (_donorOplogFetcher) {
+ _donorOplogFetcher->shutdown();
+ _donorOplogFetcher->join();
+ }
+
if (_writerPool) {
_writerPool->join();
}
@@ -746,8 +784,22 @@ void TenantMigrationRecipientService::Instance::run(
_stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance);
stdx::lock_guard lk(_mutex);
- // Create the oplog applier and do not start now.
- // TODO SERVER-48812: Oplog application in MigrationServiceInstance.
+ // Create the oplog applier but do not start it yet.
+ invariant(_stateDoc.getStartApplyingOpTime());
+ LOGV2_DEBUG(4881202,
+ 1,
+ "Recipient migration service creating oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startApplyingOpTime"_attr = *_stateDoc.getStartApplyingOpTime());
+
+ _tenantOplogApplier =
+ std::make_unique<TenantOplogApplier>(_migrationUuid,
+ _tenantId,
+ *_stateDoc.getStartApplyingOpTime(),
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get());
// Start the cloner.
auto clonerFuture = _startTenantAllDatabaseCloner(lk);
@@ -759,10 +811,16 @@ void TenantMigrationRecipientService::Instance::run(
.then([this] { return _onCloneSuccess(); })
.then([this] {
_stopOrHangOnFailPoint(&fpAfterCollectionClonerDone);
- // Start the oplog applier.
- // TODO SERVER-48812: Oplog application in MigrationServiceInstance
+ LOGV2_DEBUG(4881200,
+ 1,
+ "Recipient migration service starting oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID());
+
+ uassertStatusOK(_tenantOplogApplier->startup());
+ _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance);
+ return _getDataConsistentFuture();
})
- .then([this] { return _getDataConsistentFuture(); })
.then([this] {
stdx::lock_guard lk(_mutex);
LOGV2_DEBUG(4881101,
@@ -775,10 +833,36 @@ void TenantMigrationRecipientService::Instance::run(
_dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopOpTime().get());
})
.then([this] {
- // wait for the oplog applier to complete/stop.
- // TODO SERVER-48812: Oplog application in MigrationServiceInstance
+ _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
+ // wait for oplog applier to complete/stop.
+ // The oplog applier does not exit normally; it must be shut down externally,
+ // e.g. by recipientForgetMigration.
+ return _tenantOplogApplier->getNotificationForOpTime(OpTime::max());
})
- .getAsync([this](Status status) {
+ .getAsync([this](StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
+ // We don't need the final optime from the oplog applier.
+ Status status = applierStatus.getStatus();
+ {
+ // If we were interrupted during oplog application, replace oplog application
+ // status with error state.
+ stdx::lock_guard lk(_mutex);
+ if ((status.isOK() || ErrorCodes::isCancelationError(status)) &&
+ _taskState.isInterrupted()) {
+ // We get an "OK" result when the stopReplProducer failpoint is set. This also
+ // cancels the migration. We will have already logged this in
+ // _oplogFetcherCallback()
+ if (!status.isOK()) {
+ LOGV2(4881207,
+ "Migration completed with both error and interrupt",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "completionStatus"_attr = status,
+ "interruptStatus"_attr = _taskState.getInterruptStatus());
+ }
+ status = _taskState.getInterruptStatus();
+ }
+ }
+
LOGV2(4878501,
"Tenant migration recipient instance: Data sync completed.",
"tenantId"_attr = getTenantId(),
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 4ad755393a9..d8c80112d04 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -135,6 +135,14 @@ public:
_createOplogFetcherFn = std::move(createOplogFetcherFn);
}
+ /**
+ * Stops the oplog applier without going through tenantForgetMigration.
+ */
+ void stopOplogApplier_forTest() {
+ stdx::lock_guard lk(_mutex);
+ _tenantOplogApplier->shutdown();
+ }
+
private:
friend class TenantMigrationRecipientServiceTest;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index b7cfaf9dc79..1a0d38eef05 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
#include <boost/optional/optional_io.hpp>
#include <memory>
@@ -42,7 +44,7 @@
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/primary_only_service_op_observer.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
@@ -54,6 +56,7 @@
#include "mongo/executor/network_interface.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/unittest/log_test.h"
#include "mongo/unittest/unittest.h"
@@ -138,6 +141,10 @@ public:
repl::setOplogCollectionName(serviceContext);
repl::createOplog(opCtx.get());
+
+ // Need real (non-mock) storage for the oplog buffer.
+ StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>());
+
// Set up OpObserver so that repl::logOp() will store the oplog entry's optime in
// ReplClientInfo.
OpObserverRegistry* opObserverRegistry =
@@ -154,30 +161,6 @@ public:
}
stepUp();
- auto storageInterfaceMock = std::make_unique<StorageInterfaceMock>();
- storageInterfaceMock->createCollFn = [this](OperationContext* opCtx,
- const NamespaceString& nss,
- const CollectionOptions& options) -> Status {
- this->_collCreated = true;
- return Status::OK();
- };
- storageInterfaceMock->createIndexesOnEmptyCollFn =
- [this](OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<BSONObj>& secondaryIndexSpecs) -> Status {
- this->_numSecondaryIndexesCreated += secondaryIndexSpecs.size();
- return Status::OK();
- };
- storageInterfaceMock->insertDocumentsFn = [this](OperationContext* opCtx,
- const NamespaceStringOrUUID& nsOrUUID,
- const std::vector<InsertStatement>& ops) {
- this->_numDocsInserted += ops.size();
- return Status::OK();
- };
-
- // Set the mock storge interface for the tests.
- StorageInterface::set(serviceContext, std::move(storageInterfaceMock));
-
_service = _registry->lookupServiceByName(
TenantMigrationRecipientService::kTenantMigrationRecipientServiceName);
ASSERT(_service);
@@ -189,6 +172,8 @@ public:
_registry->onShutdown();
_service = nullptr;
+ StorageInterface::set(getServiceContext(), {});
+
// Clearing the connection pool is necessary when doing tests which use the
// ReplicaSetMonitor. See src/mongo/dbtests/mock/mock_replica_set.h for details.
ScopedDbConnection::clearPool();
@@ -284,10 +269,6 @@ protected:
private:
unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{
logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)};
- StorageInterfaceMock::CreateCollectionFn _standardCreateCollectionFn;
- StorageInterfaceMock::CreateIndexesOnEmptyCollectionFn
- _standardCreateIndexesOnEmptyCollectionFn;
- std::unique_ptr<StorageInterfaceMock> _storageInterface;
};
@@ -351,7 +332,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
const UUID migrationUUID = UUID::gen();
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
TenantMigrationRecipientDocument TenantMigrationRecipientInstance(
migrationUUID,
@@ -399,7 +380,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_S
const UUID migrationUUID = UUID::gen();
- MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /* dollarPrefixHosts */);
TenantMigrationRecipientDocument TenantMigrationRecipientInstance(
migrationUUID,
@@ -450,7 +431,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
const UUID migrationUUID = UUID::gen();
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
// Primary is unavailable.
replSet.kill(replSet.getHosts()[0].toString());
@@ -508,7 +489,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
const UUID migrationUUID = UUID::gen();
- MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 2, true /* hasPrimary */, true /* dollarPrefixHosts */);
// Primary is unavailable.
replSet.kill(replSet.getHosts()[0].toString());
@@ -604,7 +585,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi
const UUID migrationUUID = UUID::gen();
const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
insertTopOfOplog(&replSet, topOfOplogOpTime);
TenantMigrationRecipientDocument initialStateDocument(
@@ -640,7 +621,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1);
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
insertTopOfOplog(&replSet, topOfOplogOpTime);
TenantMigrationRecipientDocument initialStateDocument(
@@ -677,7 +658,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi
const OpTime txnLastWriteOpTime(Timestamp(4, 1), 1);
const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
insertTopOfOplog(&replSet, topOfOplogOpTime);
SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t());
lastTxn.setStartOpTime(txnStartOpTime);
@@ -720,7 +701,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
const OpTime newTopOfOplogOpTime(Timestamp(6, 1), 1);
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
insertTopOfOplog(&replSet, topOfOplogOpTime);
SessionTxnRecord lastTxn(makeLogicalSessionIdForTest(), 100, txnLastWriteOpTime, Date_t());
lastTxn.setStartOpTime(txnStartOpTime);
@@ -760,7 +741,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
const UUID migrationUUID = UUID::gen();
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
TenantMigrationRecipientDocument initialStateDocument(
migrationUUID,
@@ -792,7 +773,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFe
const UUID migrationUUID = UUID::gen();
const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
insertTopOfOplog(&replSet, topOfOplogOpTime);
TenantMigrationRecipientDocument initialStateDocument(
@@ -832,12 +813,14 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner
<< "stop"));
auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion");
+ auto taskFpGuard = makeGuard([&taskFp] { taskFp->setMode(FailPoint::off); });
+
auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn);
const UUID migrationUUID = UUID::gen();
const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
insertTopOfOplog(&replSet, topOfOplogOpTime);
// Skip the cloners in this test, so we provide an empty list of databases.
@@ -856,7 +839,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
OpTime cloneCompletionRecipientOpTime;
{
- FailPointEnableBlock fp("fpAfterRetrievingStartOpTimesMigrationRecipientInstance",
+ // This failpoint will stop us just before the cloner starts.
+ FailPointEnableBlock fp("fpAfterStartingOplogFetcherMigrationRecipientInstance",
BSON("action"
<< "hang"));
// Create and start the instance.
@@ -881,11 +865,163 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner
ASSERT_EQ(cloneCompletionRecipientOpTime, getStateDoc(instance.get()).getCloneFinishedOpTime());
checkStateDocPersisted(opCtx.get(), instance.get());
+ taskFpGuard.dismiss();
taskFp->setMode(FailPoint::off);
// Wait for task completion success.
ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
+TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplication) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Setting these causes us to skip cloning.
+ initialStateDocument.setCloneFinishedOpTime(topOfOplogOpTime);
+ initialStateDocument.setDataConsistentStopOpTime(topOfOplogOpTime);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ LOGV2(4881201,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ // The oplog fetcher should exist and be running.
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ // Kill it.
+ oplogFetcher->shutdownWith({ErrorCodes::Error(4881203), "Injected error"});
+
+ // Wait for task completion failure.
+ auto status = instance->getCompletionFuture().getNoThrow();
+ ASSERT_EQ(4881203, status.code());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+ const OpTime injectedEntryOpTime(Timestamp(6, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Setting these causes us to skip cloning.
+ initialStateDocument.setCloneFinishedOpTime(topOfOplogOpTime);
+ initialStateDocument.setDataConsistentStopOpTime(topOfOplogOpTime);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ LOGV2(4881208,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ // The oplog fetcher should exist and be running.
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ // Send an oplog entry not from our tenant, which should cause the oplog applier to assert.
+ auto oplogEntry = makeOplogEntry(injectedEntryOpTime,
+ OpTypeEnum::kInsert,
+ NamespaceString("admin.bogus"),
+ UUID::gen(),
+ BSON("_id"
+ << "bad insert"),
+ boost::none /* o2 */);
+ oplogFetcher->receiveBatch(1LL, {oplogEntry.toBSON()});
+
+ // Wait for task completion failure.
+ ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Setting these causes us to skip cloning.
+ initialStateDocument.setCloneFinishedOpTime(topOfOplogOpTime);
+ initialStateDocument.setDataConsistentStopOpTime(topOfOplogOpTime);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ LOGV2(4881209,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ // The oplog fetcher should exist and be running.
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ // Stop the oplog applier.
+ instance->stopOplogApplier_forTest();
+
+ // Wait for task completion success. Since we're using a test function to cancel the applier,
+ // the actual result is not critical.
+ ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index cab5881cfdf..72c5294867f 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -93,7 +93,8 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo
return SemiFuture<OpTimePair>::makeReady(_finalStatus);
}
// If this optime has already passed, just return a ready future.
- if (_lastBatchCompletedOpTimes.donorOpTime >= donorOpTime) {
+ if (_lastBatchCompletedOpTimes.donorOpTime >= donorOpTime ||
+ _beginApplyingAfterOpTime >= donorOpTime) {
return SemiFuture<OpTimePair>::makeReady(_lastBatchCompletedOpTimes);
}