From 154777a52a5588885c35e17a688a75ec3a18d924 Mon Sep 17 00:00:00 2001 From: Faustoleyva54 Date: Mon, 12 Dec 2022 22:46:26 +0000 Subject: SERVER-71939 Update BucketCatalogTest::Task class with failpoint --- src/mongo/db/timeseries/bucket_catalog.cpp | 12 ++ src/mongo/db/timeseries/bucket_catalog_test.cpp | 146 ++++++++++++++++-------- 2 files changed, 109 insertions(+), 49 deletions(-) (limited to 'src/mongo/db/timeseries') diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 83885d3d8ca..4c0e2e33773 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -63,6 +63,7 @@ MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeReopeningBucket); MONGO_FAIL_POINT_DEFINE(alwaysUseSameBucketCatalogStripe); MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationAfterStart); MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeFinish); +MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch); uint8_t numDigits(uint32_t num) { @@ -1954,6 +1955,10 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptrgetResult().getStatus().ignore(); // We don't care about the result. } @@ -2177,6 +2182,13 @@ void BucketCatalog::_abort(Stripe* stripe, if (doRemove) { _removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort); + } else { + _bucketStateManager.changeBucketState( + bucket->bucketId(), + [](boost::optional input, std::uint64_t) -> boost::optional { + invariant(input.has_value()); + return input.value().setFlag(BucketStateFlag::kCleared); + }); } } diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 2acb41bc3d8..d3ffaa64732 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -54,17 +54,13 @@ constexpr StringData kNumClosedDueToMemoryThreshold = "numBucketsClosedDueToMemo class BucketCatalogTest : public CatalogTestFixture { protected: - class Task { - AtomicWord _running{false}; - stdx::packaged_task _task; - stdx::future _future; + class RunBackgroundTaskAndWaitForFailpoint { stdx::thread _taskThread; public: - Task(std::function&& fn); - ~Task(); - - const stdx::future& future(); + RunBackgroundTaskAndWaitForFailpoint(const std::string& failpointName, + std::function&& fn); + ~RunBackgroundTaskAndWaitForFailpoint(); }; void setUp() override; @@ -106,25 +102,6 @@ protected: BSONObj _makeTimeseriesOptionsForCreate() const override; }; -BucketCatalogTest::Task::Task(std::function&& fn) - : _task{[this, fn = std::move(fn)]() { - _running.store(true); - fn(); - }}, - _future{_task.get_future()}, - _taskThread{std::move(_task)} { - while (!_running.load()) { - stdx::this_thread::yield(); - } -} -BucketCatalogTest::Task::~Task() { - _taskThread.join(); -} - -const stdx::future& BucketCatalogTest::Task::future() { - return _future; -} - void BucketCatalogTest::setUp() { CatalogTestFixture::setUp(); @@ -139,6 +116,23 @@ void BucketCatalogTest::setUp() { } } +BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::RunBackgroundTaskAndWaitForFailpoint( + const std::string& failpointName, std::function&& fn) { + auto fp = globalFailPointRegistry().find(failpointName); + auto timesEntered = fp->setMode(FailPoint::alwaysOn, 0); + + // Start background job. + _taskThread = stdx::thread(std::move(fn)); + + // Once we hit the failpoint once, turn it off. + fp->waitForTimesEntered(timesEntered + 1); + fp->setMode(FailPoint::off, 0); +} + +BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::~RunBackgroundTaskAndWaitForFailpoint() { + _taskThread.join(); +} + std::pair BucketCatalogTest::_makeOperationContext() { auto client = getServiceContext()->makeClient("BucketCatalogTest"); @@ -902,16 +896,19 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { // Batch 2 will not be able to commit until batch 1 has finished. ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); - auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; - // Add a little extra wait to make sure prepareCommit actually gets to the blocking point. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); - ASSERT(task.future().valid()); - ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1))) - << "prepareCommit finished before expected"; - _bucketCatalog->finish(batch1, {}); - task.future().wait(); + { + auto task = RunBackgroundTaskAndWaitForFailpoint{ + "hangWaitingForConflictingPreparedBatch", + [&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; + + // Finish the first batch. + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + } + _bucketCatalog->finish(batch2, {}); + ASSERT(batch2->finished()); } TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { @@ -952,20 +949,23 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { // Batch 2 will not be able to commit until batch 1 has finished. ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); - auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; - // Add a little extra wait to make sure prepareCommit actually gets to the blocking point. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); - ASSERT(task.future().valid()); - ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1))) - << "prepareCommit finished before expected"; - - // If we abort the third batch, it should abort the second one too, as it isn't prepared. - // However, since the first batch is prepared, we can't abort it or clean up the bucket. We can - // then finish the first batch, which will allow the second batch to proceed. It should - // recognize it has been aborted and clean up the bucket. - _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); - _bucketCatalog->finish(batch1, {}); - task.future().wait(); + + { + auto task = RunBackgroundTaskAndWaitForFailpoint{ + "hangWaitingForConflictingPreparedBatch", + [&]() { ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); }}; + + // If we abort the third batch, it should abort the second one too, as it isn't prepared. + // However, since the first batch is prepared, we can't abort it or clean up the bucket. We + // can then finish the first batch, which will allow the second batch to proceed. It should + // recognize it has been aborted and clean up the bucket. + _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + } + // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should + // be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we + // expect the prepareCommit() call to fail. ASSERT(batch2->finished()); // Make sure a new batch ends up in a new bucket. @@ -981,6 +981,54 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { ASSERT_NE(batch2->bucket().bucketId, batch4->bucket().bucketId); } +TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { + auto batch1 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + + auto batch2 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + + // Batch 1 and 2 use the same bucket. + ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId); + ASSERT(batch1->claimCommitRights()); + ASSERT(batch2->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + + // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be + // closed after batch 1 finishes. + _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + ASSERT(batch2->finished()); + + // Ensure a batch started after batch 2 aborts, does not insert future measurements into the + // aborted batch/bucket. + auto batch3 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId); +} + TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { auto batch1 = _bucketCatalog ->insert(_opCtx, -- cgit v1.2.1