diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2021-02-11 09:58:42 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-11 16:18:43 +0000 |
commit | dc03b6aa04c099922c026866d3abca68040c09dd (patch) | |
tree | 6654142a0af185ae44084678707c498946bce17f | |
parent | 011dba7c9ae31a681074dc7bee468c03edb23ef3 (diff) | |
download | mongo-dc03b6aa04c099922c026866d3abca68040c09dd.tar.gz |
SERVER-54426 Set all expected promises when committing time-series bucket
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 2 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 17 |
3 files changed, 29 insertions, 6 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index b8a70923bb8..f4d21d6af7d 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -187,6 +187,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* auto [promise, future] = makePromiseFuture<CommitInfo>(); bucket->promises.push(std::move(promise)); commitInfoFuture = std::move(future); + } else { + bucket->promises.push(boost::none); } bucket->numWriters++; @@ -243,8 +245,10 @@ BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId, stats.numMeasurementsCommitted += measurements.size(); // Inform waiters that their measurements have been committed. - for (uint32_t i = 1; i < bucket.numPendingCommitMeasurements; i++) { - bucket.promises.front().emplaceValue(*previousCommitInfo); + for (uint32_t i = 0; i < bucket.numPendingCommitMeasurements; i++) { + if (auto& promise = bucket.promises.front()) { + promise->emplaceValue(*previousCommitInfo); + } bucket.promises.pop(); } if (bucket.numPendingCommitMeasurements) { @@ -302,9 +306,11 @@ void BucketCatalog::clear(const BucketId& bucketId) { auto& bucket = it->second; while (!bucket.promises.empty()) { - bucket.promises.front().setError({ErrorCodes::TimeseriesBucketCleared, - str::stream() << "Time-series bucket " << *bucketId - << " for " << bucket.ns << " was cleared"}); + if (auto& promise = bucket.promises.front()) { + promise->setError({ErrorCodes::TimeseriesBucketCleared, + str::stream() << "Time-series bucket " << *bucketId << " for " + << bucket.ns << " was cleared"}); + } bucket.promises.pop(); } diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 8766711a37b..87687161b79 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -270,7 +270,7 @@ private: // Promises for committers to fulfill in order to signal to waiters that their measurements // have been committed. - std::queue<Promise<CommitInfo>> promises; + std::queue<boost::optional<Promise<CommitInfo>>> promises; // Whether the bucket is full. This can be due to number of measurements, size, or time // range. diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index ad60654f675..cbdb57f977a 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -195,6 +195,23 @@ TEST_F(BucketCatalogTest, ClearDatabaseBuckets) { _insertOneAndCommit(_ns3, 1); } +TEST_F(BucketCatalogTest, InsertBetweenCommits) { + auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + ASSERT(!result1.getValue().commitInfo); + + auto data = _bucketCatalog->commit(result1.getValue().bucketId); + ASSERT_EQ(data.docs.size(), 1); + ASSERT_EQ(data.numCommittedMeasurements, 0); + + // Insert before the second commit so that the committer gets more documents to commit. + auto result2 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + ASSERT(result2.getValue().commitInfo); + + _commit(result1.getValue().bucketId, 1); + + ASSERT(result2.getValue().commitInfo->isReady()); +} + DEATH_TEST_F(BucketCatalogTest, CannotProvideCommitInfoOnFirstCommit, "invariant") { auto result = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); auto& [bucketId, _] = result.getValue(); |