From 64a5d2927e191d468a8c6595e07c9113be7d1e14 Mon Sep 17 00:00:00 2001 From: Dan Larkin-York Date: Wed, 2 Nov 2022 17:13:34 +0000 Subject: SERVER-71020 SERVER-71939 Ensure aborting time series batch eventually removes bucket from catalog (cherry picked from commit 6c1b68295151a074848f8cfd36060fc8949928fe) (cherry picked from commit bc4e993a11b434a7cf60aff416182a8ee640d6b3) --- src/mongo/db/timeseries/bucket_catalog.cpp | 22 ++- src/mongo/db/timeseries/bucket_catalog_test.cpp | 180 +++++++++++++++++++----- 2 files changed, 165 insertions(+), 37 deletions(-) diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 7090cabc8ec..f548308bacc 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -52,6 +52,7 @@ void normalizeObject(BSONObjBuilder* builder, const BSONObj& obj); const auto getBucketCatalog = ServiceContext::declareDecoration(); MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict); +MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch); uint8_t numDigits(uint32_t num) { uint8_t numDigits = 0; @@ -325,7 +326,19 @@ bool BucketCatalog::prepareCommit(std::shared_ptr batch) { BucketAccess bucket(this, batch->bucketId(), BucketState::kPrepared); if (batch->finished()) { - // Someone may have aborted it while we were waiting. + // Someone may have aborted it while we were waiting. Since we have the prepared batch, we + // should now be able to fully abort the bucket. + if (bucket) { + bucket.release(); + auto lk = _lockExclusive(); + auto it = _allBuckets.find(batch->bucketId()); + if (it != _allBuckets.end()) { + auto bucket = it->second.get(); + stdx::unique_lock blk{bucket->_mutex}; + bucket->_preparedBatch.reset(); + _abort(blk, bucket, nullptr, boost::none); + } + } return false; } else if (!bucket) { abort(batch); @@ -529,6 +542,11 @@ void BucketCatalog::_waitToCommitBatch(const std::shared_ptr& batch) // We have to wait for someone else to finish. bucket.release(); + + // We only hit this failpoint when there are conflicting prepared batches on the same + // bucket. + hangWaitingForConflictingPreparedBatch.pauseWhileSet(); + current->getResult().getStatus().ignore(); // We don't care about the result. } } @@ -590,6 +608,8 @@ void BucketCatalog::_abort(stdx::unique_lock& lk, lk.unlock(); if (doRemove) { [[maybe_unused]] bool removed = _removeBucket(bucket, false /* expiringBuckets */); + } else { + _setBucketState(bucket->id(), BucketState::kCleared); } } diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index f354142cf6a..0fc28b5d0c4 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -44,17 +44,13 @@ namespace mongo { namespace { class BucketCatalogTest : public CatalogTestFixture { protected: - class Task { - AtomicWord _running{false}; - stdx::packaged_task _task; - stdx::future _future; + class RunBackgroundTaskAndWaitForFailpoint { stdx::thread _taskThread; public: - Task(std::function&& fn); - ~Task(); - - const stdx::future& future(); + RunBackgroundTaskAndWaitForFailpoint(const std::string& failpointName, + std::function&& fn); + ~RunBackgroundTaskAndWaitForFailpoint(); }; void setUp() override; @@ -91,25 +87,6 @@ protected: BSONObj _makeTimeseriesOptionsForCreate() const override; }; -BucketCatalogTest::Task::Task(std::function&& fn) - : _task{[this, fn = std::move(fn)]() { - _running.store(true); - fn(); - }}, - _future{_task.get_future()}, - _taskThread{std::move(_task)} { - while (!_running.load()) { - stdx::this_thread::yield(); - } -} -BucketCatalogTest::Task::~Task() { - _taskThread.join(); -} - -const stdx::future& BucketCatalogTest::Task::future() { - return _future; -} - void BucketCatalogTest::setUp() { CatalogTestFixture::setUp(); @@ -124,6 +101,23 @@ void BucketCatalogTest::setUp() { } } +BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::RunBackgroundTaskAndWaitForFailpoint( + const std::string& failpointName, std::function&& fn) { + auto fp = globalFailPointRegistry().find(failpointName); + auto timesEntered = fp->setMode(FailPoint::alwaysOn, 0); + + // Start background job. + _taskThread = stdx::thread(std::move(fn)); + + // Once we hit the failpoint once, turn it off. + fp->waitForTimesEntered(timesEntered + 1); + fp->setMode(FailPoint::off, 0); +} + +BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::~RunBackgroundTaskAndWaitForFailpoint() { + _taskThread.join(); +} + std::pair BucketCatalogTest::_makeOperationContext() { auto client = getServiceContext()->makeClient("BucketCatalogTest"); @@ -831,17 +825,131 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { ASSERT(batch2->claimCommitRights()); // Batch 2 will not be able to commit until batch 1 has finished. - _bucketCatalog->prepareCommit(batch1); - auto task = Task{[&]() { _bucketCatalog->prepareCommit(batch2); }}; - // Add a little extra wait to make sure prepareCommit actually gets to the blocking point. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); - ASSERT(task.future().valid()); - ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1))) - << "prepareCommit finished before expected"; + ASSERT_TRUE(_bucketCatalog->prepareCommit(batch1)); + + { + auto task = RunBackgroundTaskAndWaitForFailpoint{ + "hangWaitingForConflictingPreparedBatch", + [&]() { ASSERT_TRUE(_bucketCatalog->prepareCommit(batch2)); }}; + + // Finish the first batch. + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + } - _bucketCatalog->finish(batch1, {}); - task.future().wait(); _bucketCatalog->finish(batch2, {}); + ASSERT(batch2->finished()); +} + +TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { + auto batch1 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue(); + + auto batch2 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue(); + auto batch3 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue(); + ASSERT_EQ(batch1->bucketId(), batch2->bucketId()); + ASSERT_EQ(batch1->bucketId(), batch3->bucketId()); + + ASSERT(batch1->claimCommitRights()); + ASSERT(batch2->claimCommitRights()); + ASSERT(batch3->claimCommitRights()); + + // Batch 2 will not be able to commit until batch 1 has finished. + ASSERT_TRUE(_bucketCatalog->prepareCommit(batch1)); + + { + auto task = RunBackgroundTaskAndWaitForFailpoint{ + "hangWaitingForConflictingPreparedBatch", + [&]() { ASSERT_FALSE(_bucketCatalog->prepareCommit(batch2)); }}; + + // If we abort the third batch, it should abort the second one too, as it isn't prepared. + // However, since the first batch is prepared, we can't abort it or clean up the bucket. We + // can then finish the first batch, which will allow the second batch to proceed. It should + // recognize it has been aborted and clean up the bucket. + _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + } + // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should + // be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we + // expect the prepareCommit() call to fail. + ASSERT(batch2->finished()); + + // Make sure a new batch ends up in a new bucket. + auto batch4 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue(); + ASSERT_NE(batch2->bucketId(), batch4->bucketId()); +} + +TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { + auto batch1 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue(); + + auto batch2 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue(); + + // Batch 1 and 2 use the same bucket. + ASSERT_EQ(batch1->bucketId(), batch2->bucketId()); + ASSERT(batch1->claimCommitRights()); + ASSERT(batch2->claimCommitRights()); + ASSERT_TRUE(_bucketCatalog->prepareCommit(batch1)); + + // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be + // closed after batch 1 finishes. + _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + ASSERT(batch2->finished()); + + // Ensure a batch started after batch 2 aborts, does not insert future measurements into the + // aborted batch/bucket. + auto batch3 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue(); + ASSERT_NE(batch1->bucketId(), batch3->bucketId()); } TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { -- cgit v1.2.1