summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries
diff options
context:
space:
mode:
authorFaustoleyva54 <fausto.leyva@mongodb.com>2022-12-12 22:46:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-12 23:17:38 +0000
commit154777a52a5588885c35e17a688a75ec3a18d924 (patch)
tree579fd96b7c229fd21db47c2e52dd3cc215cb2514 /src/mongo/db/timeseries
parent831ce278f4caa2603e5da3077b13df2338c3ed06 (diff)
downloadmongo-154777a52a5588885c35e17a688a75ec3a18d924.tar.gz
SERVER-71939 Update BucketCatalogTest::Task class with failpoint
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp12
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp146
2 files changed, 109 insertions, 49 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 83885d3d8ca..4c0e2e33773 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -63,6 +63,7 @@ MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeReopeningBucket);
MONGO_FAIL_POINT_DEFINE(alwaysUseSameBucketCatalogStripe);
MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationAfterStart);
MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeFinish);
+MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch);
uint8_t numDigits(uint32_t num) {
@@ -1954,6 +1955,10 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<Wri
}
}
+ // We only hit this failpoint when there are conflicting prepared batches on the same
+ // bucket.
+ hangWaitingForConflictingPreparedBatch.pauseWhileSet();
+
// We have to wait for someone else to finish.
current->getResult().getStatus().ignore(); // We don't care about the result.
}
@@ -2177,6 +2182,13 @@ void BucketCatalog::_abort(Stripe* stripe,
if (doRemove) {
_removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort);
+ } else {
+ _bucketStateManager.changeBucketState(
+ bucket->bucketId(),
+ [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
+ invariant(input.has_value());
+ return input.value().setFlag(BucketStateFlag::kCleared);
+ });
}
}
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 2acb41bc3d8..d3ffaa64732 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -54,17 +54,13 @@ constexpr StringData kNumClosedDueToMemoryThreshold = "numBucketsClosedDueToMemo
class BucketCatalogTest : public CatalogTestFixture {
protected:
- class Task {
- AtomicWord<bool> _running{false};
- stdx::packaged_task<void()> _task;
- stdx::future<void> _future;
+ class RunBackgroundTaskAndWaitForFailpoint {
stdx::thread _taskThread;
public:
- Task(std::function<void()>&& fn);
- ~Task();
-
- const stdx::future<void>& future();
+ RunBackgroundTaskAndWaitForFailpoint(const std::string& failpointName,
+ std::function<void()>&& fn);
+ ~RunBackgroundTaskAndWaitForFailpoint();
};
void setUp() override;
@@ -106,25 +102,6 @@ protected:
BSONObj _makeTimeseriesOptionsForCreate() const override;
};
-BucketCatalogTest::Task::Task(std::function<void()>&& fn)
- : _task{[this, fn = std::move(fn)]() {
- _running.store(true);
- fn();
- }},
- _future{_task.get_future()},
- _taskThread{std::move(_task)} {
- while (!_running.load()) {
- stdx::this_thread::yield();
- }
-}
-BucketCatalogTest::Task::~Task() {
- _taskThread.join();
-}
-
-const stdx::future<void>& BucketCatalogTest::Task::future() {
- return _future;
-}
-
void BucketCatalogTest::setUp() {
CatalogTestFixture::setUp();
@@ -139,6 +116,23 @@ void BucketCatalogTest::setUp() {
}
}
+BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::RunBackgroundTaskAndWaitForFailpoint(
+ const std::string& failpointName, std::function<void()>&& fn) {
+ auto fp = globalFailPointRegistry().find(failpointName);
+ auto timesEntered = fp->setMode(FailPoint::alwaysOn, 0);
+
+ // Start background job.
+ _taskThread = stdx::thread(std::move(fn));
+
+ // Once we hit the failpoint once, turn it off.
+ fp->waitForTimesEntered(timesEntered + 1);
+ fp->setMode(FailPoint::off, 0);
+}
+
+BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::~RunBackgroundTaskAndWaitForFailpoint() {
+ _taskThread.join();
+}
+
std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>
BucketCatalogTest::_makeOperationContext() {
auto client = getServiceContext()->makeClient("BucketCatalogTest");
@@ -902,16 +896,19 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
// Batch 2 will not be able to commit until batch 1 has finished.
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
- auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
- // Add a little extra wait to make sure prepareCommit actually gets to the blocking point.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
- ASSERT(task.future().valid());
- ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1)))
- << "prepareCommit finished before expected";
- _bucketCatalog->finish(batch1, {});
- task.future().wait();
+ {
+ auto task = RunBackgroundTaskAndWaitForFailpoint{
+ "hangWaitingForConflictingPreparedBatch",
+ [&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
+
+ // Finish the first batch.
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ }
+
_bucketCatalog->finish(batch2, {});
+ ASSERT(batch2->finished());
}
TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
@@ -952,20 +949,23 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
// Batch 2 will not be able to commit until batch 1 has finished.
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
- auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
- // Add a little extra wait to make sure prepareCommit actually gets to the blocking point.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
- ASSERT(task.future().valid());
- ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1)))
- << "prepareCommit finished before expected";
-
- // If we abort the third batch, it should abort the second one too, as it isn't prepared.
- // However, since the first batch is prepared, we can't abort it or clean up the bucket. We can
- // then finish the first batch, which will allow the second batch to proceed. It should
- // recognize it has been aborted and clean up the bucket.
- _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
- _bucketCatalog->finish(batch1, {});
- task.future().wait();
+
+ {
+ auto task = RunBackgroundTaskAndWaitForFailpoint{
+ "hangWaitingForConflictingPreparedBatch",
+ [&]() { ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); }};
+
+ // If we abort the third batch, it should abort the second one too, as it isn't prepared.
+ // However, since the first batch is prepared, we can't abort it or clean up the bucket. We
+ // can then finish the first batch, which will allow the second batch to proceed. It should
+ // recognize it has been aborted and clean up the bucket.
+ _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ }
+ // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should
+ // be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we
+ // expect the prepareCommit() call to fail.
ASSERT(batch2->finished());
// Make sure a new batch ends up in a new bucket.
@@ -981,6 +981,54 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
ASSERT_NE(batch2->bucket().bucketId, batch4->bucket().bucketId);
}
+TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
+ auto batch1 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+
+ auto batch2 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+
+ // Batch 1 and 2 use the same bucket.
+ ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId);
+ ASSERT(batch1->claimCommitRights());
+ ASSERT(batch2->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+
+ // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be
+ // closed after batch 1 finishes.
+ _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ ASSERT(batch2->finished());
+
+ // Ensure a batch started after batch 2 aborts, does not insert future measurements into the
+ // aborted batch/bucket.
+ auto batch3 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId);
+}
+
TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
auto batch1 = _bucketCatalog
->insert(_opCtx,