summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-11-02 17:13:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-21 21:16:33 +0000
commit64a5d2927e191d468a8c6595e07c9113be7d1e14 (patch)
tree028373d66cb4e1b3d69f87dcee881b7f65d71645
parent897c960e0faf9ada0bc247567e57cb8b876324ee (diff)
downloadmongo-64a5d2927e191d468a8c6595e07c9113be7d1e14.tar.gz
SERVER-71020 SERVER-71939 Ensure aborting time series batch eventually removes bucket from catalog
(cherry picked from commit 6c1b68295151a074848f8cfd36060fc8949928fe) (cherry picked from commit bc4e993a11b434a7cf60aff416182a8ee640d6b3)
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp22
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp180
2 files changed, 165 insertions, 37 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 7090cabc8ec..f548308bacc 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -52,6 +52,7 @@ void normalizeObject(BSONObjBuilder* builder, const BSONObj& obj);
const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>();
MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict);
+MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch);
uint8_t numDigits(uint32_t num) {
uint8_t numDigits = 0;
@@ -325,7 +326,19 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
BucketAccess bucket(this, batch->bucketId(), BucketState::kPrepared);
if (batch->finished()) {
- // Someone may have aborted it while we were waiting.
+ // Someone may have aborted it while we were waiting. Since we have the prepared batch, we
+ // should now be able to fully abort the bucket.
+ if (bucket) {
+ bucket.release();
+ auto lk = _lockExclusive();
+ auto it = _allBuckets.find(batch->bucketId());
+ if (it != _allBuckets.end()) {
+ auto bucket = it->second.get();
+ stdx::unique_lock blk{bucket->_mutex};
+ bucket->_preparedBatch.reset();
+ _abort(blk, bucket, nullptr, boost::none);
+ }
+ }
return false;
} else if (!bucket) {
abort(batch);
@@ -529,6 +542,11 @@ void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch)
// We have to wait for someone else to finish.
bucket.release();
+
+ // We only hit this failpoint when there are conflicting prepared batches on the same
+ // bucket.
+ hangWaitingForConflictingPreparedBatch.pauseWhileSet();
+
current->getResult().getStatus().ignore(); // We don't care about the result.
}
}
@@ -590,6 +608,8 @@ void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk,
lk.unlock();
if (doRemove) {
[[maybe_unused]] bool removed = _removeBucket(bucket, false /* expiringBuckets */);
+ } else {
+ _setBucketState(bucket->id(), BucketState::kCleared);
}
}
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index f354142cf6a..0fc28b5d0c4 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -44,17 +44,13 @@ namespace mongo {
namespace {
class BucketCatalogTest : public CatalogTestFixture {
protected:
- class Task {
- AtomicWord<bool> _running{false};
- stdx::packaged_task<void()> _task;
- stdx::future<void> _future;
+ class RunBackgroundTaskAndWaitForFailpoint {
stdx::thread _taskThread;
public:
- Task(std::function<void()>&& fn);
- ~Task();
-
- const stdx::future<void>& future();
+ RunBackgroundTaskAndWaitForFailpoint(const std::string& failpointName,
+ std::function<void()>&& fn);
+ ~RunBackgroundTaskAndWaitForFailpoint();
};
void setUp() override;
@@ -91,25 +87,6 @@ protected:
BSONObj _makeTimeseriesOptionsForCreate() const override;
};
-BucketCatalogTest::Task::Task(std::function<void()>&& fn)
- : _task{[this, fn = std::move(fn)]() {
- _running.store(true);
- fn();
- }},
- _future{_task.get_future()},
- _taskThread{std::move(_task)} {
- while (!_running.load()) {
- stdx::this_thread::yield();
- }
-}
-BucketCatalogTest::Task::~Task() {
- _taskThread.join();
-}
-
-const stdx::future<void>& BucketCatalogTest::Task::future() {
- return _future;
-}
-
void BucketCatalogTest::setUp() {
CatalogTestFixture::setUp();
@@ -124,6 +101,23 @@ void BucketCatalogTest::setUp() {
}
}
+BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::RunBackgroundTaskAndWaitForFailpoint(
+ const std::string& failpointName, std::function<void()>&& fn) {
+ auto fp = globalFailPointRegistry().find(failpointName);
+ auto timesEntered = fp->setMode(FailPoint::alwaysOn, 0);
+
+ // Start background job.
+ _taskThread = stdx::thread(std::move(fn));
+
+ // Once we hit the failpoint once, turn it off.
+ fp->waitForTimesEntered(timesEntered + 1);
+ fp->setMode(FailPoint::off, 0);
+}
+
+BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::~RunBackgroundTaskAndWaitForFailpoint() {
+ _taskThread.join();
+}
+
std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>
BucketCatalogTest::_makeOperationContext() {
auto client = getServiceContext()->makeClient("BucketCatalogTest");
@@ -831,17 +825,131 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
ASSERT(batch2->claimCommitRights());
// Batch 2 will not be able to commit until batch 1 has finished.
- _bucketCatalog->prepareCommit(batch1);
- auto task = Task{[&]() { _bucketCatalog->prepareCommit(batch2); }};
- // Add a little extra wait to make sure prepareCommit actually gets to the blocking point.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
- ASSERT(task.future().valid());
- ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1)))
- << "prepareCommit finished before expected";
+ ASSERT_TRUE(_bucketCatalog->prepareCommit(batch1));
+
+ {
+ auto task = RunBackgroundTaskAndWaitForFailpoint{
+ "hangWaitingForConflictingPreparedBatch",
+ [&]() { ASSERT_TRUE(_bucketCatalog->prepareCommit(batch2)); }};
+
+ // Finish the first batch.
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ }
- _bucketCatalog->finish(batch1, {});
- task.future().wait();
_bucketCatalog->finish(batch2, {});
+ ASSERT(batch2->finished());
+}
+
+TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
+ auto batch1 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue();
+
+ auto batch2 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue();
+ auto batch3 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue();
+ ASSERT_EQ(batch1->bucketId(), batch2->bucketId());
+ ASSERT_EQ(batch1->bucketId(), batch3->bucketId());
+
+ ASSERT(batch1->claimCommitRights());
+ ASSERT(batch2->claimCommitRights());
+ ASSERT(batch3->claimCommitRights());
+
+ // Batch 2 will not be able to commit until batch 1 has finished.
+ ASSERT_TRUE(_bucketCatalog->prepareCommit(batch1));
+
+ {
+ auto task = RunBackgroundTaskAndWaitForFailpoint{
+ "hangWaitingForConflictingPreparedBatch",
+ [&]() { ASSERT_FALSE(_bucketCatalog->prepareCommit(batch2)); }};
+
+ // If we abort the third batch, it should abort the second one too, as it isn't prepared.
+ // However, since the first batch is prepared, we can't abort it or clean up the bucket. We
+ // can then finish the first batch, which will allow the second batch to proceed. It should
+ // recognize it has been aborted and clean up the bucket.
+ _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ }
+ // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should
+ // be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we
+ // expect the prepareCommit() call to fail.
+ ASSERT(batch2->finished());
+
+ // Make sure a new batch ends up in a new bucket.
+ auto batch4 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue();
+ ASSERT_NE(batch2->bucketId(), batch4->bucketId());
+}
+
+TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
+ auto batch1 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue();
+
+ auto batch2 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue();
+
+ // Batch 1 and 2 use the same bucket.
+ ASSERT_EQ(batch1->bucketId(), batch2->bucketId());
+ ASSERT(batch1->claimCommitRights());
+ ASSERT(batch2->claimCommitRights());
+ ASSERT_TRUE(_bucketCatalog->prepareCommit(batch1));
+
+ // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be
+ // closed after batch 1 finishes.
+ _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ ASSERT(batch2->finished());
+
+ // Ensure a batch started after batch 2 aborts, does not insert future measurements into the
+ // aborted batch/bucket.
+ auto batch3 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue();
+ ASSERT_NE(batch1->bucketId(), batch3->bucketId());
}
TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {