summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2022-02-28 16:55:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-28 17:39:42 +0000
commit335bef8b6c3632ce7228553ef1c8fa3522768858 (patch)
tree8ed3f2fc3775c83eef237bf8523da0bd0d2568c6 /src/mongo/db
parentc05a030a3e807d1f444b4ee91b2331d5b7c19916 (diff)
downloadmongo-335bef8b6c3632ce7228553ef1c8fa3522768858.tar.gz
SERVER-61010 Require a `Status` for `BucketCatalog::abort`
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/write_commands.cpp108
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp68
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h21
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp42
4 files changed, 124 insertions, 115 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 276af6bb02a..47c688309f9 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -740,18 +740,16 @@ public:
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
- std::vector<size_t>* docsToRetry) const {
+ std::vector<size_t>* docsToRetry) const try {
auto& bucketCatalog = BucketCatalog::get(opCtx);
auto metadata = bucketCatalog.getMetadata(batch->bucket());
- bool prepared = bucketCatalog.prepareCommit(batch);
- if (!prepared) {
+ auto status = bucketCatalog.prepareCommit(batch);
+ if (!status.isOK()) {
invariant(batch->finished());
docsToRetry->push_back(index);
return true;
}
- // Now that the batch is prepared, make sure we clean up if we throw.
- ScopeGuard batchGuard([&] { bucketCatalog.abort(batch); });
hangTimeseriesInsertBeforeWrite.pauseWhileSet();
@@ -764,7 +762,6 @@ public:
generateError(opCtx, output.result, start + index, errors->size())) {
errors->emplace_back(std::move(*error));
bucketCatalog.abort(batch, output.result.getStatus());
- batchGuard.dismiss();
return output.canContinue;
}
@@ -779,7 +776,6 @@ public:
generateError(opCtx, output.result, start + index, errors->size())) {
errors->emplace_back(std::move(*error));
bucketCatalog.abort(batch, output.result.getStatus());
- batchGuard.dismiss();
return output.canContinue;
}
@@ -794,8 +790,6 @@ public:
auto closedBucket =
bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
- batchGuard.dismiss();
-
if (closedBucket) {
// If this write closed a bucket, compress the bucket
auto output = _performTimeseriesBucketCompression(opCtx, *closedBucket);
@@ -806,6 +800,9 @@ public:
}
}
return true;
+ } catch (const DBException& ex) {
+ BucketCatalog::get(opCtx).abort(batch, ex.toStatus());
+ throw;
}
bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx,
@@ -834,7 +831,7 @@ public:
return left.get()->bucket().id < right.get()->bucket().id;
});
- boost::optional<Status> abortStatus;
+ Status abortStatus = Status::OK();
ScopeGuard batchGuard{[&] {
for (auto batch : batchesToCommit) {
if (batch.get()) {
@@ -843,58 +840,63 @@ public:
}
}};
- std::vector<write_ops::InsertCommandRequest> insertOps;
- std::vector<write_ops::UpdateCommandRequest> updateOps;
+ try {
+ std::vector<write_ops::InsertCommandRequest> insertOps;
+ std::vector<write_ops::UpdateCommandRequest> updateOps;
- for (auto batch : batchesToCommit) {
- auto metadata = bucketCatalog.getMetadata(batch.get()->bucket());
- if (!bucketCatalog.prepareCommit(batch)) {
- return false;
- }
+ for (auto batch : batchesToCommit) {
+ auto metadata = bucketCatalog.getMetadata(batch.get()->bucket());
+ auto prepareCommitStatus = bucketCatalog.prepareCommit(batch);
+ if (!prepareCommitStatus.isOK()) {
+ abortStatus = prepareCommitStatus;
+ return false;
+ }
- if (batch.get()->numPreviouslyCommittedMeasurements() == 0) {
- insertOps.push_back(_makeTimeseriesInsertOp(
- batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
- } else {
- updateOps.push_back(_makeTimeseriesUpdateOp(
- opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
+ if (batch.get()->numPreviouslyCommittedMeasurements() == 0) {
+ insertOps.push_back(_makeTimeseriesInsertOp(
+ batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
+ } else {
+ updateOps.push_back(_makeTimeseriesUpdateOp(
+ opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
+ }
}
- }
- hangTimeseriesInsertBeforeWrite.pauseWhileSet();
+ hangTimeseriesInsertBeforeWrite.pauseWhileSet();
- auto result =
- write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps);
- if (!result.isOK()) {
- abortStatus = result;
- return false;
- }
+ auto result =
+ write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps);
+ if (!result.isOK()) {
+ abortStatus = result;
+ return false;
+ }
- getOpTimeAndElectionId(opCtx, opTime, electionId);
+ getOpTimeAndElectionId(opCtx, opTime, electionId);
- bool compressClosedBuckets = true;
- for (auto batch : batchesToCommit) {
- auto closedBucket =
- bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
- batch.get().reset();
+ bool compressClosedBuckets = true;
+ for (auto batch : batchesToCommit) {
+ auto closedBucket = bucketCatalog.finish(
+ batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+ batch.get().reset();
- if (!closedBucket || !compressClosedBuckets) {
- continue;
- }
+ if (!closedBucket || !compressClosedBuckets) {
+ continue;
+ }
- // If this write closed a bucket, compress the bucket
- auto ret = _performTimeseriesBucketCompression(opCtx, *closedBucket);
- if (!ret.result.isOK()) {
- // Don't try to compress any other buckets if we fail. We're not allowed to do
- // more write operations.
- compressClosedBuckets = false;
- }
- if (!ret.canContinue) {
+ // If this write closed a bucket, compress the bucket
+ auto ret = _performTimeseriesBucketCompression(opCtx, *closedBucket);
if (!ret.result.isOK()) {
+ // Don't try to compress any other buckets if we fail. We're not allowed to
+ // do more write operations.
+ compressClosedBuckets = false;
+ }
+ if (!ret.canContinue) {
abortStatus = ret.result.getStatus();
+ return false;
}
- return false;
}
+ } catch (const DBException& ex) {
+ abortStatus = ex.toStatus();
+ throw;
}
batchGuard.dismiss();
@@ -1026,13 +1028,7 @@ public:
// If there are any unprocessed batches, we mark them as error with the last known
// error.
if (itr > indexOfLastProcessedBatch && batch->claimCommitRights()) {
- auto& bucketCatalog = BucketCatalog::get(opCtx);
- bucketCatalog.abort(batch);
-
- tassert(
- 6023100,
- "there should be at least one error if the batch processing exited early",
- lastError);
+ BucketCatalog::get(opCtx).abort(batch, lastError->getStatus());
errors->emplace_back(start + index, lastError->getStatus());
continue;
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index d817f3e1bc5..f2710fff960 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -192,6 +192,17 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti
return {bucketId, roundedTime};
}
+
+Status getTimeseriesBucketClearedError(const OID& bucketId,
+ const boost::optional<NamespaceString>& ns = boost::none) {
+ std::string nsIdentification;
+ if (ns) {
+ nsIdentification.assign(str::stream() << " for namespace " << *ns);
+ }
+ return {ErrorCodes::TimeseriesBucketCleared,
+ str::stream() << "Time-series bucket " << bucketId << nsIdentification
+ << " was cleared"};
+}
} // namespace
struct BucketCatalog::ExecutionStats {
@@ -524,19 +535,12 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) {
_promise.emplaceValue(info);
}
-void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status,
- const Bucket* bucket) {
+void BucketCatalog::WriteBatch::_abort(const Status& status) {
if (finished()) {
return;
}
- std::string nsIdentification;
- if (bucket) {
- nsIdentification.append(str::stream() << " for namespace " << bucket->_ns);
- }
- _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared,
- str::stream() << "Time-series bucket " << _bucket.id
- << nsIdentification << " was cleared"}));
+ _promise.setError(status);
}
BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) {
@@ -675,10 +679,12 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
return InsertResult{batch, closedBuckets};
}
-bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
+Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
+ auto getBatchStatus = [&] { return batch->_promise.getFuture().getNoThrow().getStatus(); };
+
if (batch->finished()) {
// In this case, someone else aborted the batch behind our back. Oops.
- return false;
+ return getBatchStatus();
}
auto& stripe = _stripes[batch->bucket().stripe];
@@ -690,17 +696,17 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
if (batch->finished()) {
// Someone may have aborted it while we were waiting.
- return false;
+ return getBatchStatus();
} else if (!bucket) {
- _abort(&stripe, stripeLock, batch, boost::none);
- return false;
+ _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id));
+ return getBatchStatus();
}
auto prevMemoryUsage = bucket->_memoryUsage;
batch->_prepareCommit(bucket);
_memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage);
- return true;
+ return Status::OK();
}
boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
@@ -741,7 +747,11 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
if (it != stripe.allBuckets.end()) {
bucket = it->second.get();
bucket->_preparedBatch.reset();
- _abort(&stripe, stripeLock, bucket, nullptr, boost::none);
+ _abort(&stripe,
+ stripeLock,
+ bucket,
+ nullptr,
+ getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
}
} else if (bucket->allCommitted()) {
if (bucket->_full) {
@@ -772,8 +782,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
return closedBucket;
}
-void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status) {
+void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& status) {
invariant(batch);
invariant(batch->_commitRights.load());
@@ -807,7 +816,11 @@ void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& sho
stdx::lock_guard catalogLock{_mutex};
_executionStats.erase(bucket->_ns);
}
- _abort(&stripe, stripeLock, bucket.get(), nullptr, boost::none);
+ _abort(&stripe,
+ stripeLock,
+ bucket.get(),
+ nullptr,
+ getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
}
it = nextIt;
@@ -948,7 +961,12 @@ BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe,
return bucket;
}
- _abort(stripe, stripeLock, bucket, nullptr, boost::none);
+ _abort(stripe,
+ stripeLock,
+ bucket,
+ nullptr,
+ getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
+
return _allocateBucket(stripe, stripeLock, info);
}
@@ -1000,12 +1018,12 @@ bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* b
void BucketCatalog::_abort(Stripe* stripe,
WithLock stripeLock,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status) {
+ const Status& status) {
// Before we access the bucket, make sure it's still there.
Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kYes);
if (!bucket) {
// Special case, bucket has already been cleared, and we need only abort this batch.
- batch->_abort(status, nullptr);
+ batch->_abort(status);
return;
}
@@ -1017,11 +1035,11 @@ void BucketCatalog::_abort(Stripe* stripe,
WithLock stripeLock,
Bucket* bucket,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status) {
+ const Status& status) {
// Abort any unprepared batches. This should be safe since we have a lock on the stripe,
// preventing anyone else from using these.
for (const auto& [_, current] : bucket->_batches) {
- current->_abort(status, bucket);
+ current->_abort(status);
}
bucket->_batches.clear();
@@ -1032,7 +1050,7 @@ void BucketCatalog::_abort(Stripe* stripe,
if (auto& prepared = bucket->_preparedBatch) {
if (prepared == batch) {
// We own the prepared batch, so we can go ahead and abort it and remove the bucket.
- prepared->_abort(status, bucket);
+ prepared->_abort(status);
prepared.reset();
} else {
doRemove = false;
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index b33a9025b3c..9807cdea1a1 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -160,10 +160,8 @@ public:
/**
* Abandons the write batch and notifies any waiters that the bucket has been cleared.
- * Parameter 'bucket' provides a pointer to the bucket if still available, nullptr
- * otherwise.
*/
- void _abort(const boost::optional<Status>& status, const Bucket* bucket);
+ void _abort(const Status& status);
const BucketHandle _bucket;
OperationId _opId;
@@ -220,10 +218,10 @@ public:
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
- * commit rights on batch. Returns true if the batch was successfully prepared, or false if the
- * batch was aborted.
+ * commit rights on batch. Returns OK if the batch was successfully prepared, or a status
+ * indicating why the batch was previously aborted by another operation.
*/
- bool prepareCommit(std::shared_ptr<WriteBatch> batch);
+ Status prepareCommit(std::shared_ptr<WriteBatch> batch);
/**
* Records the result of a batch commit. Caller must already have commit rights on batch, and
@@ -234,11 +232,10 @@ public:
boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info);
/**
- * Aborts the given write batch and any other outstanding batches on the same bucket. Uses the
- * provided status when clearing the bucket, or TimeseriesBucketCleared if not provided.
+ * Aborts the given write batch and any other outstanding batches on the same bucket, using the
+ * provided status.
*/
- void abort(std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status = boost::none);
+ void abort(std::shared_ptr<WriteBatch> batch, const Status& status);
/**
* Marks any bucket with the specified OID as cleared and prevents any future inserts from
@@ -415,7 +412,7 @@ private:
void _abort(Stripe* stripe,
WithLock stripeLock,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status = boost::none);
+ const Status& status);
/**
* Aborts any unprepared batches for the given bucket, then removes the bucket if there is no
@@ -426,7 +423,7 @@ private:
WithLock stripeLock,
Bucket* bucket,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status);
+ const Status& status);
/**
* Adds the bucket to a list of idle buckets to be expired at a later date.
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 93352e64bb9..b39b1065b89 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -158,7 +158,7 @@ void BucketCatalogTest::_commit(const std::shared_ptr<BucketCatalog::WriteBatch>
uint16_t numPreviouslyCommittedMeasurements,
size_t expectedBatchSize) {
ASSERT(batch->claimCommitRights());
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), expectedBatchSize);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), numPreviouslyCommittedMeasurements);
@@ -261,7 +261,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
// The batch hasn't actually been committed yet.
ASSERT(!batch1->finished());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
// Still not finished.
ASSERT(!batch1->finished());
@@ -290,7 +290,7 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
.batch;
ASSERT(batch->claimCommitRights());
auto bucket = batch->bucket();
- _bucketCatalog->abort(batch);
+ _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket));
}
@@ -502,7 +502,7 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
.getValue()
.batch;
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT_EQ(batch1->measurements().size(), 1);
ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
@@ -533,7 +533,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") {
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto& batch = result.getValue().batch;
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
// BucketCatalog::prepareCommit uses dassert, so it will only invariant in debug mode. Ensure we
// die here in non-debug mode as well.
@@ -640,7 +640,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
.getValue()
.batch;
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT_EQ(batch1->measurements().size(), 1);
ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
@@ -657,7 +657,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
ASSERT_NE(batch1, batch2);
ASSERT(batch2->claimCommitRights());
- _bucketCatalog->abort(batch2);
+ _bucketCatalog->abort(batch2, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT(batch2->finished());
ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
@@ -680,8 +680,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_bucketCatalog->clear(_ns1);
- bool prepared = _bucketCatalog->prepareCommit(batch);
- ASSERT(!prepared);
+ ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
@@ -695,7 +694,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
.getValue()
.batch;
ASSERT(batch->claimCommitRights());
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
@@ -723,13 +722,13 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
.getValue()
.batch;
ASSERT(batch->claimCommitRights());
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
ASSERT_THROWS(_bucketCatalog->clear(batch->bucket().id), WriteConflictException);
- _bucketCatalog->abort(batch);
+ _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
@@ -745,7 +744,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.getValue()
.batch;
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT_EQ(batch1->measurements().size(), 1);
ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
@@ -767,7 +766,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
// Now try to prepare the second batch. Ensure it aborts the batch.
ASSERT(batch2->claimCommitRights());
- ASSERT(!_bucketCatalog->prepareCommit(batch2));
+ ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2));
ASSERT(batch2->finished());
ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
@@ -790,7 +789,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
ASSERT_NE(batch1->bucket().id, batch3->bucket().id);
// Clean up this batch
ASSERT(batch3->claimCommitRights());
- _bucketCatalog->abort(batch3);
+ _bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""});
// Make sure we can finish the cleanly prepared batch.
_bucketCatalog->finish(batch1, {});
@@ -810,12 +809,11 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
.batch;
ASSERT(batch->claimCommitRights());
- _bucketCatalog->abort(batch);
+ _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
- bool prepared = _bucketCatalog->prepareCommit(batch);
- ASSERT(!prepared);
+ ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
@@ -896,8 +894,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
ASSERT(batch2->claimCommitRights());
// Batch 2 will not be able to commit until batch 1 has finished.
- _bucketCatalog->prepareCommit(batch1);
- auto task = Task{[&]() { _bucketCatalog->prepareCommit(batch2); }};
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+ auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
// Add a little extra wait to make sure prepareCommit actually gets to the blocking point.
stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
ASSERT(task.future().valid());
@@ -932,7 +930,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
// Batch 2 is the first batch to commit the time field.
ASSERT(batch2->claimCommitRights());
- _bucketCatalog->prepareCommit(batch2);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch2));
ASSERT_EQ(batch2->newFieldNamesToBeInserted().size(), 1);
ASSERT_EQ(batch2->newFieldNamesToBeInserted().begin()->first, _timeField);
_bucketCatalog->finish(batch2, {});
@@ -940,7 +938,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
// Batch 1 was the first batch to insert the time field, but by commit time it was already
// committed by batch 2.
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT(batch1->newFieldNamesToBeInserted().empty());
_bucketCatalog->finish(batch1, {});
}