summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/timeseries/timeseries_bucket_manual_removal.js74
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp315
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp63
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h19
5 files changed, 338 insertions, 135 deletions
diff --git a/jstests/core/timeseries/timeseries_bucket_manual_removal.js b/jstests/core/timeseries/timeseries_bucket_manual_removal.js
new file mode 100644
index 00000000000..da529437203
--- /dev/null
+++ b/jstests/core/timeseries/timeseries_bucket_manual_removal.js
@@ -0,0 +1,74 @@
+/**
+ * Tests that a time-series collection handles a bucket being manually removed from the buckets
+ * collection.
+ *
+ * @tags: [
+ * assumes_no_implicit_collection_creation_after_drop,
+ * does_not_support_retryable_writes, # Batches containing more than one measurement
+ * does_not_support_stepdowns,
+ * requires_fcv_49,
+ * requires_find_command,
+ * requires_getmore,
+ * sbe_incompatible,
+ * ]
+ */
+(function() {
+'use strict';
+
+load('jstests/core/timeseries/libs/timeseries.js');
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(db.getMongo())) {
+ jsTestLog('Skipping test because the time-series collection feature flag is disabled');
+ return;
+}
+
+const testDB = db.getSiblingDB(jsTestName());
+
+const coll = testDB.getCollection('t');
+const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
+
+const timeFieldName = 'time';
+
+coll.drop();
+assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+const docs1 = [
+ {
+ _id: 0,
+ [timeFieldName]: ISODate("2021-01-01T01:00:00Z"),
+ },
+ {
+ _id: 1,
+ [timeFieldName]: ISODate("2021-01-01T01:01:00Z"),
+ },
+];
+const docs2 = [
+ {
+ _id: 2,
+ [timeFieldName]: ISODate("2021-01-01T01:02:00Z"),
+ },
+ {
+ _id: 3,
+ [timeFieldName]: ISODate("2021-01-01T01:03:00Z"),
+ },
+];
+
+assert.commandWorked(coll.insert(docs1));
+assert.docEq(coll.find().toArray(), docs1);
+let buckets = bucketsColl.find().toArray();
+assert.eq(buckets.length, 1, 'Expected one bucket but found ' + tojson(buckets));
+const bucketId = buckets[0]._id;
+
+assert.commandWorked(bucketsColl.remove({_id: bucketId}));
+assert.docEq(coll.find().toArray(), []);
+buckets = bucketsColl.find().toArray();
+assert.eq(buckets.length, 0, 'Expected no buckets but found ' + tojson(buckets));
+
+assert.commandWorked(coll.insert(docs2));
+assert.docEq(coll.find().toArray(), docs2);
+buckets = bucketsColl.find().toArray();
+assert.eq(buckets.length, 1, 'Expected one bucket but found ' + tojson(buckets));
+assert.neq(buckets[0]._id, bucketId);
+})(); \ No newline at end of file
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index ac1e8159d87..50936d9cb53 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -414,6 +414,8 @@ error_codes:
- {code: 335, name: TenantMigrationForgotten}
+ - {code: 336, name: TimeseriesBucketCleared, categories: [InternalOnly]}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index baf5815ac7f..302b6a69a1d 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -67,6 +67,7 @@
#include "mongo/db/transaction_participant.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/db/write_concern.h"
+#include "mongo/logv2/redaction.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/string_map.h"
@@ -593,154 +594,234 @@ public:
auth::checkAuthForInsertCommand(authzSession, getBypass(), _batch);
}
- /**
- * Writes to the underlying system.buckets collection.
- */
- void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder* result) const {
- if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, result)) {
- return;
+ StatusWith<SingleWriteResult> _getTimeseriesSingleWriteResult(
+ const write_ops_exec::WriteResult& reply) const {
+ invariant(reply.results.size() == 1,
+ str::stream() << "Unexpected number of results (" << reply.results.size()
+ << ") for insert on time-series collection " << ns());
+
+ return reply.results[0];
+ }
+
+ StatusWith<SingleWriteResult> _performTimeseriesInsert(
+ OperationContext* opCtx,
+ const OID& bucketId,
+ const BucketCatalog::CommitData& data,
+ const BSONObj& metadata) const {
+ auto bucketsNs = ns().makeTimeseriesBucketsNamespace();
+
+ BSONObjBuilder builder;
+ builder.append(write_ops::Insert::kCommandName, bucketsNs.coll());
+ // The schema validation configured in the bucket collection is intended for direct
+ // operations by end users and is not applicable here.
+ builder.append(write_ops::Insert::kBypassDocumentValidationFieldName, true);
+
+ // Statement IDs are not meaningful because of the way we combine and convert inserts
+ // for the bucket collection. A retryable write is the only situation where it is
+ // appropriate to forward statement IDs.
+ if (isTimeseriesWriteRetryable(opCtx)) {
+ if (auto stmtId = _batch.getStmtId()) {
+ builder.append(write_ops::Insert::kStmtIdFieldName, *stmtId);
+ } else if (auto stmtIds = _batch.getStmtIds()) {
+ builder.append(write_ops::Insert::kStmtIdsFieldName, *stmtIds);
+ }
+ }
+
+ builder.append(write_ops::Insert::kDocumentsFieldName,
+ makeTimeseriesInsertDocument(bucketId, data, metadata));
+
+ auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj());
+ auto timeseriesInsertBatch = InsertOp::parse(request);
+
+ return _getTimeseriesSingleWriteResult(
+ write_ops_exec::performInserts(opCtx, timeseriesInsertBatch));
+ }
+
+ StatusWith<SingleWriteResult> _performTimeseriesUpdate(
+ OperationContext* opCtx,
+ const OID& bucketId,
+ const BucketCatalog::CommitData& data,
+ const BSONObj& metadata) const {
+ auto update = makeTimeseriesUpdateOpEntry(bucketId, data, metadata);
+ write_ops::Update timeseriesUpdateBatch(ns().makeTimeseriesBucketsNamespace(),
+ {update});
+
+ write_ops::WriteCommandBase writeCommandBase;
+ // The schema validation configured in the bucket collection is intended for direct
+ // operations by end users and is not applicable here.
+ writeCommandBase.setBypassDocumentValidation(true);
+ writeCommandBase.setOrdered(_batch.getOrdered());
+
+ // Statement IDs are not meaningful because of the way we combine and convert inserts
+ // for the bucket collection. A retryable write is the only situation where it is
+ // appropriate to forward statement IDs.
+ if (isTimeseriesWriteRetryable(opCtx)) {
+ if (auto stmtId = _batch.getStmtId()) {
+ writeCommandBase.setStmtId(*stmtId);
+ } else if (auto stmtIds = _batch.getStmtIds()) {
+ writeCommandBase.setStmtIds(*stmtIds);
+ }
}
- auto ns = _batch.getNamespace();
- auto bucketsNs = ns.makeTimeseriesBucketsNamespace();
+ timeseriesUpdateBatch.setWriteCommandBase(std::move(writeCommandBase));
+
+ return _getTimeseriesSingleWriteResult(
+ write_ops_exec::performUpdates(opCtx, timeseriesUpdateBatch));
+ }
+
+ void _commitTimeseriesBucket(OperationContext* opCtx,
+ const OID& bucketId,
+ size_t index,
+ std::vector<BSONObj>* errors,
+ boost::optional<repl::OpTime>* opTime,
+ boost::optional<OID>* electionId,
+ std::vector<size_t>* updatesToRetryAsInserts) const {
+ auto& bucketCatalog = BucketCatalog::get(opCtx);
+
+ auto metadata = bucketCatalog.getMetadata(bucketId);
+ auto data = bucketCatalog.commit(bucketId);
+ while (!data.docs.empty()) {
+ auto result = data.numCommittedMeasurements == 0
+ ? _performTimeseriesInsert(opCtx, bucketId, data, metadata)
+ : _performTimeseriesUpdate(opCtx, bucketId, data, metadata);
+
+ if (data.numCommittedMeasurements != 0 && result.isOK() &&
+ result.getValue().getNModified() == 0) {
+ // No bucket was found to update, meaning that it was manually removed.
+ bucketCatalog.clear(bucketId);
+ updatesToRetryAsInserts->push_back(index);
+ return;
+ }
+
+ if (auto error = generateError(opCtx, result, index, errors->size())) {
+ errors->push_back(*error);
+ }
+
+ auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
+ const auto replMode = replCoord->getReplicationMode();
+
+ *opTime = replMode != repl::ReplicationCoordinator::modeNone
+ ? boost::make_optional(
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp())
+ : boost::none;
+ *electionId = replMode == repl::ReplicationCoordinator::modeReplSet
+ ? boost::make_optional(replCoord->getElectionId())
+ : boost::none;
+
+ data = bucketCatalog.commit(
+ bucketId, BucketCatalog::CommitInfo{std::move(result), *opTime, *electionId});
+ }
+ }
+ /**
+ * Writes to the underlying system.buckets collection. Returns the indices of the batch
+ * which were attempted in an update operation, but found no bucket to update. These indices
+ * can be passed as the optional 'indices' parameter in a subsequent call to this function,
+ * in order to to be retried as inserts.
+ */
+ std::vector<size_t> _performTimeseriesWrites(
+ OperationContext* opCtx,
+ std::vector<BSONObj>* errors,
+ boost::optional<repl::OpTime>* opTime,
+ boost::optional<OID>* electionId,
+ const boost::optional<std::vector<size_t>>& indices = boost::none) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
+
std::vector<std::pair<OID, size_t>> bucketsToCommit;
std::vector<std::pair<Future<BucketCatalog::CommitInfo>, size_t>> bucketsToWaitOn;
- for (size_t i = 0; i < _batch.getDocuments().size(); i++) {
+ auto insert = [&](size_t index) {
auto [bucketId, commitInfo] =
- bucketCatalog.insert(opCtx, ns, _batch.getDocuments()[i]);
+ bucketCatalog.insert(opCtx, ns(), _batch.getDocuments()[index]);
if (commitInfo) {
- bucketsToWaitOn.push_back({std::move(*commitInfo), i});
+ bucketsToWaitOn.push_back({std::move(*commitInfo), index});
} else {
- bucketsToCommit.push_back({std::move(bucketId), i});
+ bucketsToCommit.push_back({std::move(bucketId), index});
+ }
+ };
+
+ if (indices) {
+ std::for_each(indices->begin(), indices->end(), insert);
+ } else {
+ for (size_t i = 0; i < _batch.getDocuments().size(); i++) {
+ insert(i);
}
}
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
- std::vector<BSONObj> errors;
- boost::optional<repl::OpTime> opTime;
- boost::optional<OID> electionId;
+ std::vector<size_t> updatesToRetryAsInserts;
for (const auto& [bucketId, index] : bucketsToCommit) {
- auto metadata = bucketCatalog.getMetadata(bucketId);
- auto data = bucketCatalog.commit(bucketId);
- while (!data.docs.empty()) {
- write_ops_exec::WriteResult reply;
- if (data.numCommittedMeasurements == 0) {
- BSONObjBuilder builder;
- builder.append(write_ops::Insert::kCommandName, bucketsNs.coll());
- // The schema validation configured in the bucket collection is intended for
- // direct operations by end users and is not applicable here.
- builder.append(write_ops::Insert::kBypassDocumentValidationFieldName, true);
- builder.append(write_ops::Insert::kOrderedFieldName, _batch.getOrdered());
-
- // Statement IDs are not meaningful because of the way we combine and
- // convert inserts for the bucket collection. A retryable write is the only
- // situation where it is appropriate to forward statement IDs.
- if (isTimeseriesWriteRetryable(opCtx)) {
- if (auto stmtId = _batch.getStmtId()) {
- builder.append(write_ops::Insert::kStmtIdFieldName, *stmtId);
- } else if (auto stmtIds = _batch.getStmtIds()) {
- builder.append(write_ops::Insert::kStmtIdsFieldName, *stmtIds);
- }
- }
-
- builder.append(write_ops::Insert::kDocumentsFieldName,
- makeTimeseriesInsertDocument(bucketId, data, metadata));
-
- auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj());
- auto timeseriesInsertBatch = InsertOp::parse(request);
- reply = write_ops_exec::performInserts(opCtx, timeseriesInsertBatch);
- } else {
- auto update = makeTimeseriesUpdateOpEntry(bucketId, data, metadata);
- write_ops::Update timeseriesUpdateBatch(bucketsNs, {update});
- {
- write_ops::WriteCommandBase writeCommandBase;
- // The schema validation configured in the bucket collection is intended
- // for direct operations by end users and is not applicable here.
- writeCommandBase.setBypassDocumentValidation(true);
- writeCommandBase.setOrdered(_batch.getOrdered());
-
- // Statement IDs are not meaningful because of the way we combine and
- // convert inserts for the bucket collection. A retryable write is the
- // only situation where it is appropriate to forward statement IDs.
- if (isTimeseriesWriteRetryable(opCtx)) {
- if (auto stmtId = _batch.getStmtId()) {
- writeCommandBase.setStmtId(*stmtId);
- } else if (auto stmtIds = _batch.getStmtIds()) {
- writeCommandBase.setStmtIds(*stmtIds);
- }
- }
-
- timeseriesUpdateBatch.setWriteCommandBase(std::move(writeCommandBase));
- }
-
- reply = write_ops_exec::performUpdates(opCtx, timeseriesUpdateBatch);
- }
-
- invariant(reply.results.size() == 1,
- str::stream()
- << "Unexpected number of results (" << reply.results.size()
- << ") for insert on time-series collection " << ns);
-
- if (auto error = generateError(opCtx, reply.results[0], index, errors.size())) {
- errors.push_back(*error);
- }
-
- auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext());
- const auto replMode = replCoord->getReplicationMode();
-
- opTime = replMode != repl::ReplicationCoordinator::modeNone
- ? boost::make_optional(
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp())
- : boost::none;
- electionId = replMode == repl::ReplicationCoordinator::modeReplSet
- ? boost::make_optional(replCoord->getElectionId())
- : boost::none;
-
- data = bucketCatalog.commit(
- bucketId,
- BucketCatalog::CommitInfo{std::move(reply.results[0]), opTime, electionId});
- }
+ _commitTimeseriesBucket(
+ opCtx, bucketId, index, errors, opTime, electionId, &updatesToRetryAsInserts);
}
for (const auto& [future, index] : bucketsToWaitOn) {
- auto commitInfo = future.get(opCtx);
- if (auto error = generateError(opCtx, commitInfo.result, index, errors.size())) {
- errors.push_back(*error);
+ auto swCommitInfo = future.getNoThrow(opCtx);
+ if (!swCommitInfo.isOK()) {
+ invariant(swCommitInfo.getStatus() == ErrorCodes::TimeseriesBucketCleared,
+ str::stream()
+ << "Got unexpected error (" << swCommitInfo.getStatus()
+ << ") waiting for time-series bucket to be committed for " << ns()
+ << ": " << redact(_batch.toBSON({})));
+
+ updatesToRetryAsInserts.push_back(index);
+ continue;
+ }
+
+ const auto& commitInfo = swCommitInfo.getValue();
+ if (auto error = generateError(opCtx, commitInfo.result, index, errors->size())) {
+ errors->push_back(*error);
}
if (commitInfo.opTime) {
- opTime = std::max(opTime.value_or(repl::OpTime()), *commitInfo.opTime);
+ *opTime = std::max(opTime->value_or(repl::OpTime()), *commitInfo.opTime);
}
if (commitInfo.electionId) {
- electionId = std::max(electionId.value_or(OID()), *commitInfo.electionId);
+ *electionId = std::max(electionId->value_or(OID()), *commitInfo.electionId);
}
}
- result->appendNumber("n", _batch.getDocuments().size() - errors.size());
- if (!errors.empty()) {
- result->append("writeErrors", errors);
- }
- if (opTime) {
- appendOpTime(*opTime, result);
- }
- if (electionId) {
- result->append("electionId", *electionId);
+ return updatesToRetryAsInserts;
+ }
+
+ void _checkAndPerformTimeseriesWrites(OperationContext* opCtx,
+ BSONObjBuilder* result) const {
+ // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's
+ // constructor.
+ try {
+ if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, result)) {
+ return;
+ }
+
+ std::vector<BSONObj> errors;
+ boost::optional<repl::OpTime> opTime;
+ boost::optional<OID> electionId;
+
+ auto updatesToRetryAsInserts =
+ _performTimeseriesWrites(opCtx, &errors, &opTime, &electionId);
+ invariant(_performTimeseriesWrites(
+ opCtx, &errors, &opTime, &electionId, updatesToRetryAsInserts)
+ .empty());
+
+ result->appendNumber("n", _batch.getDocuments().size() - errors.size());
+ if (!errors.empty()) {
+ result->append("writeErrors", errors);
+ }
+ if (opTime) {
+ appendOpTime(*opTime, result);
+ }
+ if (electionId) {
+ result->append("electionId", *electionId);
+ }
+ } catch (DBException& ex) {
+ ex.addContext(str::stream() << "time-series insert failed: " << ns().ns());
+ throw;
}
}
void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
if (isTimeseries(opCtx, ns())) {
- // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's
- // constructor.
- try {
- _performTimeseriesWrites(opCtx, &result);
- } catch (DBException& ex) {
- ex.addContext(str::stream() << "time-series insert failed: " << ns().ns());
- throw;
- }
+ _checkAndPerformTimeseriesWrites(opCtx, &result);
return;
}
auto reply = write_ops_exec::performInserts(opCtx, _batch);
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index ebf66828c5a..a17f682bc42 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -274,6 +274,25 @@ BucketCatalog::CommitData BucketCatalog::commit(const OID& bucketId,
return data;
}
+void BucketCatalog::clear(const OID& bucketId) {
+ stdx::lock_guard lk(_mutex);
+
+ auto it = _buckets.find(bucketId);
+ if (it == _buckets.end()) {
+ return;
+ }
+ auto& bucket = it->second;
+
+ while (!bucket.promises.empty()) {
+ bucket.promises.front().setError({ErrorCodes::TimeseriesBucketCleared,
+ str::stream() << "Time-series bucket " << bucketId
+ << " for " << bucket.ns << " was cleared"});
+ bucket.promises.pop();
+ }
+
+ _removeBucket(bucketId);
+}
+
void BucketCatalog::clear(const NamespaceString& ns) {
stdx::lock_guard lk(_mutex);
@@ -282,16 +301,11 @@ void BucketCatalog::clear(const NamespaceString& ns) {
};
for (auto it = _orderedBuckets.lower_bound({ns, {}, {}});
- it != _orderedBuckets.end() && shouldClear(std::get<NamespaceString>(*it));
- it = _orderedBuckets.erase(it)) {
- const auto& bucketId = std::get<OID>(*it);
- const auto& bucketNs = std::get<NamespaceString>(*it);
- auto bucketIt = _buckets.find(bucketId);
- _memoryUsage -= bucketIt->second.memoryUsage;
- _buckets.erase(bucketIt);
- _idleBuckets.erase(bucketId);
- _bucketIds.erase({bucketNs, std::get<BucketMetadata>(*it)});
- _executionStats.erase(bucketNs);
+ it != _orderedBuckets.end() && shouldClear(std::get<NamespaceString>(*it));) {
+ auto nextIt = std::next(it);
+ _executionStats.erase(std::get<NamespaceString>(*it));
+ _removeBucket(std::get<OID>(*it), it);
+ it = nextIt;
}
}
@@ -325,14 +339,31 @@ void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuild
}
}
+void BucketCatalog::_removeBucket(const OID& bucketId,
+ boost::optional<OrderedBuckets::iterator> orderedBucketsIt,
+ boost::optional<IdleBuckets::iterator> idleBucketsIt) {
+ auto it = _buckets.find(bucketId);
+ _memoryUsage -= it->second.memoryUsage;
+
+ if (orderedBucketsIt) {
+ _orderedBuckets.erase(*orderedBucketsIt);
+ } else {
+ _orderedBuckets.erase({it->second.ns, it->second.metadata, it->first});
+ }
+
+ if (idleBucketsIt) {
+ _idleBuckets.erase(*idleBucketsIt);
+ } else {
+ _idleBuckets.erase(it->first);
+ }
+
+ _bucketIds.erase({std::move(it->second.ns), std::move(it->second.metadata)});
+ _buckets.erase(it);
+}
+
void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats) {
while (!_idleBuckets.empty() && _memoryUsage > kIdleBucketExpiryMemoryUsageThreshold) {
- auto it = _buckets.find(*_idleBuckets.begin());
- _memoryUsage -= it->second.memoryUsage;
- _idleBuckets.erase(_idleBuckets.begin());
- _bucketIds.erase({it->second.ns, it->second.metadata});
- _orderedBuckets.erase({it->second.ns, it->second.metadata, it->first});
- _buckets.erase(it);
+ _removeBucket(*_idleBuckets.begin(), boost::none, _idleBuckets.begin());
stats->numBucketsClosedDueToMemoryThreshold++;
}
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 496dc2bbb71..4b2382e92fa 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -100,6 +100,11 @@ public:
boost::optional<CommitInfo> previousCommitInfo = boost::none);
/**
+ * Clears the given bucket.
+ */
+ void clear(const OID& bucketId);
+
+ /**
* Clears the buckets for the given namespace.
*/
void clear(const NamespaceString& ns);
@@ -269,6 +274,16 @@ private:
class ServerStatus;
+ using OrderedBuckets = std::set<std::tuple<NamespaceString, BucketMetadata, OID>>;
+ using IdleBuckets = std::set<OID>;
+
+ /**
+ * Removes the given bucket from the bucket catalog's internal data structures.
+ */
+ void _removeBucket(const OID& bucketId,
+ boost::optional<OrderedBuckets::iterator> orderedBucketsIt = boost::none,
+ boost::optional<IdleBuckets::iterator> idleBucketsIt = boost::none);
+
/**
* Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold.
*/
@@ -283,10 +298,10 @@ private:
stdx::unordered_map<std::pair<NamespaceString, BucketMetadata>, OID> _bucketIds;
// All namespace, metadata, and _id tuples which currently have a bucket in the catalog.
- std::set<std::tuple<NamespaceString, BucketMetadata, OID>> _orderedBuckets;
+ OrderedBuckets _orderedBuckets;
// Buckets that do not have any writers.
- std::set<OID> _idleBuckets;
+ IdleBuckets _idleBuckets;
// Per-collection execution stats.
stdx::unordered_map<NamespaceString, ExecutionStats> _executionStats;