diff options
author | Alyssa Wagenmaker <alyssa.wagenmaker@mongodb.com> | 2023-03-23 13:21:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-23 14:25:04 +0000 |
commit | 27f17569feb62dc075f1e733bc1d7b53ed226ea9 (patch) | |
tree | f0000440e022063641c80e71cb47a41810224665 /src/mongo/db/exec | |
parent | 642d0a1318833017fe865e8c3b4216ae48a00991 (diff) | |
download | mongo-27f17569feb62dc075f1e733bc1d7b53ed226ea9.tar.gz |
SERVER-74955 Refactor TS_MODIFY to unpack and write a bucket in one doWork() call
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.cpp | 266 | ||||
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.h | 33 |
2 files changed, 118 insertions, 181 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp index c2298ff645d..f0e0f274923 100644 --- a/src/mongo/db/exec/timeseries_modify.cpp +++ b/src/mongo/db/exec/timeseries_modify.cpp @@ -55,8 +55,7 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx, } bool TimeseriesModifyStage::isEOF() { - return !_bucketUnpacker.hasNext() && child()->isEOF() && - _retryBucketId == WorkingSet::INVALID_ID; + return child()->isEOF() && _retryBucketId == WorkingSet::INVALID_ID; } std::unique_ptr<PlanStageStats> TimeseriesModifyStage::getStats() { @@ -69,33 +68,18 @@ std::unique_ptr<PlanStageStats> TimeseriesModifyStage::getStats() { return ret; } -void TimeseriesModifyStage::resetCurrentBucket() { - _deletedMeasurements.clear(); - _unchangedMeasurements.clear(); - _currentBucketFromMigrate = false; - _currentBucketRid = RecordId{}; - _currentBucketSnapshotId = SnapshotId{}; -} - -PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() { - ON_BLOCK_EXIT([&] { resetCurrentBucket(); }); - +PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( + WorkingSetID bucketWsmId, + const std::vector<BSONObj>& unchangedMeasurements, + const std::vector<BSONObj>& deletedMeasurements, + bool bucketFromMigrate) { if (_params->isExplain) { - _specificStats.measurementsDeleted += _deletedMeasurements.size(); + _specificStats.measurementsDeleted += deletedMeasurements.size(); return PlanStage::NEED_TIME; } // No measurements needed to be deleted from the bucket document. - if (_deletedMeasurements.empty()) { - return PlanStage::NEED_TIME; - } - - if (opCtx()->recoveryUnit()->getSnapshotId() != _currentBucketSnapshotId) { - // The snapshot has changed, so we have no way to prove that the bucket we're - // unwinding still exists in the same shape it did originally. If it has changed, we - // risk re-inserting a measurement that we can see in our cache but which has - // actually since been deleted. So we have to fetch and retry this bucket. - _retryBucket(_currentBucketRid); + if (deletedMeasurements.empty()) { return PlanStage::NEED_TIME; } @@ -112,29 +96,35 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() { std::terminate(); }); - OID bucketId = record_id_helpers::toBSONAs(_currentBucketRid, "_id")["_id"].OID(); - if (_unchangedMeasurements.empty()) { + auto recordId = _ws->get(bucketWsmId)->recordId; + + auto yieldAndRetry = [&](unsigned logId) { + LOGV2_DEBUG(logId, + 5, + "Retrying bucket due to conflict attempting to write out changes", + "bucket_rid"_attr = recordId); + _retryBucket(bucketWsmId); + return PlanStage::NEED_YIELD; + }; + + OID bucketId = record_id_helpers::toBSONAs(recordId, "_id")["_id"].OID(); + if (unchangedMeasurements.empty()) { write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId), false); write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry}); - auto result = timeseries::performAtomicWrites( - opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate); + auto result = + timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate); if (!result.isOK()) { - LOGV2_DEBUG(7309300, - 5, - "Retrying bucket due to conflict attempting to write out changes", - "bucket_rid"_attr = _currentBucketRid); - _retryBucket(_currentBucketRid); - return PlanStage::NEED_YIELD; + return yieldAndRetry(7309300); } } else { auto timeseriesOptions = collection()->getTimeseriesOptions(); auto metaFieldName = timeseriesOptions->getMetaField(); auto metadata = - metaFieldName ? _unchangedMeasurements[0].getField(*metaFieldName).wrap() : BSONObj(); + metaFieldName ? unchangedMeasurements[0].getField(*metaFieldName).wrap() : BSONObj(); auto replaceBucket = timeseries::makeNewDocumentForWrite(bucketId, - _unchangedMeasurements, + unchangedMeasurements, metadata, timeseriesOptions, collection()->getDefaultCollator()); @@ -143,22 +133,17 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() { write_ops::UpdateOpEntry updateEntry(BSON("_id" << bucketId), std::move(u)); write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry}); - auto result = timeseries::performAtomicWrites( - opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate); + auto result = + timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate); if (!result.isOK()) { - LOGV2_DEBUG(7309301, - 5, - "Retrying bucket due to conflict attempting to write out changes", - "bucket_rid"_attr = _currentBucketRid); - _retryBucket(_currentBucketRid); - return PlanStage::NEED_YIELD; + return yieldAndRetry(7309301); } } - _specificStats.measurementsDeleted += _deletedMeasurements.size(); + _specificStats.measurementsDeleted += deletedMeasurements.size(); - // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in - // which they are created, and a WriteUnitOfWork is a transaction, make sure to restore the - // state outside of the WriteUnitOfWork. + // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which + // they are created, and a WriteUnitOfWork is a transaction, make sure to restore the state + // outside of the WriteUnitOfWork. return handlePlanStageYield( expCtx(), "TimeseriesModifyStage restoreState", @@ -168,46 +153,33 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() { return PlanStage::NEED_TIME; }, // yieldHandler - // Note we don't need to retry anything in this case since the - // delete already was committed. However, we still need to - // return the deleted document (if it was requested). - // TODO for findAndModify we need to return the deleted doc. + // Note we don't need to retry anything in this case since the delete already was committed. + // However, we still need to return the deleted document (if it was requested). + // TODO SERVER-73089 for findAndModify we need to return the deleted doc. [&] { /* noop */ }); } template <typename F> -boost::optional<PlanStage::StageState> TimeseriesModifyStage::_rememberIfWritingToOrphanedBucket( - ScopeGuard<F>& bucketFreer, WorkingSetID id) { +std::pair<boost::optional<PlanStage::StageState>, bool> +TimeseriesModifyStage::_checkIfWritingToOrphanedBucket(ScopeGuard<F>& bucketFreer, + WorkingSetID id) { // If we are in explain mode, we do not need to check if the bucket is orphaned since we're not // writing to bucket. If we are migrating a bucket, we also do not need to check if the bucket - // is not writable and just remember it. + // is not writable and just return it. if (_params->isExplain || _params->fromMigrate) { - _currentBucketFromMigrate = _params->fromMigrate; - return boost::none; + return {boost::none, _params->fromMigrate}; } - - auto [immediateReturnStageState, currentBucketFromMigrate] = - _preWriteFilter.checkIfNotWritable(_ws->get(id)->doc.value(), - "timeseriesDelete"_sd, - collection()->ns(), - [&](const ExceptionFor<ErrorCodes::StaleConfig>& ex) { - planExecutorShardingCriticalSectionFuture(opCtx()) = - ex->getCriticalSectionSignal(); - // Retry the write if we're in the sharding critical - // section. - bucketFreer.dismiss(); - _retryBucket(id); - }); - - // We need to immediately return if the bucket is orphaned or we're in the sharding critical - // section and hence should yield. - if (immediateReturnStageState) { - return *immediateReturnStageState; - } - - _currentBucketFromMigrate = currentBucketFromMigrate; - - return boost::none; + return _preWriteFilter.checkIfNotWritable(_ws->get(id)->doc.value(), + "timeseriesDelete"_sd, + collection()->ns(), + [&](const ExceptionFor<ErrorCodes::StaleConfig>& ex) { + planExecutorShardingCriticalSectionFuture( + opCtx()) = ex->getCriticalSectionSignal(); + // Retry the write if we're in the sharding + // critical section. + bucketFreer.dismiss(); + _retryBucket(id); + }); } PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) { @@ -216,25 +188,16 @@ PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) { if (status != PlanStage::ADVANCED) { return status; } - - auto member = _ws->get(id); - // TODO SERVER-73142 remove this assert, we may not have an object if we have a spool child. - tassert(7443600, "Child should have provided the whole document", member->hasObj()); - if (member->hasObj()) { - // We already got the whole document from our child, no need to fetch. - return PlanStage::ADVANCED; - } } else { - // We have a bucket that we need to fetch before we can unpack it. id = _retryBucketId; _retryBucketId = WorkingSet::INVALID_ID; } - // We don't have an up-to-date document for this RecordId. Fetch it and ensure that it still - // exists and matches our predicate. + // We may not have an up-to-date bucket for this RecordId. Fetch it and ensure that it still + // exists and matches our bucket-level predicate if it is not believed to be up-to-date. bool docStillMatches; - const auto ret = handlePlanStageYield( + const auto status = handlePlanStageYield( expCtx(), "TimeseriesModifyStage:: ensureStillMatches", collection()->ns().ns(), @@ -248,14 +211,13 @@ PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) { // There was a problem trying to detect if the document still exists, so retry. _retryBucket(id); }); - if (ret != PlanStage::NEED_TIME) { - return ret; + if (status != PlanStage::NEED_TIME) { + return status; } - return docStillMatches ? PlanStage::ADVANCED : PlanStage::NEED_TIME; } -void TimeseriesModifyStage::_retryBucket(const stdx::variant<WorkingSetID, RecordId>& bucketId) { +void TimeseriesModifyStage::_retryBucket(WorkingSetID bucketId) { tassert(7309302, "Cannot be in the middle of unpacking a bucket if retrying", !_bucketUnpacker.hasNext()); @@ -263,20 +225,7 @@ void TimeseriesModifyStage::_retryBucket(const stdx::variant<WorkingSetID, Recor "Cannot retry two buckets at the same time", _retryBucketId == WorkingSet::INVALID_ID); - stdx::visit(OverloadedVisitor{ - [&](WorkingSetID id) { _retryBucketId = id; }, - [&](const RecordId& rid) { - // We don't have a working set member referencing this bucket, allocate one. - _retryBucketId = _ws->allocate(); - auto member = _ws->get(_retryBucketId); - member->recordId = rid; - member->doc.setSnapshotId(_currentBucketSnapshotId); - member->transitionToRecordIdAndObj(); - }, - }, - bucketId); - - resetCurrentBucket(); + _retryBucketId = bucketId; } PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { @@ -284,61 +233,62 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { return PlanStage::IS_EOF; } - // The current bucket is exhausted. Perform an atomic write to modify the bucket and move on to - // the next. - if (!_bucketUnpacker.hasNext()) { - auto status = _writeToTimeseriesBuckets(); - if (status != PlanStage::NEED_TIME) { - *out = WorkingSet::INVALID_ID; - return status; + tassert(7495500, + "Expected bucketUnpacker's current bucket to be exhausted", + !_bucketUnpacker.hasNext()); + + auto id = WorkingSet::INVALID_ID; + auto status = _getNextBucket(id); + if (status != PlanStage::ADVANCED) { + if (status == PlanStage::NEED_YIELD) { + *out = id; } + return status; + } + + // We want to free this member when we return because we either have an owned copy of the bucket + // for normal write and write to orphan cases, or we skip the bucket. + ScopeGuard bucketFreer([&] { _ws->free(id); }); + + auto member = _ws->get(id); + tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId()); + + // Determine if we are writing to an orphaned bucket - such writes should be excluded from + // user-visible change stream events. This will be achieved later by setting 'fromMigrate' flag + // when calling performAtomicWrites(). + auto [immediateReturnStageState, bucketFromMigrate] = + _checkIfWritingToOrphanedBucket(bucketFreer, id); + if (immediateReturnStageState) { + return *immediateReturnStageState; + } + tassert(7309304, + "Expected no bucket to retry after getting a new bucket", + _retryBucketId == WorkingSet::INVALID_ID); + + // Unpack the bucket and determine which measurements match the residual predicate. + auto ownedBucket = member->doc.value().toBson().getOwned(); + _bucketUnpacker.reset(std::move(ownedBucket)); + ++_specificStats.bucketsUnpacked; - auto id = WorkingSet::INVALID_ID; - status = _getNextBucket(id); - - if (PlanStage::ADVANCED == status) { - // We want to free this member when we return because we either have an owned copy - // of the bucket for normal write and write to orphan cases, or we skip the bucket. - ScopeGuard bucketFreer([&] { _ws->free(id); }); - - auto member = _ws->get(id); - tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId()); - - if (auto immediateReturnStageState = - _rememberIfWritingToOrphanedBucket(bucketFreer, id)) { - return *immediateReturnStageState; - } - - tassert(7309304, - "Expected no bucket to retry after getting a new bucket", - _retryBucketId == WorkingSet::INVALID_ID); - _currentBucketRid = member->recordId; - _currentBucketSnapshotId = member->doc.snapshotId(); - - // Make an owned copy of the bucket document if necessary. The bucket will be - // unwound across multiple calls to 'doWork()', so we need to hold our own copy in - // the query execution layer in case the storage engine reclaims the memory for the - // bucket between calls to 'doWork()'. - auto ownedBucket = member->doc.value().toBson().getOwned(); - _bucketUnpacker.reset(std::move(ownedBucket)); - ++_specificStats.bucketsUnpacked; + std::vector<BSONObj> unchangedMeasurements; + std::vector<BSONObj> deletedMeasurements; + + while (_bucketUnpacker.hasNext()) { + auto measurement = _bucketUnpacker.getNext().toBson(); + if (_residualPredicate->matchesBSON(measurement)) { + deletedMeasurements.push_back(measurement); } else { - if (PlanStage::NEED_YIELD == status) { - *out = id; - } - return status; + unchangedMeasurements.push_back(measurement); } } - invariant(_bucketUnpacker.hasNext()); - auto measurement = _bucketUnpacker.getNext().toBson().getOwned(); - if (_residualPredicate->matchesBSON(measurement)) { - _deletedMeasurements.push_back(measurement); - } else { - _unchangedMeasurements.push_back(measurement); + status = _writeToTimeseriesBuckets( + id, unchangedMeasurements, deletedMeasurements, bucketFromMigrate); + if (status != PlanStage::NEED_TIME) { + *out = WorkingSet::INVALID_ID; + bucketFreer.dismiss(); } - - return PlanStage::NEED_TIME; + return status; } void TimeseriesModifyStage::doRestoreStateRequiresCollection() { diff --git a/src/mongo/db/exec/timeseries_modify.h b/src/mongo/db/exec/timeseries_modify.h index 2c14996f83c..e428698309b 100644 --- a/src/mongo/db/exec/timeseries_modify.h +++ b/src/mongo/db/exec/timeseries_modify.h @@ -40,8 +40,8 @@ namespace mongo { /** * Unpacks time-series bucket documents and writes the modified documents. * - * The stage processes one measurement at a time, but only performs a write after each bucket is - * exhausted. + * The stage processes one bucket at a time, unpacking all the measurements and writing the output + * bucket in a single doWork() call. */ class TimeseriesModifyStage final : public RequiresMutableCollectionStage { public: @@ -78,18 +78,22 @@ protected: private: /** - * Writes the modifications to a bucket when the end of the bucket is detected. + * Writes the modifications to a bucket. */ - PlanStage::StageState _writeToTimeseriesBuckets(); + PlanStage::StageState _writeToTimeseriesBuckets( + WorkingSetID bucketWsmId, + const std::vector<BSONObj>& unchangedMeasurements, + const std::vector<BSONObj>& deletedMeasurements, + bool bucketFromMigrate); /** * Helper to set up state to retry 'bucketId' after yielding and establishing a new storage * snapshot. */ - void _retryBucket(const stdx::variant<WorkingSetID, RecordId>& bucketId); + void _retryBucket(WorkingSetID bucketId); template <typename F> - boost::optional<PlanStage::StageState> _rememberIfWritingToOrphanedBucket( + std::pair<boost::optional<PlanStage::StageState>, bool> _checkIfWritingToOrphanedBucket( ScopeGuard<F>& bucketFreer, WorkingSetID id); /** @@ -97,8 +101,6 @@ private: */ PlanStage::StageState _getNextBucket(WorkingSetID& id); - void resetCurrentBucket(); - std::unique_ptr<DeleteStageParams> _params; WorkingSet* _ws; @@ -113,16 +115,6 @@ private: // unmodified. std::unique_ptr<MatchExpression> _residualPredicate; - // The RecordId (also "_id" for the clustered collection) value of the current bucket. - RecordId _currentBucketRid = RecordId{}; - // Maintained similarly to '_currentBucketRid', but used to determine if we can actually use the - // results of unpacking to do a write. If the storage engine snapshot has changed, all bets are - // off and it's unsafe to proceed - more details in the implementation which reads this value. - SnapshotId _currentBucketSnapshotId = SnapshotId{}; - - std::vector<BSONObj> _unchangedMeasurements; - std::vector<BSONObj> _deletedMeasurements; - /** * This member is used to check whether the write should be performed, and if so, any other * behavior that should be done as part of the write (e.g. skipping it because it affects an @@ -133,11 +125,6 @@ private: */ write_stage_common::PreWriteFilter _preWriteFilter; - // True if the current bucket is an orphan and we're writing to an orphaned bucket, when such - // writes should be excluded from user-visible change stream events. This can be achieved by - // setting 'fromMigrate' flag when calling performAtomicWrites(). - bool _currentBucketFromMigrate = false; - TimeseriesModifyStats _specificStats{}; // A pending retry to get to after a NEED_YIELD propagation and a new storage snapshot is |