diff options
Diffstat (limited to 'src/mongo/db/change_collection_expired_documents_remover.cpp')
-rw-r--r-- | src/mongo/db/change_collection_expired_documents_remover.cpp | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp index dc1aadeaa06..5eae8abdf76 100644 --- a/src/mongo/db/change_collection_expired_documents_remover.cpp +++ b/src/mongo/db/change_collection_expired_documents_remover.cpp @@ -29,9 +29,11 @@ #include "mongo/db/change_collection_expired_documents_remover.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_streams_cluster_parameter_gen.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" @@ -83,23 +85,58 @@ void removeExpiredDocuments(Client* client) { // Number of documents removed in the current pass. size_t removedCount = 0; + long long maxStartWallTime = 0; + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get()); for (const auto& tenantId : getAllTenants()) { auto expiredAfterSeconds = getExpireAfterSeconds(tenantId); invariant(expiredAfterSeconds); + + // Acquire intent-exclusive lock on the change collection. + AutoGetChangeCollection changeCollection{ + opCtx.get(), AutoGetChangeCollection::AccessMode::kWrite, tenantId}; + + // Early exit if collection does not exist or if running on a secondary (requires + // opCtx->lockState()->isRSTLLocked()). + if (!changeCollection || + !repl::ReplicationCoordinator::get(opCtx.get()) + ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kConfigDb)) { + continue; + } + + // Get the metadata required for the removal of the expired change collection + // documents. Early exit if the metadata is missing, indicating that there is nothing + // to remove. + auto purgingJobMetadata = + ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( + opCtx.get(), + &*changeCollection, + currentWallTime - Seconds(*expiredAfterSeconds)); + if (!purgingJobMetadata) { + continue; + } + removedCount += ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - opCtx.get(), tenantId, currentWallTime - Seconds(*expiredAfterSeconds)); + opCtx.get(), &*changeCollection, purgingJobMetadata->maxRecordIdBound); + changeCollectionManager.getPurgingJobStats().scannedCollections.fetchAndAddRelaxed(1); + maxStartWallTime = + std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis); } + changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store(maxStartWallTime); - // TODO SERVER-66636 Use server parameters to track periodic job statistics per tenant. + const auto jobDurationMillis = clock->now() - currentWallTime; if (removedCount > 0) { LOGV2_DEBUG(6663503, 3, "Periodic expired change collection job finished executing", "numberOfRemovals"_attr = removedCount, - "jobDuration"_attr = (clock->now() - currentWallTime).toString()); + "jobDuration"_attr = jobDurationMillis.toString()); } + + changeCollectionManager.getPurgingJobStats().totalPass.fetchAndAddRelaxed(1); + changeCollectionManager.getPurgingJobStats().timeElapsedMillis.fetchAndAddRelaxed( + jobDurationMillis.count()); } catch (const DBException& exception) { if (exception.toStatus() != ErrorCodes::OK) { LOGV2_WARNING(6663504, |