diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier_test.cpp | 254 |
2 files changed, 128 insertions, 128 deletions
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 8b0356a1b29..afc287175cc 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -94,7 +94,7 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo } Status TenantOplogApplier::_doStartup_inlock() noexcept { - _oplogBatcher = std::make_unique<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor); + _oplogBatcher = std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor); auto status = _oplogBatcher->startup(); if (!status.isOK()) return status; diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index cee53e4286e..02e654a2d01 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -202,18 +202,18 @@ TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) { auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); + ASSERT_OK(applier->startup()); // Even if we wait for the first op in a batch, it is the last op we should be notified on. - auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get(); + auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get(); ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime); auto entries = _opObserver->getEntries(); ASSERT_EQ(2, entries.size()); assertNoOpMatches(srcOps[0], entries[0]); assertNoOpMatches(srcOps[1], entries[1]); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { @@ -226,19 +226,19 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); + ASSERT_OK(applier->startup()); // Even if we wait for the first op in a batch, it is the last op we should be notified on. - auto lastBatchTimes = applier.getNotificationForOpTime(srcOps.front().getOpTime()).get(); + auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get(); ASSERT_EQ(srcOps.back().getOpTime(), lastBatchTimes.donorOpTime); auto entries = _opObserver->getEntries(); ASSERT_EQ(srcOps.size(), entries.size()); for (size_t i = 0; i < srcOps.size(); i++) { assertNoOpMatches(srcOps[i], entries[i]); } - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { @@ -250,15 +250,15 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); tenantApplierBatchSizeOps.store(2 /* ops */); - ASSERT_OK(applier.startup()); - auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); - auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime()); + ASSERT_OK(applier->startup()); + auto firstBatchFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); + auto secondBatchFuture = applier->getNotificationForOpTime(srcOps[2].getOpTime()); pushOps(srcOps); // We should see the last batch optime for each batch in our notifications. ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime); @@ -269,8 +269,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { assertNoOpMatches(srcOps[1], entries[1]); assertNoOpMatches(srcOps[2], entries[2]); assertNoOpMatches(srcOps[3], entries[3]); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { @@ -291,14 +291,14 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); + ASSERT_OK(applier->startup()); // The first two ops should come in the first batch. - auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); + auto firstBatchFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); ASSERT_EQ(srcOps[1].getOpTime(), firstBatchFuture.get().donorOpTime); // The last op is in a batch by itself. - auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime()); + auto secondBatchFuture = applier->getNotificationForOpTime(srcOps[2].getOpTime()); ASSERT_EQ(srcOps[2].getOpTime(), secondBatchFuture.get().donorOpTime); auto entries = _opObserver->getEntries(); ASSERT_EQ(srcOps.size(), entries.size()); @@ -309,8 +309,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { ASSERT_EQ(OpTime(), entries[0].getPrevWriteOpTimeInTransaction()); ASSERT_EQ(entries[0].getOpTime(), entries[1].getPrevWriteOpTimeInTransaction()); ASSERT_EQ(entries[1].getOpTime(), entries[2].getPrevWriteOpTimeInTransaction()); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) { @@ -322,15 +322,15 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no database was available, the insert shouldn't actually happen. ASSERT_FALSE(onInsertsCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) { @@ -343,15 +343,15 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no collection was available, the insert shouldn't actually happen. ASSERT_FALSE(onInsertsCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) { @@ -374,16 +374,16 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // This insert gets converted to an upsert. ASSERT_FALSE(onInsertsCalled); ASSERT_TRUE(onUpdateCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsert_Success) { @@ -403,14 +403,14 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_Success) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_TRUE(onInsertsCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { @@ -457,15 +457,15 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { // Make sure all ops end up in a single thread so they can be batched. auto writerPool = makeTenantMigrationWriterPool(1); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entries.back().getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entries.back().getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_TRUE(onInsertsCalledNss1); ASSERT_TRUE(onInsertsCalledNss2); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) { @@ -484,16 +484,16 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Updates to missing documents should just be dropped, neither inserted nor updated. ASSERT_FALSE(onInsertsCalled); ASSERT_FALSE(onUpdateCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) { @@ -511,14 +511,14 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_TRUE(onUpdateCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) { @@ -533,15 +533,15 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no database was available, the delete shouldn't actually happen. ASSERT_FALSE(onDeleteCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) { @@ -557,15 +557,15 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since no collection was available, the delete shouldn't actually happen. ASSERT_FALSE(onDeleteCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) { @@ -582,15 +582,15 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); // Since the document wasn't available, onDelete should not be called. ASSERT_FALSE(onDeleteCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyDelete_Success) { @@ -618,14 +618,14 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_Success) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_TRUE(onDeleteCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCommand_Success) { @@ -651,14 +651,14 @@ TEST_F(TenantOplogApplierTest, ApplyCommand_Success) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_TRUE(applyCmdCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCommand_WrongNSS) { @@ -679,14 +679,14 @@ TEST_F(TenantOplogApplierTest, ApplyCommand_WrongNSS) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_FALSE(applyCmdCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS) { @@ -701,14 +701,14 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_FALSE(onInsertsCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) { @@ -725,14 +725,14 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) { pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(entry.getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); ASSERT_FALSE(onInsertsCalled); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyNoop_Success) { @@ -741,10 +741,10 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) { pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); auto entries = _opObserver->getEntries(); @@ -754,8 +754,8 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) { ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime()); ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { @@ -764,10 +764,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); auto entries = _opObserver->getEntries(); @@ -777,8 +777,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime()); ASSERT_EQUALS(futureRes.getValue().recipientOpTime, OpTime()); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Success) { @@ -788,14 +788,14 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); tenantApplierBatchSizeOps.store(1 /* ops */); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); auto entries = _opObserver->getEntries(); @@ -806,8 +806,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime()); ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success) { @@ -817,10 +817,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); auto entries = _opObserver->getEntries(); @@ -831,8 +831,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime()); ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Success) { @@ -843,10 +843,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime()); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); auto entries = _opObserver->getEntries(); @@ -857,8 +857,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime()); ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { @@ -868,10 +868,10 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - TenantOplogApplier applier( + auto applier = std::make_shared<TenantOplogApplier>( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - ASSERT_OK(applier.startup()); - auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); + ASSERT_OK(applier->startup()); + auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); auto entries = _opObserver->getEntries(); @@ -882,8 +882,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime()); ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); - applier.shutdown(); - applier.join(); + applier->shutdown(); + applier->join(); } } // namespace repl |