diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2022-11-02 17:13:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-21 16:03:38 +0000 |
commit | e7acc50f7d2e0ee599a13721b6d235495c6c0ca9 (patch) | |
tree | cff13dd145571190e34ae7b156af79db79b51c6a | |
parent | 9ba29038b9316acb6f0a5a414285ca39fddb3fa5 (diff) | |
download | mongo-e7acc50f7d2e0ee599a13721b6d235495c6c0ca9.tar.gz |
SERVER-71020 SERVER-71939 Ensure aborting time series batch eventually removes bucket from catalog
(cherry picked from commit 6c1b68295151a074848f8cfd36060fc8949928fe)
(cherry picked from commit bc4e993a11b434a7cf60aff416182a8ee640d6b3)
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 185 |
2 files changed, 163 insertions, 37 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 10e0e2df903..5a935ae439e 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -51,6 +51,7 @@ void normalizeObject(BSONObjBuilder* builder, const BSONObj& obj); const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>(); MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict); +MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch); uint8_t numDigits(uint32_t num) { uint8_t numDigits = 0; @@ -746,7 +747,11 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { _useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kPrepared); if (batch->finished()) { - // Someone may have aborted it while we were waiting. + // Someone may have aborted it while we were waiting. Since we have the prepared batch, we + // should now be able to fully abort the bucket. + if (bucket) { + _abort(&stripe, stripeLock, batch, getBatchStatus()); + } return getBatchStatus(); } else if (!bucket) { _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id)); @@ -1051,6 +1056,10 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<Wri } } + // We only hit this failpoint when there are conflicting prepared batches on the same + // bucket. + hangWaitingForConflictingPreparedBatch.pauseWhileSet(); + // We have to wait for someone else to finish. current->getResult().getStatus().ignore(); // We don't care about the result. } @@ -1108,7 +1117,7 @@ void BucketCatalog::_abort(Stripe* stripe, // user is doing with it, but we need to keep the bucket around until // that batch is finished. if (auto& prepared = bucket->_preparedBatch) { - if (prepared == batch) { + if (batch && prepared == batch) { // We own the prepared batch, so we can go ahead and abort it and remove the bucket. prepared->_abort(status); prepared.reset(); @@ -1119,6 +1128,8 @@ void BucketCatalog::_abort(Stripe* stripe, if (doRemove) { [[maybe_unused]] bool removed = _removeBucket(stripe, stripeLock, bucket); + } else { + _setBucketState(bucket->id(), BucketState::kCleared); } } diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index b39b1065b89..99ce87d5b1b 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -43,17 +43,13 @@ namespace mongo { namespace { class BucketCatalogTest : public CatalogTestFixture { protected: - class Task { - AtomicWord<bool> _running{false}; - stdx::packaged_task<void()> _task; - stdx::future<void> _future; + class RunBackgroundTaskAndWaitForFailpoint { stdx::thread _taskThread; public: - Task(std::function<void()>&& fn); - ~Task(); - - const stdx::future<void>& future(); + RunBackgroundTaskAndWaitForFailpoint(const std::string& failpointName, + std::function<void()>&& fn); + ~RunBackgroundTaskAndWaitForFailpoint(); }; void setUp() override; @@ -96,25 +92,6 @@ protected: BSONObj _makeTimeseriesOptionsForCreate() const override; }; -BucketCatalogTest::Task::Task(std::function<void()>&& fn) - : _task{[this, fn = std::move(fn)]() { - _running.store(true); - fn(); - }}, - _future{_task.get_future()}, - _taskThread{std::move(_task)} { - while (!_running.load()) { - stdx::this_thread::yield(); - } -} -BucketCatalogTest::Task::~Task() { - _taskThread.join(); -} - -const stdx::future<void>& BucketCatalogTest::Task::future() { - return _future; -} - void BucketCatalogTest::setUp() { CatalogTestFixture::setUp(); @@ -129,6 +106,23 @@ void BucketCatalogTest::setUp() { } } +BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::RunBackgroundTaskAndWaitForFailpoint( + const std::string& failpointName, std::function<void()>&& fn) { + auto fp = globalFailPointRegistry().find(failpointName); + auto timesEntered = fp->setMode(FailPoint::alwaysOn, 0); + + // Start background job. + _taskThread = stdx::thread(std::move(fn)); + + // Once we hit the failpoint once, turn it off. + fp->waitForTimesEntered(timesEntered + 1); + fp->setMode(FailPoint::off, 0); +} + +BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::~RunBackgroundTaskAndWaitForFailpoint() { + _taskThread.join(); +} + std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext> BucketCatalogTest::_makeOperationContext() { auto client = getServiceContext()->makeClient("BucketCatalogTest"); @@ -895,16 +889,137 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { // Batch 2 will not be able to commit until batch 1 has finished. ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); - auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; - // Add a little extra wait to make sure prepareCommit actually gets to the blocking point. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); - ASSERT(task.future().valid()); - ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1))) - << "prepareCommit finished before expected"; - _bucketCatalog->finish(batch1, {}); - task.future().wait(); + { + auto task = RunBackgroundTaskAndWaitForFailpoint{ + "hangWaitingForConflictingPreparedBatch", + [&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; + + // Finish the first batch. + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + } + _bucketCatalog->finish(batch2, {}); + ASSERT(batch2->finished()); +} + +TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { + auto batch1 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + + auto batch2 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + auto batch3 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + ASSERT_EQ(batch1->bucket().id, batch2->bucket().id); + ASSERT_EQ(batch1->bucket().id, batch3->bucket().id); + + ASSERT(batch1->claimCommitRights()); + ASSERT(batch2->claimCommitRights()); + ASSERT(batch3->claimCommitRights()); + + // Batch 2 will not be able to commit until batch 1 has finished. + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + + { + auto task = RunBackgroundTaskAndWaitForFailpoint{ + "hangWaitingForConflictingPreparedBatch", + [&]() { ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); }}; + + // If we abort the third batch, it should abort the second one too, as it isn't prepared. + // However, since the first batch is prepared, we can't abort it or clean up the bucket. We + // can then finish the first batch, which will allow the second batch to proceed. It should + // recognize it has been aborted and clean up the bucket. + _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + } + // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should + // be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we + // expect the prepareCommit() call to fail. + ASSERT(batch2->finished()); + + // Make sure a new batch ends up in a new bucket. + auto batch4 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + ASSERT_NE(batch2->bucket().id, batch4->bucket().id); +} + +TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { + auto batch1 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + + auto batch2 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + + // Batch 1 and 2 use the same bucket. + ASSERT_EQ(batch1->bucket().id, batch2->bucket().id); + ASSERT(batch1->claimCommitRights()); + ASSERT(batch2->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + + // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be + // closed after batch 1 finishes. + _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + _bucketCatalog->finish(batch1, {}); + ASSERT(batch1->finished()); + ASSERT(batch2->finished()); + + // Ensure a batch started after batch 2 aborts, does not insert future measurements into the + // aborted batch/bucket. + auto batch3 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + ASSERT_NE(batch1->bucket().id, batch3->bucket().id); } TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { |