summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2020-09-16 09:53:31 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-29 14:59:45 +0000
commit3237a1428c6b07cb8019bbb8d47301a680667033 (patch)
treed8301dfbe7e505f00013a55a11e2889119807a93
parentcd14c4aad38bae7fa0555f13ee3e9538d850d719 (diff)
downloadmongo-3237a1428c6b07cb8019bbb8d47301a680667033.tar.gz
SERVER-48808 Oplog Fetching in MigrationServiceInstance
-rw-r--r--src/mongo/db/repl/SConscript4
-rw-r--r--src/mongo/db/repl/cloner_utils.cpp8
-rw-r--r--src/mongo/db/repl/cloner_utils.h7
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp25
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h5
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl21
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp166
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h45
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp41
9 files changed, 312 insertions, 10 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index bff884e7844..cd279da1fbf 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1293,7 +1293,11 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/transaction',
+ 'cloner_utils',
+ 'oplog_buffer_collection',
'oplog_entry',
+ 'oplog_fetcher',
+ 'repl_server_parameters',
'tenant_migration_state_machine_idl',
]
)
diff --git a/src/mongo/db/repl/cloner_utils.cpp b/src/mongo/db/repl/cloner_utils.cpp
index ab63050d669..7f061a4f1ff 100644
--- a/src/mongo/db/repl/cloner_utils.cpp
+++ b/src/mongo/db/repl/cloner_utils.cpp
@@ -37,9 +37,13 @@
namespace mongo {
namespace repl {
+BSONObj ClonerUtils::makeTenantDatabaseRegex(StringData prefix) {
+ return BSON("$regexp"
+ << "^" + prefix + "_");
+}
+
BSONObj ClonerUtils::makeTenantDatabaseFilter(StringData prefix) {
- return BSON("name" << BSON("$regexp"
- << "^" + prefix + "_"));
+ return BSON("name" << makeTenantDatabaseRegex(prefix));
}
BSONObj ClonerUtils::buildMajorityWaitRequest(Timestamp operationTime) {
diff --git a/src/mongo/db/repl/cloner_utils.h b/src/mongo/db/repl/cloner_utils.h
index b9acd75b2ee..8623b2fdd1f 100644
--- a/src/mongo/db/repl/cloner_utils.h
+++ b/src/mongo/db/repl/cloner_utils.h
@@ -47,6 +47,11 @@ class ClonerUtils {
public:
/**
+ * Builds a regex that matches database names prefixed with a specific tenantId.
+ */
+ static BSONObj makeTenantDatabaseRegex(StringData prefix);
+
+ /**
* Builds a filter that matches database names prefixed with a specific tenantId.
*/
static BSONObj makeTenantDatabaseFilter(StringData prefix);
@@ -64,4 +69,4 @@ public:
} // namespace repl
-} // namespace mongo \ No newline at end of file
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index e9ec434ffd4..d9f3db55ae7 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -216,6 +216,12 @@ OplogFetcher::~OplogFetcher() {
join();
}
+void OplogFetcher::setConnection(std::unique_ptr<DBClientConnection>&& _connectedClient) {
+ // Can only call this once, before startup.
+ invariant(!_conn);
+ _conn = std::move(_connectedClient);
+}
+
Status OplogFetcher::_doStartup_inlock() noexcept {
return _scheduleWorkAndSaveHandle_inlock(
[this](const executor::TaskExecutor::CallbackArgs& args) {
@@ -345,18 +351,25 @@ void OplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbac
return;
}
+ bool hadExistingConnection = true;
{
stdx::lock_guard<Latch> lock(_mutex);
- _conn = _createClientFn();
+ if (!_conn) {
+ _conn = _createClientFn();
+ hadExistingConnection = false;
+ }
}
hangAfterOplogFetcherCallbackScheduled.pauseWhileSet();
- auto connectStatus = _connect();
- // Error out if we failed to connect after exhausting the allowed retry attempts.
- if (!connectStatus.isOK()) {
- _finishCallback(connectStatus);
- return;
+ if (!hadExistingConnection) {
+ auto connectStatus = _connect();
+
+ // Error out if we failed to connect after exhausting the allowed retry attempts.
+ if (!connectStatus.isOK()) {
+ _finishCallback(connectStatus);
+ return;
+ }
}
_setMetadataWriterAndReader();
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index f8f80c34d76..459b2c42fe2 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -196,6 +196,11 @@ public:
Timestamp lastTS,
StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
+ /**
+ * Allows the OplogFetcher to use an already-established connection from the caller. Ownership
+ * of the connection is taken by the OplogFetcher. Must be called before startup.
+ */
+ void setConnection(std::unique_ptr<DBClientConnection>&& _connectedClient);
/**
* Prints out the status and settings of the oplog fetcher.
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 4153ad4ef0e..289bf286c9a 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -342,3 +342,24 @@ server_parameters:
cpp_varname: tenantMigrationGarbageCollectionDelayMS
default:
expr: 48 * 60 * 60 * 1000
+
+ tenantMigrationOplogBufferPeekCacheSize:
+ description: >-
+ Set this to specify size of read ahead buffer in the OplogBufferCollection for tenant
+ migrations.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: tenantMigrationOplogBufferPeekCacheSize
+ default: 10000
+
+ tenantMigrationOplogFetcherBatchSize:
+ description: >-
+ The batchSize to use for the find/getMore queries called by the OplogFetcher for
+ tenant migrations.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: tenantMigrationOplogFetcherBatchSize
+ # 16MB max batch size / 12 byte min doc size * 10 (for good measure) =
+ # defaultBatchSize to use.
+ default:
+ expr: (16 * 1024 * 1024) / 12 * 10
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 560ad7ba86e..ba7916eeda2 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -39,9 +39,13 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/cloner_utils.h"
+#include "mongo/db/repl/data_replicator_external_state.h"
+#include "mongo/db/repl/oplog_buffer_collection.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
@@ -53,15 +57,89 @@
namespace mongo {
namespace repl {
+namespace {
+constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd;
+} // namespace
+
+// A convenient place to set test-specific parameters.
+MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance);
// Fails before waiting for the state doc to be majority replicated.
MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc);
MONGO_FAIL_POINT_DEFINE(stopAfterPersistingTenantMigrationRecipientInstanceStateDoc);
MONGO_FAIL_POINT_DEFINE(stopAfterConnectingTenantMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(stopAfterRetrievingStartOpTimesMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(stopAfterStartingOplogFetcherMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout);
MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance);
+namespace {
+// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine
+// and recover from there. So the restart decision is always "no".
+class OplogFetcherRestartDecisionTenantMigration
+ : public OplogFetcher::OplogFetcherRestartDecision {
+public:
+ ~OplogFetcherRestartDecisionTenantMigration(){};
+ bool shouldContinue(OplogFetcher* fetcher, Status status) final {
+ return false;
+ }
+ void fetchSuccessful(OplogFetcher* fetcher) final {}
+};
+
+// The oplog fetcher requires some of the methods in DataReplicatorExternalState to operate.
+class DataReplicatorExternalStateTenantMigration : public DataReplicatorExternalState {
+public:
+ // The oplog fetcher is passed its executor directly and does not use the one from the
+ // DataReplicatorExternalState.
+ executor::TaskExecutor* getTaskExecutor() const final {
+ MONGO_UNREACHABLE;
+ }
+ std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const final {
+ MONGO_UNREACHABLE;
+ }
+
+ // The oplog fetcher uses the current term and opTime to inform the sync source of term changes.
+ // As the term on the donor and the term on the recipient have nothing to do with each other,
+ // we do not want to do that.
+ OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() final {
+ return {OpTime::kUninitializedTerm, OpTime()};
+ }
+
+ // Tenant migration does not require the metadata from the oplog query.
+ void processMetadata(const rpc::ReplSetMetadata& replMetadata,
+ rpc::OplogQueryMetadata oqMetadata) final {}
+
+ // Tenant migration does not change sync source depending on metadata.
+ ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) final {
+ return ChangeSyncSourceAction::kContinueSyncing;
+ }
+
+ // The oplog fetcher should never call the rest of the methods.
+ std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::unique_ptr<OplogApplier> makeOplogApplier(
+ OplogBuffer* oplogBuffer,
+ OplogApplier::Observer* observer,
+ ReplicationConsistencyMarkers* consistencyMarkers,
+ StorageInterface* storageInterface,
+ const OplogApplier::Options& options,
+ ThreadPool* writerPool) final {
+ MONGO_UNREACHABLE;
+ };
+
+ virtual StatusWith<ReplSetConfig> getCurrentConfig() const final {
+ MONGO_UNREACHABLE;
+ }
+};
+
+
+} // namespace
TenantMigrationRecipientService::TenantMigrationRecipientService(ServiceContext* serviceContext)
: PrimaryOnlyService(serviceContext) {}
@@ -292,6 +370,72 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo
_stateDoc.setStartFetchingOpTime(startFetchingOpTime);
}
+void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
+ auto opCtx = cc().makeOperationContext();
+ OplogBufferCollection::Options options;
+ options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize);
+ options.dropCollectionAtStartup = false;
+ options.dropCollectionAtShutdown = false;
+ NamespaceString oplogBufferNs(NamespaceString::kConfigDb,
+ kOplogBufferPrefix + getMigrationUUID().toString());
+ stdx::lock_guard lk(_mutex);
+ invariant(_stateDoc.getStartFetchingOpTime());
+ _donorOplogBuffer = std::make_unique<OplogBufferCollection>(
+ StorageInterface::get(opCtx.get()), oplogBufferNs, options);
+ _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>();
+ _donorOplogFetcher = (*_createOplogFetcherFn)(
+ (**_scopedExecutor).get(),
+ *_stateDoc.getStartFetchingOpTime(),
+ _oplogFetcherClient->getServerHostAndPort(),
+ // The config is only used for setting the awaitData timeout; the defaults are fine.
+ ReplSetConfig::parse(BSON("_id"
+ << "dummy"
+ << "version" << 1 << "members" << BSONObj())),
+ std::make_unique<OplogFetcherRestartDecisionTenantMigration>(),
+ // We do not need to check the rollback ID.
+ ReplicationProcess::kUninitializedRollbackId,
+ false /* requireFresherSyncSource */,
+ _dataReplicatorExternalState.get(),
+ [this](OplogFetcher::Documents::const_iterator first,
+ OplogFetcher::Documents::const_iterator last,
+ const OplogFetcher::DocumentsInfo& info) {
+ return _enqueueDocuments(first, last, info);
+ },
+ [this](const Status& s, int rbid) { _oplogFetcherCallback(s); },
+ tenantMigrationOplogFetcherBatchSize,
+ OplogFetcher::StartingPoint::kEnqueueFirstDoc,
+ _getOplogFetcherFilter(),
+ ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern),
+ "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString());
+ _donorOplogFetcher->setConnection(std::move(_oplogFetcherClient));
+ uassertStatusOK(_donorOplogFetcher->startup());
+}
+
+Status TenantMigrationRecipientService::Instance::_enqueueDocuments(
+ OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info) {
+
+ invariant(_donorOplogBuffer);
+
+ if (info.toApplyDocumentCount == 0)
+ return Status::OK();
+
+ auto opCtx = cc().makeOperationContext();
+ // Wait for enough space.
+ _donorOplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes);
+
+ // Buffer docs for later application.
+ _donorOplogBuffer->push(opCtx.get(), begin, end);
+
+ return Status::OK();
+}
+
+void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status oplogFetcherStatus) {
+ // TODO(SERVER-48812): Abort the migration unless the error is CallbackCanceled and
+ // the migration has finished.
+}
+
namespace {
constexpr std::int32_t stopFailPointErrorCode = 4880402;
void stopOnFailPoint(FailPoint* fp) {
@@ -309,9 +453,25 @@ void TenantMigrationRecipientService::Instance::interrupt(Status status) {
}
}
+BSONObj TenantMigrationRecipientService::Instance::_getOplogFetcherFilter() const {
+ // Either the namespace belongs to the tenant, or it's an applyOps in the admin namespace
+ // and the first operation belongs to the tenant. A transaction with mixed tenant/non-tenant
+ // operations should not be possible and will fail in the TenantOplogApplier.
+ //
+ // Commit of prepared transactions is not handled here; we'd need to handle them in the applier
+ // by allowing all commits through here and ignoring those not corresponding to active
+ // transactions.
+ BSONObj namespaceRegex = ClonerUtils::makeTenantDatabaseRegex(getTenantId());
+ return BSON("$or" << BSON_ARRAY(BSON("ns" << namespaceRegex)
+ << BSON("ns"
+ << "admin.$cmd"
+ << "o.applyOps.0.ns" << namespaceRegex)));
+}
+
void TenantMigrationRecipientService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
_scopedExecutor = executor;
+ pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet();
ExecutorFuture(**executor)
.then([this]() { return _initializeStateDoc(); })
.then([this] {
@@ -338,8 +498,12 @@ void TenantMigrationRecipientService::Instance::run(
})
.then([this] {
stopOnFailPoint(&stopAfterRetrievingStartOpTimesMigrationRecipientInstance);
+ _startOplogFetcher();
+ })
+ .then([this] {
+ stopOnFailPoint(&stopAfterStartingOplogFetcherMigrationRecipientInstance);
// TODO SERVER-48808: Run cloners in MigrationServiceInstance
- // TODO SERVER-48811: Oplog fetching in MigrationServiceInstance
+ // TODO SERVER-48811: Oplog application in MigrationServiceInstance
})
.getAsync([this](Status status) {
LOGV2(4878501,
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 2b3f1a0777b..debea0f5ba5 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -32,8 +32,10 @@
#include <boost/optional.hpp>
#include <memory>
+#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
namespace mongo {
@@ -43,6 +45,7 @@ class ReplicaSetMonitor;
class ServiceContext;
namespace repl {
+class OplogBufferCollection;
/**
* TenantMigrationRecipientService is a primary only service to handle
@@ -104,6 +107,14 @@ public:
*/
const std::string& getTenantId() const;
+ /*
+ * Set the oplog creator functor, to allow use of a mock oplog fetcher.
+ */
+ void setCreateOplogFetcherFn_forTest(
+ std::unique_ptr<OplogFetcherFactory>&& createOplogFetcherFn) {
+ _createOplogFetcherFn = std::move(createOplogFetcherFn);
+ }
+
private:
friend class TenantMigrationRecipientServiceTest;
@@ -135,6 +146,33 @@ public:
*/
void _getStartOpTimesFromDonor(WithLock);
+ /**
+ * Pushes documents from oplog fetcher to oplog buffer.
+ *
+ * Returns a status even though it always returns OK, to conform the interface OplogFetcher
+ * expects for the EnqueueDocumentsFn.
+ */
+ Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info);
+
+ /**
+ * Starts the tenant oplog fetcher.
+ */
+
+ void _startOplogFetcher();
+
+ /**
+ * Called when the oplog fetcher finishes. Usually the oplog fetcher finishes only when
+ * cancelled or on error.
+ */
+ void _oplogFetcherCallback(Status oplogFetcherStatus);
+
+ /**
+ * Returns the filter used to get only oplog documents related to the appropriate tenant.
+ */
+ BSONObj _getOplogFetcherFilter() const;
+
std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor;
// Protects below non-const data members.
@@ -160,6 +198,13 @@ public:
// optimes while the '_oplogFetcherClient' will be reserved for the oplog fetcher only.
std::unique_ptr<DBClientConnection> _client;
std::unique_ptr<DBClientConnection> _oplogFetcherClient;
+
+
+ std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn =
+ std::make_unique<CreateOplogFetcherFn>();
+ std::unique_ptr<OplogBufferCollection> _donorOplogBuffer;
+ std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState;
+ std::unique_ptr<OplogFetcher> _donorOplogFetcher;
};
};
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index f3f1b517e2b..e83e7549258 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_fetcher_mock.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/primary_only_service_op_observer.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
@@ -213,6 +214,11 @@ protected:
return instance->_oplogFetcherClient.get();
}
+ OplogFetcher* getDonorOplogFetcher(
+ const TenantMigrationRecipientService::Instance* instance) const {
+ return instance->_donorOplogFetcher.get();
+ }
+
const TenantMigrationRecipientDocument& getStateDoc(
const TenantMigrationRecipientService::Instance* instance) const {
return instance->_stateDoc;
@@ -652,5 +658,40 @@ TEST_F(TenantMigrationRecipientServiceTest,
checkStateDocPersisted(opCtx.get(), instance.get());
}
+TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFetcher) {
+ FailPointEnableBlock fp("stopAfterStartingOplogFetcherMigrationRecipientInstance");
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ // Wait for task completion success.
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ // The oplog fetcher should exist and be running.
+ auto oplogFetcher = getDonorOplogFetcher(instance.get());
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+}
+
} // namespace repl
} // namespace mongo