diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp | 103 |
1 files changed, 96 insertions, 7 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index 40ce2dcf21e..559d9cbe630 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -478,8 +478,8 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { auto hasSeenStartAtFuture = fetcher.awaitInsert(startAt); ASSERT_FALSE(hasSeenStartAtFuture.isReady()); - // iterate() won't lead to any documents being inserted into the output collection (because no - // writes have happened to the data collection) so `hasSeenStartAtFuture` still won't be ready. + // Because no writes have happened to the data collection, the fetcher will insert a no-op entry + // with the latestOplogTimestamp, so `hasSeenStartAtFuture` will be ready. auto fetcherJob = launchAsync([&, this] { ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); fetcher.useReadConcernForTest(false); @@ -487,12 +487,12 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { auto factory = makeCancelableOpCtx(); return fetcher.iterate(&cc(), factory); }); + ASSERT_TRUE(requestPassthroughHandler(fetcherJob)); - ASSERT_FALSE(hasSeenStartAtFuture.isReady()); + ASSERT_TRUE(hasSeenStartAtFuture.isReady()); // Insert a document into the data collection and have it generate an oplog entry with a - // "destinedRecipient" field. Only after iterate() is called again and inserts a record into the - // output collection will `hasSeenStartAtFuture` have become ready. + // "destinedRecipient" field. auto dataWriteTimestamp = [&] { FailPointEnableBlock fp("addDestinedRecipient", BSON("destinedRecipient" << _destinationShard.toString())); @@ -505,7 +505,6 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); }(); - ASSERT_FALSE(hasSeenStartAtFuture.isReady()); fetcherJob = launchAsync([&, this] { ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); @@ -521,9 +520,99 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { ASSERT_TRUE(fetcher.awaitInsert(startAt).isReady()); // However, asking for `dataWriteTimestamp` wouldn't become ready until the next record is - // insert into the output collection. + // inserted into the output collection. ASSERT_FALSE(fetcher.awaitInsert({dataWriteTimestamp, dataWriteTimestamp}).isReady()); } +TEST_F(ReshardingOplogFetcherTest, TestStartAtUpdatedWithProgressMarkOplogTs) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + const NamespaceString otherCollection("dbtests.collectionNotBeingResharded"); + + create(outputCollectionNss); + create(dataCollectionNss); + create(otherCollection); + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + const auto& collectionUUID = [&] { + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + return dataColl->uuid(); + }(); + + ReshardingDonorOplogId startAt{_fetchTimestamp, _fetchTimestamp}; + ReshardingOplogFetcher fetcher(makeFetcherEnv(), + _reshardingUUID, + collectionUUID, + startAt, + _donorShard, + _destinationShard, + outputCollectionNss); + + // Insert a document into the data collection and have it generate an oplog entry with a + // "destinedRecipient" field. + auto writeToDataCollectionTs = [&] { + FailPointEnableBlock fp("addDestinedRecipient", + BSON("destinedRecipient" << _destinationShard.toString())); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1))); + wuow.commit(); + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + }(); + + auto fetcherJob = launchAsync([&, this] { + ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + auto factory = makeCancelableOpCtx(); + return fetcher.iterate(&cc(), factory); + }); + ASSERT_TRUE(requestPassthroughHandler(fetcherJob)); + + // The fetcher's lastSeenTimestamp should be equal to `writeToDataCollectionTs`. + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToDataCollectionTs); + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToDataCollectionTs); + ASSERT_EQ(1, metricsFetchedCount()) << " Verify reported metrics"; + + // Now, insert a document into a different collection that is not involved in resharding. + auto writeToOtherCollectionTs = [&] { + AutoGetCollection dataColl(_opCtx, otherCollection, LockMode::MODE_IX); + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1))); + wuow.commit(); + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + }(); + + fetcherJob = launchAsync([&, this] { + ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + auto factory = makeCancelableOpCtx(); + return fetcher.iterate(&cc(), factory); + }); + ASSERT_TRUE(requestPassthroughHandler(fetcherJob)); + + // The fetcher's lastSeenTimestamp should now be equal to `writeToOtherCollectionTs` + // because the lastSeenTimestamp will be updated with the latest oplog timestamp from the + // donor's cursor response. + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToOtherCollectionTs); + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToOtherCollectionTs); + ASSERT_EQ(2, metricsFetchedCount()) << " Verify reported metrics"; + + // The last document returned by ReshardingDonorOplogIterator::getNextBatch() would be + // `writeToDataCollectionTs`, but ReshardingOplogFetcher would have inserted a doc with + // `writeToOtherCollectionTs` after this so `awaitInsert` should be immediately ready when + // passed `writeToDataCollectionTs`. + ASSERT_TRUE(fetcher.awaitInsert({writeToDataCollectionTs, writeToDataCollectionTs}).isReady()); + + // `awaitInsert` should not be ready if passed `writeToOtherCollectionTs`. + ASSERT_FALSE( + fetcher.awaitInsert({writeToOtherCollectionTs, writeToOtherCollectionTs}).isReady()); +} } // namespace } // namespace mongo |