diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2021-04-01 23:06:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-02 00:01:02 +0000 |
commit | b55f6c86a1a47f28de7815ef942c3c422590dea9 (patch) | |
tree | 3ad4664be27f03f0e601d1fb6fd32d4276ca25f6 /src/mongo/db/timeseries | |
parent | c8e74f9e82a9aab3f549ff77aa539fa6b3ab6b45 (diff) | |
download | mongo-b55f6c86a1a47f28de7815ef942c3c422590dea9.tar.gz |
SERVER-55060 Direct modification must remove buckets from the time-series bucket catalog
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r-- | src/mongo/db/timeseries/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 233 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 55 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 64 |
4 files changed, 264 insertions, 90 deletions
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript index c2f1959a441..b1b35582fe2 100644 --- a/src/mongo/db/timeseries/SConscript +++ b/src/mongo/db/timeseries/SConscript @@ -23,7 +23,9 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/commands/server_status', + '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', '$BUILD_DIR/mongo/db/views/views', + '$BUILD_DIR/mongo/util/fail_point', 'timeseries_idl', ], ) diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 17fa905b8d9..a6fe7f89e14 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -35,13 +35,16 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/commands/server_status.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/operation_context.h" #include "mongo/db/views/view_catalog.h" #include "mongo/stdx/thread.h" +#include "mongo/util/fail_point.h" namespace mongo { namespace { const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>(); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict); uint8_t numDigits(uint32_t num) { uint8_t numDigits = 0; @@ -128,6 +131,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( auto time = timeElem.Date(); BucketAccess bucket{this, key, stats.get(), time}; + invariant(bucket); StringSet newFieldNamesToBeInserted; uint32_t newFieldNamesSize = 0; @@ -202,46 +206,59 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( return batch; } -void BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { - invariant(!batch->finished()); +bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { + if (batch->finished()) { + // In this case, someone else aborted the batch behind our back. Oops. + return false; + } _waitToCommitBatch(batch); BucketAccess bucket(this, batch->bucket()); - invariant(bucket); + if (!bucket) { + abort(batch); + return false; + } + + invariant(_setBucketState(bucket->_id, BucketState::kPrepared)); auto prevMemoryUsage = bucket->_memoryUsage; batch->_prepareCommit(); _memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage); bucket->_batches.erase(batch->_lsid); + + return true; } void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info) { invariant(!batch->finished()); invariant(!batch->active()); - auto stats = _getExecutionStats(batch->bucket()->_ns); - BucketAccess bucket(this, batch->bucket()); - invariant(bucket); batch->_finish(info); - bucket->_preparedBatch.reset(); + if (bucket) { + invariant(_setBucketState(bucket->_id, BucketState::kNormal)); + bucket->_preparedBatch.reset(); + } if (info.result.isOK()) { + auto& stats = batch->_stats; stats->numCommits.fetchAndAddRelaxed(1); - if (bucket->_numCommittedMeasurements == 0) { + if (batch->numPreviouslyCommittedMeasurements() == 0) { stats->numBucketInserts.fetchAndAddRelaxed(1); } else { stats->numBucketUpdates.fetchAndAddRelaxed(1); } stats->numMeasurementsCommitted.fetchAndAddRelaxed(batch->measurements().size()); - bucket->_numCommittedMeasurements += batch->measurements().size(); + if (bucket) { + bucket->_numCommittedMeasurements += batch->measurements().size(); + } } - if (bucket->allCommitted()) { + if (bucket && bucket->allCommitted()) { if (bucket->_full) { // Everything in the bucket has been committed, and nothing more will be added since the // bucket is full. Thus, we can remove it. @@ -255,6 +272,10 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& // happened in BucketAccess::rollover, and that there is already a new open bucket for // this metadata. _markBucketNotIdle(ptr, false /* locked */); + { + stdx::lock_guard statesLk{_statesMutex}; + _bucketStates.erase(ptr->_id); + } _allBuckets.erase(ptr); } else { _markBucketIdle(bucket); @@ -265,54 +286,26 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch) { invariant(batch); invariant(!batch->finished()); + invariant(batch->_commitRights.load()); Bucket* bucket = batch->bucket(); - while (true) { - std::vector<std::shared_ptr<WriteBatch>> batchesToWaitOn; - - { - auto lk = _lockExclusive(); - - // Before we access the bucket, make sure it's still there. - if (!_allBuckets.contains(bucket)) { - // Someone else already cleaned up the bucket while we didn't hold the lock. - invariant(batch->finished()); - return; - } - - stdx::unique_lock blk{bucket->_mutex}; - if (bucket->allCommitted()) { - // No uncommitted batches left to abort, go ahead and remove the bucket. - blk.unlock(); - _removeBucket(bucket, false /* expiringBuckets */); - break; - } - - // For any uncommitted batches that we need to abort, see if we already have the rights, - // otherwise try to claim the rights and abort it. If we don't get the rights, then wait - // for the other writer to resolve the batch. - for (const auto& [_, active] : bucket->_batches) { - if (active == batch || active->claimCommitRights()) { - active->_abort(); - } else { - batchesToWaitOn.push_back(active); - } - } - bucket->_batches.clear(); + // Before we access the bucket, make sure it's still there. + auto lk = _lockExclusive(); + if (!_allBuckets.contains(bucket)) { + // Special case, bucket has already been cleared, and we need only abort this batch. + batch->_abort(); + return; + } - if (auto& prepared = bucket->_preparedBatch) { - if (prepared == batch) { - prepared->_abort(); - } else { - batchesToWaitOn.push_back(prepared); - } - prepared.reset(); - } - } + stdx::unique_lock blk{bucket->_mutex}; + _abort(blk, bucket, batch); +} - for (const auto& batchToWaitOn : batchesToWaitOn) { - batchToWaitOn->getResult().getStatus().ignore(); - } +void BucketCatalog::clear(const OID& oid) { + auto result = _setBucketState(oid, BucketState::kCleared); + if (result && *result == BucketState::kPreparedAndCleared) { + hangTimeseriesDirectModificationBeforeWriteConflict.pauseWhileSet(); + throw WriteConflictException(); } } @@ -328,10 +321,10 @@ void BucketCatalog::clear(const NamespaceString& ns) { auto nextIt = std::next(it); const auto& bucket = *it; - _verifyBucketIsUnused(bucket.get()); + stdx::unique_lock blk{bucket->_mutex}; if (shouldClear(bucket->_ns)) { _executionStats.erase(bucket->_ns); - _removeBucket(bucket.get(), false /* expiringBuckets */); + _abort(blk, bucket.get(), nullptr); } it = nextIt; @@ -394,7 +387,9 @@ BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) { while (true) { BucketAccess bucket{this, batch->bucket()}; - invariant(bucket); + if (!bucket) { + return; + } auto current = bucket->_preparedBatch; if (!current) { @@ -416,15 +411,42 @@ bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) { } invariant(bucket->_batches.empty()); + invariant(!bucket->_preparedBatch); _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); _markBucketNotIdle(bucket, expiringBuckets /* locked */); _openBuckets.erase({std::move(bucket->_ns), std::move(bucket->_metadata)}); + { + stdx::lock_guard statesLk{_statesMutex}; + _bucketStates.erase(bucket->_id); + } _allBuckets.erase(it); return true; } +void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk, + Bucket* bucket, + std::shared_ptr<WriteBatch> batch) { + // For any uncommitted batches that we need to abort, see if we already have the rights, + // otherwise try to claim the rights and abort it. If we don't get the rights, then wait + // for the other writer to resolve the batch. + for (const auto& [_, current] : bucket->_batches) { + current->_abort(); + } + bucket->_batches.clear(); + + if (auto& prepared = bucket->_preparedBatch) { + if (prepared == batch) { + prepared->_abort(); + } + prepared.reset(); + } + + lk.unlock(); + _removeBucket(bucket, true /* bucketIsUnused */); +} + void BucketCatalog::_markBucketIdle(Bucket* bucket) { invariant(bucket); stdx::lock_guard lk{_idleMutex}; @@ -482,6 +504,7 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket( auto [it, inserted] = _allBuckets.insert(std::make_unique<Bucket>()); Bucket* bucket = it->get(); _setIdTimestamp(bucket, time); + _bucketStates.emplace(bucket->_id, BucketState::kNormal); _openBuckets[key] = bucket; if (openedDuetoMetadata) { @@ -518,7 +541,52 @@ const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutio } void BucketCatalog::_setIdTimestamp(Bucket* bucket, const Date_t& time) { + auto oldId = bucket->_id; bucket->_id.setTimestamp(durationCount<Seconds>(time.toDurationSinceEpoch())); + stdx::lock_guard statesLk{_statesMutex}; + _bucketStates.erase(oldId); + _bucketStates.emplace(bucket->_id, BucketState::kNormal); +} + +boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const OID& id, + BucketState target) { + stdx::lock_guard statesLk{_statesMutex}; + auto it = _bucketStates.find(id); + if (it == _bucketStates.end()) { + return boost::none; + } + + auto& [_, state] = *it; + switch (target) { + case BucketState::kNormal: { + if (state == BucketState::kPrepared) { + state = BucketState::kNormal; + } else if (state == BucketState::kPreparedAndCleared) { + state = BucketState::kCleared; + } else { + invariant(state != BucketState::kCleared); + } + break; + } + case BucketState::kPrepared: { + invariant(state == BucketState::kNormal); + state = BucketState::kPrepared; + break; + } + case BucketState::kCleared: { + if (state == BucketState::kNormal) { + state = BucketState::kCleared; + } else if (state == BucketState::kPrepared) { + state = BucketState::kPreparedAndCleared; + } + break; + } + case BucketState::kPreparedAndCleared: { + invariant(target != BucketState::kPreparedAndCleared); + } + } + + return state; } BucketCatalog::BucketMetadata::BucketMetadata(BSONObj&& obj, @@ -607,8 +675,8 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, { auto lk = _catalog->_lockShared(); - bool bucketExisted = _findOpenBucketAndLock(hash); - if (bucketExisted) { + auto bucketState = _findOpenBucketAndLock(hash); + if (bucketState == BucketState::kNormal || bucketState == BucketState::kPrepared) { return; } } @@ -620,12 +688,21 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, Bucket* bucket) : _catalog(catalog) { auto lk = _catalog->_lockShared(); - auto it = _catalog->_allBuckets.find(bucket); - if (it == _catalog->_allBuckets.end()) { + auto bucketIt = _catalog->_allBuckets.find(bucket); + if (bucketIt == _catalog->_allBuckets.end()) { return; } + _bucket = bucket; _acquire(); + + stdx::lock_guard statesLk{_catalog->_statesMutex}; + auto statesIt = _catalog->_bucketStates.find(_bucket->_id); + invariant(statesIt != _catalog->_bucketStates.end()); + auto& [_, state] = *statesIt; + if (state == BucketState::kCleared) { + release(); + } } BucketCatalog::BucketAccess::~BucketAccess() { @@ -634,18 +711,27 @@ BucketCatalog::BucketAccess::~BucketAccess() { } } -bool BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) { +BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketAndLock(std::size_t hash) { auto it = _catalog->_openBuckets.find(*_key, hash); if (it == _catalog->_openBuckets.end()) { // Bucket does not exist. - return false; + return BucketState::kCleared; } _bucket = it->second; _acquire(); - _catalog->_markBucketNotIdle(_bucket, false /* locked */); - return true; + stdx::lock_guard statesLk{_catalog->_statesMutex}; + auto statesIt = _catalog->_bucketStates.find(_bucket->_id); + invariant(statesIt != _catalog->_bucketStates.end()); + auto& [_, state] = *statesIt; + if (state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) { + release(); + } else { + _catalog->_markBucketNotIdle(_bucket, false /* locked */); + } + + return state; } void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t hash) { @@ -658,7 +744,20 @@ void BucketCatalog::BucketAccess::_findOrCreateOpenBucketAndLock(std::size_t has _bucket = it->second; _acquire(); - _catalog->_markBucketNotIdle(_bucket, false /* locked */); + + { + stdx::lock_guard statesLk{_catalog->_statesMutex}; + auto statesIt = _catalog->_bucketStates.find(_bucket->_id); + invariant(statesIt != _catalog->_bucketStates.end()); + auto& [_, state] = *statesIt; + if (state == BucketState::kNormal || state == BucketState::kPrepared) { + _catalog->_markBucketNotIdle(_bucket, false /* locked */); + return; + } + } + + _catalog->_abort(_guard, _bucket, nullptr); + _create(); } void BucketCatalog::BucketAccess::_acquire() { @@ -985,7 +1084,6 @@ StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() con } BucketCatalog::Bucket* BucketCatalog::WriteBatch::bucket() const { - invariant(!finished()); return _bucket; } @@ -1083,7 +1181,6 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) { } void BucketCatalog::WriteBatch::_abort() { - invariant(_commitRights.load()); _active = false; _promise.setError({ErrorCodes::TimeseriesBucketCleared, str::stream() << "Time-series bucket " << _bucket->id() << " for " diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index b6fc129313f..66d762d5efe 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -190,9 +190,10 @@ public: /** * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have - * commit rights on batch. + * commit rights on batch. Returns true if the batch was successfully prepared, or false if the + * batch was aborted. */ - void prepareCommit(std::shared_ptr<WriteBatch> batch); + bool prepareCommit(std::shared_ptr<WriteBatch> batch); /** * Records the result of a batch commit. Caller must already have commit rights on batch, and @@ -207,6 +208,12 @@ public: void abort(std::shared_ptr<WriteBatch> batch); /** + * Marks any bucket with the specified OID as cleared and prevents any future inserts from + * landing in that bucket. + */ + void clear(const OID& oid); + + /** * Clears the buckets for the given namespace. */ void clear(const NamespaceString& ns); @@ -461,6 +468,20 @@ private: AtomicWord<long long> numMeasurementsCommitted; }; + enum class BucketState { + // Bucket can be inserted into, and does not have an outstanding prepared commit + kNormal, + // Bucket can be inserted into, and has a prepared commit outstanding. + kPrepared, + // Bucket can no longer be inserted into, does not have an outstanding prepared + // commit. + kCleared, + // Bucket can no longer be inserted into, but still has an outstanding + // prepared commit. Any writer other than the one who prepared the + // commit should receive a WriteConflictException. + kPreparedAndCleared, + }; + /** * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This * is intended primarily for using a single bucket, including replacing it when it becomes full. @@ -500,9 +521,13 @@ private: Date_t getTime() const; private: - // Helper to find and lock an open bucket for the given metadata if it exists. Requires a - // shared lock on the catalog. Returns true if the bucket exists and was locked. - bool _findOpenBucketAndLock(std::size_t hash); + /** + * Helper to find and lock an open bucket for the given metadata if it exists. Requires a + * shared lock on the catalog. Returns the state of the bucket if it is locked and usable. + * In case the bucket does not exist or was previously cleared and thus is not usable, the + * return value will be BucketState::kCleared. + */ + BucketState _findOpenBucketAndLock(std::size_t hash); // Helper to find an open bucket for the given metadata if it exists, create it if it // doesn't, and lock it. Requires an exclusive lock on the catalog. @@ -537,6 +562,12 @@ private: bool _removeBucket(Bucket* bucket, bool expiringBuckets); /** + * Aborts any batches it can for the given bucket, then removes the bucket. If batch is + * non-null, it is assumed that the caller has commit rights for that batch. + */ + void _abort(stdx::unique_lock<Mutex>& lk, Bucket* bucket, std::shared_ptr<WriteBatch> batch); + + /** * Adds the bucket to a list of idle buckets to be expired at a later date */ void _markBucketIdle(Bucket* bucket); @@ -572,6 +603,16 @@ private: void _setIdTimestamp(Bucket* bucket, const Date_t& time); /** + * Changes the bucket state, taking into account the current state, the specified target state, + * and allowed state transitions. The return value, if set, is the final state of the bucket + * with the given id; if no such bucket exists, the return value will not be set. + * + * Ex. For a bucket with state kPrepared, and a target of kCleared, the return will be + * kPreparedAndCleared. + */ + boost::optional<BucketState> _setBucketState(const OID& id, BucketState target); + + /** * You must hold a lock on _bucketMutex when accessing _allBuckets or _openBuckets. * While holding a lock on _bucketMutex, you can take a lock on an individual bucket, then * release _bucketMutex. Any iterators on the protected structures should be considered invalid @@ -595,6 +636,10 @@ private: // The current open bucket for each namespace and metadata pair. stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, Bucket*> _openBuckets; + // Bucket state + mutable Mutex _statesMutex = MONGO_MAKE_LATCH("BucketCatalog::_statesMutex"); + stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates; + // This mutex protects access to _idleBuckets mutable Mutex _idleMutex = MONGO_MAKE_LATCH("BucketCatalog::_idleMutex"); diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 5b0f398eebd..f21376f5eb9 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -32,6 +32,7 @@ #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/views/view_catalog.h" #include "mongo/stdx/future.h" @@ -445,7 +446,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { ASSERT(batch2->newFieldNamesToBeInserted().count("a")) << batch2->toBSON(); } -TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) { +TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { auto batch1 = _bucketCatalog ->insert(_opCtx, _ns1, @@ -470,27 +471,56 @@ TEST_F(BucketCatalogTest, AbortBatchWithOutstandingInsertsOnBucket) { .getValue(); ASSERT_NE(batch1, batch2); - ASSERT_EQ(0, _getNumWaits(_ns1)); - - // Aborting the batch will have to wait for the commit of batch1 to finish, then will proceed - // to abort batch2. ASSERT(batch2->claimCommitRights()); - auto task = Task{[&]() { _bucketCatalog->abort(batch2); }}; - // Add a little extra wait to make sure abort actually gets to the blocking point. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); - ASSERT(task.future().valid()); - ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1))) - << "clear finished before expected"; + _bucketCatalog->abort(batch2); + ASSERT(batch2->finished()); + ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); _bucketCatalog->finish(batch1, _commitInfo); ASSERT(batch1->finished()); + ASSERT_OK(batch1->getResult().getStatus()); +} - // Now the clear should be able to continue, and will eventually abort batch2. - task.future().wait(); - ASSERT_EQ(1, _getNumWaits(_ns1)); - ASSERT(batch2->finished()); - ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); - ASSERT_EQ(1, _getNumWaits(_ns1)); +TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { + auto batch = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + .getValue(); + ASSERT(batch->claimCommitRights()); + _bucketCatalog->prepareCommit(batch); + ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); + + ASSERT_THROWS(_bucketCatalog->clear(batch->bucket()->id()), WriteConflictException); + + _bucketCatalog->abort(batch); + ASSERT(batch->finished()); + ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); +} + +TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrows) { + auto batch = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + .getValue(); + ASSERT(batch->claimCommitRights()); + + _bucketCatalog->abort(batch); + ASSERT(batch->finished()); + ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + + bool prepared = _bucketCatalog->prepareCommit(batch); + ASSERT(!prepared); + ASSERT(batch->finished()); + ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); } TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { |