diff options
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.cpp')
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.cpp | 150 |
1 files changed, 137 insertions, 13 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 0afcb527b9e..d085b514d18 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -27,6 +27,7 @@ * it in the license file. */ + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/db/change_stream_change_collection_manager.h" @@ -38,9 +39,13 @@ #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/concurrency/exception_util.h" #include "mongo/db/multitenancy_gen.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/repl/apply_ops_command_info.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/logv2/log.h" namespace mongo { @@ -127,20 +132,44 @@ private: // TODO SERVER-65950 retreive tenant from the oplog. // TODO SERVER-67170 avoid inspecting the oplog BSON object. + if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) { + if (nssFieldElem.String() == "config.$cmd"_sd) { + if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { + // The oplog entry might be a drop command on the change collection. Check if + // the drop request is for the already deleted change collection, as such do not + // attempt to write to the change collection if that is the case. This scenario + // is possible because 'WriteUnitOfWork' will stage the changes and while + // committing the staged 'CollectionImpl::insertDocuments' change the collection + // object might have already been deleted. + if (auto dropFieldElem = objectFieldElem["drop"_sd]) { + return dropFieldElem.String() != NamespaceString::kChangeCollectionName; + } + } + } - if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]; - nssFieldElem && nssFieldElem.String() == "config.$cmd"_sd) { - if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { - // The oplog entry might be a drop command on the change collection. Check if the - // drop request is for the already deleted change collection, as such do not attempt - // to write to the change collection if that is the case. This scenario is possible - // because 'WriteUnitOfWork' will stage the changes and while committing the staged - // 'CollectionImpl::insertDocuments' change the collection object might have already - // been deleted. - if (auto dropFieldElem = objectFieldElem["drop"_sd]) { - return dropFieldElem.String() != NamespaceString::kChangeCollectionName; + if (nssFieldElem.String() == "admin.$cmd"_sd) { + if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { + // The oplog entry might be a batch delete command on a change collection, avoid + // inserting such oplog entries back to the change collection. + if (auto applyOpsFieldElem = objectFieldElem["applyOps"_sd]) { + const auto nestedOperations = repl::ApplyOps::extractOperations(oplogDoc); + for (auto& op : nestedOperations) { + if (op.getNss().isChangeCollection() && + op.getOpType() == repl::OpTypeEnum::kDelete) { + return false; + } + } + } } } + + // The oplog entry might be a single delete command on a change collection, avoid + // inserting such oplog entries back to the change collection. + if (auto opTypeFieldElem = oplogDoc[repl::OplogEntry::kOpTypeFieldName]; + opTypeFieldElem && + opTypeFieldElem.String() == repl::OpType_serializer(repl::OpTypeEnum::kDelete)) { + return !NamespaceString(nssFieldElem.String()).isChangeCollection(); + } } return true; @@ -177,8 +206,9 @@ bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() { } // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag. - return feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( - serverGlobalParams.featureCompatibility); + return serverGlobalParams.featureCompatibility.isVersionInitialized() && + feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( + serverGlobalParams.featureCompatibility); } bool ChangeStreamChangeCollectionManager::hasChangeCollection( @@ -302,4 +332,98 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( return changeCollectionsWriter.write(opCtx, opDebug); } +boost::optional<RecordIdBound> +ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( + OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) { + const auto isExpired = [&](const BSONObj& changeDoc) { + const BSONElement& wallElem = changeDoc["wall"]; + invariant(wallElem); + + auto bsonType = wallElem.type(); + invariant(bsonType == BSONType::Date); + + return wallElem.Date() <= expirationTime; + }; + + BSONObj currChangeDoc; + RecordId currRecordId; + + boost::optional<RecordId> prevRecordId; + boost::optional<RecordId> prevPrevRecordId; + + auto scanExecutor = InternalPlanner::collectionScan(opCtx, + changeCollection, + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD); + while (true) { + auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId); + switch (getNextState) { + case PlanExecutor::IS_EOF: + // Either the collection is empty (case in which return boost::none), or all the + // documents have expired. The remover job should never delete the last entry of a + // change collection, so return the recordId of the document previous to the last + // one. + return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get()) + : boost::optional<RecordIdBound>(); + case PlanExecutor::ADVANCED: { + if (!isExpired(currChangeDoc)) { + return prevRecordId ? RecordIdBound(prevRecordId.get()) + : boost::optional<RecordIdBound>(); + } + } + } + + prevPrevRecordId = prevRecordId; + prevRecordId = currRecordId; + } + + MONGO_UNREACHABLE; +} + +size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( + OperationContext* opCtx, boost::optional<TenantId> tenantId, const Date_t& expirationTime) { + // Acquire intent-exclusive lock on the change collection. Early exit if the collection + // doesn't exist. + const auto changeCollection = + AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; + + // Early exit if collection does not exist, or if running on a secondary (requires + // opCtx->lockState()->isRSTLLocked()). + if (!changeCollection || + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, NamespaceString::kConfigDb)) { + return 0; + } + + const auto maxRecordIdBound = + getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime); + + // Early exit if there are no expired documents to be removed. + if (!maxRecordIdBound.has_value()) { + return 0; + } + + auto params = std::make_unique<DeleteStageParams>(); + params->isMulti = true; + + auto batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>(); + auto deleteExecutor = InternalPlanner::deleteWithCollectionScan( + opCtx, + &(*changeCollection), + std::move(params), + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD, + boost::none, + std::move(maxRecordIdBound), + CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, + std::move(batchedDeleteParams)); + + try { + return deleteExecutor->executeDelete(); + } catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) { + // It is expected that a collection drop can kill a query plan while deleting an old + // document, so ignore this error. + return 0; + } +} } // namespace mongo |