summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp103
1 files changed, 96 insertions, 7 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index 40ce2dcf21e..559d9cbe630 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -478,8 +478,8 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
auto hasSeenStartAtFuture = fetcher.awaitInsert(startAt);
ASSERT_FALSE(hasSeenStartAtFuture.isReady());
- // iterate() won't lead to any documents being inserted into the output collection (because no
- // writes have happened to the data collection) so `hasSeenStartAtFuture` still won't be ready.
+ // Because no writes have happened to the data collection, the fetcher will insert a no-op entry
+ // with the latestOplogTimestamp, so `hasSeenStartAtFuture` will be ready.
auto fetcherJob = launchAsync([&, this] {
ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
fetcher.useReadConcernForTest(false);
@@ -487,12 +487,12 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
auto factory = makeCancelableOpCtx();
return fetcher.iterate(&cc(), factory);
});
+
ASSERT_TRUE(requestPassthroughHandler(fetcherJob));
- ASSERT_FALSE(hasSeenStartAtFuture.isReady());
+ ASSERT_TRUE(hasSeenStartAtFuture.isReady());
// Insert a document into the data collection and have it generate an oplog entry with a
- // "destinedRecipient" field. Only after iterate() is called again and inserts a record into the
- // output collection will `hasSeenStartAtFuture` have become ready.
+ // "destinedRecipient" field.
auto dataWriteTimestamp = [&] {
FailPointEnableBlock fp("addDestinedRecipient",
BSON("destinedRecipient" << _destinationShard.toString()));
@@ -505,7 +505,6 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
}();
- ASSERT_FALSE(hasSeenStartAtFuture.isReady());
fetcherJob = launchAsync([&, this] {
ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
@@ -521,9 +520,99 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
ASSERT_TRUE(fetcher.awaitInsert(startAt).isReady());
// However, asking for `dataWriteTimestamp` wouldn't become ready until the next record is
- // insert into the output collection.
+ // inserted into the output collection.
ASSERT_FALSE(fetcher.awaitInsert({dataWriteTimestamp, dataWriteTimestamp}).isReady());
}
+TEST_F(ReshardingOplogFetcherTest, TestStartAtUpdatedWithProgressMarkOplogTs) {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+ const NamespaceString otherCollection("dbtests.collectionNotBeingResharded");
+
+ create(outputCollectionNss);
+ create(dataCollectionNss);
+ create(otherCollection);
+ _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+
+ const auto& collectionUUID = [&] {
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+ return dataColl->uuid();
+ }();
+
+ ReshardingDonorOplogId startAt{_fetchTimestamp, _fetchTimestamp};
+ ReshardingOplogFetcher fetcher(makeFetcherEnv(),
+ _reshardingUUID,
+ collectionUUID,
+ startAt,
+ _donorShard,
+ _destinationShard,
+ outputCollectionNss);
+
+ // Insert a document into the data collection and have it generate an oplog entry with a
+ // "destinedRecipient" field.
+ auto writeToDataCollectionTs = [&] {
+ FailPointEnableBlock fp("addDestinedRecipient",
+ BSON("destinedRecipient" << _destinationShard.toString()));
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1)));
+ wuow.commit();
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+ }();
+
+ auto fetcherJob = launchAsync([&, this] {
+ ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
+ fetcher.useReadConcernForTest(false);
+ fetcher.setInitialBatchSizeForTest(2);
+ auto factory = makeCancelableOpCtx();
+ return fetcher.iterate(&cc(), factory);
+ });
+ ASSERT_TRUE(requestPassthroughHandler(fetcherJob));
+
+ // The fetcher's lastSeenTimestamp should be equal to `writeToDataCollectionTs`.
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToDataCollectionTs);
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToDataCollectionTs);
+ ASSERT_EQ(1, metricsFetchedCount()) << " Verify reported metrics";
+
+ // Now, insert a document into a different collection that is not involved in resharding.
+ auto writeToOtherCollectionTs = [&] {
+ AutoGetCollection dataColl(_opCtx, otherCollection, LockMode::MODE_IX);
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1)));
+ wuow.commit();
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+ }();
+
+ fetcherJob = launchAsync([&, this] {
+ ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
+ fetcher.useReadConcernForTest(false);
+ fetcher.setInitialBatchSizeForTest(2);
+ auto factory = makeCancelableOpCtx();
+ return fetcher.iterate(&cc(), factory);
+ });
+ ASSERT_TRUE(requestPassthroughHandler(fetcherJob));
+
+ // The fetcher's lastSeenTimestamp should now be equal to `writeToOtherCollectionTs`
+ // because the lastSeenTimestamp will be updated with the latest oplog timestamp from the
+ // donor's cursor response.
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToOtherCollectionTs);
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToOtherCollectionTs);
+ ASSERT_EQ(2, metricsFetchedCount()) << " Verify reported metrics";
+
+ // The last document returned by ReshardingDonorOplogIterator::getNextBatch() would be
+ // `writeToDataCollectionTs`, but ReshardingOplogFetcher would have inserted a doc with
+ // `writeToOtherCollectionTs` after this so `awaitInsert` should be immediately ready when
+ // passed `writeToDataCollectionTs`.
+ ASSERT_TRUE(fetcher.awaitInsert({writeToDataCollectionTs, writeToDataCollectionTs}).isReady());
+
+ // `awaitInsert` should not be ready if passed `writeToOtherCollectionTs`.
+ ASSERT_FALSE(
+ fetcher.awaitInsert({writeToOtherCollectionTs, writeToOtherCollectionTs}).isReady());
+}
} // namespace
} // namespace mongo