diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2022-10-20 13:01:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-04 17:41:12 +0000 |
commit | 6a5faba612b86518985d33a5d23954be10530a5d (patch) | |
tree | 41c932e6027cc18fc07db7e672fb0c5e8199b547 | |
parent | 826697fdc36740ab8543d81880d6d641eba0685a (diff) | |
download | mongo-6a5faba612b86518985d33a5d23954be10530a5d.tar.gz |
SERVER-70274 Improve performance of change stream pre-images purging job by leveraging EOF filter
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan_common.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp | 333 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/query/internal_plans.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/query/internal_plans.h | 10 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_collscan.cpp | 32 |
8 files changed, 173 insertions, 324 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 506ce18d1ce..c34f65afdca 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -354,16 +354,24 @@ PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, // In the future, we could change seekNear() to always return a record after minRecord in the // direction of the scan. However, tailable scans depend on the current behavior in order to // mark their position for resuming the tailable scan later on. - if (!beforeStartOfRange(_params, *member) && Filter::passes(member, _filter)) { - if (_params.stopApplyingFilterAfterFirstMatch) { - _filter = nullptr; - } - *out = memberID; - return PlanStage::ADVANCED; - } else { + if (beforeStartOfRange(_params, *member)) { _workingSet->free(memberID); return PlanStage::NEED_TIME; } + + if (!Filter::passes(member, _filter)) { + _workingSet->free(memberID); + if (_params.shouldReturnEofOnFilterMismatch) { + _commonStats.isEOF = true; + return PlanStage::IS_EOF; + } + return PlanStage::NEED_TIME; + } + if (_params.stopApplyingFilterAfterFirstMatch) { + _filter = nullptr; + } + *out = memberID; + return PlanStage::ADVANCED; } bool CollectionScan::isEOF() { diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index ba5559a4491..5770943229c 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -111,6 +111,10 @@ struct CollectionScanParams { // Whether or not to wait for oplog visibility on oplog collection scans. bool shouldWaitForOplogVisibility = false; + + // Whether or not to return EOF and stop further scanning once MatchExpression evaluates to + // false. Can only be set to true if the MatchExpression is present. + bool shouldReturnEofOnFilterMismatch = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index 2b16d4dacda..d84cb183e48 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -58,18 +58,6 @@ MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime); namespace preImageRemoverInternal { -bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime, - const Timestamp& earliestOplogEntryTimestamp) { - // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller - // than the 'earliestOplogEntryTimestamp'. - const bool preImageOplogEntryIsDeleted = ts < earliestOplogEntryTimestamp; - const auto expirationTime = preImageExpirationTime.get_value_or(Date_t::min()); - - // Pre-image is expired if its corresponding oplog entry is deleted or its operation - // time is less than or equal to the expiration time. - return preImageOplogEntryIsDeleted || operationTime <= expirationTime; -} - // Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise. boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( ChangeStreamOptions& changeStreamOptions) { @@ -108,18 +96,48 @@ RecordId toRecordId(ChangeStreamPreImageId id) { } /** + * Finds the next collection UUID in the change stream pre-images collection 'preImagesCollPtr' for + * which collection UUID is greater than 'collectionUUID'. Returns boost::none if the next + * collection is not found. + */ +boost::optional<UUID> findNextCollectionUUID(OperationContext* opCtx, + const CollectionPtr* preImagesCollPtr, + boost::optional<UUID> collectionUUID + +) { + BSONObj preImageObj; + auto minRecordId = collectionUUID + ? boost::make_optional(RecordIdBound(toRecordId(ChangeStreamPreImageId( + *collectionUUID, Timestamp::max(), std::numeric_limits<int64_t>::max())))) + : boost::none; + auto planExecutor = + InternalPlanner::collectionScan(opCtx, + preImagesCollPtr, + PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY, + InternalPlanner::Direction::FORWARD, + boost::none /* resumeAfterRecordId */, + std::move(minRecordId)); + if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) { + return boost::none; + } + auto parsedUUID = UUID::parse(preImageObj["_id"].Obj()["nsUUID"]); + tassert(7027400, "Pre-image collection UUID must be of UUID type", parsedUUID.isOK()); + return {std::move(parsedUUID.getValue())}; +} + +/** * Scans the 'config.system.preimages' collection and deletes the expired pre-images from it. * * Pre-images are ordered by collection UUID, ie. if UUID of collection A is ordered before UUID of * collection B, then pre-images of collection A will be stored before pre-images of collection B. * - * While scanning the collection for expired pre-images, each pre-image timestamp is compared - * against the 'earliestOplogEntryTimestamp' value. Any pre-image that has a timestamp greater than - * the 'earliestOplogEntryTimestamp' value is not considered for deletion and the cursor seeks to - * the next UUID in the collection. - * - * Seek to the next UUID is done by setting the values of 'Timestamp' and 'ApplyOpsIndex' fields to - * max, ie. (currentPreImage.nsUUID, Timestamp::max(), ApplyOpsIndex::max()). + * Pre-images are considered expired based on expiration parameter. In case when expiration + * parameter is not set a pre-image is considered expired if its timestamp is smaller than the + * timestamp of the earliest oplog entry. In case when expiration parameter is specified, aside from + * timestamp check a check on the wall clock time of the pre-image recording ('operationTime') is + * performed. If the difference between 'currentTimeForTimeBasedExpiration' and 'operationTime' is + * larger than expiration parameter, the pre-image is considered expired. One of those two + * conditions must be true for a pre-image to be eligible for deletion. * * +-------------------------+ * | config.system.preimages | @@ -134,267 +152,96 @@ RecordId toRecordId(ChangeStreamPreImageId id) { * | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 | * +-------------------+ +-------------------+ +-------------------+ +-------------------+ */ -class ChangeStreamExpiredPreImageIterator { -public: - // Iterator over the range of pre-image documents, where each range defines a set of expired - // pre-image documents of one collection eligible for deletion due to expiration. Lower and - // upper bounds of a range are inclusive. - class Iterator { - public: - using RecordIdRange = std::pair<RecordId, RecordId>; - - Iterator(OperationContext* opCtx, - const CollectionPtr* preImagesCollPtr, - Timestamp earliestOplogEntryTimestamp, - boost::optional<Date_t> preImageExpirationTime, - bool isEndIterator = false) - : _opCtx(opCtx), - _preImagesCollPtr(preImagesCollPtr), - _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp), - _preImageExpirationTime(preImageExpirationTime) { - if (!isEndIterator) { - advance(); - } - } - - const RecordIdRange& operator*() const { - return _currentExpiredPreImageRange; - } - - const RecordIdRange* operator->() const { - return &_currentExpiredPreImageRange; - } - - Iterator& operator++() { - advance(); - return *this; - } - - // Both iterators are equal if they are both pointing to the same expired pre-image range. - friend bool operator==(const Iterator& a, const Iterator& b) { - return a._currentExpiredPreImageRange == b._currentExpiredPreImageRange; - }; - - friend bool operator!=(const Iterator& a, const Iterator& b) { - return !(a == b); - }; - - private: - // Scans the pre-images collection and gets the next expired pre-image range or sets - // '_currentExpiredPreImageRange' to the range with empty record ids in case there are no - // more expired pre-images left. - void advance() { - const auto getNextPreImageAttributes = - [&](std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>& planExecutor) - -> boost::optional<preImageRemoverInternal::PreImageAttributes> { - BSONObj preImageObj; - if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) { - return boost::none; - } - - auto preImage = - ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"), preImageObj); - return {{std::move(preImage.getId().getNsUUID()), - std::move(preImage.getId().getTs()), - std::move(preImage.getOperationTime())}}; - }; - - while (true) { - // Fetch the first pre-image from the next collection, that has pre-images enabled. - auto planExecutor = _previousCollectionUUID - ? createCollectionScan(RecordIdBound( - toRecordId(ChangeStreamPreImageId(*_previousCollectionUUID, - Timestamp::max(), - std::numeric_limits<int64_t>::max())))) - : createCollectionScan(boost::none); - auto preImageAttributes = getNextPreImageAttributes(planExecutor); - - // If there aren't any pre-images left, set the range to the empty record ids and - // return. - if (!preImageAttributes) { - _currentExpiredPreImageRange = std::pair(RecordId(), RecordId()); - return; - } - const auto currentCollectionUUID = preImageAttributes->collectionUUID; - _previousCollectionUUID = currentCollectionUUID; - - // If the first pre-image in the current collection is not expired, fetch the first - // pre-image from the next collection. - if (!preImageAttributes->isExpiredPreImage(_preImageExpirationTime, - _earliestOplogEntryTimestamp)) { - continue; - } - - // If an expired pre-image is found, compute the max expired pre-image RecordId for - // this collection depending on the expiration parameter being set. - const auto minKey = - toRecordId(ChangeStreamPreImageId(currentCollectionUUID, Timestamp(), 0)); - RecordId maxKey; - if (_preImageExpirationTime) { - // Reset the collection scan to start one increment before the - // '_earliestOplogEntryTimestamp', as the pre-images with smaller or equal - // timestamp are guaranteed to be expired. - Timestamp lastExpiredPreimageTs(_earliestOplogEntryTimestamp.asULL() - 1); - auto planExecutor = createCollectionScan(RecordIdBound( - toRecordId(ChangeStreamPreImageId(currentCollectionUUID, - lastExpiredPreimageTs, - std::numeric_limits<int64_t>::max())))); - - // Iterate over all the expired pre-images in the collection in order to find - // the max RecordId. - while ((preImageAttributes = getNextPreImageAttributes(planExecutor)) && - preImageAttributes->isExpiredPreImage(_preImageExpirationTime, - _earliestOplogEntryTimestamp) && - preImageAttributes->collectionUUID == currentCollectionUUID) { - lastExpiredPreimageTs = preImageAttributes->ts; - } - - maxKey = - toRecordId(ChangeStreamPreImageId(currentCollectionUUID, - lastExpiredPreimageTs, - std::numeric_limits<int64_t>::max())); - } else { - // If the expiration parameter is not set, then the last expired pre-image - // timestamp equals to one increment before the '_earliestOplogEntryTimestamp'. - maxKey = toRecordId( - ChangeStreamPreImageId(currentCollectionUUID, - Timestamp(_earliestOplogEntryTimestamp.asULL() - 1), - std::numeric_limits<int64_t>::max())); - } - tassert(6138300, - "Max key of the expired pre-image range has to be valid", - maxKey.isValid()); - _currentExpiredPreImageRange = std::pair(minKey, maxKey); - return; - } - } - - // Set up the new collection scan to start from the 'minKey'. - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan( - boost::optional<RecordIdBound> minKey) const { - return InternalPlanner::collectionScan(_opCtx, - _preImagesCollPtr, - PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY, - InternalPlanner::Direction::FORWARD, - boost::none, - minKey); - } - - OperationContext* _opCtx; - const CollectionPtr* _preImagesCollPtr; - RecordIdRange _currentExpiredPreImageRange; - boost::optional<UUID> _previousCollectionUUID; - const Timestamp _earliestOplogEntryTimestamp; - - // The pre-images with operation time less than or equal to the '_preImageExpirationTime' - // are considered expired. - const boost::optional<Date_t> _preImageExpirationTime; - }; - - ChangeStreamExpiredPreImageIterator( - OperationContext* opCtx, - const CollectionPtr* preImagesCollPtr, - const Timestamp earliestOplogEntryTimestamp, - const boost::optional<Date_t> preImageExpirationTime = boost::none) - : _opCtx(opCtx), - _preImagesCollPtr(preImagesCollPtr), - _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp), - _preImageExpirationTime(preImageExpirationTime) {} - - Iterator begin() const { - return Iterator( - _opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp, _preImageExpirationTime); - } - - Iterator end() const { - return Iterator(_opCtx, - _preImagesCollPtr, - _earliestOplogEntryTimestamp, - _preImageExpirationTime, - true /*isEndIterator*/); - } - -private: - OperationContext* _opCtx; - const CollectionPtr* _preImagesCollPtr; - const Timestamp _earliestOplogEntryTimestamp; - const boost::optional<Date_t> _preImageExpirationTime; -}; - -void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) { - const auto startTime = Date_t::now(); - auto opCtx = client->makeOperationContext(); - +size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx, + Date_t currentTimeForTimeBasedExpiration) { // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection // doesn't exist. - AutoGetCollection autoColl( - opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); + AutoGetCollection autoColl(opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); const auto& preImagesColl = autoColl.getCollection(); if (!preImagesColl) { - return; + return 0; } // Do not run the job on secondaries. - if (!repl::ReplicationCoordinator::get(opCtx.get()) - ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) { - return; + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, NamespaceString::kAdminDb)) { + return 0; } - // Get the timestamp of the ealiest oplog entry. + // Get the timestamp of the earliest oplog entry. const auto currentEarliestOplogEntryTs = - repl::StorageInterface::get(client->getServiceContext()) - ->getEarliestOplogTimestamp(opCtx.get()); + repl::StorageInterface::get(opCtx->getServiceContext())->getEarliestOplogTimestamp(opCtx); const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); size_t numberOfRemovals = 0; + const auto preImageExpirationTime = ::mongo::preImageRemoverInternal::getPreImageExpirationTime( + opCtx, currentTimeForTimeBasedExpiration); + + // Configure the filter for the case when expiration parameter is set. + OrMatchExpression filter; + const MatchExpression* filterPtr = nullptr; + if (preImageExpirationTime) { + filter.add( + std::make_unique<LTMatchExpression>("_id.ts"_sd, Value(currentEarliestOplogEntryTs))); + filter.add(std::make_unique<LTEMatchExpression>("operationTime"_sd, + Value(*preImageExpirationTime))); + filterPtr = &filter; + } + const bool shouldReturnEofOnFilterMismatch = preImageExpirationTime.has_value(); - ChangeStreamExpiredPreImageIterator expiredPreImages( - opCtx.get(), - &preImagesColl, - currentEarliestOplogEntryTs, - ::mongo::preImageRemoverInternal::getPreImageExpirationTime( - opCtx.get(), currentTimeForTimeBasedExpiration)); - - for (const auto& collectionRange : expiredPreImages) { + boost::optional<UUID> currentCollectionUUID = boost::none; + while ((currentCollectionUUID = + findNextCollectionUUID(opCtx, &preImagesColl, currentCollectionUUID))) { writeConflictRetry( - opCtx.get(), + opCtx, "ChangeStreamExpiredPreImagesRemover", NamespaceString::kChangeStreamPreImagesNamespace.ns(), [&] { auto params = std::make_unique<DeleteStageParams>(); params->isMulti = true; - boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams; + std::unique_ptr<BatchedDeleteStageBatchParams> batchedDeleteParams; if (isBatchedRemoval) { - batchParams = std::make_unique<BatchedDeleteStageBatchParams>(); + batchedDeleteParams = std::make_unique<BatchedDeleteStageBatchParams>(); } + RecordIdBound minRecordId( + toRecordId(ChangeStreamPreImageId(*currentCollectionUUID, Timestamp(), 0))); + + // If the expiration parameter is set, the 'maxRecord' is set to the maximum + // RecordId for this collection. Whether the pre-image has to be deleted will be + // determined by the filtering MatchExpression. + // + // If the expiration parameter is not set, then the last expired pre-image timestamp + // equals to one increment before the 'currentEarliestOplogEntryTs'. + RecordIdBound maxRecordId = RecordIdBound(toRecordId(ChangeStreamPreImageId( + *currentCollectionUUID, + preImageExpirationTime ? Timestamp::max() + : Timestamp(currentEarliestOplogEntryTs.asULL() - 1), + std::numeric_limits<int64_t>::max()))); auto exec = InternalPlanner::deleteWithCollectionScan( - opCtx.get(), + opCtx, &preImagesColl, std::move(params), PlanYieldPolicy::YieldPolicy::YIELD_AUTO, InternalPlanner::Direction::FORWARD, - RecordIdBound(collectionRange.first), - RecordIdBound(collectionRange.second), + std::move(minRecordId), + std::move(maxRecordId), CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, - std::move(batchParams)); + std::move(batchedDeleteParams), + filterPtr, + shouldReturnEofOnFilterMismatch); numberOfRemovals += exec->executeDelete(); }); } - - if (numberOfRemovals > 0) { - LOGV2_DEBUG(5869104, - 3, - "Periodic expired pre-images removal job finished executing", - "numberOfRemovals"_attr = numberOfRemovals, - "jobDuration"_attr = (Date_t::now() - startTime).toString()); - } + return numberOfRemovals; } void performExpiredChangeStreamPreImagesRemovalPass(Client* client) { + ServiceContext::UniqueOperationContext opCtx; try { Date_t currentTimeForTimeBasedExpiration = Date_t::now(); + opCtx = client->makeOperationContext(); changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { // Populate the current time for time based expiration of pre-images. @@ -409,7 +256,7 @@ void performExpiredChangeStreamPreImagesRemovalPass(Client* client) { currentTimeForTimeBasedExpiration = currentTimeElem.Date(); } }); - deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); + deleteExpiredChangeStreamPreImages(opCtx.get(), currentTimeForTimeBasedExpiration); } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted"); } catch (const DBException& exception) { diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h index 0ddd491991f..19315825043 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h @@ -35,22 +35,6 @@ namespace mongo { namespace preImageRemoverInternal { -/** - * Specifies attributes that determines if the pre-image has been expired or not. - */ -struct PreImageAttributes { - mongo::UUID collectionUUID; - Timestamp ts; - Date_t operationTime; - - /** - * Determines if the pre-image is considered expired based on the expiration parameter being - * set. - */ - bool isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime, - const Timestamp& earliestOplogEntryTimestamp); -}; - boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime); } // namespace preImageRemoverInternal diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp index ec49c8453d3..b2562a1db44 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp @@ -61,16 +61,6 @@ public: ASSERT_EQ(changeStreamOptionsManager.setOptions(opCtx, changeStreamOptions).getStatus(), ErrorCodes::OK); } - - bool isExpiredPreImage(const Timestamp& preImageTs, - const Date_t& preImageOperationTime, - const boost::optional<Date_t>& preImageExpirationTime, - const Timestamp& earliestOplogEntryTimestamp) { - preImageRemoverInternal::PreImageAttributes preImageAttributes{ - UUID::gen(), preImageTs, preImageOperationTime}; - return preImageAttributes.isExpiredPreImage(preImageExpirationTime, - earliestOplogEntryTimestamp); - } }; TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithValidIntegralValue) { @@ -107,39 +97,5 @@ TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithOf preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime); ASSERT_FALSE(receivedExpireAfterSeconds); } - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderTimestamp) { - ASSERT_TRUE( - isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */, - Date_t::now() /* preImageOperationTime */, - Date_t::now() /* preImageExpirationTime */, - Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */); -} - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldNotHaveExpired) { - ASSERT_FALSE( - isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */, - Date_t::now() + Seconds(1) /* preImageOperationTime */, - Date_t::now() /* preImageExpirationTime */, - Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */); -} - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderOperationTime) { - ASSERT_TRUE( - isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */, - Date_t::now() /* preImageOperationTime */, - Date_t::now() + Seconds(1) /* preImageExpirationTime */, - Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */); -} - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, - preImageShouldNotHaveExpiredWithNullExpirationTime) { - ASSERT_TRUE( - isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */, - Date_t::now() /* preImageOperationTime */, - boost::none /* preImageExpirationTime */, - Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index 04f70b1d2cc..78820eb8111 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -123,7 +123,8 @@ CollectionScanParams createCollectionScanParams( boost::optional<RecordId> resumeAfterRecordId, boost::optional<RecordIdBound> minRecord, boost::optional<RecordIdBound> maxRecord, - CollectionScanParams::ScanBoundInclusion boundInclusion) { + CollectionScanParams::ScanBoundInclusion boundInclusion, + bool shouldReturnEofOnFilterMismatch) { const auto& collection = *coll; invariant(collection); @@ -139,6 +140,7 @@ CollectionScanParams createCollectionScanParams( params.direction = CollectionScanParams::BACKWARD; } params.boundInclusion = boundInclusion; + params.shouldReturnEofOnFilterMismatch = shouldReturnEofOnFilterMismatch; return params; } } // namespace @@ -151,7 +153,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection boost::optional<RecordId> resumeAfterRecordId, boost::optional<RecordIdBound> minRecord, boost::optional<RecordIdBound> maxRecord, - CollectionScanParams::ScanBoundInclusion boundInclusion) { + CollectionScanParams::ScanBoundInclusion boundInclusion, + bool shouldReturnEofOnFilterMismatch) { const auto& collection = *coll; invariant(collection); @@ -167,7 +170,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection resumeAfterRecordId, minRecord, maxRecord, - boundInclusion); + boundInclusion, + shouldReturnEofOnFilterMismatch); auto cs = _collectionScan(expCtx, ws.get(), &collection, collScanParams); @@ -218,11 +222,19 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith boost::optional<RecordIdBound> minRecord, boost::optional<RecordIdBound> maxRecord, CollectionScanParams::ScanBoundInclusion boundInclusion, - boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams) { + std::unique_ptr<BatchedDeleteStageBatchParams> batchedDeleteParams, + const MatchExpression* filter, + bool shouldReturnEofOnFilterMismatch) { const auto& collection = *coll; invariant(collection); - auto ws = std::make_unique<WorkingSet>(); + if (shouldReturnEofOnFilterMismatch) { + tassert(7010801, + "MatchExpression filter must be provided when 'shouldReturnEofOnFilterMismatch' is " + "set to true ", + filter); + } + auto ws = std::make_unique<WorkingSet>(); auto expCtx = make_intrusive<ExpressionContext>( opCtx, std::unique_ptr<CollatorInterface>(nullptr), collection->ns()); @@ -237,14 +249,15 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith boost::none /* resumeAfterId */, minRecord, maxRecord, - boundInclusion); + boundInclusion, + shouldReturnEofOnFilterMismatch); - auto root = _collectionScan(expCtx, ws.get(), &collection, collScanParams); + auto root = _collectionScan(expCtx, ws.get(), &collection, collScanParams, filter); - if (batchParams) { + if (batchedDeleteParams) { root = std::make_unique<BatchedDeleteStage>(expCtx.get(), std::move(params), - std::move(*batchParams), + std::move(batchedDeleteParams), ws.get(), collection, root.release()); @@ -454,12 +467,13 @@ std::unique_ptr<PlanStage> InternalPlanner::_collectionScan( const boost::intrusive_ptr<ExpressionContext>& expCtx, WorkingSet* ws, const CollectionPtr* coll, - const CollectionScanParams& params) { + const CollectionScanParams& params, + const MatchExpression* filter) { const auto& collection = *coll; invariant(collection); - return std::make_unique<CollectionScan>(expCtx.get(), collection, params, ws, nullptr); + return std::make_unique<CollectionScan>(expCtx.get(), collection, params, ws, filter); } std::unique_ptr<PlanStage> InternalPlanner::_indexScan( diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index ea8de7c0042..46400c4e3fa 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -83,7 +83,8 @@ public: boost::optional<RecordIdBound> minRecord = boost::none, boost::optional<RecordIdBound> maxRecord = boost::none, CollectionScanParams::ScanBoundInclusion boundInclusion = - CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords); + CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, + bool shouldReturnEofOnFilterMismatch = false); static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> collectionScan( OperationContext* opCtx, @@ -104,7 +105,9 @@ public: boost::optional<RecordIdBound> maxRecord = boost::none, CollectionScanParams::ScanBoundInclusion boundInclusion = CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, - boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams = boost::none); + std::unique_ptr<BatchedDeleteStageBatchParams> batchedDeleteParams = nullptr, + const MatchExpression* filter = nullptr, + bool shouldReturnEofOnFilterMismatch = false); /** * Returns an index scan. Caller owns returned pointer. @@ -197,7 +200,8 @@ private: const boost::intrusive_ptr<ExpressionContext>& expCtx, WorkingSet* ws, const CollectionPtr* collection, - const CollectionScanParams& params); + const CollectionScanParams& params, + const MatchExpression* filter = nullptr); /** * Returns a plan stage that is either an index scan or an index scan with a fetch stage. diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp index fafbd58c0ef..3f022034fd0 100644 --- a/src/mongo/dbtests/query_stage_collscan.cpp +++ b/src/mongo/dbtests/query_stage_collscan.cpp @@ -339,6 +339,38 @@ TEST_F(QueryStageCollectionScanTest, QueryStageCollscanBasicBackwardWithMatch) { ASSERT_EQUALS(25, countResults(CollectionScanParams::BACKWARD, obj)); } +TEST_F(QueryStageCollectionScanTest, + QueryTestCollscanStopsScanningOnFilterFailureInClusteredCollectionIfSpecified) { + auto ns = NamespaceString("a.b"); + auto collDeleter = createClusteredCollection(ns, false /* prePopulate */); + for (int i = 1; i <= numObj(); ++i) { + insertDocument(ns, BSON("_id" << i << "foo" << i)); + } + + AutoGetCollectionForRead autoColl(&_opCtx, ns); + const CollectionPtr& coll = autoColl.getCollection(); + ASSERT(coll->isClustered()); + + // Configure the threshold and the expected number of scanned documents. + const int threshold = numObj() / 2; + const int expectedNumberOfScannedDocuments = threshold + 1; + + // Configure the scan. + CollectionScanParams params; + params.shouldReturnEofOnFilterMismatch = true; + WorkingSet ws; + LTEMatchExpression filter{"foo"_sd, Value(threshold)}; + auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, &filter); + + // Scan all matching documents. + WorkingSetID id = WorkingSet::INVALID_ID; + while (!scan->isEOF()) { + scan->work(&id); + } + auto collScanStats = static_cast<const CollectionScanStats*>(scan->getSpecificStats()); + ASSERT_EQUALS(expectedNumberOfScannedDocuments, collScanStats->docsTested); +} + // Get objects in the order we inserted them. TEST_F(QueryStageCollectionScanTest, QueryStageCollscanObjectsInOrderForward) { AutoGetCollectionForReadCommand collection(&_opCtx, nss); |