summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2021-01-08 00:35:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-26 21:37:22 +0000
commit3b87ecca61a77614d03f01a36a6ea4e155917ff0 (patch)
treef77f3c04a0df2ffe50b9564573bfbfadd522ef24
parent66f0361649bc5239460a6764cacf6c448fe190e9 (diff)
downloadmongo-3b87ecca61a77614d03f01a36a6ea4e155917ff0.tar.gz
SERVER-52723 Handle oplog application restart in TenantMigrationRecipientService
-rw-r--r--jstests/replsets/tenant_migration_resume_oplog_application.js108
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp5
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h5
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp113
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h10
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp664
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp31
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h9
-rw-r--r--src/mongo/db/repl/tenant_oplog_batcher.cpp27
-rw-r--r--src/mongo/db/repl/tenant_oplog_batcher.h4
-rw-r--r--src/mongo/db/repl/tenant_oplog_batcher_test.cpp82
12 files changed, 1011 insertions, 48 deletions
diff --git a/jstests/replsets/tenant_migration_resume_oplog_application.js b/jstests/replsets/tenant_migration_resume_oplog_application.js
new file mode 100644
index 00000000000..bff439365bd
--- /dev/null
+++ b/jstests/replsets/tenant_migration_resume_oplog_application.js
@@ -0,0 +1,108 @@
+/**
+ * Tests that in a tenant migration, the recipient primary will resume oplog application on
+ * failover.
+ * @tags: [requires_majority_read_concern, requires_fcv_49]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/uuid_util.js"); // for 'extractUUIDFromObject'
+load("jstests/libs/parallelTester.js"); // for 'Thread'
+load("jstests/libs/write_concern_util.js"); // for 'stopReplicationOnSecondaries'
+load("jstests/aggregation/extras/utils.js"); // For assertArrayEq.
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+
+const recipientRst = new ReplSetTest({
+ nodes: 3,
+ name: jsTestName() + "_recipient",
+ // Use a batch size of 2 so that we can hang in the middle of tenant oplog application.
+ nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().recipient,
+ {setParameter: {tenantApplierBatchSizeOps: 2}})
+});
+
+recipientRst.startSet();
+recipientRst.initiate();
+if (!TenantMigrationUtil.isFeatureFlagEnabled(recipientRst.getPrimary())) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ recipientRst.stopSet();
+ return;
+}
+const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), recipientRst: recipientRst});
+
+const tenantId = "testTenantId";
+const dbName = tenantMigrationTest.tenantDB(tenantId, "testDB");
+const collName = "testColl";
+
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+const donorRst = tenantMigrationTest.getDonorRst();
+const donorTestColl = donorPrimary.getDB(dbName).getCollection(collName);
+
+// Populate the donor replica set with some initial data and make sure it is majority committed.
+const majorityCommittedDocs = [{_id: 0, x: 0}, {_id: 1, x: 1}];
+assert.commandWorked(donorTestColl.insert(majorityCommittedDocs, {writeConcern: {w: "majority"}}));
+assert.eq(2, donorTestColl.find().readConcern("majority").itcount());
+
+const migrationId = UUID();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
+ tenantId: tenantId,
+};
+
+// Configure fail point to have the recipient primary hang after the cloner completes and the oplog
+// applier has started.
+let waitAfterDatabaseClone = configureFailPoint(
+ recipientPrimary, "fpAfterStartingOplogApplierMigrationRecipientInstance", {action: "hang"});
+// Configure fail point to hang the tenant oplog applier after it applies the first batch.
+let waitInOplogApplier = configureFailPoint(recipientPrimary, "hangInTenantOplogApplication");
+
+// Start a migration and wait for recipient to hang in the tenant database cloner.
+const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
+migrationThread.start();
+waitAfterDatabaseClone.wait();
+
+// Insert some writes that will eventually be picked up by the tenant oplog applier on the
+// recipient.
+const docsToApply = [{_id: 2, x: 2}, {_id: 3, x: 3}, {_id: 4, x: 4}];
+tenantMigrationTest.insertDonorDB(dbName, collName, docsToApply);
+
+// Wait for the applied oplog batch to be replicated.
+waitInOplogApplier.wait();
+recipientRst.awaitReplication();
+let local = recipientPrimary.getDB("local");
+let appliedNoOps = local.oplog.rs.find({fromTenantMigration: migrationId, op: "n"});
+let resultsArr = appliedNoOps.toArray();
+// We should have applied the no-op oplog entries for the first batch of documents (size 2).
+assert.eq(2, appliedNoOps.count(), appliedNoOps);
+// No-op entries will be in the same order.
+assert.eq(docsToApply[0], resultsArr[0].o.o, resultsArr);
+assert.eq(docsToApply[1], resultsArr[1].o.o, resultsArr);
+
+// Step up a new node in the recipient set and trigger a failover. The new primary should resume
+// fetching starting from the unapplied documents.
+const newRecipientPrimary = recipientRst.getSecondaries()[0];
+assert.commandWorked(newRecipientPrimary.adminCommand({replSetStepUp: 1}));
+waitAfterDatabaseClone.off();
+waitInOplogApplier.off();
+recipientRst.getPrimary();
+
+// The migration should go through after recipient failover.
+assert.commandWorked(migrationThread.returnData());
+// Validate that the last no-op entry is applied.
+local = newRecipientPrimary.getDB("local");
+appliedNoOps = local.oplog.rs.find({fromTenantMigration: migrationId, op: "n"});
+resultsArr = appliedNoOps.toArray();
+assert.eq(3, appliedNoOps.count(), appliedNoOps);
+assert.eq(docsToApply[2], resultsArr[2].o.o, resultsArr);
+
+tenantMigrationTest.checkTenantDBHashes(tenantId);
+tenantMigrationTest.stop();
+recipientRst.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d92e3262a38..ca18216794f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1318,6 +1318,7 @@ env.Library(
'oplog_buffer_collection',
'oplog_entry',
'oplog_fetcher',
+ 'oplog_interface_local',
'repl_server_parameters',
'replication_auth',
'tenant_migration_cloners',
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 8f82f9be8ec..d5948177dbb 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -129,7 +129,6 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
"least 1 document matching ts: "
<< lastTS.toString());
}
-
DocumentsInfo info;
// The count of the bytes of the documents read off the network.
info.networkDocumentBytes = 0;
@@ -252,6 +251,10 @@ std::string OplogFetcher::toString() {
return output;
}
+OplogFetcher::StartingPoint OplogFetcher::getStartingPoint_forTest() const {
+ return _config.startingPoint;
+}
+
OpTime OplogFetcher::getLastOpTimeFetched_forTest() const {
return _getLastOpTimeFetched();
}
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index acd43fbb18b..58a0f80f9a7 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -261,6 +261,11 @@ public:
// ================== Test support API ===================
/**
+ * Returns the StartingPoint defined in the OplogFetcher::Config.
+ */
+ StartingPoint getStartingPoint_forTest() const;
+
+ /**
* Returns the `find` query run on the sync source's oplog.
*/
BSONObj getFindQuery_forTest(long long findTimeout) const;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index a63f2094bd7..725bf40ffc4 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/oplog_buffer_collection.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
@@ -56,6 +57,7 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/future_util.h"
namespace mongo {
@@ -69,6 +71,7 @@ MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth);
MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration);
+MONGO_FAIL_POINT_DEFINE(pauseAfterCreatingOplogBuffer);
// Fails before waiting for the state doc to be majority replicated.
MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc);
@@ -484,7 +487,12 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc(
.semi();
}
-void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock) {
+void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) {
+ if (_isCloneCompletedMarkerSet(lk)) {
+ invariant(_stateDoc.getStartApplyingDonorOpTime().has_value());
+ invariant(_stateDoc.getStartFetchingDonorOpTime().has_value());
+ return;
+ }
// Get the last oplog entry at the read concern majority optime in the remote oplog. It
// does not matter which tenant it is for.
auto oplogOpTimeFields =
@@ -572,9 +580,21 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
_donorOplogBuffer = std::make_unique<OplogBufferCollection>(
StorageInterface::get(opCtx.get()), oplogBufferNs, options);
_donorOplogBuffer->startup(opCtx.get());
+
+ pauseAfterCreatingOplogBuffer.pauseWhileSet();
+
_dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>();
+ auto startFetchOpTime = *_stateDoc.getStartFetchingDonorOpTime();
+ auto resumingFromOplogBuffer = false;
+ if (_isCloneCompletedMarkerSet(lk)) {
+ auto topOfOplogBuffer = _donorOplogBuffer->lastObjectPushed(opCtx.get());
+ if (topOfOplogBuffer) {
+ startFetchOpTime = uassertStatusOK(OpTime::parseFromOplogEntry(topOfOplogBuffer.get()));
+ resumingFromOplogBuffer = true;
+ }
+ }
OplogFetcher::Config oplogFetcherConfig(
- *_stateDoc.getStartFetchingDonorOpTime(),
+ startFetchOpTime,
_oplogFetcherClient->getServerHostAndPort(),
// The config is only used for setting the awaitData timeout; the defaults are fine.
ReplSetConfig::parse(BSON("_id"
@@ -590,7 +610,9 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
oplogFetcherConfig.requestResumeToken = true;
oplogFetcherConfig.name =
"TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString();
- oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc;
+ oplogFetcherConfig.startingPoint = resumingFromOplogBuffer
+ ? OplogFetcher::StartingPoint::kSkipFirstDoc
+ : OplogFetcher::StartingPoint::kEnqueueFirstDoc;
_donorOplogFetcher = (*_createOplogFetcherFn)(
(**_scopedExecutor).get(),
@@ -705,6 +727,54 @@ bool TenantMigrationRecipientService::Instance::_isCloneCompletedMarkerSet(WithL
return _stateDoc.getCloneFinishedRecipientOpTime().has_value();
}
+OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOptime(
+ const OpTime startApplyingDonorOpTime, const OpTime cloneFinishedRecipientOpTime) const {
+ invariant(_stateDoc.getCloneFinishedRecipientOpTime().has_value());
+ auto opCtx = cc().makeOperationContext();
+ OplogInterfaceLocal oplog(opCtx.get());
+ auto oplogIter = oplog.makeIterator();
+ auto result = oplogIter->next();
+
+ while (result.isOK()) {
+ const auto oplogObj = result.getValue().first;
+ auto swRecipientOpTime = repl::OpTime::parseFromOplogEntry(oplogObj);
+ uassert(5272311,
+ str::stream() << "Unable to parse opTime from oplog entry: " << redact(oplogObj)
+ << ", error: " << swRecipientOpTime.getStatus(),
+ swRecipientOpTime.isOK());
+ if (swRecipientOpTime.getValue() <= cloneFinishedRecipientOpTime) {
+ break;
+ }
+ const bool isFromCurrentMigration = oplogObj.hasField("fromTenantMigration") &&
+ (uassertStatusOK(UUID::parse(oplogObj.getField("fromTenantMigration"))) ==
+ getMigrationUUID());
+ // Find the most recent no-op oplog entry from the current migration.
+ if (isFromCurrentMigration &&
+ (oplogObj.getStringField("op") == OpType_serializer(repl::OpTypeEnum::kNoop))) {
+ const auto migratedEntryObj = oplogObj.getObjectField("o");
+ const auto swDonorOpTime = repl::OpTime::parseFromOplogEntry(migratedEntryObj);
+ uassert(5272305,
+ str::stream() << "Unable to parse opTime from tenant migration oplog entry: "
+ << redact(oplogObj) << ", error: " << swDonorOpTime.getStatus(),
+ swDonorOpTime.isOK());
+ if (swDonorOpTime.getValue() < startApplyingDonorOpTime) {
+ break;
+ }
+ LOGV2_DEBUG(5272302,
+ 1,
+ "Found an optime to resume oplog application from",
+ "opTime"_attr = swDonorOpTime.getValue());
+ return swDonorOpTime.getValue();
+ }
+ result = oplogIter->next();
+ }
+ LOGV2_DEBUG(5272304,
+ 1,
+ "Resuming oplog application from startApplyingDonorOpTime",
+ "opTime"_attr = startApplyingDonorOpTime);
+ return startApplyingDonorOpTime;
+}
+
Future<void> TenantMigrationRecipientService::Instance::_startTenantAllDatabaseCloner(WithLock lk) {
// If the state is data consistent, do not start the cloner.
if (_isCloneCompletedMarkerSet(lk)) {
@@ -1096,7 +1166,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
.then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance);
- stdx::lock_guard lk(_mutex);
+ stdx::unique_lock lk(_mutex);
{
// Throwing error when cloner is canceled externally via interrupt(), makes the
@@ -1110,20 +1180,37 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// Create the oplog applier but do not start it yet.
invariant(_stateDoc.getStartApplyingDonorOpTime());
+
+ OpTime beginApplyingAfterOpTime;
+ bool isResuming = false;
+ if (_isCloneCompletedMarkerSet(lk)) {
+ const auto startApplyingDonorOpTime = *_stateDoc.getStartApplyingDonorOpTime();
+ const auto cloneFinishedRecipientOptime =
+ *_stateDoc.getCloneFinishedRecipientOpTime();
+ lk.unlock();
+ // We avoid holding the mutex while scanning the local oplog which acquires the RSTL
+ // in IX mode. This is to allow us to be interruptable via a concurrent stepDown
+ // which acquires the RSTL in X mode.
+ beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime(
+ startApplyingDonorOpTime, cloneFinishedRecipientOptime);
+ isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime;
+ lk.lock();
+ } else {
+ beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime();
+ }
LOGV2_DEBUG(4881202,
1,
"Recipient migration service creating oplog applier",
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID(),
- "startApplyingDonorOpTime"_attr = *_stateDoc.getStartApplyingDonorOpTime());
-
- _tenantOplogApplier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- _tenantId,
- *_stateDoc.getStartApplyingDonorOpTime(),
- _donorOplogBuffer.get(),
- **_scopedExecutor,
- _writerPool.get());
+ "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime);
+ _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid,
+ _tenantId,
+ beginApplyingAfterOpTime,
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get(),
+ isResuming);
// Start the cloner.
auto clonerFuture = _startTenantAllDatabaseCloner(lk);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 5b8cf10bf47..ddcb0b16b2e 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -306,7 +306,7 @@ public:
/**
* Retrieves the start optimes from the donor and updates the in-memory state accordingly.
*/
- void _getStartOpTimesFromDonor(WithLock);
+ void _getStartOpTimesFromDonor(WithLock lk);
/**
* Pushes documents from oplog fetcher to oplog buffer.
@@ -340,6 +340,14 @@ public:
bool _isCloneCompletedMarkerSet(WithLock) const;
/*
+ * Traverse backwards through the oplog to find the optime which tenant oplog application
+ * should resume from. The oplog applier should resume applying entries that have a greater
+ * optime than the returned value.
+ */
+ OpTime _getOplogResumeApplyingDonorOptime(const OpTime startApplyingDonorOpTime,
+ const OpTime cloneFinishedRecipientOpTime) const;
+
+ /*
* Starts the tenant cloner.
* Returns future that will be fulfilled when the cloner completes.
*/
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index fd04b139e13..26cb2aad961 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -101,6 +101,23 @@ OplogEntry makeOplogEntry(OpTime opTime,
boost::none)}; // _id
}
+MutableOplogEntry makeNoOpOplogEntry(OpTime opTime,
+ NamespaceString nss,
+ OptionalCollectionUUID uuid,
+ BSONObj o,
+ boost::optional<UUID> migrationUUID) {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setOpTime(opTime);
+ oplogEntry.setNss(nss);
+ oplogEntry.setObject(o);
+ oplogEntry.setWallClockTime(Date_t::now());
+ if (migrationUUID) {
+ oplogEntry.setFromTenantMigration(migrationUUID.get());
+ }
+ return oplogEntry;
+}
+
/**
* Generates a listDatabases response for an TenantAllDatabaseCloner to consume.
*/
@@ -324,6 +341,11 @@ protected:
return instance->_donorOplogBuffer.get();
}
+ TenantOplogApplier* getTenantOplogApplier(
+ const TenantMigrationRecipientService::Instance* instance) const {
+ return instance->_tenantOplogApplier.get();
+ }
+
const TenantMigrationRecipientDocument& getStateDoc(
const TenantMigrationRecipientService::Instance* instance) const {
return instance->_stateDoc;
@@ -341,6 +363,23 @@ protected:
*/
Date_t now() {
return _clkSource->now();
+ };
+
+ /*
+ * Populates the migration state document to simulate a recipient service restart where cloning
+ * has already finished. This requires the oplog buffer to contain an oplog entry with the
+ * optime to resume from. Otherwise, oplog application will fail when the OplogBatcher seeks
+ * to the resume timestamp.
+ */
+ void updateStateDocToCloningFinished(TenantMigrationRecipientDocument& initialStateDoc,
+ OpTime cloneFinishedRecipientOpTime,
+ OpTime dataConsistentStopDonorOpTime,
+ OpTime startApplyingDonorOpTime,
+ OpTime startFetchingDonorOptime) {
+ initialStateDoc.setCloneFinishedRecipientOpTime(cloneFinishedRecipientOpTime);
+ initialStateDoc.setDataConsistentStopDonorOpTime(dataConsistentStopDonorOpTime);
+ initialStateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime);
+ initialStateDoc.setStartFetchingDonorOpTime(startFetchingDonorOptime);
}
private:
@@ -1259,9 +1298,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
kRecipientPEMPayload);
- // Setting these causes us to skip cloning.
- initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
- initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
auto opCtx = makeOperationContext();
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
@@ -1296,6 +1337,591 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat
ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
+TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromTopOfOplogBuffer) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime initialOpTime(Timestamp(1, 1), 1);
+ const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, initialOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ kRecipientPEMPayload);
+
+ // We skip cloning here as a way to simulate that the recipient service has detected an existing
+ // migration on startup and will resume oplog fetching from the appropriate optime.
+ updateStateDocToCloningFinished(
+ initialStateDocument, initialOpTime, dataConsistentOpTime, initialOpTime, initialOpTime);
+
+ // Hang after creating the oplog buffer collection but before starting the oplog fetcher.
+ const auto hangBeforeFetcherFp =
+ globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer");
+ auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ hangBeforeFetcherFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ const auto oplogBuffer = getDonorOplogBuffer(instance.get());
+ OplogBuffer::Batch batch1;
+ const OpTime resumeOpTime(Timestamp(2, 1), initialOpTime.getTerm());
+ auto resumeOplogBson = makeOplogEntry(resumeOpTime,
+ OpTypeEnum::kInsert,
+ NamespaceString("tenantA_foo.bar"),
+ UUID::gen(),
+ BSON("doc" << 2),
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON();
+ batch1.push_back(resumeOplogBson);
+ oplogBuffer->push(opCtx.get(), batch1.cbegin(), batch1.cend());
+ ASSERT_EQUALS(oplogBuffer->getCount(), 1);
+
+ // Continue the recipient service to hang after starting the oplog applier.
+ const auto hangAfterStartingOplogApplier =
+ globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
+ initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ hangBeforeFetcherFp->setMode(FailPoint::off);
+ hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
+
+ // The oplog fetcher should exist and be running.
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+ // The oplog fetcher should have started fetching from resumeOpTime.
+ ASSERT_EQUALS(oplogFetcher->getLastOpTimeFetched_forTest(), resumeOpTime);
+ ASSERT(oplogFetcher->getStartingPoint_forTest() == OplogFetcher::StartingPoint::kSkipFirstDoc);
+
+ hangAfterStartingOplogApplier->setMode(FailPoint::off);
+
+ // Feed the oplog fetcher the last doc required for us to be considered consistent.
+ auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime,
+ OpTypeEnum::kInsert,
+ NamespaceString("tenantA_foo.bar"),
+ UUID::gen(),
+ BSON("doc" << 3),
+ boost::none /* o2 */);
+ oplogFetcher->receiveBatch(
+ 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp());
+
+ LOGV2(5272308,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ // Stop the oplog applier.
+ instance->stopOplogApplier_forTest();
+ // Wait for task completion. Since we're using a test function to cancel the applier,
+ // the actual result is not critical.
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFrom) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime initialOpTime(Timestamp(1, 1), 1);
+ const OpTime startFetchingOpTime(Timestamp(2, 1), 1);
+ const OpTime clonerFinishedOpTime(Timestamp(3, 1), 1);
+ const OpTime resumeFetchingOpTime(Timestamp(4, 1), 1);
+ const OpTime dataConsistentOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, initialOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ kRecipientPEMPayload);
+
+ // We skip cloning here as a way to simulate that the recipient service has detected an existing
+ // migration on startup and will attempt to resume oplog fetching from the appropriate optime.
+ updateStateDocToCloningFinished(initialStateDocument,
+ clonerFinishedOpTime /* clonerFinishedRecipientOpTime */,
+ dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
+ startFetchingOpTime /* startApplyingDonorOpTime */,
+ startFetchingOpTime /* startFetchingDonorOpTime */);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+
+ // Hang after creating the oplog buffer collection but before starting the oplog fetcher.
+ const auto hangBeforeFetcherFp =
+ globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer");
+ auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ hangBeforeFetcherFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ // There are no documents in the oplog buffer to resume fetching from.
+ const auto oplogBuffer = getDonorOplogBuffer(instance.get());
+ ASSERT_EQUALS(oplogBuffer->getCount(), 0);
+
+ // Continue and hang before starting the oplog applier.
+ const auto hangAfterStartingOplogApplier =
+ globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
+ hangBeforeFetcherFp->setMode(FailPoint::off);
+ initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
+
+ // The oplog fetcher should exist and be running.
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+ // The oplog fetcher should have started fetching from 'startFetchingOpTime'. Since no document
+ // was found in the oplog buffer, we should have set the 'StartingPoint' to 'kEnqueueFirstDoc'.
+ ASSERT_EQUALS(oplogFetcher->getLastOpTimeFetched_forTest(), startFetchingOpTime);
+ ASSERT(oplogFetcher->getStartingPoint_forTest() ==
+ OplogFetcher::StartingPoint::kEnqueueFirstDoc);
+
+ // Feed the oplog fetcher the last doc required for the recipient to be considered consistent.
+ const auto tenantNss = NamespaceString("tenantA_foo.bar");
+ auto resumeFetchingOplogEntry = makeOplogEntry(resumeFetchingOpTime,
+ OpTypeEnum::kInsert,
+ tenantNss,
+ UUID::gen(),
+ BSON("doc" << 1),
+ boost::none /* o2 */);
+ auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime,
+ OpTypeEnum::kInsert,
+ tenantNss,
+ UUID::gen(),
+ BSON("doc" << 3),
+ boost::none /* o2 */);
+ oplogFetcher->receiveBatch(1,
+ {resumeFetchingOplogEntry.getEntry().toBSON(),
+ dataConsistentOplogEntry.getEntry().toBSON()},
+ dataConsistentOpTime.getTimestamp());
+
+ // Allow the service to continue.
+ hangAfterStartingOplogApplier->setMode(FailPoint::off);
+ LOGV2(5272310,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ // Stop the oplog applier.
+ instance->stopOplogApplier_forTest();
+ // Wait for task completion. Since we're using a test function to cancel the applier,
+ // the actual result is not critical.
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplogEntry) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime initialOpTime(Timestamp(1, 1), 1);
+ const OpTime clonerFinishedOpTime(Timestamp(2, 1), 1);
+ const OpTime resumeOpTime(Timestamp(3, 1), 1);
+ const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, initialOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ kRecipientPEMPayload);
+
+ // We skip cloning here as a way to simulate that the recipient service has detected an existing
+ // migration on startup and will attempt to resume oplog fetching from the appropriate optime.
+ updateStateDocToCloningFinished(initialStateDocument,
+ clonerFinishedOpTime /* cloneFinishedRecipientOpTime */,
+ dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
+ clonerFinishedOpTime /* startApplyingDonorOpTime */,
+ initialOpTime /* startFetchingDonorOpTime */);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+ // Create and insert two tenant migration no-op entries into the oplog. The oplog applier should
+ // resume from the no-op entry with the most recent donor opTime.
+ const auto insertNss = NamespaceString("tenantA_foo.bar");
+ const auto earlierOplogBson = makeOplogEntry(clonerFinishedOpTime,
+ OpTypeEnum::kInsert,
+ insertNss,
+ UUID::gen(),
+ BSON("doc" << 1),
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON();
+ const auto resumeOplogBson = makeOplogEntry(resumeOpTime,
+ OpTypeEnum::kInsert,
+ insertNss,
+ UUID::gen(),
+ BSON("doc" << 2),
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON();
+ auto storage = StorageInterface::get(opCtx->getServiceContext());
+ const auto oplogNss = NamespaceString::kRsOplogNamespace;
+ const OpTime earlierRecipientOpTime(Timestamp(9, 1), 1);
+ const OpTime resumeRecipientOpTime(Timestamp(10, 1), 1);
+ auto earlierNoOpEntry = makeNoOpOplogEntry(earlierRecipientOpTime,
+ insertNss,
+ UUID::gen(),
+ earlierOplogBson,
+ instance->getMigrationUUID());
+ auto resumeNoOpEntry = makeNoOpOplogEntry(resumeRecipientOpTime,
+ insertNss,
+ UUID::gen(),
+ resumeOplogBson,
+ instance->getMigrationUUID());
+ ASSERT_OK(
+ storage->insertDocument(opCtx.get(),
+ oplogNss,
+ {earlierNoOpEntry.toBSON(), earlierRecipientOpTime.getTimestamp()},
+ earlierRecipientOpTime.getTerm()));
+ ASSERT_OK(
+ storage->insertDocument(opCtx.get(),
+ oplogNss,
+ {resumeNoOpEntry.toBSON(), resumeRecipientOpTime.getTimestamp()},
+ resumeRecipientOpTime.getTerm()));
+
+ // Hang before starting the oplog applier.
+ const auto hangAfterStartingOplogApplier =
+ globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
+ auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
+
+ auto oplogFetcher = getDonorOplogFetcher(instance.get());
+ auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime,
+ OpTypeEnum::kInsert,
+ insertNss,
+ UUID::gen(),
+ BSON("doc" << 3),
+ boost::none /* o2 */);
+ // Feed the oplog fetcher the last doc required for the recipient to be considered consistent.
+ oplogFetcher->receiveBatch(
+ 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp());
+
+ // Allow the service to continue.
+ hangAfterStartingOplogApplier->setMode(FailPoint::off);
+ LOGV2(5272350,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ // The oplog applier should have started applying at the 'resumeOpTime'.
+ const auto oplogApplier = getTenantOplogApplier(instance.get());
+ ASSERT_EQUALS(resumeOpTime, oplogApplier->getBeginApplyingOpTime_forTest());
+
+ // Stop the oplog applier.
+ instance->stopOplogApplier_forTest();
+ // Wait for task completion. Since we're using a test function to cancel the applier,
+ // the actual result is not critical.
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApplyingOpTime) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime initialOpTime(Timestamp(1, 1), 1);
+ const OpTime startApplyingOpTime(Timestamp(2, 1), 1);
+ const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, initialOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ kRecipientPEMPayload);
+
+ // We skip cloning here as a way to simulate that the recipient service has detected an existing
+ // migration on startup and will attempt to resume oplog fetching from the appropriate optime.
+ updateStateDocToCloningFinished(initialStateDocument,
+ OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime */,
+ dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
+ startApplyingOpTime /* startApplyingDonorOpTime */,
+ initialOpTime /* startFetchingDonorOpTime */);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ // Create and insert the following into the oplog:
+ // - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'.
+ // - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'.
+ // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o' field. The donor opTime
+ // is less than the 'startApplyingDonorOpTime'.
+ // - (4) A no-op oplog entry with an inner oplog entry as the 'o' field but no 'fromMigrate'
+ // field. These oplog entries do not satisfy the conditions for the oplog applier to resume from
+ // so we default to resuming from 'startDonorApplyingOpTime'.
+ const auto insertNss = NamespaceString("tenantA_foo.bar");
+ const auto entryBeforeStartApplyingOpTime = makeOplogEntry(
+ initialOpTime,
+ OpTypeEnum::kInsert,
+ insertNss,
+ UUID::gen(),
+ BSON("doc"
+ << "before startApplyingDonorOpTime"),
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON();
+ const auto afterStartApplyingOpTime = OpTime(Timestamp(3, 1), 1);
+ const auto entryAfterStartApplyingOpTime = makeOplogEntry(
+ afterStartApplyingOpTime,
+ OpTypeEnum::kInsert,
+ insertNss,
+ UUID::gen(),
+ BSON("doc"
+ << "after startApplyingDonorOpTime"),
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON();
+ auto storage = StorageInterface::get(opCtx->getServiceContext());
+ const auto oplogNss = NamespaceString::kRsOplogNamespace;
+ const auto collUuid = UUID::gen();
+ std::vector<DurableOplogEntry> oplogEntries;
+ std::vector<MutableOplogEntry> noOpEntries;
+ // (1)
+ oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(9, 1), 1),
+ OpTypeEnum::kInsert,
+ insertNss,
+ collUuid,
+ BSON("doc"
+ << "before clonerFinishedOpTime"),
+ boost::none /* o2 */)
+ .getEntry());
+ // (2)
+ oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(11, 1), 1),
+ OpTypeEnum::kInsert,
+ insertNss,
+ collUuid,
+ BSON("doc"
+ << "after clonerFinishedOpTime"),
+ boost::none /* o2 */)
+ .getEntry());
+ // (3)
+ noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(12, 1), 1),
+ insertNss,
+ collUuid,
+ entryBeforeStartApplyingOpTime,
+ instance->getMigrationUUID()));
+ // (4)
+ noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(13, 1), 1),
+ insertNss,
+ collUuid,
+ entryAfterStartApplyingOpTime,
+ boost::none /* o2 */));
+ for (auto entry : oplogEntries) {
+ auto opTime = entry.getOpTime();
+ ASSERT_OK(storage->insertDocument(
+ opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm()));
+ }
+ for (auto entry : noOpEntries) {
+ auto opTime = entry.getOpTime();
+ ASSERT_OK(storage->insertDocument(
+ opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm()));
+ }
+
+ // Hang before starting the oplog applier.
+ const auto hangAfterStartingOplogApplier =
+ globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
+ auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
+
+ auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime,
+ OpTypeEnum::kInsert,
+ NamespaceString("tenantA_foo.bar"),
+ UUID::gen(),
+ BSON("doc" << 3),
+ boost::none /* o2 */);
+
+ auto oplogFetcher = getDonorOplogFetcher(instance.get());
+ // Feed the oplog fetcher the last doc required for the recipient to be considered consistent.
+ oplogFetcher->receiveBatch(
+ 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp());
+
+ // Allow the service to continue.
+ hangAfterStartingOplogApplier->setMode(FailPoint::off);
+ LOGV2(5272340,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ // The oplog applier starts applying from the first opTime after the 'beginApplyingOpTime'.
+ const auto oplogApplier = getTenantOplogApplier(instance.get());
+ ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getBeginApplyingOpTime_forTest());
+
+ // Stop the oplog applier.
+ instance->stopOplogApplier_forTest();
+ // Wait for task completion. Since we're using a test function to cancel the applier,
+ // the actual result is not critical.
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest,
+ OplogFetcherResumesFromStartFetchingOpTimeWithDocInBuffer) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime initialOpTime(Timestamp(1, 1), 1);
+ const OpTime startFetchingOpTime(Timestamp(2, 1), 1);
+ const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, initialOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ kRecipientPEMPayload);
+
+ // We skip cloning here as a way to simulate that the recipient service has detected an existing
+ // migration on startup and will resume oplog fetching from the appropriate optime.
+ updateStateDocToCloningFinished(initialStateDocument,
+ startFetchingOpTime,
+ dataConsistentOpTime,
+ startFetchingOpTime,
+ startFetchingOpTime);
+
+ // Hang after creating the oplog buffer collection but before starting the oplog fetcher.
+ const auto hangBeforeFetcherFp =
+ globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer");
+ auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ hangBeforeFetcherFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ // Insert the first document with 'startFetchingOpTime' into the oplog buffer. The fetcher
+ // should know to skip this document on service restart.
+ const auto oplogBuffer = getDonorOplogBuffer(instance.get());
+ OplogBuffer::Batch batch1;
+ batch1.push_back(makeOplogEntry(startFetchingOpTime,
+ OpTypeEnum::kInsert,
+ NamespaceString("tenantA_foo.bar"),
+ UUID::gen(),
+ BSON("doc" << 2),
+ boost::none /* o2 */)
+ .getEntry()
+ .toBSON());
+ oplogBuffer->push(opCtx.get(), batch1.cbegin(), batch1.cend());
+ ASSERT_EQUALS(oplogBuffer->getCount(), 1);
+
+ auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime,
+ OpTypeEnum::kInsert,
+ NamespaceString("tenantA_foo.bar"),
+ UUID::gen(),
+ BSON("doc" << 3),
+ boost::none /* o2 */);
+ // Continue the recipient service to hang before starting the oplog applier.
+ const auto hangAfterStartingOplogApplier =
+ globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
+ initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ hangBeforeFetcherFp->setMode(FailPoint::off);
+ hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
+
+ // The oplog fetcher should exist and be running.
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+ // The oplog fetcher should have started fetching from 'startFetchingOpTime'. However, the
+ // fetcher should skip the first doc from being fetched since it already exists in the buffer.
+ ASSERT_EQUALS(oplogFetcher->getLastOpTimeFetched_forTest(), startFetchingOpTime);
+ ASSERT(oplogFetcher->getStartingPoint_forTest() == OplogFetcher::StartingPoint::kSkipFirstDoc);
+
+ // Feed the oplog fetcher the last doc required for us to be considered consistent.
+ oplogFetcher->receiveBatch(
+ 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp());
+
+ // Allow the service to continue.
+ hangAfterStartingOplogApplier->setMode(FailPoint::off);
+ LOGV2(5272317,
+ "Waiting for recipient service to reach consistent state",
+ "suite"_attr = _agent.getSuiteName(),
+ "test"_attr = _agent.getTestName());
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+
+ // Stop the oplog applier.
+ instance->stopOplogApplier_forTest();
+ // Wait for task completion. Since we're using a test function to cancel the applier,
+ // the actual result is not critical.
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) {
const UUID migrationUUID = UUID::gen();
const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
@@ -1311,9 +1937,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) {
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
kRecipientPEMPayload);
- // Setting these causes us to skip cloning.
- initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
- initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
auto opCtx = makeOperationContext();
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
@@ -1373,9 +2001,11 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) {
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
kRecipientPEMPayload);
- // Setting these causes us to skip cloning.
- initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
- initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
auto opCtx = makeOperationContext();
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
@@ -1732,9 +2362,11 @@ TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterConsis
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
kRecipientPEMPayload);
- // Setting these causes us to skip cloning.
- initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
- initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
auto opCtx = makeOperationContext();
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
@@ -1816,9 +2448,11 @@ TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterFail)
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
kRecipientPEMPayload);
- // Setting these causes us to skip cloning.
- initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
- initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
auto opCtx = makeOperationContext();
std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 54698b77f00..c36d5c73d64 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -55,19 +55,23 @@
namespace mongo {
namespace repl {
+MONGO_FAIL_POINT_DEFINE(hangInTenantOplogApplication);
+
TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
const std::string& tenantId,
OpTime applyFromOpTime,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
- ThreadPool* writerPool)
+ ThreadPool* writerPool,
+ const bool isResuming)
: AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId),
_migrationUuid(migrationUuid),
_tenantId(tenantId),
_beginApplyingAfterOpTime(applyFromOpTime),
_oplogBuffer(oplogBuffer),
_executor(std::move(executor)),
- _writerPool(writerPool) {}
+ _writerPool(writerPool),
+ _isResuming(isResuming) {}
TenantOplogApplier::~TenantOplogApplier() {
shutdown();
@@ -93,8 +97,17 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo
return iter->second.getFuture().semi();
}
+OpTime TenantOplogApplier::getBeginApplyingOpTime_forTest() const {
+ return _beginApplyingAfterOpTime;
+}
+
Status TenantOplogApplier::_doStartup_inlock() noexcept {
- _oplogBatcher = std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor);
+ Timestamp resumeTs;
+ if (_isResuming) {
+ resumeTs = _beginApplyingAfterOpTime.getTimestamp();
+ }
+ _oplogBatcher =
+ std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor, resumeTs);
auto status = _oplogBatcher->startup();
if (!status.isOK())
return status;
@@ -288,6 +301,18 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
iter->second.emplaceValue(_lastAppliedOpTimesUpToLastBatch);
}
_opTimeNotificationList.erase(_opTimeNotificationList.begin(), firstUnexpiredIter);
+
+ hangInTenantOplogApplication.executeIf(
+ [&](const BSONObj& data) {
+ LOGV2(
+ 5272315,
+ "hangInTenantOplogApplication failpoint enabled -- blocking until it is disabled.",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "lastBatchCompletedOpTimes"_attr = lastBatchCompletedOpTimes);
+ hangInTenantOplogApplication.pauseWhileSet(opCtx.get());
+ },
+ [&](const BSONObj& data) { return !lastBatchCompletedOpTimes.recipientOpTime.isNull(); });
}
void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index ce9bd29136d..bae6772f296 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -76,7 +76,8 @@ public:
OpTime applyFromOpTime,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
- ThreadPool* writerPool);
+ ThreadPool* writerPool,
+ const bool isResuming = false);
virtual ~TenantOplogApplier();
@@ -89,6 +90,11 @@ public:
*/
SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime);
+ /**
+ * Returns the optime the applier will start applying from. Used for testing.
+ */
+ OpTime getBeginApplyingOpTime_forTest() const;
+
private:
Status _doStartup_inlock() noexcept final;
void _doShutdown_inlock() noexcept final;
@@ -153,6 +159,7 @@ private:
Status _finalStatus = Status::OK(); // (M)
stdx::unordered_set<UUID, UUID::Hash> _knownGoodUuids; // (X)
bool _applyLoopApplyingBatch = false; // (M)
+ const bool _isResuming; // (R)
};
/**
diff --git a/src/mongo/db/repl/tenant_oplog_batcher.cpp b/src/mongo/db/repl/tenant_oplog_batcher.cpp
index c564d4b0119..47cbbef1c5d 100644
--- a/src/mongo/db/repl/tenant_oplog_batcher.cpp
+++ b/src/mongo/db/repl/tenant_oplog_batcher.cpp
@@ -41,10 +41,12 @@ namespace mongo {
namespace repl {
TenantOplogBatcher::TenantOplogBatcher(const std::string& tenantId,
RandomAccessOplogBuffer* oplogBuffer,
- std::shared_ptr<executor::TaskExecutor> executor)
+ std::shared_ptr<executor::TaskExecutor> executor,
+ Timestamp resumeBatchingTs)
: AbstractAsyncComponent(executor.get(), std::string("TenantOplogBatcher_") + tenantId),
_oplogBuffer(oplogBuffer),
- _executor(executor) {}
+ _executor(executor),
+ _resumeBatchingTs(resumeBatchingTs) {}
TenantOplogBatcher::~TenantOplogBatcher() {
shutdown();
@@ -208,6 +210,27 @@ SemiFuture<TenantOplogBatch> TenantOplogBatcher::getNextBatch(BatchLimits limits
Status TenantOplogBatcher::_doStartup_inlock() noexcept {
LOGV2_DEBUG(
4885604, 1, "Tenant Oplog Batcher starting up", "component"_attr = _getComponentName());
+ if (!_resumeBatchingTs.isNull()) {
+ auto opCtx = cc().makeOperationContext();
+ uassert(5272303,
+ str::stream() << "Error resuming oplog batcher",
+ _oplogBuffer
+ ->seekToTimestamp(opCtx.get(),
+ _resumeBatchingTs,
+ RandomAccessOplogBuffer::SeekStrategy::kInexact)
+ .isOK());
+ // Doing a 'seekToTimestamp' will not set the '_lastPoppedKey' on its own if a document
+ // with '_resumeBatchingTs' exists in the buffer collection. We do a 'tryPop' here to set
+ // '_lastPoppedKey' to equal '_resumeBatchingTs'.
+ if (_oplogBuffer->findByTimestamp(opCtx.get(), _resumeBatchingTs).isOK()) {
+ BSONObj opToPopAndDiscard;
+ _oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard);
+ }
+ LOGV2_DEBUG(5272306,
+ 1,
+ "Tenant Oplog Batcher will resume batching from after timestamp",
+ "timestamp"_attr = _resumeBatchingTs);
+ }
return Status::OK();
}
diff --git a/src/mongo/db/repl/tenant_oplog_batcher.h b/src/mongo/db/repl/tenant_oplog_batcher.h
index ddb8f769111..c8b51f0cbd9 100644
--- a/src/mongo/db/repl/tenant_oplog_batcher.h
+++ b/src/mongo/db/repl/tenant_oplog_batcher.h
@@ -75,7 +75,8 @@ public:
TenantOplogBatcher(const std::string& tenantId,
RandomAccessOplogBuffer* oplogBuffer,
- std::shared_ptr<executor::TaskExecutor> executor);
+ std::shared_ptr<executor::TaskExecutor> executor,
+ const Timestamp resumeBatchingTs);
virtual ~TenantOplogBatcher();
@@ -114,6 +115,7 @@ private:
RandomAccessOplogBuffer* _oplogBuffer; // (S)
bool _batchRequested = false; // (M)
std::shared_ptr<executor::TaskExecutor> _executor; // (R)
+ const Timestamp _resumeBatchingTs; // (R)
};
} // namespace repl
diff --git a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp
index 18ca8741d6a..7bb3d2273f2 100644
--- a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp
@@ -49,6 +49,7 @@ class TenantOplogBatcherTest : public unittest::Test, public ScopedGlobalService
public:
void setUp() override {
unittest::Test::setUp();
+ Client::initThread("TenantOplogBatcherTest");
auto network = std::make_unique<executor::NetworkInterfaceMock>();
_net = network.get();
executor::ThreadPoolMock::Options thread_pool_options;
@@ -58,6 +59,10 @@ public:
_oplogBuffer.startup(nullptr);
}
+ void tearDown() override {
+ Client::releaseCurrent();
+ }
+
protected:
TenantOplogBatcher::BatchLimits bigBatchLimits =
TenantOplogBatcher::BatchLimits(1ULL << 32, 1ULL << 32);
@@ -94,7 +99,8 @@ std::string toString(TenantOplogBatch& batch) {
constexpr auto dbName = "tenant_test"_sd;
TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) {
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
// We just started, no batch should be available.
@@ -107,7 +113,8 @@ TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) {
}
TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) {
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
// We just started, no batch should be available.
@@ -131,7 +138,8 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) {
}
TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) {
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
@@ -145,7 +153,8 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) {
}
TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedCommit) {
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
@@ -176,7 +185,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWith
srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps).getEntry().toBSON());
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON());
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
@@ -209,7 +219,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) {
srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps1).getEntry().toBSON());
srcOps.push_back(makeApplyOpsOplogEntry(2, false, innerOps2).getEntry().toBSON());
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
@@ -252,7 +263,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOp
// Set batch limits so that each batch contains a maximum of 'BatchLimit::ops'.
auto limits = bigBatchLimits;
limits.ops = 3U;
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(limits);
@@ -283,7 +295,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOper
// Set batch limits so that only the first two operations can fit into the first batch.
auto limits = bigBatchLimits;
limits.bytes = std::size_t(srcOps[0].objsize() + srcOps[1].objsize());
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(limits);
@@ -328,7 +341,8 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded)
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
@@ -380,7 +394,8 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded)
}
TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) {
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
// bigBatchLimits is a legal batch limit.
auto limits = bigBatchLimits;
@@ -392,7 +407,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) {
}
TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchSizeLimits) {
- auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor);
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
ASSERT_OK(batcher->startup());
// bigBatchLimits is a legal batch limit.
auto limits = bigBatchLimits;
@@ -403,5 +419,49 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchSizeLimits) {
batcher->join();
}
+TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromTimestamp) {
+ std::vector<BSONObj> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).getEntry().toBSON());
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON());
+ srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")).getEntry().toBSON());
+ srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")).getEntry().toBSON());
+ srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON());
+ _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
+
+ auto batcher =
+ std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(4, 1));
+ ASSERT_OK(batcher->startup());
+
+ auto batchFuture = batcher->getNextBatch(bigBatchLimits);
+
+ auto batch = batchFuture.get();
+ ASSERT_EQUALS(1U, batch.ops.size()) << toString(batch);
+ ASSERT_BSONOBJ_EQ(srcOps[4], batch.ops[0].entry.getEntry().toBSON());
+
+ batcher->shutdown();
+ batcher->join();
+}
+
+TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromNonExistentTimestamp) {
+ std::vector<BSONObj> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")).getEntry().toBSON());
+ srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON());
+ _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
+
+ auto batcher =
+ std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(3, 1));
+ ASSERT_OK(batcher->startup());
+
+ auto batchFuture = batcher->getNextBatch(bigBatchLimits);
+
+ auto batch = batchFuture.get();
+ ASSERT_EQUALS(2U, batch.ops.size()) << toString(batch);
+ ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON());
+ ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON());
+
+ batcher->shutdown();
+ batcher->join();
+}
+
} // namespace repl
} // namespace mongo