summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorSuganthi Mani <suganthi.mani@mongodb.com>2020-12-02 07:13:29 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-02 12:44:04 +0000
commit2e916b7179a28923127a2933c58e109b5daf20a7 (patch)
treedb12695184f270be3f09271a90aa15a89de017c0 /src/mongo/db/repl
parent6cf43fbc5c296c26ec03193c70c5fc3ba742c942 (diff)
downloadmongo-2e916b7179a28923127a2933c58e109b5daf20a7.tar.gz
SERVER-52934 Tenant oplog applier typo error fix and unit test failure fix.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp2
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp254
2 files changed, 128 insertions, 128 deletions
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 8b0356a1b29..afc287175cc 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -94,7 +94,7 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo
}
Status TenantOplogApplier::_doStartup_inlock() noexcept {
- _oplogBatcher = std::make_unique<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor);
+ _oplogBatcher = std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor);
auto status = _oplogBatcher->startup();
if (!status.isOK())
return status;
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index cee53e4286e..02e654a2d01 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -202,18 +202,18 @@ TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) {
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
+ ASSERT_OK(applier->startup());
// Even if we wait for the first op in a batch, it is the last op we should be notified on.
- auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get();
+ auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get();
ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime);
auto entries = _opObserver->getEntries();
ASSERT_EQ(2, entries.size());
assertNoOpMatches(srcOps[0], entries[0]);
assertNoOpMatches(srcOps[1], entries[1]);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) {
@@ -226,19 +226,19 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) {
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
+ ASSERT_OK(applier->startup());
// Even if we wait for the first op in a batch, it is the last op we should be notified on.
- auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get();
+ auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get();
ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime);
auto entries = _opObserver->getEntries();
ASSERT_EQ(srcOps.size(), entries.size());
for (size_t i = 0; i < srcOps.size(); i++) {
assertNoOpMatches(srcOps[i], entries[i]);
}
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
@@ -250,15 +250,15 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */);
tenantApplierBatchSizeOps.store(2 /* ops */);
- ASSERT_OK(applier.startup());
- auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
- auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime());
+ ASSERT_OK(applier->startup());
+ auto firstBatchFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime());
+ auto secondBatchFuture = applier->getNotificationForOpTime(srcOps[2].getOpTime());
pushOps(srcOps);
// We should see the last batch optime for each batch in our notifications.
ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime);
@@ -269,8 +269,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
assertNoOpMatches(srcOps[1], entries[1]);
assertNoOpMatches(srcOps[2], entries[2]);
assertNoOpMatches(srcOps[3], entries[3]);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
@@ -291,14 +291,14 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
+ ASSERT_OK(applier->startup());
// The first two ops should come in the first batch.
- auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
+ auto firstBatchFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime());
ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime);
// The last op is in a batch by itself.
- auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime());
+ auto secondBatchFuture = applier->getNotificationForOpTime(srcOps[2].getOpTime());
ASSERT_EQ(srcOps[2].getOpTime(), secondBatchFuture.get().donorOpTime);
auto entries = _opObserver->getEntries();
ASSERT_EQ(srcOps.size(), entries.size());
@@ -309,8 +309,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
ASSERT_EQ(OpTime(), entries[0].getPrevWriteOpTimeInTransaction());
ASSERT_EQ(entries[0].getOpTime(), entries[1].getPrevWriteOpTimeInTransaction());
ASSERT_EQ(entries[1].getOpTime(), entries[2].getPrevWriteOpTimeInTransaction());
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) {
@@ -322,15 +322,15 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no database was available, the insert shouldn't actually happen.
ASSERT_FALSE(onInsertsCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) {
@@ -343,15 +343,15 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no collection was available, the insert shouldn't actually happen.
ASSERT_FALSE(onInsertsCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) {
@@ -374,16 +374,16 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// This insert gets converted to an upsert.
ASSERT_FALSE(onInsertsCalled);
ASSERT_TRUE(onUpdateCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_Success) {
@@ -403,14 +403,14 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_Success) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_TRUE(onInsertsCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) {
@@ -457,15 +457,15 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) {
// Make sure all ops end up in a single thread so they can be batched.
auto writerPool = makeTenantMigrationWriterPool(1);
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entries.back().getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entries.back().getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_TRUE(onInsertsCalledNss1);
ASSERT_TRUE(onInsertsCalledNss2);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) {
@@ -484,16 +484,16 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Updates to missing documents should just be dropped, neither inserted nor updated.
ASSERT_FALSE(onInsertsCalled);
ASSERT_FALSE(onUpdateCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) {
@@ -511,14 +511,14 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_TRUE(onUpdateCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) {
@@ -533,15 +533,15 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no database was available, the delete shouldn't actually happen.
ASSERT_FALSE(onDeleteCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) {
@@ -557,15 +557,15 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no collection was available, the delete shouldn't actually happen.
ASSERT_FALSE(onDeleteCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) {
@@ -582,15 +582,15 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since the document wasn't available, onDelete should not be called.
ASSERT_FALSE(onDeleteCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_Success) {
@@ -618,14 +618,14 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_Success) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_TRUE(onDeleteCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCommand_Success) {
@@ -651,14 +651,14 @@ TEST_F(TenantOplogApplierTest, ApplyCommand_Success) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_TRUE(applyCmdCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCommand_WrongNSS) {
@@ -679,14 +679,14 @@ TEST_F(TenantOplogApplierTest, ApplyCommand_WrongNSS) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_FALSE(applyCmdCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS) {
@@ -701,14 +701,14 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_FALSE(onInsertsCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) {
@@ -725,14 +725,14 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_FALSE(onInsertsCalled);
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyNoop_Success) {
@@ -741,10 +741,10 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) {
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
auto entries = _opObserver->getEntries();
@@ -754,8 +754,8 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) {
ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime());
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) {
@@ -764,10 +764,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) {
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
auto entries = _opObserver->getEntries();
@@ -777,8 +777,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) {
ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime());
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, OpTime());
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Success) {
@@ -788,14 +788,14 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */);
tenantApplierBatchSizeOps.store(1 /* ops */);
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
auto entries = _opObserver->getEntries();
@@ -806,8 +806,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su
ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime());
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success) {
@@ -817,10 +817,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
auto entries = _opObserver->getEntries();
@@ -831,8 +831,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success
ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime());
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Success) {
@@ -843,10 +843,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe
ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime());
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
auto entries = _opObserver->getEntries();
@@ -857,8 +857,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe
ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime());
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
@@ -868,10 +868,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- TenantOplogApplier applier(
+ auto applier = std::make_shared<TenantOplogApplier>(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- ASSERT_OK(applier.startup());
- auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
auto entries = _opObserver->getEntries();
@@ -882,8 +882,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime());
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
- applier.shutdown();
- applier.join();
+ applier->shutdown();
+ applier->join();
}
} // namespace repl