summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2022-10-20 13:01:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-04 10:48:19 +0000
commit1d9d6d49f3e713bcdb50896ed412daa3150d7116 (patch)
tree6b15acf38486db03bf84026fdad39947f414db52
parentea4b2d784664f3240b56ad99ba66b33ff4e0330f (diff)
downloadmongo-1d9d6d49f3e713bcdb50896ed412daa3150d7116.tar.gz
SERVER-70274 Improve performance of change stream pre-images purging job by leveraging EOF filter
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.cpp428
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.h18
-rw-r--r--src/mongo/db/exec/collection_scan.cpp22
-rw-r--r--src/mongo/db/exec/collection_scan_common.h4
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp44
-rw-r--r--src/mongo/db/query/internal_plans.cpp32
-rw-r--r--src/mongo/db/query/internal_plans.h10
-rw-r--r--src/mongo/dbtests/query_stage_collscan.cpp32
8 files changed, 223 insertions, 367 deletions
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
index 3b82e7a7c46..30ee0c3c7da 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
@@ -61,18 +61,6 @@ MONGO_FAIL_POINT_DEFINE(failPreimagesCollectionCreation);
namespace change_stream_pre_image_helpers {
-bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
- const Timestamp& earliestOplogEntryTimestamp) {
- // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller
- // than the 'earliestOplogEntryTimestamp'.
- const bool preImageOplogEntryIsDeleted = ts < earliestOplogEntryTimestamp;
- const auto expirationTime = preImageExpirationTime.get_value_or(Date_t::min());
-
- // Pre-image is expired if its corresponding oplog entry is deleted or its operation
- // time is less than or equal to the expiration time.
- return preImageOplogEntryIsDeleted || operationTime <= expirationTime;
-}
-
// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise.
boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
ChangeStreamOptions& changeStreamOptions) {
@@ -187,18 +175,48 @@ RecordId toRecordId(ChangeStreamPreImageId id) {
}
/**
+ * Finds the next collection UUID in the change stream pre-images collection 'preImagesCollPtr' for
+ * which collection UUID is greater than 'collectionUUID'. Returns boost::none if the next
+ * collection is not found.
+ */
+boost::optional<UUID> findNextCollectionUUID(OperationContext* opCtx,
+ const CollectionPtr* preImagesCollPtr,
+ boost::optional<UUID> collectionUUID
+
+) {
+ BSONObj preImageObj;
+ auto minRecordId = collectionUUID
+ ? boost::make_optional(RecordIdBound(toRecordId(ChangeStreamPreImageId(
+ *collectionUUID, Timestamp::max(), std::numeric_limits<int64_t>::max()))))
+ : boost::none;
+ auto planExecutor =
+ InternalPlanner::collectionScan(opCtx,
+ preImagesCollPtr,
+ PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
+ InternalPlanner::Direction::FORWARD,
+ boost::none /* resumeAfterRecordId */,
+ std::move(minRecordId));
+ if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) {
+ return boost::none;
+ }
+ auto parsedUUID = UUID::parse(preImageObj["_id"].Obj()["nsUUID"]);
+ tassert(7027400, "Pre-image collection UUID must be of UUID type", parsedUUID.isOK());
+ return {std::move(parsedUUID.getValue())};
+}
+
+/**
* Scans the 'config.system.preimages' collection and deletes the expired pre-images from it.
*
* Pre-images are ordered by collection UUID, ie. if UUID of collection A is ordered before UUID of
* collection B, then pre-images of collection A will be stored before pre-images of collection B.
*
- * While scanning the collection for expired pre-images, each pre-image timestamp is compared
- * against the 'earliestOplogEntryTimestamp' value. Any pre-image that has a timestamp greater than
- * the 'earliestOplogEntryTimestamp' value is not considered for deletion and the cursor seeks to
- * the next UUID in the collection.
- *
- * Seek to the next UUID is done by setting the values of 'Timestamp' and 'ApplyOpsIndex' fields to
- * max, ie. (currentPreImage.nsUUID, Timestamp::max(), ApplyOpsIndex::max()).
+ * Pre-images are considered expired based on expiration parameter. In case when expiration
+ * parameter is not set a pre-image is considered expired if its timestamp is smaller than the
+ * timestamp of the earliest oplog entry. In case when expiration parameter is specified, aside from
+ * timestamp check a check on the wall clock time of the pre-image recording ('operationTime') is
+ * performed. If the difference between 'currentTimeForTimeBasedExpiration' and 'operationTime' is
+ * larger than expiration parameter, the pre-image is considered expired. One of those two
+ * conditions must be true for a pre-image to be eligible for deletion.
*
* +-------------------------+
* | config.system.preimages |
@@ -213,257 +231,116 @@ RecordId toRecordId(ChangeStreamPreImageId id) {
* | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 |
* +-------------------+ +-------------------+ +-------------------+ +-------------------+
*/
-class ChangeStreamExpiredPreImageIterator {
-public:
- // Iterator over the range of pre-image documents, where each range defines a set of expired
- // pre-image documents of one collection eligible for deletion due to expiration. Lower and
- // upper bounds of a range are inclusive.
- class Iterator {
- public:
- using RecordIdRange = std::pair<RecordId, RecordId>;
-
- Iterator(OperationContext* opCtx,
- const CollectionPtr* preImagesCollPtr,
- Timestamp earliestOplogEntryTimestamp,
- boost::optional<Date_t> preImageExpirationTime,
- bool isEndIterator = false)
- : _opCtx(opCtx),
- _preImagesCollPtr(preImagesCollPtr),
- _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp),
- _preImageExpirationTime(preImageExpirationTime) {
- if (!isEndIterator) {
- advance();
- }
- }
-
- const RecordIdRange& operator*() const {
- return _currentExpiredPreImageRange;
- }
-
- const RecordIdRange* operator->() const {
- return &_currentExpiredPreImageRange;
- }
-
- Iterator& operator++() {
- advance();
- return *this;
- }
-
- // Both iterators are equal if they are both pointing to the same expired pre-image range.
- friend bool operator==(const Iterator& a, const Iterator& b) {
- return a._currentExpiredPreImageRange == b._currentExpiredPreImageRange;
- };
-
- friend bool operator!=(const Iterator& a, const Iterator& b) {
- return !(a == b);
- };
-
- private:
- // Scans the pre-images collection and gets the next expired pre-image range or sets
- // '_currentExpiredPreImageRange' to the range with empty record ids in case there are no
- // more expired pre-images left.
- void advance() {
- const auto getNextPreImageAttributes =
- [&](std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>& planExecutor)
- -> boost::optional<change_stream_pre_image_helpers::PreImageAttributes> {
- BSONObj preImageObj;
- if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) {
- return boost::none;
- }
-
- auto preImage =
- ChangeStreamPreImage::parse(IDLParserContext("pre-image"), preImageObj);
- return {{std::move(preImage.getId().getNsUUID()),
- std::move(preImage.getId().getTs()),
- std::move(preImage.getOperationTime())}};
- };
-
- while (true) {
- // Fetch the first pre-image from the next collection, that has pre-images enabled.
- auto planExecutor = _previousCollectionUUID
- ? createCollectionScan(RecordIdBound(
- toRecordId(ChangeStreamPreImageId(*_previousCollectionUUID,
- Timestamp::max(),
- std::numeric_limits<int64_t>::max()))))
- : createCollectionScan(boost::none);
- auto preImageAttributes = getNextPreImageAttributes(planExecutor);
-
- // If there aren't any pre-images left, set the range to the empty record ids and
- // return.
- if (!preImageAttributes) {
- _currentExpiredPreImageRange = std::pair(RecordId(), RecordId());
- return;
- }
- const auto currentCollectionUUID = preImageAttributes->collectionUUID;
- _previousCollectionUUID = currentCollectionUUID;
-
- // If the first pre-image in the current collection is not expired, fetch the first
- // pre-image from the next collection.
- if (!preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
- _earliestOplogEntryTimestamp)) {
- continue;
- }
-
- // If an expired pre-image is found, compute the max expired pre-image RecordId for
- // this collection depending on the expiration parameter being set.
- const auto minKey =
- toRecordId(ChangeStreamPreImageId(currentCollectionUUID, Timestamp(), 0));
- RecordId maxKey;
- if (_preImageExpirationTime) {
- // Reset the collection scan to start one increment before the
- // '_earliestOplogEntryTimestamp', as the pre-images with smaller or equal
- // timestamp are guaranteed to be expired.
- Timestamp lastExpiredPreimageTs(_earliestOplogEntryTimestamp.asULL() - 1);
- auto planExecutor = createCollectionScan(RecordIdBound(
- toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
- lastExpiredPreimageTs,
- std::numeric_limits<int64_t>::max()))));
-
- // Iterate over all the expired pre-images in the collection in order to find
- // the max RecordId.
- while ((preImageAttributes = getNextPreImageAttributes(planExecutor)) &&
- preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
- _earliestOplogEntryTimestamp) &&
- preImageAttributes->collectionUUID == currentCollectionUUID) {
- lastExpiredPreimageTs = preImageAttributes->ts;
- }
-
- maxKey =
- toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
- lastExpiredPreimageTs,
- std::numeric_limits<int64_t>::max()));
- } else {
- // If the expiration parameter is not set, then the last expired pre-image
- // timestamp equals to one increment before the '_earliestOplogEntryTimestamp'.
- maxKey = toRecordId(
- ChangeStreamPreImageId(currentCollectionUUID,
- Timestamp(_earliestOplogEntryTimestamp.asULL() - 1),
- std::numeric_limits<int64_t>::max()));
- }
- tassert(6138300,
- "Max key of the expired pre-image range has to be valid",
- maxKey.isValid());
- _currentExpiredPreImageRange = std::pair(minKey, maxKey);
- return;
- }
- }
-
- // Set up the new collection scan to start from the 'minKey'.
- std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan(
- boost::optional<RecordIdBound> minKey) const {
- return InternalPlanner::collectionScan(_opCtx,
- _preImagesCollPtr,
- PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
- InternalPlanner::Direction::FORWARD,
- boost::none,
- minKey);
- }
-
- OperationContext* _opCtx;
- const CollectionPtr* _preImagesCollPtr;
- RecordIdRange _currentExpiredPreImageRange;
- boost::optional<UUID> _previousCollectionUUID;
- const Timestamp _earliestOplogEntryTimestamp;
-
- // The pre-images with operation time less than or equal to the '_preImageExpirationTime'
- // are considered expired.
- const boost::optional<Date_t> _preImageExpirationTime;
- };
-
- ChangeStreamExpiredPreImageIterator(
- OperationContext* opCtx,
- const CollectionPtr* preImagesCollPtr,
- const Timestamp earliestOplogEntryTimestamp,
- const boost::optional<Date_t> preImageExpirationTime = boost::none)
- : _opCtx(opCtx),
- _preImagesCollPtr(preImagesCollPtr),
- _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp),
- _preImageExpirationTime(preImageExpirationTime) {}
-
- Iterator begin() const {
- return Iterator(
- _opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp, _preImageExpirationTime);
+size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx,
+ Date_t currentTimeForTimeBasedExpiration) {
+ // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
+ // doesn't exist.
+ // TODO SERVER-66642 Account for multitenancy.
+ AutoGetCollection autoColl(
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX);
+ const auto& preImagesColl = autoColl.getCollection();
+ if (!preImagesColl) {
+ return 0;
}
- Iterator end() const {
- return Iterator(_opCtx,
- _preImagesCollPtr,
- _earliestOplogEntryTimestamp,
- _preImageExpirationTime,
- true /*isEndIterator*/);
+ // Do not run the job on secondaries.
+ if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kConfigDb)) {
+ return 0;
}
-private:
- OperationContext* _opCtx;
- const CollectionPtr* _preImagesCollPtr;
- const Timestamp _earliestOplogEntryTimestamp;
- const boost::optional<Date_t> _preImageExpirationTime;
-};
+ // Get the timestamp of the earliest oplog entry.
+ const auto currentEarliestOplogEntryTs =
+ repl::StorageInterface::get(opCtx->getServiceContext())->getEarliestOplogTimestamp(opCtx);
+
+ const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
+ size_t numberOfRemovals = 0;
+ const auto preImageExpirationTime = change_stream_pre_image_helpers::getPreImageExpirationTime(
+ opCtx, currentTimeForTimeBasedExpiration);
+
+ // Configure the filter for the case when expiration parameter is set.
+ OrMatchExpression filter;
+ const MatchExpression* filterPtr = nullptr;
+ if (preImageExpirationTime) {
+ filter.add(
+ std::make_unique<LTMatchExpression>("_id.ts"_sd, Value(currentEarliestOplogEntryTs)));
+ filter.add(std::make_unique<LTEMatchExpression>("operationTime"_sd,
+ Value(*preImageExpirationTime)));
+ filterPtr = &filter;
+ }
+ const bool shouldReturnEofOnFilterMismatch = preImageExpirationTime.has_value();
+
+ // TODO SERVER-66642 Account for multitenancy.
+ boost::optional<UUID> currentCollectionUUID = boost::none;
+ while ((currentCollectionUUID =
+ findNextCollectionUUID(opCtx, &preImagesColl, currentCollectionUUID))) {
+ writeConflictRetry(
+ opCtx,
+ "ChangeStreamExpiredPreImagesRemover",
+ NamespaceString::makePreImageCollectionNSS(boost::none).ns(),
+ [&] {
+ auto params = std::make_unique<DeleteStageParams>();
+ params->isMulti = true;
+
+ std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams;
+ if (isBatchedRemoval) {
+ batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
+ }
+ RecordIdBound minRecordId(
+ toRecordId(ChangeStreamPreImageId(*currentCollectionUUID, Timestamp(), 0)));
+
+ // If the expiration parameter is set, the 'maxRecord' is set to the maximum
+ // RecordId for this collection. Whether the pre-image has to be deleted will be
+ // determined by the filtering MatchExpression.
+ //
+ // If the expiration parameter is not set, then the last expired pre-image timestamp
+ // equals to one increment before the 'currentEarliestOplogEntryTs'.
+ RecordIdBound maxRecordId = RecordIdBound(toRecordId(ChangeStreamPreImageId(
+ *currentCollectionUUID,
+ preImageExpirationTime ? Timestamp::max()
+ : Timestamp(currentEarliestOplogEntryTs.asULL() - 1),
+ std::numeric_limits<int64_t>::max())));
+
+ auto exec = InternalPlanner::deleteWithCollectionScan(
+ opCtx,
+ &preImagesColl,
+ std::move(params),
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD,
+ std::move(minRecordId),
+ std::move(maxRecordId),
+ CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
+ std::move(batchedDeleteParams),
+ filterPtr,
+ shouldReturnEofOnFilterMismatch);
+ numberOfRemovals += exec->executeDelete();
+ });
+ }
+ return numberOfRemovals;
+}
+} // namespace
-void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) {
+void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass(
+ Client* client) {
+ Date_t currentTimeForTimeBasedExpiration = Date_t::now();
+ changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
+ // Populate the current time for time based expiration of pre-images.
+ if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
+ const BSONType bsonType = currentTimeElem.type();
+ tassert(5869300,
+ str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is "
+ "'date', but found: "
+ << bsonType,
+ bsonType == BSONType::Date);
+ currentTimeForTimeBasedExpiration = currentTimeElem.Date();
+ }
+ });
const auto startTime = Date_t::now();
ServiceContext::UniqueOperationContext opCtx;
try {
opCtx = client->makeOperationContext();
-
- // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
- // doesn't exist.
- AutoGetCollection autoColl(
- opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
- const auto& preImagesColl = autoColl.getCollection();
- if (!preImagesColl) {
- return;
- }
-
- // Do not run the job on secondaries.
- if (!repl::ReplicationCoordinator::get(opCtx.get())
- ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) {
- return;
- }
-
- // Get the timestamp of the earliest oplog entry.
- const auto currentEarliestOplogEntryTs =
- repl::StorageInterface::get(client->getServiceContext())
- ->getEarliestOplogTimestamp(opCtx.get());
-
- const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
- size_t numberOfRemovals = 0;
-
- ChangeStreamExpiredPreImageIterator expiredPreImages(
- opCtx.get(),
- &preImagesColl,
- currentEarliestOplogEntryTs,
- change_stream_pre_image_helpers::getPreImageExpirationTime(
- opCtx.get(), currentTimeForTimeBasedExpiration));
-
- for (const auto& collectionRange : expiredPreImages) {
- writeConflictRetry(
- opCtx.get(),
- "ChangeStreamExpiredPreImagesRemover",
- NamespaceString::kChangeStreamPreImagesNamespace.ns(),
- [&] {
- auto params = std::make_unique<DeleteStageParams>();
- params->isMulti = true;
-
- std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams;
- if (isBatchedRemoval) {
- batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
- }
-
- auto exec = InternalPlanner::deleteWithCollectionScan(
- opCtx.get(),
- &preImagesColl,
- std::move(params),
- PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
- InternalPlanner::Direction::FORWARD,
- RecordIdBound(collectionRange.first),
- RecordIdBound(collectionRange.second),
- CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
- std::move(batchedDeleteParams));
- numberOfRemovals += exec->executeDelete();
- });
- }
-
+ auto numberOfRemovals =
+ deleteExpiredChangeStreamPreImages(opCtx.get(), currentTimeForTimeBasedExpiration);
if (numberOfRemovals > 0) {
LOGV2_DEBUG(5869104,
3,
@@ -472,11 +349,12 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
"jobDuration"_attr = (Date_t::now() - startTime).toString());
}
} catch (const DBException& exception) {
- if (opCtx && opCtx.get()->getKillStatus() != ErrorCodes::OK) {
+ Status interruptStatus = opCtx ? opCtx.get()->checkForInterruptNoAssert() : Status::OK();
+ if (!interruptStatus.isOK()) {
LOGV2_DEBUG(5869105,
3,
- "Periodic expired pre-images removal job operation was killed",
- "errorCode"_attr = opCtx.get()->getKillStatus());
+ "Periodic expired pre-images removal job operation was interrupted",
+ "errorCode"_attr = interruptStatus);
} else {
LOGV2_ERROR(5869106,
"Periodic expired pre-images removal job failed",
@@ -484,26 +362,4 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
}
}
}
-} // namespace
-
-void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass(
- Client* client) {
- Date_t currentTimeForTimeBasedExpiration = Date_t::now();
-
- changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
- // Populate the current time for time based expiration of pre-images.
- if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
- const BSONType bsonType = currentTimeElem.type();
- tassert(5869300,
- str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is "
- "'date', but found: "
- << bsonType,
- bsonType == BSONType::Date);
-
- currentTimeForTimeBasedExpiration = currentTimeElem.Date();
- }
- });
- deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h
index dede0e38c96..4af7ef6a8e9 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.h
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.h
@@ -39,25 +39,7 @@
namespace mongo {
namespace change_stream_pre_image_helpers {
-
-/**
- * Specifies attributes that determines if the pre-image has been expired or not.
- */
-struct PreImageAttributes {
- mongo::UUID collectionUUID;
- Timestamp ts;
- Date_t operationTime;
-
- /**
- * Determines if the pre-image is considered expired based on the expiration parameter being
- * set.
- */
- bool isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
- const Timestamp& earliestOplogEntryTimestamp);
-};
-
boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime);
-
} // namespace change_stream_pre_image_helpers
/**
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index ce38acde2ee..1bee034cd0c 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -413,16 +413,24 @@ PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
// In the future, we could change seekNear() to always return a record after minRecord in the
// direction of the scan. However, tailable scans depend on the current behavior in order to
// mark their position for resuming the tailable scan later on.
- if (!beforeStartOfRange(_params, *member) && Filter::passes(member, _filter)) {
- if (_params.stopApplyingFilterAfterFirstMatch) {
- _filter = nullptr;
- }
- *out = memberID;
- return PlanStage::ADVANCED;
- } else {
+ if (beforeStartOfRange(_params, *member)) {
_workingSet->free(memberID);
return PlanStage::NEED_TIME;
}
+
+ if (!Filter::passes(member, _filter)) {
+ _workingSet->free(memberID);
+ if (_params.shouldReturnEofOnFilterMismatch) {
+ _commonStats.isEOF = true;
+ return PlanStage::IS_EOF;
+ }
+ return PlanStage::NEED_TIME;
+ }
+ if (_params.stopApplyingFilterAfterFirstMatch) {
+ _filter = nullptr;
+ }
+ *out = memberID;
+ return PlanStage::ADVANCED;
}
bool CollectionScan::isEOF() {
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index dd701264941..2031fafe8b4 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -111,6 +111,10 @@ struct CollectionScanParams {
// Whether or not to wait for oplog visibility on oplog collection scans.
bool shouldWaitForOplogVisibility = false;
+
+ // Whether or not to return EOF and stop further scanning once MatchExpression evaluates to
+ // false. Can only be set to true if the MatchExpression is present.
+ bool shouldReturnEofOnFilterMismatch = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
index b1bcb82b454..b759c9f68db 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
@@ -62,16 +62,6 @@ public:
ASSERT_EQ(changeStreamOptionsManager.setOptions(opCtx, changeStreamOptions).getStatus(),
ErrorCodes::OK);
}
-
- bool isExpiredPreImage(const Timestamp& preImageTs,
- const Date_t& preImageOperationTime,
- const boost::optional<Date_t>& preImageExpirationTime,
- const Timestamp& earliestOplogEntryTimestamp) {
- change_stream_pre_image_helpers::PreImageAttributes preImageAttributes{
- UUID::gen(), preImageTs, preImageOperationTime};
- return preImageAttributes.isExpiredPreImage(preImageExpirationTime,
- earliestOplogEntryTimestamp);
- }
};
TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithValidIntegralValue) {
@@ -108,39 +98,5 @@ TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithOf
change_stream_pre_image_helpers::getPreImageExpirationTime(opCtx.get(), currentTime);
ASSERT_FALSE(receivedExpireAfterSeconds);
}
-
-TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderTimestamp) {
- ASSERT_TRUE(
- isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */,
- Date_t::now() /* preImageOperationTime */,
- Date_t::now() /* preImageExpirationTime */,
- Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */);
-}
-
-TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldNotHaveExpired) {
- ASSERT_FALSE(
- isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */,
- Date_t::now() + Seconds(1) /* preImageOperationTime */,
- Date_t::now() /* preImageExpirationTime */,
- Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */);
-}
-
-TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderOperationTime) {
- ASSERT_TRUE(
- isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */,
- Date_t::now() /* preImageOperationTime */,
- Date_t::now() + Seconds(1) /* preImageExpirationTime */,
- Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */);
-}
-
-TEST_F(ChangeStreamPreImageExpirationPolicyTest,
- preImageShouldNotHaveExpiredWithNullExpirationTime) {
- ASSERT_TRUE(
- isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */,
- Date_t::now() /* preImageOperationTime */,
- boost::none /* preImageExpirationTime */,
- Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */);
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index 110c76addfe..de914d66f27 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -123,7 +123,8 @@ CollectionScanParams createCollectionScanParams(
const boost::optional<RecordId>& resumeAfterRecordId,
boost::optional<RecordIdBound> minRecord,
boost::optional<RecordIdBound> maxRecord,
- CollectionScanParams::ScanBoundInclusion boundInclusion) {
+ CollectionScanParams::ScanBoundInclusion boundInclusion,
+ bool shouldReturnEofOnFilterMismatch) {
const auto& collection = *coll;
invariant(collection);
@@ -139,6 +140,7 @@ CollectionScanParams createCollectionScanParams(
params.direction = CollectionScanParams::BACKWARD;
}
params.boundInclusion = boundInclusion;
+ params.shouldReturnEofOnFilterMismatch = shouldReturnEofOnFilterMismatch;
return params;
}
} // namespace
@@ -151,7 +153,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
const boost::optional<RecordId>& resumeAfterRecordId,
boost::optional<RecordIdBound> minRecord,
boost::optional<RecordIdBound> maxRecord,
- CollectionScanParams::ScanBoundInclusion boundInclusion) {
+ CollectionScanParams::ScanBoundInclusion boundInclusion,
+ bool shouldReturnEofOnFilterMismatch) {
const auto& collection = *coll;
invariant(collection);
@@ -167,7 +170,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
resumeAfterRecordId,
minRecord,
maxRecord,
- boundInclusion);
+ boundInclusion,
+ shouldReturnEofOnFilterMismatch);
auto cs = _collectionScan(expCtx, ws.get(), &collection, collScanParams);
@@ -218,11 +222,19 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith
boost::optional<RecordIdBound> minRecord,
boost::optional<RecordIdBound> maxRecord,
CollectionScanParams::ScanBoundInclusion boundInclusion,
- std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams) {
+ std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams,
+ const MatchExpression* filter,
+ bool shouldReturnEofOnFilterMismatch) {
const auto& collection = *coll;
invariant(collection);
- auto ws = std::make_unique<WorkingSet>();
+ if (shouldReturnEofOnFilterMismatch) {
+ tassert(7010801,
+ "MatchExpression filter must be provided when 'shouldReturnEofOnFilterMismatch' is "
+ "set to true ",
+ filter);
+ }
+ auto ws = std::make_unique<WorkingSet>();
auto expCtx = make_intrusive<ExpressionContext>(
opCtx, std::unique_ptr<CollatorInterface>(nullptr), collection->ns());
@@ -237,9 +249,10 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith
boost::none /* resumeAfterId */,
minRecord,
maxRecord,
- boundInclusion);
+ boundInclusion,
+ shouldReturnEofOnFilterMismatch);
- auto root = _collectionScan(expCtx, ws.get(), &collection, collScanParams);
+ auto root = _collectionScan(expCtx, ws.get(), &collection, collScanParams, filter);
if (batchedDeleteParams) {
root = std::make_unique<BatchedDeleteStage>(expCtx.get(),
@@ -464,12 +477,13 @@ std::unique_ptr<PlanStage> InternalPlanner::_collectionScan(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WorkingSet* ws,
const CollectionPtr* coll,
- const CollectionScanParams& params) {
+ const CollectionScanParams& params,
+ const MatchExpression* filter) {
const auto& collection = *coll;
invariant(collection);
- return std::make_unique<CollectionScan>(expCtx.get(), collection, params, ws, nullptr);
+ return std::make_unique<CollectionScan>(expCtx.get(), collection, params, ws, filter);
}
std::unique_ptr<PlanStage> InternalPlanner::_indexScan(
diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h
index b0d13bb6d5e..aa1beb31e21 100644
--- a/src/mongo/db/query/internal_plans.h
+++ b/src/mongo/db/query/internal_plans.h
@@ -83,7 +83,8 @@ public:
boost::optional<RecordIdBound> minRecord = boost::none,
boost::optional<RecordIdBound> maxRecord = boost::none,
CollectionScanParams::ScanBoundInclusion boundInclusion =
- CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords);
+ CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
+ bool shouldReturnEofOnFilterMismatch = false);
static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> collectionScan(
OperationContext* opCtx,
@@ -105,7 +106,9 @@ public:
boost::optional<RecordIdBound> maxRecord = boost::none,
CollectionScanParams::ScanBoundInclusion boundInclusion =
CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
- std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams = nullptr);
+ std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams = nullptr,
+ const MatchExpression* filter = nullptr,
+ bool shouldReturnEofOnFilterMismatch = false);
/**
* Returns an index scan. Caller owns returned pointer.
@@ -200,7 +203,8 @@ private:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WorkingSet* ws,
const CollectionPtr* collection,
- const CollectionScanParams& params);
+ const CollectionScanParams& params,
+ const MatchExpression* filter = nullptr);
/**
* Returns a plan stage that is either an index scan or an index scan with a fetch stage.
diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp
index fa180a525b6..aec34cd36c6 100644
--- a/src/mongo/dbtests/query_stage_collscan.cpp
+++ b/src/mongo/dbtests/query_stage_collscan.cpp
@@ -335,6 +335,38 @@ TEST_F(QueryStageCollectionScanTest, QueryStageCollscanBasicBackwardWithMatch) {
ASSERT_EQUALS(25, countResults(CollectionScanParams::BACKWARD, obj));
}
+TEST_F(QueryStageCollectionScanTest,
+ QueryTestCollscanStopsScanningOnFilterFailureInClusteredCollectionIfSpecified) {
+ auto ns = NamespaceString("a.b");
+ auto collDeleter = createClusteredCollection(ns, false /* prePopulate */);
+ for (int i = 1; i <= numObj(); ++i) {
+ insertDocument(ns, BSON("_id" << i << "foo" << i));
+ }
+
+ AutoGetCollectionForRead autoColl(&_opCtx, ns);
+ const CollectionPtr& coll = autoColl.getCollection();
+ ASSERT(coll->isClustered());
+
+ // Configure the threshold and the expected number of scanned documents.
+ const int threshold = numObj() / 2;
+ const int expectedNumberOfScannedDocuments = threshold + 1;
+
+ // Configure the scan.
+ CollectionScanParams params;
+ params.shouldReturnEofOnFilterMismatch = true;
+ WorkingSet ws;
+ LTEMatchExpression filter{"foo"_sd, Value(threshold)};
+ auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, &filter);
+
+ // Scan all matching documents.
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ while (!scan->isEOF()) {
+ scan->work(&id);
+ }
+ auto collScanStats = static_cast<const CollectionScanStats*>(scan->getSpecificStats());
+ ASSERT_EQUALS(expectedNumberOfScannedDocuments, collScanStats->docsTested);
+}
+
// Get objects in the order we inserted them.
TEST_F(QueryStageCollectionScanTest, QueryStageCollscanObjectsInOrderForward) {
AutoGetCollectionForReadCommand collection(&_opCtx, nss);