diff options
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.cpp')
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.cpp | 79 |
1 files changed, 49 insertions, 30 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 5559e49390a..e6aaf3e86f8 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -60,6 +60,15 @@ const auto getChangeCollectionManager = ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>(); /** + * Returns the list of all tenant ids in the replica set. + * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is + * available. + */ +std::vector<boost::optional<TenantId>> getAllTenants() { + return {boost::none}; +} + +/** * Creates a Document object from the supplied oplog entry, performs necessary modifications to it * and then returns it as a BSON object. */ @@ -186,6 +195,14 @@ private: } // namespace +BSONObj ChangeStreamChangeCollectionManager::PurgingJobStats::toBSON() const { + return BSON("totalPass" << totalPass.load() << "docsDeleted" << docsDeleted.load() + << "bytesDeleted" << bytesDeleted.load() << "scannedCollections" + << scannedCollections.load() << "maxStartWallTimeMillis" + << maxStartWallTimeMillis.load() << "timeElapsedMillis" + << timeElapsedMillis.load()); +} + ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get( ServiceContext* service) { return *getChangeCollectionManager(service); @@ -338,8 +355,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( return changeCollectionsWriter.write(opCtx, opDebug); } -boost::optional<RecordIdBound> -ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( +boost::optional<ChangeCollectionPurgingJobMetadata> +ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) { const auto isExpired = [&](const BSONObj& changeDoc) { const BSONElement& wallElem = changeDoc["wall"]; @@ -354,6 +371,7 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( BSONObj currChangeDoc; RecordId currRecordId; + boost::optional<long long> firstDocWallTime; boost::optional<RecordId> prevRecordId; boost::optional<RecordId> prevPrevRecordId; @@ -364,17 +382,29 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( while (true) { auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId); switch (getNextState) { - case PlanExecutor::IS_EOF: + case PlanExecutor::IS_EOF: { // Either the collection is empty (case in which return boost::none), or all the // documents have expired. The remover job should never delete the last entry of a // change collection, so return the recordId of the document previous to the last // one. - return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get()) - : boost::optional<RecordIdBound>(); + if (!prevPrevRecordId) { + return boost::none; + } + + return {{*firstDocWallTime, RecordIdBound(*prevPrevRecordId)}}; + } case PlanExecutor::ADVANCED: { + if (!prevRecordId.has_value()) { + firstDocWallTime = + boost::make_optional(currChangeDoc["wall"].Date().toMillisSinceEpoch()); + } + if (!isExpired(currChangeDoc)) { - return prevRecordId ? RecordIdBound(prevRecordId.get()) - : boost::optional<RecordIdBound>(); + if (!prevRecordId) { + return boost::none; + } + + return {{*firstDocWallTime, RecordIdBound(*prevRecordId)}}; } } } @@ -387,28 +417,9 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( } size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - OperationContext* opCtx, boost::optional<TenantId> tenantId, const Date_t& expirationTime) { - // Acquire intent-exclusive lock on the change collection. Early exit if the collection - // doesn't exist. - const auto changeCollection = - AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; - - // Early exit if collection does not exist, or if running on a secondary (requires - // opCtx->lockState()->isRSTLLocked()). - if (!changeCollection || - !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( - opCtx, NamespaceString::kConfigDb)) { - return 0; - } - - const auto maxRecordIdBound = - getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime); - - // Early exit if there are no expired documents to be removed. - if (!maxRecordIdBound.has_value()) { - return 0; - } - + OperationContext* opCtx, + const CollectionPtr* changeCollection, + const RecordIdBound& maxRecordIdBound) { auto params = std::make_unique<DeleteStageParams>(); params->isMulti = true; @@ -425,7 +436,15 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume std::move(batchedDeleteParams)); try { - return deleteExecutor->executeDelete(); + (void)deleteExecutor->executeDelete(); + auto batchedDeleteStats = deleteExecutor->getBatchedDeleteStats(); + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + changeCollectionManager.getPurgingJobStats().docsDeleted.fetchAndAddRelaxed( + batchedDeleteStats.docsDeleted); + changeCollectionManager.getPurgingJobStats().bytesDeleted.fetchAndAddRelaxed( + batchedDeleteStats.bytesDeleted); + + return batchedDeleteStats.docsDeleted; } catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) { // It is expected that a collection drop can kill a query plan while deleting an old // document, so ignore this error. |