summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2021-06-24 10:08:55 -0400
committerGregory Noma <gregory.noma@gmail.com>2021-06-24 16:56:03 -0400
commit174ce8fa6dde67b5c11c702848744c38cf2170bb (patch)
tree72c03f3db9ae45745dbdf8bfc1b1fefa44fc1545
parentc6ebda4b612b0f1da93e1892a3765c9581f3dc62 (diff)
downloadmongo-174ce8fa6dde67b5c11c702848744c38cf2170bb.tar.gz
SERVER-57842 Use OperationId instead of LogicalSessionId in the BucketCatalog
(cherry picked from commit 63c647943e3cf9dd68d4a0bfff93b383d4520fbf)
-rw-r--r--src/mongo/db/operation_id.cpp6
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp23
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h10
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp29
4 files changed, 38 insertions, 30 deletions
diff --git a/src/mongo/db/operation_id.cpp b/src/mongo/db/operation_id.cpp
index 5a4ca82ed13..3184fc44d2e 100644
--- a/src/mongo/db/operation_id.cpp
+++ b/src/mongo/db/operation_id.cpp
@@ -41,7 +41,11 @@ OperationIdSlot UniqueOperationIdRegistry::acquireSlot() {
invariant(_activeIds.size() < (1 << 20));
while (true) {
- const auto&& [it, ok] = _activeIds.insert(_nextOpId++);
+ auto opId = _nextOpId++;
+ if (!_nextOpId) {
+ _nextOpId = 1U;
+ }
+ const auto&& [it, ok] = _activeIds.insert(opId);
if (ok) {
return OperationIdSlot(*it, shared_from_this());
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 4c579b49be2..8563e4333c4 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -114,13 +114,14 @@ void normalizeTopLevel(BSONObjBuilder* builder, const BSONElement& elem) {
}
}
-UUID getLsid(OperationContext* opCtx, BucketCatalog::CombineWithInsertsFromOtherClients combine) {
- static const UUID common{UUID::gen()};
+OperationId getOpId(OperationContext* opCtx,
+ BucketCatalog::CombineWithInsertsFromOtherClients combine) {
switch (combine) {
case BucketCatalog::CombineWithInsertsFromOtherClients::kAllow:
- return common;
+ return 0;
case BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow:
- return opCtx->getLogicalSessionId()->getId();
+ invariant(opCtx->getOpID());
+ return opCtx->getOpID();
}
MONGO_UNREACHABLE;
}
@@ -229,7 +230,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
&sizeToBeAdded);
}
- auto batch = bucket->_activeBatch(getLsid(opCtx, combine), stats);
+ auto batch = bucket->_activeBatch(getOpId(opCtx, combine), stats);
batch->_addMeasurement(doc);
batch->_recordNewFields(std::move(newFieldNamesToBeInserted));
@@ -279,7 +280,7 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
batch->_prepareCommit();
_memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage);
- bucket->_batches.erase(batch->_lsid);
+ bucket->_batches.erase(batch->_opId);
return true;
}
@@ -782,10 +783,10 @@ bool BucketCatalog::Bucket::allCommitted() const {
}
std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch(
- const UUID& lsid, const std::shared_ptr<ExecutionStats>& stats) {
- auto it = _batches.find(lsid);
+ OperationId opId, const std::shared_ptr<ExecutionStats>& stats) {
+ auto it = _batches.find(opId);
if (it == _batches.end()) {
- it = _batches.try_emplace(lsid, std::make_shared<WriteBatch>(this, lsid, stats)).first;
+ it = _batches.try_emplace(opId, std::make_shared<WriteBatch>(this, opId, stats)).first;
}
return it->second;
}
@@ -1069,9 +1070,9 @@ Date_t BucketCatalog::BucketAccess::getTime() const {
}
BucketCatalog::WriteBatch::WriteBatch(Bucket* bucket,
- const UUID& lsid,
+ OperationId opId,
const std::shared_ptr<ExecutionStats>& stats)
- : _bucket{bucket}, _lsid(lsid), _stats{stats} {}
+ : _bucket{bucket}, _opId(opId), _stats{stats} {}
bool BucketCatalog::WriteBatch::claimCommitRights() {
return !_commitRights.swap(true);
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 2a9501c25b2..d8a5ffcf574 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -81,7 +81,7 @@ public:
public:
WriteBatch() = delete;
- WriteBatch(Bucket* bucket, const UUID& lsid, const std::shared_ptr<ExecutionStats>& stats);
+ WriteBatch(Bucket* bucket, OperationId opId, const std::shared_ptr<ExecutionStats>& stats);
/**
* Attempt to claim the right to commit (or abort) a batch. If it returns true, rights are
@@ -149,7 +149,7 @@ public:
Bucket* _bucket;
- const UUID _lsid;
+ OperationId _opId;
std::shared_ptr<ExecutionStats> _stats;
std::vector<BSONObj> _measurements;
@@ -366,7 +366,7 @@ public:
/**
* Return a pointer to the current, open batch.
*/
- std::shared_ptr<WriteBatch> _activeBatch(const UUID& lsid,
+ std::shared_ptr<WriteBatch> _activeBatch(OperationId opId,
const std::shared_ptr<ExecutionStats>& stats);
// Access to the bucket is controlled by this lock
@@ -414,8 +414,8 @@ public:
// any.
std::shared_ptr<WriteBatch> _preparedBatch;
- // Batches, per logical session, that haven't been committed or aborted yet.
- stdx::unordered_map<UUID, std::shared_ptr<WriteBatch>, UUID::Hash> _batches;
+ // Batches, per operation, that haven't been committed or aborted yet.
+ stdx::unordered_map<OperationId, std::shared_ptr<WriteBatch>> _batches;
// If the bucket is in _idleBuckets, then its position is recorded here.
boost::optional<IdleList::iterator> _idleListEntry = boost::none;
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 6414a52fca6..b76619d9eb1 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -58,6 +58,10 @@ protected:
};
void setUp() override;
+
+ std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>
+ _makeOperationContext();
+
virtual BSONObj _makeTimeseriesOptionsForCreate() const;
TimeseriesOptions _getTimeseriesOptions(const NamespaceString& ns) const;
@@ -120,6 +124,13 @@ void BucketCatalogTest::setUp() {
}
}
+std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>
+BucketCatalogTest::_makeOperationContext() {
+ auto client = getServiceContext()->makeClient("BucketCatalogTest");
+ auto opCtx = client->makeOperationContext();
+ return {std::move(client), std::move(opCtx)};
+}
+
BSONObj BucketCatalogTest::_makeTimeseriesOptionsForCreate() const {
return BSON("timeField" << _timeField << "metaField" << _metaField);
}
@@ -657,7 +668,6 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
}
TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch1 = _bucketCatalog
->insert(_opCtx,
_ns1,
@@ -667,9 +677,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue();
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch2 = _bucketCatalog
- ->insert(_opCtx,
+ ->insert(_makeOperationContext().second.get(),
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
@@ -677,9 +686,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue();
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch3 = _bucketCatalog
- ->insert(_opCtx,
+ ->insert(_makeOperationContext().second.get(),
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
@@ -687,9 +695,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue();
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch4 = _bucketCatalog
- ->insert(_opCtx,
+ ->insert(_makeOperationContext().second.get(),
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
@@ -708,7 +715,6 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
}
TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch1 = _bucketCatalog
->insert(_opCtx,
_ns1,
@@ -718,9 +724,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue();
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch2 = _bucketCatalog
- ->insert(_opCtx,
+ ->insert(_makeOperationContext().second.get(),
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
@@ -746,7 +751,6 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
}
TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch1 = _bucketCatalog
->insert(_opCtx,
_ns1,
@@ -756,9 +760,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue();
- _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
auto batch2 = _bucketCatalog
- ->insert(_opCtx,
+ ->insert(_makeOperationContext().second.get(),
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),