diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2021-06-24 10:08:55 -0400 |
---|---|---|
committer | Gregory Noma <gregory.noma@gmail.com> | 2021-06-24 16:56:03 -0400 |
commit | 174ce8fa6dde67b5c11c702848744c38cf2170bb (patch) | |
tree | 72c03f3db9ae45745dbdf8bfc1b1fefa44fc1545 | |
parent | c6ebda4b612b0f1da93e1892a3765c9581f3dc62 (diff) | |
download | mongo-174ce8fa6dde67b5c11c702848744c38cf2170bb.tar.gz |
SERVER-57842 Use OperationId instead of LogicalSessionId in the BucketCatalog
(cherry picked from commit 63c647943e3cf9dd68d4a0bfff93b383d4520fbf)
-rw-r--r-- | src/mongo/db/operation_id.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 10 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 29 |
4 files changed, 38 insertions, 30 deletions
diff --git a/src/mongo/db/operation_id.cpp b/src/mongo/db/operation_id.cpp index 5a4ca82ed13..3184fc44d2e 100644 --- a/src/mongo/db/operation_id.cpp +++ b/src/mongo/db/operation_id.cpp @@ -41,7 +41,11 @@ OperationIdSlot UniqueOperationIdRegistry::acquireSlot() { invariant(_activeIds.size() < (1 << 20)); while (true) { - const auto&& [it, ok] = _activeIds.insert(_nextOpId++); + auto opId = _nextOpId++; + if (!_nextOpId) { + _nextOpId = 1U; + } + const auto&& [it, ok] = _activeIds.insert(opId); if (ok) { return OperationIdSlot(*it, shared_from_this()); } diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 4c579b49be2..8563e4333c4 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -114,13 +114,14 @@ void normalizeTopLevel(BSONObjBuilder* builder, const BSONElement& elem) { } } -UUID getLsid(OperationContext* opCtx, BucketCatalog::CombineWithInsertsFromOtherClients combine) { - static const UUID common{UUID::gen()}; +OperationId getOpId(OperationContext* opCtx, + BucketCatalog::CombineWithInsertsFromOtherClients combine) { switch (combine) { case BucketCatalog::CombineWithInsertsFromOtherClients::kAllow: - return common; + return 0; case BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow: - return opCtx->getLogicalSessionId()->getId(); + invariant(opCtx->getOpID()); + return opCtx->getOpID(); } MONGO_UNREACHABLE; } @@ -229,7 +230,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( &sizeToBeAdded); } - auto batch = bucket->_activeBatch(getLsid(opCtx, combine), stats); + auto batch = bucket->_activeBatch(getOpId(opCtx, combine), stats); batch->_addMeasurement(doc); batch->_recordNewFields(std::move(newFieldNamesToBeInserted)); @@ -279,7 +280,7 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { batch->_prepareCommit(); _memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage); - bucket->_batches.erase(batch->_lsid); + bucket->_batches.erase(batch->_opId); return true; } @@ -782,10 +783,10 @@ bool BucketCatalog::Bucket::allCommitted() const { } std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch( - const UUID& lsid, const std::shared_ptr<ExecutionStats>& stats) { - auto it = _batches.find(lsid); + OperationId opId, const std::shared_ptr<ExecutionStats>& stats) { + auto it = _batches.find(opId); if (it == _batches.end()) { - it = _batches.try_emplace(lsid, std::make_shared<WriteBatch>(this, lsid, stats)).first; + it = _batches.try_emplace(opId, std::make_shared<WriteBatch>(this, opId, stats)).first; } return it->second; } @@ -1069,9 +1070,9 @@ Date_t BucketCatalog::BucketAccess::getTime() const { } BucketCatalog::WriteBatch::WriteBatch(Bucket* bucket, - const UUID& lsid, + OperationId opId, const std::shared_ptr<ExecutionStats>& stats) - : _bucket{bucket}, _lsid(lsid), _stats{stats} {} + : _bucket{bucket}, _opId(opId), _stats{stats} {} bool BucketCatalog::WriteBatch::claimCommitRights() { return !_commitRights.swap(true); diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 2a9501c25b2..d8a5ffcf574 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -81,7 +81,7 @@ public: public: WriteBatch() = delete; - WriteBatch(Bucket* bucket, const UUID& lsid, const std::shared_ptr<ExecutionStats>& stats); + WriteBatch(Bucket* bucket, OperationId opId, const std::shared_ptr<ExecutionStats>& stats); /** * Attempt to claim the right to commit (or abort) a batch. If it returns true, rights are @@ -149,7 +149,7 @@ public: Bucket* _bucket; - const UUID _lsid; + OperationId _opId; std::shared_ptr<ExecutionStats> _stats; std::vector<BSONObj> _measurements; @@ -366,7 +366,7 @@ public: /** * Return a pointer to the current, open batch. */ - std::shared_ptr<WriteBatch> _activeBatch(const UUID& lsid, + std::shared_ptr<WriteBatch> _activeBatch(OperationId opId, const std::shared_ptr<ExecutionStats>& stats); // Access to the bucket is controlled by this lock @@ -414,8 +414,8 @@ public: // any. std::shared_ptr<WriteBatch> _preparedBatch; - // Batches, per logical session, that haven't been committed or aborted yet. - stdx::unordered_map<UUID, std::shared_ptr<WriteBatch>, UUID::Hash> _batches; + // Batches, per operation, that haven't been committed or aborted yet. + stdx::unordered_map<OperationId, std::shared_ptr<WriteBatch>> _batches; // If the bucket is in _idleBuckets, then its position is recorded here. boost::optional<IdleList::iterator> _idleListEntry = boost::none; diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 6414a52fca6..b76619d9eb1 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -58,6 +58,10 @@ protected: }; void setUp() override; + + std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext> + _makeOperationContext(); + virtual BSONObj _makeTimeseriesOptionsForCreate() const; TimeseriesOptions _getTimeseriesOptions(const NamespaceString& ns) const; @@ -120,6 +124,13 @@ void BucketCatalogTest::setUp() { } } +std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext> +BucketCatalogTest::_makeOperationContext() { + auto client = getServiceContext()->makeClient("BucketCatalogTest"); + auto opCtx = client->makeOperationContext(); + return {std::move(client), std::move(opCtx)}; +} + BSONObj BucketCatalogTest::_makeTimeseriesOptionsForCreate() const { return BSON("timeField" << _timeField << "metaField" << _metaField); } @@ -657,7 +668,6 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { } TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch1 = _bucketCatalog ->insert(_opCtx, _ns1, @@ -667,9 +677,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue(); - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch2 = _bucketCatalog - ->insert(_opCtx, + ->insert(_makeOperationContext().second.get(), _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), @@ -677,9 +686,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue(); - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch3 = _bucketCatalog - ->insert(_opCtx, + ->insert(_makeOperationContext().second.get(), _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), @@ -687,9 +695,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue(); - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch4 = _bucketCatalog - ->insert(_opCtx, + ->insert(_makeOperationContext().second.get(), _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), @@ -708,7 +715,6 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { } TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch1 = _bucketCatalog ->insert(_opCtx, _ns1, @@ -718,9 +724,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue(); - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch2 = _bucketCatalog - ->insert(_opCtx, + ->insert(_makeOperationContext().second.get(), _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), @@ -746,7 +751,6 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { } TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch1 = _bucketCatalog ->insert(_opCtx, _ns1, @@ -756,9 +760,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue(); - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); auto batch2 = _bucketCatalog - ->insert(_opCtx, + ->insert(_makeOperationContext().second.get(), _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), |