summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorcsum112 <catalin.sumanaru@mongodb.com>2022-07-26 13:30:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-26 16:32:49 +0000
commit1dbab74005fb10825d3cd11b796257f0daae1ba7 (patch)
treed6818e95f38cbcb4d2372e55e2d2d8f27871271e /src/mongo
parent7f487523d90cc37f4cfdcd8ba4be03b6f69c2d26 (diff)
downloadmongo-1dbab74005fb10825d3cd11b796257f0daae1ba7.tar.gz
SERVER-66639 Manager class for pre-image collection
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript24
-rw-r--r--src/mongo/db/catalog/create_collection.cpp19
-rw-r--r--src/mongo/db/catalog/create_collection.h6
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.cpp501
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.h96
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/ftdc/SConscript1
-rw-r--r--src/mongo/db/namespace_string.cpp6
-rw-r--r--src/mongo/db/namespace_string.h6
-rw-r--r--src/mongo/db/op_observer/SConscript4
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp20
-rw-r--r--src/mongo/db/pipeline/SConscript28
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp395
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h23
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp9
-rw-r--r--src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp78
-rw-r--r--src/mongo/db/pipeline/change_stream_pre_image_helpers.h42
-rw-r--r--src/mongo/db/repl/SConscript4
-rw-r--r--src/mongo/db/repl/oplog.cpp7
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/update/SConscript1
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/util/SConscript1
-rw-r--r--src/mongo/util/concurrency/SConscript1
-rw-r--r--src/mongo/watchdog/SConscript1
26 files changed, 685 insertions, 598 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 0d434266dbb..f930ce201c0 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -532,6 +532,30 @@ env.Library(
)
env.Library(
+ target='change_stream_pre_images_collection_manager',
+ source=[
+ 'change_stream_pre_images_collection_manager.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_preimage',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
+ '$BUILD_DIR/mongo/db/catalog/clustered_collection_options',
+ '$BUILD_DIR/mongo/db/catalog/collection',
+ '$BUILD_DIR/mongo/db/catalog_raii',
+ '$BUILD_DIR/mongo/db/change_stream_options_manager',
+ '$BUILD_DIR/mongo/db/concurrency/exception_util',
+ '$BUILD_DIR/mongo/db/curop',
+ '$BUILD_DIR/mongo/db/db_raii',
+ '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/query_exec',
+ '$BUILD_DIR/mongo/db/record_id_helpers',
+ '$BUILD_DIR/mongo/db/repl/storage_interface',
+ ],
+)
+
+env.Library(
target='write_block_bypass',
source=[
'write_block_bypass.cpp',
diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp
index b48c504401e..e9f07b88523 100644
--- a/src/mongo/db/catalog/create_collection.cpp
+++ b/src/mongo/db/catalog/create_collection.cpp
@@ -68,7 +68,6 @@ namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(failTimeseriesViewCreation);
-MONGO_FAIL_POINT_DEFINE(failPreimagesCollectionCreation);
MONGO_FAIL_POINT_DEFINE(clusterAllCollectionsByDefault);
using IndexVersion = IndexDescriptor::IndexVersion;
@@ -676,24 +675,6 @@ Status createCollection(OperationContext* opCtx,
return createCollection(opCtx, ns, options, idIndex);
}
-void createChangeStreamPreImagesCollection(OperationContext* opCtx) {
- uassert(5868501,
- "Failpoint failPreimagesCollectionCreation enabled. Throwing exception",
- !MONGO_unlikely(failPreimagesCollectionCreation.shouldFail()));
-
- const auto nss = NamespaceString::kChangeStreamPreImagesNamespace;
- CollectionOptions preImagesCollectionOptions;
-
- // Make the collection clustered by _id.
- preImagesCollectionOptions.clusteredIndex.emplace(
- clustered_util::makeCanonicalClusteredInfoForLegacyFormat());
- const auto status = _createCollection(opCtx, nss, preImagesCollectionOptions, BSONObj());
- uassert(status.code(),
- str::stream() << "Failed to create the pre-images collection: " << nss.coll()
- << causedBy(status.reason()),
- status.isOK() || status.code() == ErrorCodes::NamespaceExists);
-}
-
// TODO SERVER-62395 Pass DatabaseName instead of dbName, and pass to isDbLockedForMode.
Status createCollectionForApplyOps(OperationContext* opCtx,
const std::string& dbName,
diff --git a/src/mongo/db/catalog/create_collection.h b/src/mongo/db/catalog/create_collection.h
index 694f4c6ed36..babef43e890 100644
--- a/src/mongo/db/catalog/create_collection.h
+++ b/src/mongo/db/catalog/create_collection.h
@@ -65,12 +65,6 @@ Status createCollection(OperationContext* opCtx,
const boost::optional<BSONObj>& idIndex);
/**
- * Creates the change stream pre-images collection. The collection is clustered by the primary key,
- * _id.
- */
-void createChangeStreamPreImagesCollection(OperationContext* opCtx);
-
-/**
* As above, but only used by replication to apply operations. This allows recreating collections
* with specific UUIDs (if ui is given). If ui is given and and a collection exists with the same
* name, the existing collection will be renamed to a temporary name if allowRenameOutOfTheWay is
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
new file mode 100644
index 00000000000..d7d8bbfe955
--- /dev/null
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
@@ -0,0 +1,501 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
+
+#include "mongo/base/error_codes.h"
+#include "mongo/db/catalog/clustered_collection_util.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/drop_collection.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_stream_options_manager.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/concurrency/lock_manager_defs.h"
+#include "mongo/db/concurrency/locker.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/fail_point.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+namespace mongo {
+namespace {
+
+// Fail point to set current time for time-based expiration of pre-images.
+MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime);
+MONGO_FAIL_POINT_DEFINE(failPreimagesCollectionCreation);
+} // namespace
+
+namespace change_stream_pre_image_helpers {
+
+bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
+ const Timestamp& earliestOplogEntryTimestamp) {
+ // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller
+ // than the 'earliestOplogEntryTimestamp'.
+ const bool preImageOplogEntryIsDeleted = ts < earliestOplogEntryTimestamp;
+ const auto expirationTime = preImageExpirationTime.get_value_or(Date_t::min());
+
+ // Pre-image is expired if its corresponding oplog entry is deleted or its operation
+ // time is less than or equal to the expiration time.
+ return preImageOplogEntryIsDeleted || operationTime <= expirationTime;
+}
+
+// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise.
+boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
+ ChangeStreamOptions& changeStreamOptions) {
+ const stdx::variant<std::string, std::int64_t>& expireAfterSeconds =
+ changeStreamOptions.getPreAndPostImages().getExpireAfterSeconds();
+
+ if (!stdx::holds_alternative<std::string>(expireAfterSeconds)) {
+ return stdx::get<std::int64_t>(expireAfterSeconds);
+ }
+
+ return boost::none;
+}
+
+// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none
+// otherwise.
+boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) {
+ boost::optional<std::int64_t> expireAfterSeconds = boost::none;
+
+ // Get the expiration time directly from the change stream manager.
+ auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx);
+ expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(changeStreamOptions);
+
+ // A pre-image is eligible for deletion if:
+ // pre-image's op-time + expireAfterSeconds < currentTime.
+ return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds))
+ : boost::none;
+}
+} // namespace change_stream_pre_image_helpers
+
+void ChangeStreamPreImagesCollectionManager::createPreImagesCollection(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ uassert(5868501,
+ "Failpoint failPreimagesCollectionCreation enabled. Throwing exception",
+ !MONGO_unlikely(failPreimagesCollectionCreation.shouldFail()));
+ const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId);
+
+ CollectionOptions preImagesCollectionOptions;
+
+ // Make the collection clustered by _id.
+ preImagesCollectionOptions.clusteredIndex.emplace(
+ clustered_util::makeCanonicalClusteredInfoForLegacyFormat());
+ const auto status = createCollection(
+ opCtx, preImagesCollectionNamespace, preImagesCollectionOptions, BSONObj());
+ uassert(status.code(),
+ str::stream() << "Failed to create the pre-images collection: "
+ << preImagesCollectionNamespace.coll() << causedBy(status.reason()),
+ status.isOK() || status.code() == ErrorCodes::NamespaceExists);
+}
+
+void ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId);
+ DropReply dropReply;
+ const auto status =
+ dropCollection(opCtx,
+ preImagesCollectionNamespace,
+ &dropReply,
+ DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
+ uassert(status.code(),
+ str::stream() << "Failed to drop the pre-images collection: "
+ << preImagesCollectionNamespace.coll() << causedBy(status.reason()),
+ status.isOK() || status.code() == ErrorCodes::NamespaceNotFound);
+}
+
+void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ const ChangeStreamPreImage& preImage) {
+ tassert(6646200,
+ "Expected to be executed in a write unit of work",
+ opCtx->lockState()->inAWriteUnitOfWork());
+ tassert(5869404,
+ str::stream() << "Invalid pre-images document applyOpsIndex: "
+ << preImage.getId().getApplyOpsIndex(),
+ preImage.getId().getApplyOpsIndex() >= 0);
+
+ // This lock acquisition can block on a stronger lock held by another operation modifying
+ // the pre-images collection. There are no known cases where an operation holding an
+ // exclusive lock on the pre-images collection also waits for oplog visibility.
+ AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId);
+ AutoGetCollection preImagesCollectionRaii(
+ opCtx, preImagesCollectionNamespace, LockMode::MODE_IX);
+ auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection();
+ tassert(6646201,
+ "The change stream pre-images collection is not present",
+ changeStreamPreImagesCollection);
+
+ const auto insertionStatus = changeStreamPreImagesCollection->insertDocument(
+ opCtx, InsertStatement{preImage.toBSON()}, &CurOp::get(opCtx)->debug());
+ tassert(5868601,
+ str::stream() << "Attempted to insert a duplicate document into the pre-images "
+ "collection. Pre-image id: "
+ << preImage.getId().toBSON().toString(),
+ insertionStatus != ErrorCodes::DuplicateKey);
+ uassertStatusOK(insertionStatus);
+}
+
+namespace {
+RecordId toRecordId(ChangeStreamPreImageId id) {
+ return record_id_helpers::keyForElem(
+ BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement());
+}
+
+/**
+ * Scans the 'config.system.preimages' collection and deletes the expired pre-images from it.
+ *
+ * Pre-images are ordered by collection UUID, ie. if UUID of collection A is ordered before UUID of
+ * collection B, then pre-images of collection A will be stored before pre-images of collection B.
+ *
+ * While scanning the collection for expired pre-images, each pre-image timestamp is compared
+ * against the 'earliestOplogEntryTimestamp' value. Any pre-image that has a timestamp greater than
+ * the 'earliestOplogEntryTimestamp' value is not considered for deletion and the cursor seeks to
+ * the next UUID in the collection.
+ *
+ * Seek to the next UUID is done by setting the values of 'Timestamp' and 'ApplyOpsIndex' fields to
+ * max, ie. (currentPreImage.nsUUID, Timestamp::max(), ApplyOpsIndex::max()).
+ *
+ * +-------------------------+
+ * | config.system.preimages |
+ * +------------+------------+
+ * |
+ * +--------------------+---------+---------+-----------------------+
+ * | | | |
+ * +-----------+-------+ +----------+--------+ +--------+----------+ +----------+--------+
+ * | collA.preImageA | | collA.preImageB | | collB.preImageC | | collB.preImageD |
+ * +-----------+-------+ +----------+--------+ +---------+---------+ +----------+--------+
+ * | timestamp: 1 | | timestamp: 10 | | timestamp: 5 | | timestamp: 9 |
+ * | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 |
+ * +-------------------+ +-------------------+ +-------------------+ +-------------------+
+ */
+class ChangeStreamExpiredPreImageIterator {
+public:
+ // Iterator over the range of pre-image documents, where each range defines a set of expired
+ // pre-image documents of one collection eligible for deletion due to expiration. Lower and
+ // upper bounds of a range are inclusive.
+ class Iterator {
+ public:
+ using RecordIdRange = std::pair<RecordId, RecordId>;
+
+ Iterator(OperationContext* opCtx,
+ const CollectionPtr* preImagesCollPtr,
+ Timestamp earliestOplogEntryTimestamp,
+ boost::optional<Date_t> preImageExpirationTime,
+ bool isEndIterator = false)
+ : _opCtx(opCtx),
+ _preImagesCollPtr(preImagesCollPtr),
+ _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp),
+ _preImageExpirationTime(preImageExpirationTime) {
+ if (!isEndIterator) {
+ advance();
+ }
+ }
+
+ const RecordIdRange& operator*() const {
+ return _currentExpiredPreImageRange;
+ }
+
+ const RecordIdRange* operator->() const {
+ return &_currentExpiredPreImageRange;
+ }
+
+ Iterator& operator++() {
+ advance();
+ return *this;
+ }
+
+ // Both iterators are equal if they are both pointing to the same expired pre-image range.
+ friend bool operator==(const Iterator& a, const Iterator& b) {
+ return a._currentExpiredPreImageRange == b._currentExpiredPreImageRange;
+ };
+
+ friend bool operator!=(const Iterator& a, const Iterator& b) {
+ return !(a == b);
+ };
+
+ private:
+ // Scans the pre-images collection and gets the next expired pre-image range or sets
+ // '_currentExpiredPreImageRange' to the range with empty record ids in case there are no
+ // more expired pre-images left.
+ void advance() {
+ const auto getNextPreImageAttributes =
+ [&](std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>& planExecutor)
+ -> boost::optional<change_stream_pre_image_helpers::PreImageAttributes> {
+ BSONObj preImageObj;
+ if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) {
+ return boost::none;
+ }
+
+ auto preImage =
+ ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"), preImageObj);
+ return {{std::move(preImage.getId().getNsUUID()),
+ std::move(preImage.getId().getTs()),
+ std::move(preImage.getOperationTime())}};
+ };
+
+ while (true) {
+ // Fetch the first pre-image from the next collection, that has pre-images enabled.
+ auto planExecutor = _previousCollectionUUID
+ ? createCollectionScan(RecordIdBound(
+ toRecordId(ChangeStreamPreImageId(*_previousCollectionUUID,
+ Timestamp::max(),
+ std::numeric_limits<int64_t>::max()))))
+ : createCollectionScan(boost::none);
+ auto preImageAttributes = getNextPreImageAttributes(planExecutor);
+
+ // If there aren't any pre-images left, set the range to the empty record ids and
+ // return.
+ if (!preImageAttributes) {
+ _currentExpiredPreImageRange = std::pair(RecordId(), RecordId());
+ return;
+ }
+ const auto currentCollectionUUID = preImageAttributes->collectionUUID;
+ _previousCollectionUUID = currentCollectionUUID;
+
+ // If the first pre-image in the current collection is not expired, fetch the first
+ // pre-image from the next collection.
+ if (!preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
+ _earliestOplogEntryTimestamp)) {
+ continue;
+ }
+
+ // If an expired pre-image is found, compute the max expired pre-image RecordId for
+ // this collection depending on the expiration parameter being set.
+ const auto minKey =
+ toRecordId(ChangeStreamPreImageId(currentCollectionUUID, Timestamp(), 0));
+ RecordId maxKey;
+ if (_preImageExpirationTime) {
+ // Reset the collection scan to start one increment before the
+ // '_earliestOplogEntryTimestamp', as the pre-images with smaller or equal
+ // timestamp are guaranteed to be expired.
+ Timestamp lastExpiredPreimageTs(_earliestOplogEntryTimestamp.asULL() - 1);
+ auto planExecutor = createCollectionScan(RecordIdBound(
+ toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
+ lastExpiredPreimageTs,
+ std::numeric_limits<int64_t>::max()))));
+
+ // Iterate over all the expired pre-images in the collection in order to find
+ // the max RecordId.
+ while ((preImageAttributes = getNextPreImageAttributes(planExecutor)) &&
+ preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
+ _earliestOplogEntryTimestamp) &&
+ preImageAttributes->collectionUUID == currentCollectionUUID) {
+ lastExpiredPreimageTs = preImageAttributes->ts;
+ }
+
+ maxKey =
+ toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
+ lastExpiredPreimageTs,
+ std::numeric_limits<int64_t>::max()));
+ } else {
+ // If the expiration parameter is not set, then the last expired pre-image
+ // timestamp equals to one increment before the '_earliestOplogEntryTimestamp'.
+ maxKey = toRecordId(
+ ChangeStreamPreImageId(currentCollectionUUID,
+ Timestamp(_earliestOplogEntryTimestamp.asULL() - 1),
+ std::numeric_limits<int64_t>::max()));
+ }
+ tassert(6138300,
+ "Max key of the expired pre-image range has to be valid",
+ maxKey.isValid());
+ _currentExpiredPreImageRange = std::pair(minKey, maxKey);
+ return;
+ }
+ }
+
+ // Set up the new collection scan to start from the 'minKey'.
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan(
+ boost::optional<RecordIdBound> minKey) const {
+ return InternalPlanner::collectionScan(_opCtx,
+ _preImagesCollPtr,
+ PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
+ InternalPlanner::Direction::FORWARD,
+ boost::none,
+ minKey);
+ }
+
+ OperationContext* _opCtx;
+ const CollectionPtr* _preImagesCollPtr;
+ RecordIdRange _currentExpiredPreImageRange;
+ boost::optional<UUID> _previousCollectionUUID;
+ const Timestamp _earliestOplogEntryTimestamp;
+
+ // The pre-images with operation time less than or equal to the '_preImageExpirationTime'
+ // are considered expired.
+ const boost::optional<Date_t> _preImageExpirationTime;
+ };
+
+ ChangeStreamExpiredPreImageIterator(
+ OperationContext* opCtx,
+ const CollectionPtr* preImagesCollPtr,
+ const Timestamp earliestOplogEntryTimestamp,
+ const boost::optional<Date_t> preImageExpirationTime = boost::none)
+ : _opCtx(opCtx),
+ _preImagesCollPtr(preImagesCollPtr),
+ _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp),
+ _preImageExpirationTime(preImageExpirationTime) {}
+
+ Iterator begin() const {
+ return Iterator(
+ _opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp, _preImageExpirationTime);
+ }
+
+ Iterator end() const {
+ return Iterator(_opCtx,
+ _preImagesCollPtr,
+ _earliestOplogEntryTimestamp,
+ _preImageExpirationTime,
+ true /*isEndIterator*/);
+ }
+
+private:
+ OperationContext* _opCtx;
+ const CollectionPtr* _preImagesCollPtr;
+ const Timestamp _earliestOplogEntryTimestamp;
+ const boost::optional<Date_t> _preImageExpirationTime;
+};
+
+void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) {
+ const auto startTime = Date_t::now();
+ ServiceContext::UniqueOperationContext opCtx;
+ try {
+ opCtx = client->makeOperationContext();
+
+ // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
+ // doesn't exist.
+ AutoGetCollection autoColl(
+ opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
+ const auto& preImagesColl = autoColl.getCollection();
+ if (!preImagesColl) {
+ return;
+ }
+
+ // Do not run the job on secondaries.
+ if (!repl::ReplicationCoordinator::get(opCtx.get())
+ ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) {
+ return;
+ }
+
+ // Get the timestamp of the earliest oplog entry.
+ const auto currentEarliestOplogEntryTs =
+ repl::StorageInterface::get(client->getServiceContext())
+ ->getEarliestOplogTimestamp(opCtx.get());
+
+ const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
+ size_t numberOfRemovals = 0;
+
+ ChangeStreamExpiredPreImageIterator expiredPreImages(
+ opCtx.get(),
+ &preImagesColl,
+ currentEarliestOplogEntryTs,
+ change_stream_pre_image_helpers::getPreImageExpirationTime(
+ opCtx.get(), currentTimeForTimeBasedExpiration));
+
+ for (const auto& collectionRange : expiredPreImages) {
+ writeConflictRetry(
+ opCtx.get(),
+ "ChangeStreamExpiredPreImagesRemover",
+ NamespaceString::kChangeStreamPreImagesNamespace.ns(),
+ [&] {
+ auto params = std::make_unique<DeleteStageParams>();
+ params->isMulti = true;
+
+ std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams;
+ if (isBatchedRemoval) {
+ batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
+ }
+
+ auto exec = InternalPlanner::deleteWithCollectionScan(
+ opCtx.get(),
+ &preImagesColl,
+ std::move(params),
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD,
+ RecordIdBound(collectionRange.first),
+ RecordIdBound(collectionRange.second),
+ CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
+ std::move(batchedDeleteParams));
+ numberOfRemovals += exec->executeDelete();
+ });
+ }
+
+ if (numberOfRemovals > 0) {
+ LOGV2_DEBUG(5869104,
+ 3,
+ "Periodic expired pre-images removal job finished executing",
+ "numberOfRemovals"_attr = numberOfRemovals,
+ "jobDuration"_attr = (Date_t::now() - startTime).toString());
+ }
+ } catch (const DBException& exception) {
+ if (opCtx && opCtx.get()->getKillStatus() != ErrorCodes::OK) {
+ LOGV2_DEBUG(5869105,
+ 3,
+ "Periodic expired pre-images removal job operation was killed",
+ "errorCode"_attr = opCtx.get()->getKillStatus());
+ } else {
+ LOGV2_ERROR(5869106,
+ "Periodic expired pre-images removal job failed",
+ "reason"_attr = exception.reason());
+ }
+ }
+}
+} // namespace
+
+void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass(
+ Client* client) {
+ Date_t currentTimeForTimeBasedExpiration = Date_t::now();
+
+ changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
+ // Populate the current time for time based expiration of pre-images.
+ if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
+ const BSONType bsonType = currentTimeElem.type();
+ tassert(5869300,
+ str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is "
+ "'date', but found: "
+ << bsonType,
+ bsonType == BSONType::Date);
+
+ currentTimeForTimeBasedExpiration = currentTimeElem.Date();
+ }
+ });
+ deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h
new file mode 100644
index 00000000000..75efb28c22d
--- /dev/null
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional/optional.hpp>
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/change_stream_preimage_gen.h"
+#include "mongo/db/tenant_id.h"
+#include "mongo/util/background.h"
+
+namespace mongo {
+namespace change_stream_pre_image_helpers {
+
+/**
+ * Specifies attributes that determines if the pre-image has been expired or not.
+ */
+struct PreImageAttributes {
+ mongo::UUID collectionUUID;
+ Timestamp ts;
+ Date_t operationTime;
+
+ /**
+ * Determines if the pre-image is considered expired based on the expiration parameter being
+ * set.
+ */
+ bool isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
+ const Timestamp& earliestOplogEntryTimestamp);
+};
+
+boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime);
+
+} // namespace change_stream_pre_image_helpers
+
+/**
+ * Manages the lifecycle of the change stream pre-images collection(s). Also is responsible for
+ * inserting the pre-images into the pre-images collection.
+ */
+class ChangeStreamPreImagesCollectionManager {
+public:
+ /**
+ * Creates the pre-images collection, clustered by the primary key '_id'. The collection is
+ * created for the specific tenant if the 'tenantId' is specified.
+ */
+ static void createPreImagesCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId);
+
+ /**
+ * Drops the pre-images collection. The collection is dropped for the specific tenant if
+ * the 'tenantId' is specified.
+ */
+ static void dropPreImagesCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId);
+
+ /**
+ * Inserts the document into the pre-images collection. The document is inserted into the
+ * tenant's pre-images collection if the 'tenantId' is specified.
+ */
+ static void insertPreImage(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ const ChangeStreamPreImage& preImage);
+
+ /**
+ * Scans the system pre-images collection and deletes the expired pre-images from it.
+ */
+ static void performExpiredChangeStreamPreImagesRemovalPass(Client* client);
+};
+} // namespace mongo
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 7beeeb4cc6a..c327ed5851a 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -561,6 +561,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/index_key_validate',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
'$BUILD_DIR/mongo/db/cluster_transaction_api',
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
diff --git a/src/mongo/db/ftdc/SConscript b/src/mongo/db/ftdc/SConscript
index ccbb8d4d8e3..98e530daa44 100644
--- a/src/mongo/db/ftdc/SConscript
+++ b/src/mongo/db/ftdc/SConscript
@@ -107,6 +107,7 @@ env.CppUnitTest(
'varint_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/util/clock_source_mock',
'ftdc',
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 88a821840c3..af904acf05c 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -318,6 +318,12 @@ NamespaceString NamespaceString::makeChangeCollectionNSS(
return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName};
}
+NamespaceString NamespaceString::makePreImageCollectionNSS(
+ const boost::optional<TenantId>& tenantId) {
+ return tenantId ? NamespaceString(tenantId, kConfigDb, "system.preimages")
+ : kChangeStreamPreImagesNamespace;
+}
+
std::string NamespaceString::getSisterNS(StringData local) const {
verify(local.size() && local[0] != '.');
return db().toString() + "." + local.toString();
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index ac19873b8ab..631352569d8 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -318,6 +318,12 @@ public:
static NamespaceString makeChangeCollectionNSS(const boost::optional<TenantId>& tenantId);
/**
+ * Constructs the pre-images collection namespace for a tenant if the 'tenantId' is specified,
+ * otherwise creates a default pre-images collection namespace.
+ */
+ static NamespaceString makePreImageCollectionNSS(const boost::optional<TenantId>& tenantId);
+
+ /**
* Constructs a NamespaceString representing a listCollections namespace. The format for this
* namespace is "<dbName>.$cmd.listCollections".
*/
diff --git a/src/mongo/db/op_observer/SConscript b/src/mongo/db/op_observer/SConscript
index 7042f75bc9b..91447ba89c1 100644
--- a/src/mongo/db/op_observer/SConscript
+++ b/src/mongo/db/op_observer/SConscript
@@ -56,6 +56,9 @@ env.Library(
source=[
"op_observer_impl.cpp",
],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ ],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/batched_write_context',
@@ -68,7 +71,6 @@ env.Library(
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/multitenancy',
- '$BUILD_DIR/mongo/db/pipeline/change_stream_pre_image_helpers',
'$BUILD_DIR/mongo/db/pipeline/change_stream_preimage',
'$BUILD_DIR/mongo/db/read_write_concern_defaults',
'$BUILD_DIR/mongo/db/repl/image_collection_entry',
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
index 055c0fc5150..1c87f872769 100644
--- a/src/mongo/db/op_observer/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/import_collection_oplog_entry_gen.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/exception_util.h"
@@ -56,7 +57,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer/op_observer_util.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/change_stream_pre_image_helpers.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/read_write_concern_defaults.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
@@ -854,7 +854,11 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
ChangeStreamPreImageId id(args.uuid, opTime.writeOpTime.getTimestamp(), 0);
ChangeStreamPreImage preImage(id, opTime.wallClockTime, preImageDoc.get());
- writeToChangeStreamPreImagesCollection(opCtx, preImage);
+
+ // TODO SERVER-66643 Pass tenant id to the pre-images collection if running in the
+ // serverless.
+ ChangeStreamPreImagesCollectionManager::insertPreImage(
+ opCtx, /* tenantId */ boost::none, preImage);
}
SessionTxnRecord sessionTxnRecord;
@@ -1058,7 +1062,11 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
ChangeStreamPreImageId id(uuid, opTime.writeOpTime.getTimestamp(), 0);
ChangeStreamPreImage preImage(id, opTime.wallClockTime, *args.deletedDoc);
- writeToChangeStreamPreImagesCollection(opCtx, preImage);
+
+ // TODO SERVER-66643 Pass tenant id to the pre-images collection if running in the
+ // serverless.
+ ChangeStreamPreImagesCollectionManager::insertPreImage(
+ opCtx, /* tenantId */ boost::none, preImage);
}
SessionTxnRecord sessionTxnRecord;
@@ -1483,8 +1491,12 @@ void writeChangeStreamPreImagesForApplyOpsEntries(
!operation.getNss().isTemporaryReshardingCollection()) {
invariant(operation.getUuid());
invariant(!operation.getPreImage().isEmpty());
- writeToChangeStreamPreImagesCollection(
+
+ // TODO SERVER-66643 Pass tenant id to the pre-images collection if running in the
+ // serverless.
+ ChangeStreamPreImagesCollectionManager::insertPreImage(
opCtx,
+ /* tenantId */ boost::none,
ChangeStreamPreImage{
ChangeStreamPreImageId{*operation.getUuid(), applyOpsTimestamp, applyOpsIndex},
operationTime,
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 169a1e816bd..097e61f218a 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -504,33 +504,10 @@ env.Library(
source=[
'change_stream_expired_pre_image_remover.cpp',
],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/change_stream_options_manager',
- '$BUILD_DIR/mongo/db/concurrency/exception_util',
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
'$BUILD_DIR/mongo/db/db_raii',
- '$BUILD_DIR/mongo/db/query_exec',
- '$BUILD_DIR/mongo/db/record_id_helpers',
- '$BUILD_DIR/mongo/db/repl/oplog_entry',
- '$BUILD_DIR/mongo/db/repl/storage_interface',
'$BUILD_DIR/mongo/util/periodic_runner',
- 'change_stream_preimage',
- ],
-)
-
-env.Library(
- target='change_stream_pre_image_helpers',
- source=[
- 'change_stream_pre_image_helpers.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/service_context',
- 'change_stream_preimage',
- ],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/db/catalog_raii',
- '$BUILD_DIR/mongo/db/concurrency/lock_manager_defs',
- '$BUILD_DIR/mongo/db/dbhelpers',
- '$BUILD_DIR/mongo/db/namespace_string',
],
)
@@ -664,6 +641,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/change_stream_options',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
'$BUILD_DIR/mongo/db/cst/cst',
'$BUILD_DIR/mongo/db/exec/document_value/document_value',
'$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util',
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index 81931be466c..f9c59747448 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -33,407 +33,19 @@
#include "change_stream_expired_pre_image_remover.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/catalog/collection.h"
-#include "mongo/db/change_stream_options_manager.h"
-#include "mongo/db/client.h"
-#include "mongo/db/concurrency/exception_util.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/change_stream_preimage_gen.h"
-#include "mongo/db/query/internal_plans.h"
-#include "mongo/db/record_id_helpers.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/storage_interface.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/storage/storage_parameters_gen.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/logv2/log.h"
#include "mongo/util/background.h"
#include "mongo/util/concurrency/idle_thread_block.h"
-#include "mongo/util/fail_point.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
-
namespace mongo {
-// Fail point to set current time for time-based expiration of pre-images.
-MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime);
-
-namespace preImageRemoverInternal {
-
-bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
- const Timestamp& earliestOplogEntryTimestamp) {
- // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller
- // than the 'earliestOplogEntryTimestamp'.
- const bool preImageOplogEntryIsDeleted = ts < earliestOplogEntryTimestamp;
- const auto expirationTime = preImageExpirationTime.get_value_or(Date_t::min());
-
- // Pre-image is expired if its corresponding oplog entry is deleted or its operation
- // time is less than or equal to the expiration time.
- return preImageOplogEntryIsDeleted || operationTime <= expirationTime;
-}
-
-// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise.
-boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
- ChangeStreamOptions& changeStreamOptions) {
- const stdx::variant<std::string, std::int64_t>& expireAfterSeconds =
- changeStreamOptions.getPreAndPostImages().getExpireAfterSeconds();
-
- if (!stdx::holds_alternative<std::string>(expireAfterSeconds)) {
- return stdx::get<std::int64_t>(expireAfterSeconds);
- }
-
- return boost::none;
-}
-
-// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none
-// otherwise.
-boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) {
- boost::optional<std::int64_t> expireAfterSeconds = boost::none;
-
- // Get the expiration time directly from the change stream manager.
- auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx);
- expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(changeStreamOptions);
-
- // A pre-image is eligible for deletion if:
- // pre-image's op-time + expireAfterSeconds < currentTime.
- return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds))
- : boost::none;
-}
-
-} // namespace preImageRemoverInternal
-
namespace {
-
-RecordId toRecordId(ChangeStreamPreImageId id) {
- return record_id_helpers::keyForElem(
- BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement());
-}
-
-/**
- * Scans the 'config.system.preimages' collection and deletes the expired pre-images from it.
- *
- * Pre-images are ordered by collection UUID, ie. if UUID of collection A is ordered before UUID of
- * collection B, then pre-images of collection A will be stored before pre-images of collection B.
- *
- * While scanning the collection for expired pre-images, each pre-image timestamp is compared
- * against the 'earliestOplogEntryTimestamp' value. Any pre-image that has a timestamp greater than
- * the 'earliestOplogEntryTimestamp' value is not considered for deletion and the cursor seeks to
- * the next UUID in the collection.
- *
- * Seek to the next UUID is done by setting the values of 'Timestamp' and 'ApplyOpsIndex' fields to
- * max, ie. (currentPreImage.nsUUID, Timestamp::max(), ApplyOpsIndex::max()).
- *
- * +-------------------------+
- * | config.system.preimages |
- * +------------+------------+
- * |
- * +--------------------+---------+---------+-----------------------+
- * | | | |
- * +-----------+-------+ +----------+--------+ +--------+----------+ +----------+--------+
- * | collA.preImageA | | collA.preImageB | | collB.preImageC | | collB.preImageD |
- * +-----------+-------+ +----------+--------+ +---------+---------+ +----------+--------+
- * | timestamp: 1 | | timestamp: 10 | | timestamp: 5 | | timestamp: 9 |
- * | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 |
- * +-------------------+ +-------------------+ +-------------------+ +-------------------+
- */
-class ChangeStreamExpiredPreImageIterator {
-public:
- // Iterator over the range of pre-image documents, where each range defines a set of expired
- // pre-image documents of one collection eligible for deletion due to expiration. Lower and
- // upper bounds of a range are inclusive.
- class Iterator {
- public:
- using RecordIdRange = std::pair<RecordId, RecordId>;
-
- Iterator(OperationContext* opCtx,
- const CollectionPtr* preImagesCollPtr,
- Timestamp earliestOplogEntryTimestamp,
- boost::optional<Date_t> preImageExpirationTime,
- bool isEndIterator = false)
- : _opCtx(opCtx),
- _preImagesCollPtr(preImagesCollPtr),
- _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp),
- _preImageExpirationTime(preImageExpirationTime) {
- if (!isEndIterator) {
- advance();
- }
- }
-
- const RecordIdRange& operator*() const {
- return _currentExpiredPreImageRange;
- }
-
- const RecordIdRange* operator->() const {
- return &_currentExpiredPreImageRange;
- }
-
- Iterator& operator++() {
- advance();
- return *this;
- }
-
- // Both iterators are equal if they are both pointing to the same expired pre-image range.
- friend bool operator==(const Iterator& a, const Iterator& b) {
- return a._currentExpiredPreImageRange == b._currentExpiredPreImageRange;
- };
-
- friend bool operator!=(const Iterator& a, const Iterator& b) {
- return !(a == b);
- };
-
- private:
- // Scans the pre-images collection and gets the next expired pre-image range or sets
- // '_currentExpiredPreImageRange' to the range with empty record ids in case there are no
- // more expired pre-images left.
- void advance() {
- const auto getNextPreImageAttributes =
- [&](std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>& planExecutor)
- -> boost::optional<preImageRemoverInternal::PreImageAttributes> {
- BSONObj preImageObj;
- if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) {
- return boost::none;
- }
-
- auto preImage =
- ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"), preImageObj);
- return {{std::move(preImage.getId().getNsUUID()),
- std::move(preImage.getId().getTs()),
- std::move(preImage.getOperationTime())}};
- };
-
- while (true) {
- // Fetch the first pre-image from the next collection, that has pre-images enabled.
- auto planExecutor = _previousCollectionUUID
- ? createCollectionScan(RecordIdBound(
- toRecordId(ChangeStreamPreImageId(*_previousCollectionUUID,
- Timestamp::max(),
- std::numeric_limits<int64_t>::max()))))
- : createCollectionScan(boost::none);
- auto preImageAttributes = getNextPreImageAttributes(planExecutor);
-
- // If there aren't any pre-images left, set the range to the empty record ids and
- // return.
- if (!preImageAttributes) {
- _currentExpiredPreImageRange = std::pair(RecordId(), RecordId());
- return;
- }
- const auto currentCollectionUUID = preImageAttributes->collectionUUID;
- _previousCollectionUUID = currentCollectionUUID;
-
- // If the first pre-image in the current collection is not expired, fetch the first
- // pre-image from the next collection.
- if (!preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
- _earliestOplogEntryTimestamp)) {
- continue;
- }
-
- // If an expired pre-image is found, compute the max expired pre-image RecordId for
- // this collection depending on the expiration parameter being set.
- const auto minKey =
- toRecordId(ChangeStreamPreImageId(currentCollectionUUID, Timestamp(), 0));
- RecordId maxKey;
- if (_preImageExpirationTime) {
- // Reset the collection scan to start one increment before the
- // '_earliestOplogEntryTimestamp', as the pre-images with smaller or equal
- // timestamp are guaranteed to be expired.
- Timestamp lastExpiredPreimageTs(_earliestOplogEntryTimestamp.asULL() - 1);
- auto planExecutor = createCollectionScan(RecordIdBound(
- toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
- lastExpiredPreimageTs,
- std::numeric_limits<int64_t>::max()))));
-
- // Iterate over all the expired pre-images in the collection in order to find
- // the max RecordId.
- while ((preImageAttributes = getNextPreImageAttributes(planExecutor)) &&
- preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
- _earliestOplogEntryTimestamp) &&
- preImageAttributes->collectionUUID == currentCollectionUUID) {
- lastExpiredPreimageTs = preImageAttributes->ts;
- }
-
- maxKey =
- toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
- lastExpiredPreimageTs,
- std::numeric_limits<int64_t>::max()));
- } else {
- // If the expiration parameter is not set, then the last expired pre-image
- // timestamp equals to one increment before the '_earliestOplogEntryTimestamp'.
- maxKey = toRecordId(
- ChangeStreamPreImageId(currentCollectionUUID,
- Timestamp(_earliestOplogEntryTimestamp.asULL() - 1),
- std::numeric_limits<int64_t>::max()));
- }
- tassert(6138300,
- "Max key of the expired pre-image range has to be valid",
- maxKey.isValid());
- _currentExpiredPreImageRange = std::pair(minKey, maxKey);
- return;
- }
- }
-
- // Set up the new collection scan to start from the 'minKey'.
- std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan(
- boost::optional<RecordIdBound> minKey) const {
- return InternalPlanner::collectionScan(_opCtx,
- _preImagesCollPtr,
- PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
- InternalPlanner::Direction::FORWARD,
- boost::none,
- minKey);
- }
-
- OperationContext* _opCtx;
- const CollectionPtr* _preImagesCollPtr;
- RecordIdRange _currentExpiredPreImageRange;
- boost::optional<UUID> _previousCollectionUUID;
- const Timestamp _earliestOplogEntryTimestamp;
-
- // The pre-images with operation time less than or equal to the '_preImageExpirationTime'
- // are considered expired.
- const boost::optional<Date_t> _preImageExpirationTime;
- };
-
- ChangeStreamExpiredPreImageIterator(
- OperationContext* opCtx,
- const CollectionPtr* preImagesCollPtr,
- const Timestamp earliestOplogEntryTimestamp,
- const boost::optional<Date_t> preImageExpirationTime = boost::none)
- : _opCtx(opCtx),
- _preImagesCollPtr(preImagesCollPtr),
- _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp),
- _preImageExpirationTime(preImageExpirationTime) {}
-
- Iterator begin() const {
- return Iterator(
- _opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp, _preImageExpirationTime);
- }
-
- Iterator end() const {
- return Iterator(_opCtx,
- _preImagesCollPtr,
- _earliestOplogEntryTimestamp,
- _preImageExpirationTime,
- true /*isEndIterator*/);
- }
-
-private:
- OperationContext* _opCtx;
- const CollectionPtr* _preImagesCollPtr;
- const Timestamp _earliestOplogEntryTimestamp;
- const boost::optional<Date_t> _preImageExpirationTime;
-};
-
-void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) {
- const auto startTime = Date_t::now();
- ServiceContext::UniqueOperationContext opCtx;
- try {
- opCtx = client->makeOperationContext();
-
- // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
- // doesn't exist.
- AutoGetCollection autoColl(
- opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
- const auto& preImagesColl = autoColl.getCollection();
- if (!preImagesColl) {
- return;
- }
-
- // Do not run the job on secondaries.
- if (!repl::ReplicationCoordinator::get(opCtx.get())
- ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) {
- return;
- }
-
- // Get the timestamp of the earliest oplog entry.
- const auto currentEarliestOplogEntryTs =
- repl::StorageInterface::get(client->getServiceContext())
- ->getEarliestOplogTimestamp(opCtx.get());
-
- const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
- size_t numberOfRemovals = 0;
-
- ChangeStreamExpiredPreImageIterator expiredPreImages(
- opCtx.get(),
- &preImagesColl,
- currentEarliestOplogEntryTs,
- ::mongo::preImageRemoverInternal::getPreImageExpirationTime(
- opCtx.get(), currentTimeForTimeBasedExpiration));
-
- for (const auto& collectionRange : expiredPreImages) {
- writeConflictRetry(
- opCtx.get(),
- "ChangeStreamExpiredPreImagesRemover",
- NamespaceString::kChangeStreamPreImagesNamespace.ns(),
- [&] {
- auto params = std::make_unique<DeleteStageParams>();
- params->isMulti = true;
-
- std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams;
- if (isBatchedRemoval) {
- batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
- }
-
- auto exec = InternalPlanner::deleteWithCollectionScan(
- opCtx.get(),
- &preImagesColl,
- std::move(params),
- PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
- InternalPlanner::Direction::FORWARD,
- RecordIdBound(collectionRange.first),
- RecordIdBound(collectionRange.second),
- CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
- std::move(batchedDeleteParams));
- numberOfRemovals += exec->executeDelete();
- });
- }
-
- if (numberOfRemovals > 0) {
- LOGV2_DEBUG(5869104,
- 3,
- "Periodic expired pre-images removal job finished executing",
- "numberOfRemovals"_attr = numberOfRemovals,
- "jobDuration"_attr = (Date_t::now() - startTime).toString());
- }
- } catch (const DBException& exception) {
- if (opCtx && opCtx.get()->getKillStatus() != ErrorCodes::OK) {
- LOGV2_DEBUG(5869105,
- 3,
- "Periodic expired pre-images removal job operation was killed",
- "errorCode"_attr = opCtx.get()->getKillStatus());
- } else {
- LOGV2_ERROR(5869106,
- "Periodic expired pre-images removal job failed",
- "reason"_attr = exception.reason());
- }
- }
-}
-
-void performExpiredChangeStreamPreImagesRemovalPass(Client* client) {
- Date_t currentTimeForTimeBasedExpiration = Date_t::now();
-
- changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
- // Populate the current time for time based expiration of pre-images.
- if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
- const BSONType bsonType = currentTimeElem.type();
- tassert(5869300,
- str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is "
- "'date', but found: "
- << bsonType,
- bsonType == BSONType::Date);
-
- currentTimeForTimeBasedExpiration = currentTimeElem.Date();
- }
- });
- deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
-}
-} // namespace
-
class ChangeStreamExpiredPreImagesRemover;
-namespace {
const auto getChangeStreamExpiredPreImagesRemover =
ServiceContext::declareDecoration<std::unique_ptr<ChangeStreamExpiredPreImagesRemover>>();
-} // namespace
/**
* A periodic background job that removes expired change stream pre-image documents from the
@@ -485,7 +97,8 @@ public:
while (true) {
LOGV2_DEBUG(6278517, 3, "Thread awake");
auto iterationStartTime = Date_t::now();
- performExpiredChangeStreamPreImagesRemovalPass(tc.get());
+ ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass(
+ tc.get());
{
// Wait until either gExpiredChangeStreamPreImageRemovalJobSleepSecs passes or a
// shutdown is requested.
@@ -528,6 +141,7 @@ private:
bool _shuttingDown = false;
};
+} // namespace
void startChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext) {
std::unique_ptr<ChangeStreamExpiredPreImagesRemover> changeStreamExpiredPreImagesRemover =
@@ -546,5 +160,4 @@ void shutdownChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext)
changeStreamExpiredPreImagesRemover->shutdown();
}
}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
index 0ddd491991f..8d2b4ea5228 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
@@ -29,32 +29,9 @@
#pragma once
-#include "mongo/db/change_stream_options_gen.h"
#include "mongo/db/service_context.h"
namespace mongo {
-namespace preImageRemoverInternal {
-
-/**
- * Specifies attributes that determines if the pre-image has been expired or not.
- */
-struct PreImageAttributes {
- mongo::UUID collectionUUID;
- Timestamp ts;
- Date_t operationTime;
-
- /**
- * Determines if the pre-image is considered expired based on the expiration parameter being
- * set.
- */
- bool isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
- const Timestamp& earliestOplogEntryTimestamp);
-};
-
-boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime);
-
-} // namespace preImageRemoverInternal
-
/**
* Starts a periodic background job to remove expired documents from 'system.preimages' collection.
*/
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
index ec49c8453d3..b1bcb82b454 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/change_stream_options_manager.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/pipeline/change_stream_expired_pre_image_remover.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -66,7 +67,7 @@ public:
const Date_t& preImageOperationTime,
const boost::optional<Date_t>& preImageExpirationTime,
const Timestamp& earliestOplogEntryTimestamp) {
- preImageRemoverInternal::PreImageAttributes preImageAttributes{
+ change_stream_pre_image_helpers::PreImageAttributes preImageAttributes{
UUID::gen(), preImageTs, preImageOperationTime};
return preImageAttributes.isExpiredPreImage(preImageExpirationTime,
earliestOplogEntryTimestamp);
@@ -82,7 +83,7 @@ TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithVa
auto currentTime = Date_t::now();
auto receivedExpireAfterSeconds =
- preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime);
+ change_stream_pre_image_helpers::getPreImageExpirationTime(opCtx.get(), currentTime);
ASSERT(receivedExpireAfterSeconds);
ASSERT_EQ(*receivedExpireAfterSeconds, currentTime - Seconds(expireAfterSeconds));
}
@@ -92,7 +93,7 @@ TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithUn
auto currentTime = Date_t::now();
auto receivedExpireAfterSeconds =
- preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime);
+ change_stream_pre_image_helpers::getPreImageExpirationTime(opCtx.get(), currentTime);
ASSERT_FALSE(receivedExpireAfterSeconds);
}
@@ -104,7 +105,7 @@ TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithOf
auto currentTime = Date_t::now();
auto receivedExpireAfterSeconds =
- preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime);
+ change_stream_pre_image_helpers::getPreImageExpirationTime(opCtx.get(), currentTime);
ASSERT_FALSE(receivedExpireAfterSeconds);
}
diff --git a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
deleted file mode 100644
index d92c505d1a9..00000000000
--- a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/pipeline/change_stream_pre_image_helpers.h"
-
-#include "mongo/base/error_codes.h"
-#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/concurrency/lock_manager_defs.h"
-#include "mongo/db/concurrency/locker.h"
-#include "mongo/db/curop.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/util/assert_util.h"
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
-
-namespace mongo {
-void writeToChangeStreamPreImagesCollection(OperationContext* opCtx,
- const ChangeStreamPreImage& preImage) {
- tassert(6646200,
- "Expected to be executed in a write unit of work",
- opCtx->lockState()->inAWriteUnitOfWork());
- tassert(5869404,
- str::stream() << "Invalid pre-image document applyOpsIndex: "
- << preImage.getId().getApplyOpsIndex(),
- preImage.getId().getApplyOpsIndex() >= 0);
-
- // This lock acquisition can block on a stronger lock held by another operation modifying
- // the pre-images collection. There are no known cases where an operation holding an
- // exclusive lock on the pre-images collection also waits for oplog visibility.
- AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
- AutoGetCollection preImagesCollectionRaii(
- opCtx, NamespaceString::kChangeStreamPreImagesNamespace, LockMode::MODE_IX);
- auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection();
- tassert(6646201,
- "The change stream pre-images collection is not present",
- changeStreamPreImagesCollection);
-
- const auto insertionStatus = changeStreamPreImagesCollection->insertDocument(
- opCtx, InsertStatement{preImage.toBSON()}, &CurOp::get(opCtx)->debug());
- tassert(5868601,
- str::stream() << "Attempted to insert a duplicate document into the pre-images "
- "collection. Pre-image id: "
- << preImage.getId().toBSON().toString(),
- insertionStatus != ErrorCodes::DuplicateKey);
- uassertStatusOK(insertionStatus);
-}
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_pre_image_helpers.h b/src/mongo/db/pipeline/change_stream_pre_image_helpers.h
deleted file mode 100644
index 6d670218688..00000000000
--- a/src/mongo/db/pipeline/change_stream_pre_image_helpers.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/change_stream_preimage_gen.h"
-
-namespace mongo {
-
-/**
- * Inserts document pre-image 'preImage' into the change stream pre-images collection.
- */
-void writeToChangeStreamPreImagesCollection(OperationContext* opCtx,
- const ChangeStreamPreImage& preImage);
-} // namespace mongo
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 13b1e4a3264..41a61e891b7 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -74,6 +74,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/local_oplog_info',
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/db_raii',
@@ -84,7 +85,6 @@ env.Library(
'$BUILD_DIR/mongo/db/op_observer/op_observer',
'$BUILD_DIR/mongo/db/op_observer/op_observer_util',
'$BUILD_DIR/mongo/db/ops/write_ops',
- '$BUILD_DIR/mongo/db/pipeline/change_stream_pre_image_helpers',
'$BUILD_DIR/mongo/db/pipeline/change_stream_preimage',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/stats/server_read_concern_write_concern_metrics',
@@ -1492,6 +1492,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/auth/auth',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
'$BUILD_DIR/mongo/db/cloner',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/curop',
@@ -1700,6 +1701,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog_raii',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 85b323106d7..a8cfa64541d 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/catalog/multi_index_block.h"
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/coll_mod_gen.h"
#include "mongo/db/commands.h"
@@ -78,7 +79,6 @@
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/delete_request_gen.h"
#include "mongo/db/ops/update.h"
-#include "mongo/db/pipeline/change_stream_pre_image_helpers.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/repl/apply_ops.h"
#include "mongo/db/repl/bgsync.h"
@@ -1095,7 +1095,10 @@ void writeChangeStreamPreImage(OperationContext* opCtx,
static_cast<int64_t>(oplogEntry.getApplyOpsIndex())};
ChangeStreamPreImage preImageDocument{
std::move(preImageId), oplogEntry.getWallClockTimeForPreImage(), preImage};
- writeToChangeStreamPreImagesCollection(opCtx, preImageDocument);
+
+ // TODO SERVER-66643 Pass tenant id to the pre-images collection if running in the serverless.
+ ChangeStreamPreImagesCollectionManager::insertPreImage(
+ opCtx, /* tenantId */ boost::none, preImageDocument);
}
} // namespace
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index 2b96b39ef52..88eb0fb7fb7 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/feature_compatibility_version_parser.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -402,7 +403,8 @@ TEST_F(OplogApplierImplTest, applyOplogEntryOrGroupedInsertsDeleteDocumentCollec
TEST_F(OplogApplierImplTest, applyOplogEntryToRecordChangeStreamPreImages) {
// Setup the pre-images collection.
- createChangeStreamPreImagesCollection(_opCtx.get());
+ ChangeStreamPreImagesCollectionManager::createPreImagesCollection(_opCtx.get(),
+ boost::none /* tenantId */);
// Create the collection.
const NamespaceString nss("test.t");
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index d3d2ab66136..c2dd1653361 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/local_oplog_info.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/commands/rwc_defaults_commands_gen.h"
@@ -545,7 +546,8 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
});
// Create the pre-images collection if it doesn't exist yet.
- createChangeStreamPreImagesCollection(opCtx);
+ ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx,
+ boost::none /* tenantId */);
// TODO: SERVER-66631 move the change collection creation logic from here to the PM-2502 hooks.
// The change collection will be created when the change stream is enabled.
diff --git a/src/mongo/db/update/SConscript b/src/mongo/db/update/SConscript
index 0fed04974ab..dd5b5772083 100644
--- a/src/mongo/db/update/SConscript
+++ b/src/mongo/db/update/SConscript
@@ -162,6 +162,7 @@ env.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/bson/mutable/mutable_bson',
'$BUILD_DIR/mongo/bson/mutable/mutable_bson_test_utils',
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util',
'$BUILD_DIR/mongo/db/matcher/expressions',
'$BUILD_DIR/mongo/db/query/collation/collator_interface_mock',
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index a7433a979cd..e23efca5551 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -197,6 +197,7 @@ tlEnv.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/clientdriver_network',
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/dbmessage',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 12674d2e461..20b1a8623e5 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -750,6 +750,7 @@ icuEnv.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'alarm',
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index 816b1e171d6..0dcad986c97 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -51,6 +51,7 @@ env.CppUnitTest(
'with_lock_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'spin_lock',
'thread_pool',
diff --git a/src/mongo/watchdog/SConscript b/src/mongo/watchdog/SConscript
index f26af597f28..b04db62fba7 100644
--- a/src/mongo/watchdog/SConscript
+++ b/src/mongo/watchdog/SConscript
@@ -42,6 +42,7 @@ env.CppUnitTest(
'watchdog_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/util/clock_source_mock',
'watchdog',