diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2021-01-29 12:32:55 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-30 02:29:18 +0000 |
commit | c08203fe14a89e789b7dc5353c427e34c4c2b31e (patch) | |
tree | ffca74e13f167012c57e560c138ff0665947fe79 /src/mongo/db | |
parent | 92d10390cbab0206c05e24c40d9284e71b282dd6 (diff) | |
download | mongo-c08203fe14a89e789b7dc5353c427e34c4c2b31e.tar.gz |
SERVER-53964 Handle time-series bucket removal
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.cpp | 315 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 19 |
3 files changed, 262 insertions, 135 deletions
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index baf5815ac7f..302b6a69a1d 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -67,6 +67,7 @@ #include "mongo/db/transaction_participant.h" #include "mongo/db/views/view_catalog.h" #include "mongo/db/write_concern.h" +#include "mongo/logv2/redaction.h" #include "mongo/s/stale_exception.h" #include "mongo/util/fail_point.h" #include "mongo/util/string_map.h" @@ -593,154 +594,234 @@ public: auth::checkAuthForInsertCommand(authzSession, getBypass(), _batch); } - /** - * Writes to the underlying system.buckets collection. - */ - void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder* result) const { - if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, result)) { - return; + StatusWith<SingleWriteResult> _getTimeseriesSingleWriteResult( + const write_ops_exec::WriteResult& reply) const { + invariant(reply.results.size() == 1, + str::stream() << "Unexpected number of results (" << reply.results.size() + << ") for insert on time-series collection " << ns()); + + return reply.results[0]; + } + + StatusWith<SingleWriteResult> _performTimeseriesInsert( + OperationContext* opCtx, + const OID& bucketId, + const BucketCatalog::CommitData& data, + const BSONObj& metadata) const { + auto bucketsNs = ns().makeTimeseriesBucketsNamespace(); + + BSONObjBuilder builder; + builder.append(write_ops::Insert::kCommandName, bucketsNs.coll()); + // The schema validation configured in the bucket collection is intended for direct + // operations by end users and is not applicable here. + builder.append(write_ops::Insert::kBypassDocumentValidationFieldName, true); + + // Statement IDs are not meaningful because of the way we combine and convert inserts + // for the bucket collection. A retryable write is the only situation where it is + // appropriate to forward statement IDs. + if (isTimeseriesWriteRetryable(opCtx)) { + if (auto stmtId = _batch.getStmtId()) { + builder.append(write_ops::Insert::kStmtIdFieldName, *stmtId); + } else if (auto stmtIds = _batch.getStmtIds()) { + builder.append(write_ops::Insert::kStmtIdsFieldName, *stmtIds); + } + } + + builder.append(write_ops::Insert::kDocumentsFieldName, + makeTimeseriesInsertDocument(bucketId, data, metadata)); + + auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj()); + auto timeseriesInsertBatch = InsertOp::parse(request); + + return _getTimeseriesSingleWriteResult( + write_ops_exec::performInserts(opCtx, timeseriesInsertBatch)); + } + + StatusWith<SingleWriteResult> _performTimeseriesUpdate( + OperationContext* opCtx, + const OID& bucketId, + const BucketCatalog::CommitData& data, + const BSONObj& metadata) const { + auto update = makeTimeseriesUpdateOpEntry(bucketId, data, metadata); + write_ops::Update timeseriesUpdateBatch(ns().makeTimeseriesBucketsNamespace(), + {update}); + + write_ops::WriteCommandBase writeCommandBase; + // The schema validation configured in the bucket collection is intended for direct + // operations by end users and is not applicable here. + writeCommandBase.setBypassDocumentValidation(true); + writeCommandBase.setOrdered(_batch.getOrdered()); + + // Statement IDs are not meaningful because of the way we combine and convert inserts + // for the bucket collection. A retryable write is the only situation where it is + // appropriate to forward statement IDs. + if (isTimeseriesWriteRetryable(opCtx)) { + if (auto stmtId = _batch.getStmtId()) { + writeCommandBase.setStmtId(*stmtId); + } else if (auto stmtIds = _batch.getStmtIds()) { + writeCommandBase.setStmtIds(*stmtIds); + } } - auto ns = _batch.getNamespace(); - auto bucketsNs = ns.makeTimeseriesBucketsNamespace(); + timeseriesUpdateBatch.setWriteCommandBase(std::move(writeCommandBase)); + + return _getTimeseriesSingleWriteResult( + write_ops_exec::performUpdates(opCtx, timeseriesUpdateBatch)); + } + + void _commitTimeseriesBucket(OperationContext* opCtx, + const OID& bucketId, + size_t index, + std::vector<BSONObj>* errors, + boost::optional<repl::OpTime>* opTime, + boost::optional<OID>* electionId, + std::vector<size_t>* updatesToRetryAsInserts) const { + auto& bucketCatalog = BucketCatalog::get(opCtx); + + auto metadata = bucketCatalog.getMetadata(bucketId); + auto data = bucketCatalog.commit(bucketId); + while (!data.docs.empty()) { + auto result = data.numCommittedMeasurements == 0 + ? _performTimeseriesInsert(opCtx, bucketId, data, metadata) + : _performTimeseriesUpdate(opCtx, bucketId, data, metadata); + + if (data.numCommittedMeasurements != 0 && result.isOK() && + result.getValue().getNModified() == 0) { + // No bucket was found to update, meaning that it was manually removed. + bucketCatalog.clear(bucketId); + updatesToRetryAsInserts->push_back(index); + return; + } + + if (auto error = generateError(opCtx, result, index, errors->size())) { + errors->push_back(*error); + } + + auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); + const auto replMode = replCoord->getReplicationMode(); + + *opTime = replMode != repl::ReplicationCoordinator::modeNone + ? boost::make_optional( + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()) + : boost::none; + *electionId = replMode == repl::ReplicationCoordinator::modeReplSet + ? boost::make_optional(replCoord->getElectionId()) + : boost::none; + + data = bucketCatalog.commit( + bucketId, BucketCatalog::CommitInfo{std::move(result), *opTime, *electionId}); + } + } + /** + * Writes to the underlying system.buckets collection. Returns the indices of the batch + * which were attempted in an update operation, but found no bucket to update. These indices + * can be passed as the optional 'indices' parameter in a subsequent call to this function, + * in order to to be retried as inserts. + */ + std::vector<size_t> _performTimeseriesWrites( + OperationContext* opCtx, + std::vector<BSONObj>* errors, + boost::optional<repl::OpTime>* opTime, + boost::optional<OID>* electionId, + const boost::optional<std::vector<size_t>>& indices = boost::none) const { auto& bucketCatalog = BucketCatalog::get(opCtx); + std::vector<std::pair<OID, size_t>> bucketsToCommit; std::vector<std::pair<Future<BucketCatalog::CommitInfo>, size_t>> bucketsToWaitOn; - for (size_t i = 0; i < _batch.getDocuments().size(); i++) { + auto insert = [&](size_t index) { auto [bucketId, commitInfo] = - bucketCatalog.insert(opCtx, ns, _batch.getDocuments()[i]); + bucketCatalog.insert(opCtx, ns(), _batch.getDocuments()[index]); if (commitInfo) { - bucketsToWaitOn.push_back({std::move(*commitInfo), i}); + bucketsToWaitOn.push_back({std::move(*commitInfo), index}); } else { - bucketsToCommit.push_back({std::move(bucketId), i}); + bucketsToCommit.push_back({std::move(bucketId), index}); + } + }; + + if (indices) { + std::for_each(indices->begin(), indices->end(), insert); + } else { + for (size_t i = 0; i < _batch.getDocuments().size(); i++) { + insert(i); } } hangTimeseriesInsertBeforeCommit.pauseWhileSet(); - std::vector<BSONObj> errors; - boost::optional<repl::OpTime> opTime; - boost::optional<OID> electionId; + std::vector<size_t> updatesToRetryAsInserts; for (const auto& [bucketId, index] : bucketsToCommit) { - auto metadata = bucketCatalog.getMetadata(bucketId); - auto data = bucketCatalog.commit(bucketId); - while (!data.docs.empty()) { - write_ops_exec::WriteResult reply; - if (data.numCommittedMeasurements == 0) { - BSONObjBuilder builder; - builder.append(write_ops::Insert::kCommandName, bucketsNs.coll()); - // The schema validation configured in the bucket collection is intended for - // direct operations by end users and is not applicable here. - builder.append(write_ops::Insert::kBypassDocumentValidationFieldName, true); - builder.append(write_ops::Insert::kOrderedFieldName, _batch.getOrdered()); - - // Statement IDs are not meaningful because of the way we combine and - // convert inserts for the bucket collection. A retryable write is the only - // situation where it is appropriate to forward statement IDs. - if (isTimeseriesWriteRetryable(opCtx)) { - if (auto stmtId = _batch.getStmtId()) { - builder.append(write_ops::Insert::kStmtIdFieldName, *stmtId); - } else if (auto stmtIds = _batch.getStmtIds()) { - builder.append(write_ops::Insert::kStmtIdsFieldName, *stmtIds); - } - } - - builder.append(write_ops::Insert::kDocumentsFieldName, - makeTimeseriesInsertDocument(bucketId, data, metadata)); - - auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj()); - auto timeseriesInsertBatch = InsertOp::parse(request); - reply = write_ops_exec::performInserts(opCtx, timeseriesInsertBatch); - } else { - auto update = makeTimeseriesUpdateOpEntry(bucketId, data, metadata); - write_ops::Update timeseriesUpdateBatch(bucketsNs, {update}); - { - write_ops::WriteCommandBase writeCommandBase; - // The schema validation configured in the bucket collection is intended - // for direct operations by end users and is not applicable here. - writeCommandBase.setBypassDocumentValidation(true); - writeCommandBase.setOrdered(_batch.getOrdered()); - - // Statement IDs are not meaningful because of the way we combine and - // convert inserts for the bucket collection. A retryable write is the - // only situation where it is appropriate to forward statement IDs. - if (isTimeseriesWriteRetryable(opCtx)) { - if (auto stmtId = _batch.getStmtId()) { - writeCommandBase.setStmtId(*stmtId); - } else if (auto stmtIds = _batch.getStmtIds()) { - writeCommandBase.setStmtIds(*stmtIds); - } - } - - timeseriesUpdateBatch.setWriteCommandBase(std::move(writeCommandBase)); - } - - reply = write_ops_exec::performUpdates(opCtx, timeseriesUpdateBatch); - } - - invariant(reply.results.size() == 1, - str::stream() - << "Unexpected number of results (" << reply.results.size() - << ") for insert on time-series collection " << ns); - - if (auto error = generateError(opCtx, reply.results[0], index, errors.size())) { - errors.push_back(*error); - } - - auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); - const auto replMode = replCoord->getReplicationMode(); - - opTime = replMode != repl::ReplicationCoordinator::modeNone - ? boost::make_optional( - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()) - : boost::none; - electionId = replMode == repl::ReplicationCoordinator::modeReplSet - ? boost::make_optional(replCoord->getElectionId()) - : boost::none; - - data = bucketCatalog.commit( - bucketId, - BucketCatalog::CommitInfo{std::move(reply.results[0]), opTime, electionId}); - } + _commitTimeseriesBucket( + opCtx, bucketId, index, errors, opTime, electionId, &updatesToRetryAsInserts); } for (const auto& [future, index] : bucketsToWaitOn) { - auto commitInfo = future.get(opCtx); - if (auto error = generateError(opCtx, commitInfo.result, index, errors.size())) { - errors.push_back(*error); + auto swCommitInfo = future.getNoThrow(opCtx); + if (!swCommitInfo.isOK()) { + invariant(swCommitInfo.getStatus() == ErrorCodes::TimeseriesBucketCleared, + str::stream() + << "Got unexpected error (" << swCommitInfo.getStatus() + << ") waiting for time-series bucket to be committed for " << ns() + << ": " << redact(_batch.toBSON({}))); + + updatesToRetryAsInserts.push_back(index); + continue; + } + + const auto& commitInfo = swCommitInfo.getValue(); + if (auto error = generateError(opCtx, commitInfo.result, index, errors->size())) { + errors->push_back(*error); } if (commitInfo.opTime) { - opTime = std::max(opTime.value_or(repl::OpTime()), *commitInfo.opTime); + *opTime = std::max(opTime->value_or(repl::OpTime()), *commitInfo.opTime); } if (commitInfo.electionId) { - electionId = std::max(electionId.value_or(OID()), *commitInfo.electionId); + *electionId = std::max(electionId->value_or(OID()), *commitInfo.electionId); } } - result->appendNumber("n", _batch.getDocuments().size() - errors.size()); - if (!errors.empty()) { - result->append("writeErrors", errors); - } - if (opTime) { - appendOpTime(*opTime, result); - } - if (electionId) { - result->append("electionId", *electionId); + return updatesToRetryAsInserts; + } + + void _checkAndPerformTimeseriesWrites(OperationContext* opCtx, + BSONObjBuilder* result) const { + // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's + // constructor. + try { + if (isRetryableTimeseriesWriteExecuted(opCtx, _batch, result)) { + return; + } + + std::vector<BSONObj> errors; + boost::optional<repl::OpTime> opTime; + boost::optional<OID> electionId; + + auto updatesToRetryAsInserts = + _performTimeseriesWrites(opCtx, &errors, &opTime, &electionId); + invariant(_performTimeseriesWrites( + opCtx, &errors, &opTime, &electionId, updatesToRetryAsInserts) + .empty()); + + result->appendNumber("n", _batch.getDocuments().size() - errors.size()); + if (!errors.empty()) { + result->append("writeErrors", errors); + } + if (opTime) { + appendOpTime(*opTime, result); + } + if (electionId) { + result->append("electionId", *electionId); + } + } catch (DBException& ex) { + ex.addContext(str::stream() << "time-series insert failed: " << ns().ns()); + throw; } } void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { if (isTimeseries(opCtx, ns())) { - // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's - // constructor. - try { - _performTimeseriesWrites(opCtx, &result); - } catch (DBException& ex) { - ex.addContext(str::stream() << "time-series insert failed: " << ns().ns()); - throw; - } + _checkAndPerformTimeseriesWrites(opCtx, &result); return; } auto reply = write_ops_exec::performInserts(opCtx, _batch); diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index ebf66828c5a..a17f682bc42 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -274,6 +274,25 @@ BucketCatalog::CommitData BucketCatalog::commit(const OID& bucketId, return data; } +void BucketCatalog::clear(const OID& bucketId) { + stdx::lock_guard lk(_mutex); + + auto it = _buckets.find(bucketId); + if (it == _buckets.end()) { + return; + } + auto& bucket = it->second; + + while (!bucket.promises.empty()) { + bucket.promises.front().setError({ErrorCodes::TimeseriesBucketCleared, + str::stream() << "Time-series bucket " << bucketId + << " for " << bucket.ns << " was cleared"}); + bucket.promises.pop(); + } + + _removeBucket(bucketId); +} + void BucketCatalog::clear(const NamespaceString& ns) { stdx::lock_guard lk(_mutex); @@ -282,16 +301,11 @@ void BucketCatalog::clear(const NamespaceString& ns) { }; for (auto it = _orderedBuckets.lower_bound({ns, {}, {}}); - it != _orderedBuckets.end() && shouldClear(std::get<NamespaceString>(*it)); - it = _orderedBuckets.erase(it)) { - const auto& bucketId = std::get<OID>(*it); - const auto& bucketNs = std::get<NamespaceString>(*it); - auto bucketIt = _buckets.find(bucketId); - _memoryUsage -= bucketIt->second.memoryUsage; - _buckets.erase(bucketIt); - _idleBuckets.erase(bucketId); - _bucketIds.erase({bucketNs, std::get<BucketMetadata>(*it)}); - _executionStats.erase(bucketNs); + it != _orderedBuckets.end() && shouldClear(std::get<NamespaceString>(*it));) { + auto nextIt = std::next(it); + _executionStats.erase(std::get<NamespaceString>(*it)); + _removeBucket(std::get<OID>(*it), it); + it = nextIt; } } @@ -325,14 +339,31 @@ void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuild } } +void BucketCatalog::_removeBucket(const OID& bucketId, + boost::optional<OrderedBuckets::iterator> orderedBucketsIt, + boost::optional<IdleBuckets::iterator> idleBucketsIt) { + auto it = _buckets.find(bucketId); + _memoryUsage -= it->second.memoryUsage; + + if (orderedBucketsIt) { + _orderedBuckets.erase(*orderedBucketsIt); + } else { + _orderedBuckets.erase({it->second.ns, it->second.metadata, it->first}); + } + + if (idleBucketsIt) { + _idleBuckets.erase(*idleBucketsIt); + } else { + _idleBuckets.erase(it->first); + } + + _bucketIds.erase({std::move(it->second.ns), std::move(it->second.metadata)}); + _buckets.erase(it); +} + void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats) { while (!_idleBuckets.empty() && _memoryUsage > kIdleBucketExpiryMemoryUsageThreshold) { - auto it = _buckets.find(*_idleBuckets.begin()); - _memoryUsage -= it->second.memoryUsage; - _idleBuckets.erase(_idleBuckets.begin()); - _bucketIds.erase({it->second.ns, it->second.metadata}); - _orderedBuckets.erase({it->second.ns, it->second.metadata, it->first}); - _buckets.erase(it); + _removeBucket(*_idleBuckets.begin(), boost::none, _idleBuckets.begin()); stats->numBucketsClosedDueToMemoryThreshold++; } } diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 496dc2bbb71..4b2382e92fa 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -100,6 +100,11 @@ public: boost::optional<CommitInfo> previousCommitInfo = boost::none); /** + * Clears the given bucket. + */ + void clear(const OID& bucketId); + + /** * Clears the buckets for the given namespace. */ void clear(const NamespaceString& ns); @@ -269,6 +274,16 @@ private: class ServerStatus; + using OrderedBuckets = std::set<std::tuple<NamespaceString, BucketMetadata, OID>>; + using IdleBuckets = std::set<OID>; + + /** + * Removes the given bucket from the bucket catalog's internal data structures. + */ + void _removeBucket(const OID& bucketId, + boost::optional<OrderedBuckets::iterator> orderedBucketsIt = boost::none, + boost::optional<IdleBuckets::iterator> idleBucketsIt = boost::none); + /** * Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold. */ @@ -283,10 +298,10 @@ private: stdx::unordered_map<std::pair<NamespaceString, BucketMetadata>, OID> _bucketIds; // All namespace, metadata, and _id tuples which currently have a bucket in the catalog. - std::set<std::tuple<NamespaceString, BucketMetadata, OID>> _orderedBuckets; + OrderedBuckets _orderedBuckets; // Buckets that do not have any writers. - std::set<OID> _idleBuckets; + IdleBuckets _idleBuckets; // Per-collection execution stats. stdx::unordered_map<NamespaceString, ExecutionStats> _executionStats; |