summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2021-04-01 23:06:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-02 00:01:02 +0000
commitb55f6c86a1a47f28de7815ef942c3c422590dea9 (patch)
tree3ad4664be27f03f0e601d1fb6fd32d4276ca25f6 /src/mongo/db/timeseries
parentc8e74f9e82a9aab3f549ff77aa539fa6b3ab6b45 (diff)
downloadmongo-b55f6c86a1a47f28de7815ef942c3c422590dea9.tar.gz
SERVER-55060 Direct modification must remove buckets from the time-series bucket catalog
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r--src/mongo/db/timeseries/SConscript2
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp233
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h55
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp64
4 files changed, 264 insertions, 90 deletions
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript
index c2f1959a441..b1b35582fe2 100644
--- a/src/mongo/db/timeseries/SConscript
+++ b/src/mongo/db/timeseries/SConscript
@@ -23,7 +23,9 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/commands/server_status',
+ '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
'$BUILD_DIR/mongo/db/views/views',
+ '$BUILD_DIR/mongo/util/fail_point',
'timeseries_idl',
],
)
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 17fa905b8d9..a6fe7f89e14 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -35,13 +35,16 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/commands/server_status.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/stdx/thread.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
namespace {
const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>();
+MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict);
uint8_t numDigits(uint32_t num) {
uint8_t numDigits = 0;
@@ -128,6 +131,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
auto time = timeElem.Date();
BucketAccess bucket{this, key, stats.get(), time};
+ invariant(bucket);
StringSet newFieldNamesToBeInserted;
uint32_t newFieldNamesSize = 0;
@@ -202,46 +206,59 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
return batch;
}
-void BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
- invariant(!batch->finished());
+bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
+ if (batch->finished()) {
+ // In this case, someone else aborted the batch behind our back. Oops.
+ return false;
+ }
_waitToCommitBatch(batch);
BucketAccess bucket(this, batch->bucket());
- invariant(bucket);
+ if (!bucket) {
+ abort(batch);
+ return false;
+ }
+
+ invariant(_setBucketState(bucket->_id, BucketState::kPrepared));
auto prevMemoryUsage = bucket->_memoryUsage;
batch->_prepareCommit();
_memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage);
bucket->_batches.erase(batch->_lsid);
+
+ return true;
}
void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info) {
invariant(!batch->finished());
invariant(!batch->active());
- auto stats = _getExecutionStats(batch->bucket()->_ns);
-
BucketAccess bucket(this, batch->bucket());
- invariant(bucket);
batch->_finish(info);
- bucket->_preparedBatch.reset();
+ if (bucket) {
+ invariant(_setBucketState(bucket->_id, BucketState::kNormal));
+ bucket->_preparedBatch.reset();
+ }
if (info.result.isOK()) {
+ auto& stats = batch->_stats;
stats->numCommits.fetchAndAddRelaxed(1);
- if (bucket->_numCommittedMeasurements == 0) {
+ if (batch->numPreviouslyCommittedMeasurements() == 0) {
stats->numBucketInserts.fetchAndAddRelaxed(1);
} else {
stats->numBucketUpdates.fetchAndAddRelaxed(1);
}
stats->numMeasurementsCommitted.fetchAndAddRelaxed(batch->measurements().size());
- bucket->_numCommittedMeasurements += batch->measurements().size();
+ if (bucket) {
+ bucket->_numCommittedMeasurements += batch->measurements().size();
+ }
}
- if (bucket->allCommitted()) {
+ if (bucket && bucket->allCommitted()) {
if (bucket->_full) {
// Everything in the bucket has been committed, and nothing more will be added since the
// bucket is full. Thus, we can remove it.
@@ -255,6 +272,10 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
// happened in BucketAccess::rollover, and that there is already a new open bucket for
// this metadata.
_markBucketNotIdle(ptr, false /* locked */);
+ {
+ stdx::lock_guard statesLk{_statesMutex};
+ _bucketStates.erase(ptr->_id);
+ }
_allBuckets.erase(ptr);
} else {
_markBucketIdle(bucket);
@@ -265,54 +286,26 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch) {
invariant(batch);
invariant(!batch->finished());
+ invariant(batch->_commitRights.load());
Bucket* bucket = batch->bucket();
- while (true) {
- std::vector<std::shared_ptr<WriteBatch>> batchesToWaitOn;
-
- {
- auto lk = _lockExclusive();
-
- // Before we access the bucket, make sure it's still there.
- if (!_allBuckets.contains(bucket)) {
- // Someone else already cleaned up the bucket while we didn't hold the lock.
- invariant(batch->finished());
- return;
- }
-
- stdx::unique_lock blk{bucket->_mutex};
- if (bucket->allCommitted()) {
- // No uncommitted batches left to abort, go ahead and remove the bucket.
- blk.unlock();
- _removeBucket(bucket, false /* expiringBuckets */);
- break;
- }
-
- // For any uncommitted batches that we need to abort, see if we already have the rights,
- // otherwise try to claim the rights and abort it. If we don't get the rights, then wait
- // for the other writer to resolve the batch.
- for (const auto& [_, active] : bucket->_batches) {
- if (active == batch || active->claimCommitRights()) {
- active->_abort();
- } else {
- batchesToWaitOn.push_back(active);
- }
- }
- bucket->_batches.clear();
+ // Before we access the bucket, make sure it's still there.
+ auto lk = _lockExclusive();
+ if (!_allBuckets.contains(bucket)) {
+ // Special case, bucket has already been cleared, and we need only abort this batch.
+ batch->_abort();
+ return;
+ }
- if (auto& prepared = bucket->_preparedBatch) {
- if (prepared == batch) {
- prepared->_abort();
- } else {
- batchesToWaitOn.push_back(prepared);
- }
- prepared.reset();
- }
- }
+ stdx::unique_lock blk{bucket->_mutex};
+ _abort(blk, bucket, batch);
+}
- for (const auto& batchToWaitOn : batchesToWaitOn) {
- batchToWaitOn->getResult().getStatus().ignore();
- }
+void BucketCatalog::clear(const OID& oid) {
+ auto result = _setBucketState(oid, BucketState::kCleared);
+ if (result && *result == BucketState::kPreparedAndCleared) {
+ hangTimeseriesDirectModificationBeforeWriteConflict.pauseWhileSet();
+ throw WriteConflictException();
}
}
@@ -328,10 +321,10 @@ void BucketCatalog::clear(const NamespaceString& ns) {
auto nextIt = std::next(it);
const auto& bucket = *it;
- _verifyBucketIsUnused(bucket.get());
+ stdx::unique_lock blk{bucket->_mutex};
if (shouldClear(bucket->_ns)) {
_executionStats.erase(bucket->_ns);
- _removeBucket(bucket.get(), false /* expiringBuckets */);
+ _abort(blk, bucket.get(), nullptr);
}
it = nextIt;
@@ -394,7 +387,9 @@ BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const
void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) {
while (true) {
BucketAccess bucket{this, batch->bucket()};
- invariant(bucket);
+ if (!bucket) {
+ return;
+ }
auto current = bucket->_preparedBatch;
if (!current) {
@@ -416,15 +411,42 @@ bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) {
}
invariant(bucket->_batches.empty());
+ invariant(!bucket->_preparedBatch);
_memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
_markBucketNotIdle(bucket, expiringBuckets /* locked */);
_openBuckets.erase({std::move(bucket->_ns), std::move(bucket->_metadata)});
+ {
+ stdx::lock_guard statesLk{_statesMutex};
+ _bucketStates.erase(bucket->_id);
+ }
_allBuckets.erase(it);
return true;
}
+void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk,
+ Bucket* bucket,
+ std::shared_ptr<WriteBatch> batch) {
+ // For any uncommitted batches that we need to abort, see if we already have the rights,
+ // otherwise try to claim the rights and abort it. If we don't get the rights, then wait
+ // for the other writer to resolve the batch.
+ for (const auto& [_, current] : bucket->_batches) {
+ current->_abort();
+ }
+ bucket->_batches.clear();
+
+ if (auto& prepared = bucket->_preparedBatch) {
+ if (prepared == batch) {
+ prepared->_abort();
+ }
+ prepared.reset();
+ }
+
+ lk.unlock();
+ _removeBucket(bucket, true /* bucketIsUnused */);
+}
+
void BucketCatalog::_markBucketIdle(Bucket* bucket) {
invariant(bucket);
stdx::lock_guard lk{_idleMutex};
@@ -482,6 +504,7 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(
auto [it, inserted] = _allBuckets.insert(std::make_unique<Bucket>());
Bucket* bucket = it->get();
_setIdTimestamp(bucket, time);
+ _bucketStates.emplace(bucket->_id, BucketState::kNormal);
_openBuckets[key] = bucket;
if (openedDuetoMetadata) {
@@ -518,7 +541,52 @@ const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutio
}
void BucketCatalog::_setIdTimestamp(Bucket* bucket, const Date_t& time) {
+ auto oldId = bucket->_id;
bucket->_id.setTimestamp(durationCount<Seconds>(time.toDurationSinceEpoch()));
+ stdx::lock_guard statesLk{_statesMutex};
+ _bucketStates.erase(oldId);
+ _bucketStates.emplace(bucket->_id, BucketState::kNormal);
+}
+
+boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const OID& id,
+ BucketState target) {
+ stdx::lock_guard statesLk{_statesMutex};
+ auto it = _bucketStates.find(id);
+ if (it == _bucketStates.end()) {
+ return boost::none;
+ }
+
+ auto& [_, state] = *it;
+ switch (target) {
+ case BucketState::kNormal: {
+ if (state == BucketState::kPrepared) {
+ state = BucketState::kNormal;
+ } else if (state == BucketState::kPreparedAndCleared) {
+ state = BucketState::kCleared;
+ } else {
+ invariant(state != BucketState::kCleared);
+ }
+ break;
+ }
+ case BucketState::kPrepared: {
+ invariant(state == BucketState::kNormal);
+ state = BucketState::kPrepared;
+ break;
+ }
+ case BucketState::kCleared: {
+ if (state == BucketState::kNormal) {
+ state = BucketState::kCleared;
+ } else if (state == BucketState::kPrepared) {
+ state = BucketState::kPreparedAndCleared;
+ }
+ break;
+ }
+ case BucketState::kPreparedAndCleared: {
+ invariant(target != BucketState::kPreparedAndCleared);
+ }
+ }
+
+ return state;
}
BucketCatalog::BucketMetadata::BucketMetadata(BSONObj&& obj,
@@ -607,8 +675,8 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
{
auto lk = _catalog->_lockShared();
- bool bucketExisted = _findOpenBucketAndLock(hash);
- if (bucketExisted) {
+ auto bucketState = _findOpenBucketAndLock(hash);
+ if (bucketState == BucketState::kNormal || bucketState == BucketState::kPrepared) {
return;
}
}
@@ -620,12 +688,21 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, Bucket* bucket)
: _catalog(catalog) {
auto lk = _catalog->_lockShared();
- auto it = _catalog->_allBuckets.find(bucket);
- if (it == _catalog->_allBuckets.end()) {
+ auto bucketIt = _catalog->_allBuckets.find(bucket);
+ if (bucketIt == _catalog->_allBuckets.end()) {
return;
}
+
_bucket = bucket;
_acquire();
+
+ stdx::lock_guard statesLk{_catalog->_statesMutex};
+ auto statesIt = _catalog->_bucketStates.find(_bucket->_id);
+ invariant(statesIt != _catalog->_bucketStates.end());
+ auto& [_, state] = *statesIt;
+ if (state == BucketState::kCleared) {
+ release();
+ }
}
BucketCatalog::BucketAccess::~BucketAccess() {
@@ -634,18 +711,27 @@ BucketCatalog::BucketAccess::~BucketAccess() {
}
}
-bool BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) {
+BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) {
auto it = _catalog->_openBuckets.find(*_key, hash);
if (it == _catalog->_openBuckets.end()) {
// Bucket does not exist.
- return false;
+ return BucketState::kCleared;
}
_bucket = it->second;
_acquire();
- _catalog->_markBucketNotIdle(_bucket, false /* locked */);
- return true;
+ stdx::lock_guard statesLk{_catalog->_statesMutex};
+ auto statesIt = _catalog->_bucketStates.find(_bucket->_id);
+ invariant(statesIt != _catalog->_bucketStates.end());
+ auto& [_, state] = *statesIt;
+ if (state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) {
+ release();
+ } else {
+ _catalog->_markBucketNotIdle(_bucket, false /* locked */);
+ }
+
+ return state;
}
void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t hash) {
@@ -658,7 +744,20 @@ void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t has
_bucket = it->second;
_acquire();
- _catalog->_markBucketNotIdle(_bucket, false /* locked */);
+
+ {
+ stdx::lock_guard statesLk{_catalog->_statesMutex};
+ auto statesIt = _catalog->_bucketStates.find(_bucket->_id);
+ invariant(statesIt != _catalog->_bucketStates.end());
+ auto& [_, state] = *statesIt;
+ if (state == BucketState::kNormal || state == BucketState::kPrepared) {
+ _catalog->_markBucketNotIdle(_bucket, false /* locked */);
+ return;
+ }
+ }
+
+ _catalog->_abort(_guard, _bucket, nullptr);
+ _create();
}
void BucketCatalog::BucketAccess::_acquire() {
@@ -985,7 +1084,6 @@ StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() con
}
BucketCatalog::Bucket* BucketCatalog::WriteBatch::bucket() const {
- invariant(!finished());
return _bucket;
}
@@ -1083,7 +1181,6 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) {
}
void BucketCatalog::WriteBatch::_abort() {
- invariant(_commitRights.load());
_active = false;
_promise.setError({ErrorCodes::TimeseriesBucketCleared,
str::stream() << "Time-series bucket " << _bucket->id() << " for "
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index b6fc129313f..66d762d5efe 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -190,9 +190,10 @@ public:
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
- * commit rights on batch.
+ * commit rights on batch. Returns true if the batch was successfully prepared, or false if the
+ * batch was aborted.
*/
- void prepareCommit(std::shared_ptr<WriteBatch> batch);
+ bool prepareCommit(std::shared_ptr<WriteBatch> batch);
/**
* Records the result of a batch commit. Caller must already have commit rights on batch, and
@@ -207,6 +208,12 @@ public:
void abort(std::shared_ptr<WriteBatch> batch);
/**
+ * Marks any bucket with the specified OID as cleared and prevents any future inserts from
+ * landing in that bucket.
+ */
+ void clear(const OID& oid);
+
+ /**
* Clears the buckets for the given namespace.
*/
void clear(const NamespaceString& ns);
@@ -461,6 +468,20 @@ private:
AtomicWord<long long> numMeasurementsCommitted;
};
+ enum class BucketState {
+ // Bucket can be inserted into, and does not have an outstanding prepared commit
+ kNormal,
+ // Bucket can be inserted into, and has a prepared commit outstanding.
+ kPrepared,
+ // Bucket can no longer be inserted into, does not have an outstanding prepared
+ // commit.
+ kCleared,
+ // Bucket can no longer be inserted into, but still has an outstanding
+ // prepared commit. Any writer other than the one who prepared the
+ // commit should receive a WriteConflictException.
+ kPreparedAndCleared,
+ };
+
/**
* Helper class to handle all the locking necessary to lookup and lock a bucket for use. This
* is intended primarily for using a single bucket, including replacing it when it becomes full.
@@ -500,9 +521,13 @@ private:
Date_t getTime() const;
private:
- // Helper to find and lock an open bucket for the given metadata if it exists. Requires a
- // shared lock on the catalog. Returns true if the bucket exists and was locked.
- bool _findOpenBucketAndLock(std::size_t hash);
+ /**
+ * Helper to find and lock an open bucket for the given metadata if it exists. Requires a
+ * shared lock on the catalog. Returns the state of the bucket if it is locked and usable.
+ * In case the bucket does not exist or was previously cleared and thus is not usable, the
+ * return value will be BucketState::kCleared.
+ */
+ BucketState _findOpenBucketAndLock(std::size_t hash);
// Helper to find an open bucket for the given metadata if it exists, create it if it
// doesn't, and lock it. Requires an exclusive lock on the catalog.
@@ -537,6 +562,12 @@ private:
bool _removeBucket(Bucket* bucket, bool expiringBuckets);
/**
+ * Aborts any batches it can for the given bucket, then removes the bucket. If batch is
+ * non-null, it is assumed that the caller has commit rights for that batch.
+ */
+ void _abort(stdx::unique_lock<Mutex>& lk, Bucket* bucket, std::shared_ptr<WriteBatch> batch);
+
+ /**
* Adds the bucket to a list of idle buckets to be expired at a later date
*/
void _markBucketIdle(Bucket* bucket);
@@ -572,6 +603,16 @@ private:
void _setIdTimestamp(Bucket* bucket, const Date_t& time);
/**
+ * Changes the bucket state, taking into account the current state, the specified target state,
+ * and allowed state transitions. The return value, if set, is the final state of the bucket
+ * with the given id; if no such bucket exists, the return value will not be set.
+ *
+ * Ex. For a bucket with state kPrepared, and a target of kCleared, the return will be
+ * kPreparedAndCleared.
+ */
+ boost::optional<BucketState> _setBucketState(const OID& id, BucketState target);
+
+ /**
* You must hold a lock on _bucketMutex when accessing _allBuckets or _openBuckets.
* While holding a lock on _bucketMutex, you can take a lock on an individual bucket, then
* release _bucketMutex. Any iterators on the protected structures should be considered invalid
@@ -595,6 +636,10 @@ private:
// The current open bucket for each namespace and metadata pair.
stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, Bucket*> _openBuckets;
+ // Bucket state
+ mutable Mutex _statesMutex = MONGO_MAKE_LATCH("BucketCatalog::_statesMutex");
+ stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates;
+
// This mutex protects access to _idleBuckets
mutable Mutex _idleMutex = MONGO_MAKE_LATCH("BucketCatalog::_idleMutex");
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 5b0f398eebd..f21376f5eb9 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/catalog/catalog_test_fixture.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/stdx/future.h"
@@ -445,7 +446,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
ASSERT(batch2->newFieldNamesToBeInserted().count("a")) << batch2->toBSON();
}
-TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) {
+TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
auto batch1 = _bucketCatalog
->insert(_opCtx,
_ns1,
@@ -470,27 +471,56 @@ TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) {
.getValue();
ASSERT_NE(batch1, batch2);
- ASSERT_EQ(0, _getNumWaits(_ns1));
-
- // Aborting the batch will have to wait for the commit of batch1 to finish, then will proceed
- // to abort batch2.
ASSERT(batch2->claimCommitRights());
- auto task = Task{[&]() { _bucketCatalog->abort(batch2); }};
- // Add a little extra wait to make sure abort actually gets to the blocking point.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
- ASSERT(task.future().valid());
- ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1)))
- << "clear finished before expected";
+ _bucketCatalog->abort(batch2);
+ ASSERT(batch2->finished());
+ ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
_bucketCatalog->finish(batch1, _commitInfo);
ASSERT(batch1->finished());
+ ASSERT_OK(batch1->getResult().getStatus());
+}
- // Now the clear should be able to continue, and will eventually abort batch2.
- task.future().wait();
- ASSERT_EQ(1, _getNumWaits(_ns1));
- ASSERT(batch2->finished());
- ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
- ASSERT_EQ(1, _getNumWaits(_ns1));
+TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
+ auto batch = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
+ .getValue();
+ ASSERT(batch->claimCommitRights());
+ _bucketCatalog->prepareCommit(batch);
+ ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
+
+ ASSERT_THROWS(_bucketCatalog->clear(batch->bucket()->id()), WriteConflictException);
+
+ _bucketCatalog->abort(batch);
+ ASSERT(batch->finished());
+ ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+}
+
+TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrows) {
+ auto batch = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
+ .getValue();
+ ASSERT(batch->claimCommitRights());
+
+ _bucketCatalog->abort(batch);
+ ASSERT(batch->finished());
+ ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+
+ bool prepared = _bucketCatalog->prepareCommit(batch);
+ ASSERT(!prepared);
+ ASSERT(batch->finished());
+ ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {