summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2021-02-11 09:58:42 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-11 16:18:43 +0000
commitdc03b6aa04c099922c026866d3abca68040c09dd (patch)
tree6654142a0af185ae44084678707c498946bce17f
parent011dba7c9ae31a681074dc7bee468c03edb23ef3 (diff)
downloadmongo-dc03b6aa04c099922c026866d3abca68040c09dd.tar.gz
SERVER-54426 Set all expected promises when committing time-series bucket
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp16
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h2
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp17
3 files changed, 29 insertions, 6 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index b8a70923bb8..f4d21d6af7d 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -187,6 +187,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext*
auto [promise, future] = makePromiseFuture<CommitInfo>();
bucket->promises.push(std::move(promise));
commitInfoFuture = std::move(future);
+ } else {
+ bucket->promises.push(boost::none);
}
bucket->numWriters++;
@@ -243,8 +245,10 @@ BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId,
stats.numMeasurementsCommitted += measurements.size();
// Inform waiters that their measurements have been committed.
- for (uint32_t i = 1; i < bucket.numPendingCommitMeasurements; i++) {
- bucket.promises.front().emplaceValue(*previousCommitInfo);
+ for (uint32_t i = 0; i < bucket.numPendingCommitMeasurements; i++) {
+ if (auto& promise = bucket.promises.front()) {
+ promise->emplaceValue(*previousCommitInfo);
+ }
bucket.promises.pop();
}
if (bucket.numPendingCommitMeasurements) {
@@ -302,9 +306,11 @@ void BucketCatalog::clear(const BucketId& bucketId) {
auto& bucket = it->second;
while (!bucket.promises.empty()) {
- bucket.promises.front().setError({ErrorCodes::TimeseriesBucketCleared,
- str::stream() << "Time-series bucket " << *bucketId
- << " for " << bucket.ns << " was cleared"});
+ if (auto& promise = bucket.promises.front()) {
+ promise->setError({ErrorCodes::TimeseriesBucketCleared,
+ str::stream() << "Time-series bucket " << *bucketId << " for "
+ << bucket.ns << " was cleared"});
+ }
bucket.promises.pop();
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 8766711a37b..87687161b79 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -270,7 +270,7 @@ private:
// Promises for committers to fulfill in order to signal to waiters that their measurements
// have been committed.
- std::queue<Promise<CommitInfo>> promises;
+ std::queue<boost::optional<Promise<CommitInfo>>> promises;
// Whether the bucket is full. This can be due to number of measurements, size, or time
// range.
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index ad60654f675..cbdb57f977a 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -195,6 +195,23 @@ TEST_F(BucketCatalogTest, ClearDatabaseBuckets) {
_insertOneAndCommit(_ns3, 1);
}
+TEST_F(BucketCatalogTest, InsertBetweenCommits) {
+ auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ ASSERT(!result1.getValue().commitInfo);
+
+ auto data = _bucketCatalog->commit(result1.getValue().bucketId);
+ ASSERT_EQ(data.docs.size(), 1);
+ ASSERT_EQ(data.numCommittedMeasurements, 0);
+
+ // Insert before the second commit so that the committer gets more documents to commit.
+ auto result2 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
+ ASSERT(result2.getValue().commitInfo);
+
+ _commit(result1.getValue().bucketId, 1);
+
+ ASSERT(result2.getValue().commitInfo->isReady());
+}
+
DEATH_TEST_F(BucketCatalogTest, CannotProvideCommitInfoOnFirstCommit, "invariant") {
auto result = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now()));
auto& [bucketId, _] = result.getValue();