From 1d9d6d49f3e713bcdb50896ed412daa3150d7116 Mon Sep 17 00:00:00 2001 From: Denis Grebennicov Date: Thu, 20 Oct 2022 13:01:48 +0000 Subject: SERVER-70274 Improve performance of change stream pre-images purging job by leveraging EOF filter --- ...change_stream_pre_images_collection_manager.cpp | 428 +++++++-------------- .../change_stream_pre_images_collection_manager.h | 18 - src/mongo/db/exec/collection_scan.cpp | 22 +- src/mongo/db/exec/collection_scan_common.h | 4 + ...hange_stream_expired_pre_image_remover_test.cpp | 44 --- src/mongo/db/query/internal_plans.cpp | 32 +- src/mongo/db/query/internal_plans.h | 10 +- src/mongo/dbtests/query_stage_collscan.cpp | 32 ++ 8 files changed, 223 insertions(+), 367 deletions(-) diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp index 3b82e7a7c46..30ee0c3c7da 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp +++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp @@ -61,18 +61,6 @@ MONGO_FAIL_POINT_DEFINE(failPreimagesCollectionCreation); namespace change_stream_pre_image_helpers { -bool PreImageAttributes::isExpiredPreImage(const boost::optional& preImageExpirationTime, - const Timestamp& earliestOplogEntryTimestamp) { - // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller - // than the 'earliestOplogEntryTimestamp'. - const bool preImageOplogEntryIsDeleted = ts < earliestOplogEntryTimestamp; - const auto expirationTime = preImageExpirationTime.get_value_or(Date_t::min()); - - // Pre-image is expired if its corresponding oplog entry is deleted or its operation - // time is less than or equal to the expiration time. - return preImageOplogEntryIsDeleted || operationTime <= expirationTime; -} - // Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise. boost::optional getExpireAfterSecondsFromChangeStreamOptions( ChangeStreamOptions& changeStreamOptions) { @@ -186,19 +174,49 @@ RecordId toRecordId(ChangeStreamPreImageId id) { BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement()); } +/** + * Finds the next collection UUID in the change stream pre-images collection 'preImagesCollPtr' for + * which collection UUID is greater than 'collectionUUID'. Returns boost::none if the next + * collection is not found. + */ +boost::optional findNextCollectionUUID(OperationContext* opCtx, + const CollectionPtr* preImagesCollPtr, + boost::optional collectionUUID + +) { + BSONObj preImageObj; + auto minRecordId = collectionUUID + ? boost::make_optional(RecordIdBound(toRecordId(ChangeStreamPreImageId( + *collectionUUID, Timestamp::max(), std::numeric_limits::max())))) + : boost::none; + auto planExecutor = + InternalPlanner::collectionScan(opCtx, + preImagesCollPtr, + PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY, + InternalPlanner::Direction::FORWARD, + boost::none /* resumeAfterRecordId */, + std::move(minRecordId)); + if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) { + return boost::none; + } + auto parsedUUID = UUID::parse(preImageObj["_id"].Obj()["nsUUID"]); + tassert(7027400, "Pre-image collection UUID must be of UUID type", parsedUUID.isOK()); + return {std::move(parsedUUID.getValue())}; +} + /** * Scans the 'config.system.preimages' collection and deletes the expired pre-images from it. * * Pre-images are ordered by collection UUID, ie. if UUID of collection A is ordered before UUID of * collection B, then pre-images of collection A will be stored before pre-images of collection B. * - * While scanning the collection for expired pre-images, each pre-image timestamp is compared - * against the 'earliestOplogEntryTimestamp' value. Any pre-image that has a timestamp greater than - * the 'earliestOplogEntryTimestamp' value is not considered for deletion and the cursor seeks to - * the next UUID in the collection. - * - * Seek to the next UUID is done by setting the values of 'Timestamp' and 'ApplyOpsIndex' fields to - * max, ie. (currentPreImage.nsUUID, Timestamp::max(), ApplyOpsIndex::max()). + * Pre-images are considered expired based on expiration parameter. In case when expiration + * parameter is not set a pre-image is considered expired if its timestamp is smaller than the + * timestamp of the earliest oplog entry. In case when expiration parameter is specified, aside from + * timestamp check a check on the wall clock time of the pre-image recording ('operationTime') is + * performed. If the difference between 'currentTimeForTimeBasedExpiration' and 'operationTime' is + * larger than expiration parameter, the pre-image is considered expired. One of those two + * conditions must be true for a pre-image to be eligible for deletion. * * +-------------------------+ * | config.system.preimages | @@ -213,257 +231,116 @@ RecordId toRecordId(ChangeStreamPreImageId id) { * | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 | * +-------------------+ +-------------------+ +-------------------+ +-------------------+ */ -class ChangeStreamExpiredPreImageIterator { -public: - // Iterator over the range of pre-image documents, where each range defines a set of expired - // pre-image documents of one collection eligible for deletion due to expiration. Lower and - // upper bounds of a range are inclusive. - class Iterator { - public: - using RecordIdRange = std::pair; - - Iterator(OperationContext* opCtx, - const CollectionPtr* preImagesCollPtr, - Timestamp earliestOplogEntryTimestamp, - boost::optional preImageExpirationTime, - bool isEndIterator = false) - : _opCtx(opCtx), - _preImagesCollPtr(preImagesCollPtr), - _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp), - _preImageExpirationTime(preImageExpirationTime) { - if (!isEndIterator) { - advance(); - } - } - - const RecordIdRange& operator*() const { - return _currentExpiredPreImageRange; - } - - const RecordIdRange* operator->() const { - return &_currentExpiredPreImageRange; - } - - Iterator& operator++() { - advance(); - return *this; - } - - // Both iterators are equal if they are both pointing to the same expired pre-image range. - friend bool operator==(const Iterator& a, const Iterator& b) { - return a._currentExpiredPreImageRange == b._currentExpiredPreImageRange; - }; - - friend bool operator!=(const Iterator& a, const Iterator& b) { - return !(a == b); - }; - - private: - // Scans the pre-images collection and gets the next expired pre-image range or sets - // '_currentExpiredPreImageRange' to the range with empty record ids in case there are no - // more expired pre-images left. - void advance() { - const auto getNextPreImageAttributes = - [&](std::unique_ptr& planExecutor) - -> boost::optional { - BSONObj preImageObj; - if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) { - return boost::none; - } - - auto preImage = - ChangeStreamPreImage::parse(IDLParserContext("pre-image"), preImageObj); - return {{std::move(preImage.getId().getNsUUID()), - std::move(preImage.getId().getTs()), - std::move(preImage.getOperationTime())}}; - }; - - while (true) { - // Fetch the first pre-image from the next collection, that has pre-images enabled. - auto planExecutor = _previousCollectionUUID - ? createCollectionScan(RecordIdBound( - toRecordId(ChangeStreamPreImageId(*_previousCollectionUUID, - Timestamp::max(), - std::numeric_limits::max())))) - : createCollectionScan(boost::none); - auto preImageAttributes = getNextPreImageAttributes(planExecutor); - - // If there aren't any pre-images left, set the range to the empty record ids and - // return. - if (!preImageAttributes) { - _currentExpiredPreImageRange = std::pair(RecordId(), RecordId()); - return; - } - const auto currentCollectionUUID = preImageAttributes->collectionUUID; - _previousCollectionUUID = currentCollectionUUID; - - // If the first pre-image in the current collection is not expired, fetch the first - // pre-image from the next collection. - if (!preImageAttributes->isExpiredPreImage(_preImageExpirationTime, - _earliestOplogEntryTimestamp)) { - continue; - } - - // If an expired pre-image is found, compute the max expired pre-image RecordId for - // this collection depending on the expiration parameter being set. - const auto minKey = - toRecordId(ChangeStreamPreImageId(currentCollectionUUID, Timestamp(), 0)); - RecordId maxKey; - if (_preImageExpirationTime) { - // Reset the collection scan to start one increment before the - // '_earliestOplogEntryTimestamp', as the pre-images with smaller or equal - // timestamp are guaranteed to be expired. - Timestamp lastExpiredPreimageTs(_earliestOplogEntryTimestamp.asULL() - 1); - auto planExecutor = createCollectionScan(RecordIdBound( - toRecordId(ChangeStreamPreImageId(currentCollectionUUID, - lastExpiredPreimageTs, - std::numeric_limits::max())))); - - // Iterate over all the expired pre-images in the collection in order to find - // the max RecordId. - while ((preImageAttributes = getNextPreImageAttributes(planExecutor)) && - preImageAttributes->isExpiredPreImage(_preImageExpirationTime, - _earliestOplogEntryTimestamp) && - preImageAttributes->collectionUUID == currentCollectionUUID) { - lastExpiredPreimageTs = preImageAttributes->ts; - } - - maxKey = - toRecordId(ChangeStreamPreImageId(currentCollectionUUID, - lastExpiredPreimageTs, - std::numeric_limits::max())); - } else { - // If the expiration parameter is not set, then the last expired pre-image - // timestamp equals to one increment before the '_earliestOplogEntryTimestamp'. - maxKey = toRecordId( - ChangeStreamPreImageId(currentCollectionUUID, - Timestamp(_earliestOplogEntryTimestamp.asULL() - 1), - std::numeric_limits::max())); - } - tassert(6138300, - "Max key of the expired pre-image range has to be valid", - maxKey.isValid()); - _currentExpiredPreImageRange = std::pair(minKey, maxKey); - return; - } - } - - // Set up the new collection scan to start from the 'minKey'. - std::unique_ptr createCollectionScan( - boost::optional minKey) const { - return InternalPlanner::collectionScan(_opCtx, - _preImagesCollPtr, - PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY, - InternalPlanner::Direction::FORWARD, - boost::none, - minKey); - } - - OperationContext* _opCtx; - const CollectionPtr* _preImagesCollPtr; - RecordIdRange _currentExpiredPreImageRange; - boost::optional _previousCollectionUUID; - const Timestamp _earliestOplogEntryTimestamp; - - // The pre-images with operation time less than or equal to the '_preImageExpirationTime' - // are considered expired. - const boost::optional _preImageExpirationTime; - }; - - ChangeStreamExpiredPreImageIterator( - OperationContext* opCtx, - const CollectionPtr* preImagesCollPtr, - const Timestamp earliestOplogEntryTimestamp, - const boost::optional preImageExpirationTime = boost::none) - : _opCtx(opCtx), - _preImagesCollPtr(preImagesCollPtr), - _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp), - _preImageExpirationTime(preImageExpirationTime) {} - - Iterator begin() const { - return Iterator( - _opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp, _preImageExpirationTime); +size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx, + Date_t currentTimeForTimeBasedExpiration) { + // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection + // doesn't exist. + // TODO SERVER-66642 Account for multitenancy. + AutoGetCollection autoColl( + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX); + const auto& preImagesColl = autoColl.getCollection(); + if (!preImagesColl) { + return 0; } - Iterator end() const { - return Iterator(_opCtx, - _preImagesCollPtr, - _earliestOplogEntryTimestamp, - _preImageExpirationTime, - true /*isEndIterator*/); + // Do not run the job on secondaries. + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, NamespaceString::kConfigDb)) { + return 0; } -private: - OperationContext* _opCtx; - const CollectionPtr* _preImagesCollPtr; - const Timestamp _earliestOplogEntryTimestamp; - const boost::optional _preImageExpirationTime; -}; + // Get the timestamp of the earliest oplog entry. + const auto currentEarliestOplogEntryTs = + repl::StorageInterface::get(opCtx->getServiceContext())->getEarliestOplogTimestamp(opCtx); + + const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); + size_t numberOfRemovals = 0; + const auto preImageExpirationTime = change_stream_pre_image_helpers::getPreImageExpirationTime( + opCtx, currentTimeForTimeBasedExpiration); + + // Configure the filter for the case when expiration parameter is set. + OrMatchExpression filter; + const MatchExpression* filterPtr = nullptr; + if (preImageExpirationTime) { + filter.add( + std::make_unique("_id.ts"_sd, Value(currentEarliestOplogEntryTs))); + filter.add(std::make_unique("operationTime"_sd, + Value(*preImageExpirationTime))); + filterPtr = &filter; + } + const bool shouldReturnEofOnFilterMismatch = preImageExpirationTime.has_value(); + + // TODO SERVER-66642 Account for multitenancy. + boost::optional currentCollectionUUID = boost::none; + while ((currentCollectionUUID = + findNextCollectionUUID(opCtx, &preImagesColl, currentCollectionUUID))) { + writeConflictRetry( + opCtx, + "ChangeStreamExpiredPreImagesRemover", + NamespaceString::makePreImageCollectionNSS(boost::none).ns(), + [&] { + auto params = std::make_unique(); + params->isMulti = true; + + std::unique_ptr batchedDeleteParams; + if (isBatchedRemoval) { + batchedDeleteParams = std::make_unique(); + } + RecordIdBound minRecordId( + toRecordId(ChangeStreamPreImageId(*currentCollectionUUID, Timestamp(), 0))); + + // If the expiration parameter is set, the 'maxRecord' is set to the maximum + // RecordId for this collection. Whether the pre-image has to be deleted will be + // determined by the filtering MatchExpression. + // + // If the expiration parameter is not set, then the last expired pre-image timestamp + // equals to one increment before the 'currentEarliestOplogEntryTs'. + RecordIdBound maxRecordId = RecordIdBound(toRecordId(ChangeStreamPreImageId( + *currentCollectionUUID, + preImageExpirationTime ? Timestamp::max() + : Timestamp(currentEarliestOplogEntryTs.asULL() - 1), + std::numeric_limits::max()))); + + auto exec = InternalPlanner::deleteWithCollectionScan( + opCtx, + &preImagesColl, + std::move(params), + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD, + std::move(minRecordId), + std::move(maxRecordId), + CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, + std::move(batchedDeleteParams), + filterPtr, + shouldReturnEofOnFilterMismatch); + numberOfRemovals += exec->executeDelete(); + }); + } + return numberOfRemovals; +} +} // namespace -void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) { +void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass( + Client* client) { + Date_t currentTimeForTimeBasedExpiration = Date_t::now(); + changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { + // Populate the current time for time based expiration of pre-images. + if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { + const BSONType bsonType = currentTimeElem.type(); + tassert(5869300, + str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is " + "'date', but found: " + << bsonType, + bsonType == BSONType::Date); + currentTimeForTimeBasedExpiration = currentTimeElem.Date(); + } + }); const auto startTime = Date_t::now(); ServiceContext::UniqueOperationContext opCtx; try { opCtx = client->makeOperationContext(); - - // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection - // doesn't exist. - AutoGetCollection autoColl( - opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); - const auto& preImagesColl = autoColl.getCollection(); - if (!preImagesColl) { - return; - } - - // Do not run the job on secondaries. - if (!repl::ReplicationCoordinator::get(opCtx.get()) - ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) { - return; - } - - // Get the timestamp of the earliest oplog entry. - const auto currentEarliestOplogEntryTs = - repl::StorageInterface::get(client->getServiceContext()) - ->getEarliestOplogTimestamp(opCtx.get()); - - const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); - size_t numberOfRemovals = 0; - - ChangeStreamExpiredPreImageIterator expiredPreImages( - opCtx.get(), - &preImagesColl, - currentEarliestOplogEntryTs, - change_stream_pre_image_helpers::getPreImageExpirationTime( - opCtx.get(), currentTimeForTimeBasedExpiration)); - - for (const auto& collectionRange : expiredPreImages) { - writeConflictRetry( - opCtx.get(), - "ChangeStreamExpiredPreImagesRemover", - NamespaceString::kChangeStreamPreImagesNamespace.ns(), - [&] { - auto params = std::make_unique(); - params->isMulti = true; - - std::unique_ptr batchedDeleteParams; - if (isBatchedRemoval) { - batchedDeleteParams = std::make_unique(); - } - - auto exec = InternalPlanner::deleteWithCollectionScan( - opCtx.get(), - &preImagesColl, - std::move(params), - PlanYieldPolicy::YieldPolicy::YIELD_AUTO, - InternalPlanner::Direction::FORWARD, - RecordIdBound(collectionRange.first), - RecordIdBound(collectionRange.second), - CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, - std::move(batchedDeleteParams)); - numberOfRemovals += exec->executeDelete(); - }); - } - + auto numberOfRemovals = + deleteExpiredChangeStreamPreImages(opCtx.get(), currentTimeForTimeBasedExpiration); if (numberOfRemovals > 0) { LOGV2_DEBUG(5869104, 3, @@ -472,11 +349,12 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim "jobDuration"_attr = (Date_t::now() - startTime).toString()); } } catch (const DBException& exception) { - if (opCtx && opCtx.get()->getKillStatus() != ErrorCodes::OK) { + Status interruptStatus = opCtx ? opCtx.get()->checkForInterruptNoAssert() : Status::OK(); + if (!interruptStatus.isOK()) { LOGV2_DEBUG(5869105, 3, - "Periodic expired pre-images removal job operation was killed", - "errorCode"_attr = opCtx.get()->getKillStatus()); + "Periodic expired pre-images removal job operation was interrupted", + "errorCode"_attr = interruptStatus); } else { LOGV2_ERROR(5869106, "Periodic expired pre-images removal job failed", @@ -484,26 +362,4 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim } } } -} // namespace - -void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass( - Client* client) { - Date_t currentTimeForTimeBasedExpiration = Date_t::now(); - - changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { - // Populate the current time for time based expiration of pre-images. - if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { - const BSONType bsonType = currentTimeElem.type(); - tassert(5869300, - str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is " - "'date', but found: " - << bsonType, - bsonType == BSONType::Date); - - currentTimeForTimeBasedExpiration = currentTimeElem.Date(); - } - }); - deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); -} - } // namespace mongo diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h index dede0e38c96..4af7ef6a8e9 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.h +++ b/src/mongo/db/change_stream_pre_images_collection_manager.h @@ -39,25 +39,7 @@ namespace mongo { namespace change_stream_pre_image_helpers { - -/** - * Specifies attributes that determines if the pre-image has been expired or not. - */ -struct PreImageAttributes { - mongo::UUID collectionUUID; - Timestamp ts; - Date_t operationTime; - - /** - * Determines if the pre-image is considered expired based on the expiration parameter being - * set. - */ - bool isExpiredPreImage(const boost::optional& preImageExpirationTime, - const Timestamp& earliestOplogEntryTimestamp); -}; - boost::optional getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime); - } // namespace change_stream_pre_image_helpers /** diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index ce38acde2ee..1bee034cd0c 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -413,16 +413,24 @@ PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, // In the future, we could change seekNear() to always return a record after minRecord in the // direction of the scan. However, tailable scans depend on the current behavior in order to // mark their position for resuming the tailable scan later on. - if (!beforeStartOfRange(_params, *member) && Filter::passes(member, _filter)) { - if (_params.stopApplyingFilterAfterFirstMatch) { - _filter = nullptr; - } - *out = memberID; - return PlanStage::ADVANCED; - } else { + if (beforeStartOfRange(_params, *member)) { _workingSet->free(memberID); return PlanStage::NEED_TIME; } + + if (!Filter::passes(member, _filter)) { + _workingSet->free(memberID); + if (_params.shouldReturnEofOnFilterMismatch) { + _commonStats.isEOF = true; + return PlanStage::IS_EOF; + } + return PlanStage::NEED_TIME; + } + if (_params.stopApplyingFilterAfterFirstMatch) { + _filter = nullptr; + } + *out = memberID; + return PlanStage::ADVANCED; } bool CollectionScan::isEOF() { diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index dd701264941..2031fafe8b4 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -111,6 +111,10 @@ struct CollectionScanParams { // Whether or not to wait for oplog visibility on oplog collection scans. bool shouldWaitForOplogVisibility = false; + + // Whether or not to return EOF and stop further scanning once MatchExpression evaluates to + // false. Can only be set to true if the MatchExpression is present. + bool shouldReturnEofOnFilterMismatch = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp index b1bcb82b454..b759c9f68db 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp @@ -62,16 +62,6 @@ public: ASSERT_EQ(changeStreamOptionsManager.setOptions(opCtx, changeStreamOptions).getStatus(), ErrorCodes::OK); } - - bool isExpiredPreImage(const Timestamp& preImageTs, - const Date_t& preImageOperationTime, - const boost::optional& preImageExpirationTime, - const Timestamp& earliestOplogEntryTimestamp) { - change_stream_pre_image_helpers::PreImageAttributes preImageAttributes{ - UUID::gen(), preImageTs, preImageOperationTime}; - return preImageAttributes.isExpiredPreImage(preImageExpirationTime, - earliestOplogEntryTimestamp); - } }; TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithValidIntegralValue) { @@ -108,39 +98,5 @@ TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithOf change_stream_pre_image_helpers::getPreImageExpirationTime(opCtx.get(), currentTime); ASSERT_FALSE(receivedExpireAfterSeconds); } - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderTimestamp) { - ASSERT_TRUE( - isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */, - Date_t::now() /* preImageOperationTime */, - Date_t::now() /* preImageExpirationTime */, - Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */); -} - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldNotHaveExpired) { - ASSERT_FALSE( - isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */, - Date_t::now() + Seconds(1) /* preImageOperationTime */, - Date_t::now() /* preImageExpirationTime */, - Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */); -} - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderOperationTime) { - ASSERT_TRUE( - isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */, - Date_t::now() /* preImageOperationTime */, - Date_t::now() + Seconds(1) /* preImageExpirationTime */, - Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */); -} - -TEST_F(ChangeStreamPreImageExpirationPolicyTest, - preImageShouldNotHaveExpiredWithNullExpirationTime) { - ASSERT_TRUE( - isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */, - Date_t::now() /* preImageOperationTime */, - boost::none /* preImageExpirationTime */, - Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index 110c76addfe..de914d66f27 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -123,7 +123,8 @@ CollectionScanParams createCollectionScanParams( const boost::optional& resumeAfterRecordId, boost::optional minRecord, boost::optional maxRecord, - CollectionScanParams::ScanBoundInclusion boundInclusion) { + CollectionScanParams::ScanBoundInclusion boundInclusion, + bool shouldReturnEofOnFilterMismatch) { const auto& collection = *coll; invariant(collection); @@ -139,6 +140,7 @@ CollectionScanParams createCollectionScanParams( params.direction = CollectionScanParams::BACKWARD; } params.boundInclusion = boundInclusion; + params.shouldReturnEofOnFilterMismatch = shouldReturnEofOnFilterMismatch; return params; } } // namespace @@ -151,7 +153,8 @@ std::unique_ptr InternalPlanner::collection const boost::optional& resumeAfterRecordId, boost::optional minRecord, boost::optional maxRecord, - CollectionScanParams::ScanBoundInclusion boundInclusion) { + CollectionScanParams::ScanBoundInclusion boundInclusion, + bool shouldReturnEofOnFilterMismatch) { const auto& collection = *coll; invariant(collection); @@ -167,7 +170,8 @@ std::unique_ptr InternalPlanner::collection resumeAfterRecordId, minRecord, maxRecord, - boundInclusion); + boundInclusion, + shouldReturnEofOnFilterMismatch); auto cs = _collectionScan(expCtx, ws.get(), &collection, collScanParams); @@ -218,11 +222,19 @@ std::unique_ptr InternalPlanner::deleteWith boost::optional minRecord, boost::optional maxRecord, CollectionScanParams::ScanBoundInclusion boundInclusion, - std::unique_ptr batchedDeleteParams) { + std::unique_ptr batchedDeleteParams, + const MatchExpression* filter, + bool shouldReturnEofOnFilterMismatch) { const auto& collection = *coll; invariant(collection); - auto ws = std::make_unique(); + if (shouldReturnEofOnFilterMismatch) { + tassert(7010801, + "MatchExpression filter must be provided when 'shouldReturnEofOnFilterMismatch' is " + "set to true ", + filter); + } + auto ws = std::make_unique(); auto expCtx = make_intrusive( opCtx, std::unique_ptr(nullptr), collection->ns()); @@ -237,9 +249,10 @@ std::unique_ptr InternalPlanner::deleteWith boost::none /* resumeAfterId */, minRecord, maxRecord, - boundInclusion); + boundInclusion, + shouldReturnEofOnFilterMismatch); - auto root = _collectionScan(expCtx, ws.get(), &collection, collScanParams); + auto root = _collectionScan(expCtx, ws.get(), &collection, collScanParams, filter); if (batchedDeleteParams) { root = std::make_unique(expCtx.get(), @@ -464,12 +477,13 @@ std::unique_ptr InternalPlanner::_collectionScan( const boost::intrusive_ptr& expCtx, WorkingSet* ws, const CollectionPtr* coll, - const CollectionScanParams& params) { + const CollectionScanParams& params, + const MatchExpression* filter) { const auto& collection = *coll; invariant(collection); - return std::make_unique(expCtx.get(), collection, params, ws, nullptr); + return std::make_unique(expCtx.get(), collection, params, ws, filter); } std::unique_ptr InternalPlanner::_indexScan( diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index b0d13bb6d5e..aa1beb31e21 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -83,7 +83,8 @@ public: boost::optional minRecord = boost::none, boost::optional maxRecord = boost::none, CollectionScanParams::ScanBoundInclusion boundInclusion = - CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords); + CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, + bool shouldReturnEofOnFilterMismatch = false); static std::unique_ptr collectionScan( OperationContext* opCtx, @@ -105,7 +106,9 @@ public: boost::optional maxRecord = boost::none, CollectionScanParams::ScanBoundInclusion boundInclusion = CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, - std::unique_ptr batchedDeleteParams = nullptr); + std::unique_ptr batchedDeleteParams = nullptr, + const MatchExpression* filter = nullptr, + bool shouldReturnEofOnFilterMismatch = false); /** * Returns an index scan. Caller owns returned pointer. @@ -200,7 +203,8 @@ private: const boost::intrusive_ptr& expCtx, WorkingSet* ws, const CollectionPtr* collection, - const CollectionScanParams& params); + const CollectionScanParams& params, + const MatchExpression* filter = nullptr); /** * Returns a plan stage that is either an index scan or an index scan with a fetch stage. diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp index fa180a525b6..aec34cd36c6 100644 --- a/src/mongo/dbtests/query_stage_collscan.cpp +++ b/src/mongo/dbtests/query_stage_collscan.cpp @@ -335,6 +335,38 @@ TEST_F(QueryStageCollectionScanTest, QueryStageCollscanBasicBackwardWithMatch) { ASSERT_EQUALS(25, countResults(CollectionScanParams::BACKWARD, obj)); } +TEST_F(QueryStageCollectionScanTest, + QueryTestCollscanStopsScanningOnFilterFailureInClusteredCollectionIfSpecified) { + auto ns = NamespaceString("a.b"); + auto collDeleter = createClusteredCollection(ns, false /* prePopulate */); + for (int i = 1; i <= numObj(); ++i) { + insertDocument(ns, BSON("_id" << i << "foo" << i)); + } + + AutoGetCollectionForRead autoColl(&_opCtx, ns); + const CollectionPtr& coll = autoColl.getCollection(); + ASSERT(coll->isClustered()); + + // Configure the threshold and the expected number of scanned documents. + const int threshold = numObj() / 2; + const int expectedNumberOfScannedDocuments = threshold + 1; + + // Configure the scan. + CollectionScanParams params; + params.shouldReturnEofOnFilterMismatch = true; + WorkingSet ws; + LTEMatchExpression filter{"foo"_sd, Value(threshold)}; + auto scan = std::make_unique(_expCtx.get(), coll, params, &ws, &filter); + + // Scan all matching documents. + WorkingSetID id = WorkingSet::INVALID_ID; + while (!scan->isEOF()) { + scan->work(&id); + } + auto collScanStats = static_cast(scan->getSpecificStats()); + ASSERT_EQUALS(expectedNumberOfScannedDocuments, collScanStats->docsTested); +} + // Get objects in the order we inserted them. TEST_F(QueryStageCollectionScanTest, QueryStageCollscanObjectsInOrderForward) { AutoGetCollectionForReadCommand collection(&_opCtx, nss); -- cgit v1.2.1