path: root/src/mongo/db/change_stream_change_collection_manager.cpp
diff options
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.cpp')
1 files changed, 137 insertions, 13 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 0afcb527b9e..d085b514d18 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -27,6 +27,7 @@
* it in the license file.
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/db/change_stream_change_collection_manager.h"
@@ -38,9 +39,13 @@
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -127,20 +132,44 @@ private:
// TODO SERVER-65950 retreive tenant from the oplog.
// TODO SERVER-67170 avoid inspecting the oplog BSON object.
+ if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) {
+ if (nssFieldElem.String() == "config.$cmd"_sd) {
+ if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
+ // The oplog entry might be a drop command on the change collection. Check if
+ // the drop request is for the already deleted change collection, as such do not
+ // attempt to write to the change collection if that is the case. This scenario
+ // is possible because 'WriteUnitOfWork' will stage the changes and while
+ // committing the staged 'CollectionImpl::insertDocuments' change the collection
+ // object might have already been deleted.
+ if (auto dropFieldElem = objectFieldElem["drop"_sd]) {
+ return dropFieldElem.String() != NamespaceString::kChangeCollectionName;
+ }
+ }
+ }
- if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName];
- nssFieldElem && nssFieldElem.String() == "config.$cmd"_sd) {
- if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
- // The oplog entry might be a drop command on the change collection. Check if the
- // drop request is for the already deleted change collection, as such do not attempt
- // to write to the change collection if that is the case. This scenario is possible
- // because 'WriteUnitOfWork' will stage the changes and while committing the staged
- // 'CollectionImpl::insertDocuments' change the collection object might have already
- // been deleted.
- if (auto dropFieldElem = objectFieldElem["drop"_sd]) {
- return dropFieldElem.String() != NamespaceString::kChangeCollectionName;
+ if (nssFieldElem.String() == "admin.$cmd"_sd) {
+ if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
+ // The oplog entry might be a batch delete command on a change collection, avoid
+ // inserting such oplog entries back to the change collection.
+ if (auto applyOpsFieldElem = objectFieldElem["applyOps"_sd]) {
+ const auto nestedOperations = repl::ApplyOps::extractOperations(oplogDoc);
+ for (auto& op : nestedOperations) {
+ if (op.getNss().isChangeCollection() &&
+ op.getOpType() == repl::OpTypeEnum::kDelete) {
+ return false;
+ }
+ }
+ }
+ // The oplog entry might be a single delete command on a change collection, avoid
+ // inserting such oplog entries back to the change collection.
+ if (auto opTypeFieldElem = oplogDoc[repl::OplogEntry::kOpTypeFieldName];
+ opTypeFieldElem &&
+ opTypeFieldElem.String() == repl::OpType_serializer(repl::OpTypeEnum::kDelete)) {
+ return !NamespaceString(nssFieldElem.String()).isChangeCollection();
+ }
return true;
@@ -177,8 +206,9 @@ bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() {
// TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag.
- return feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
- serverGlobalParams.featureCompatibility);
+ return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
+ serverGlobalParams.featureCompatibility);
bool ChangeStreamChangeCollectionManager::hasChangeCollection(
@@ -302,4 +332,98 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
return changeCollectionsWriter.write(opCtx, opDebug);
+ OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) {
+ const auto isExpired = [&](const BSONObj& changeDoc) {
+ const BSONElement& wallElem = changeDoc["wall"];
+ invariant(wallElem);
+ auto bsonType = wallElem.type();
+ invariant(bsonType == BSONType::Date);
+ return wallElem.Date() <= expirationTime;
+ };
+ BSONObj currChangeDoc;
+ RecordId currRecordId;
+ boost::optional<RecordId> prevRecordId;
+ boost::optional<RecordId> prevPrevRecordId;
+ auto scanExecutor = InternalPlanner::collectionScan(opCtx,
+ changeCollection,
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD);
+ while (true) {
+ auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId);
+ switch (getNextState) {
+ case PlanExecutor::IS_EOF:
+ // Either the collection is empty (case in which return boost::none), or all the
+ // documents have expired. The remover job should never delete the last entry of a
+ // change collection, so return the recordId of the document previous to the last
+ // one.
+ return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get())
+ : boost::optional<RecordIdBound>();
+ case PlanExecutor::ADVANCED: {
+ if (!isExpired(currChangeDoc)) {
+ return prevRecordId ? RecordIdBound(prevRecordId.get())
+ : boost::optional<RecordIdBound>();
+ }
+ }
+ }
+ prevPrevRecordId = prevRecordId;
+ prevRecordId = currRecordId;
+ }
+size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId, const Date_t& expirationTime) {
+ // Acquire intent-exclusive lock on the change collection. Early exit if the collection
+ // doesn't exist.
+ const auto changeCollection =
+ AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
+ // Early exit if collection does not exist, or if running on a secondary (requires
+ // opCtx->lockState()->isRSTLLocked()).
+ if (!changeCollection ||
+ !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kConfigDb)) {
+ return 0;
+ }
+ const auto maxRecordIdBound =
+ getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime);
+ // Early exit if there are no expired documents to be removed.
+ if (!maxRecordIdBound.has_value()) {
+ return 0;
+ }
+ auto params = std::make_unique<DeleteStageParams>();
+ params->isMulti = true;
+ auto batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
+ auto deleteExecutor = InternalPlanner::deleteWithCollectionScan(
+ opCtx,
+ &(*changeCollection),
+ std::move(params),
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD,
+ boost::none,
+ std::move(maxRecordIdBound),
+ CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
+ std::move(batchedDeleteParams));
+ try {
+ return deleteExecutor->executeDelete();
+ } catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) {
+ // It is expected that a collection drop can kill a query plan while deleting an old
+ // document, so ignore this error.
+ return 0;
+ }
} // namespace mongo