summaryrefslogtreecommitdiff
path: root/src/mongo/db/change_collection_expired_documents_remover.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/change_collection_expired_documents_remover.cpp')
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp43
1 files changed, 40 insertions, 3 deletions
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
index dc1aadeaa06..5eae8abdf76 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.cpp
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -29,9 +29,11 @@
#include "mongo/db/change_collection_expired_documents_remover.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/mutex.h"
@@ -83,23 +85,58 @@ void removeExpiredDocuments(Client* client) {
// Number of documents removed in the current pass.
size_t removedCount = 0;
+ long long maxStartWallTime = 0;
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get());
for (const auto& tenantId : getAllTenants()) {
auto expiredAfterSeconds = getExpireAfterSeconds(tenantId);
invariant(expiredAfterSeconds);
+
+ // Acquire intent-exclusive lock on the change collection.
+ AutoGetChangeCollection changeCollection{
+ opCtx.get(), AutoGetChangeCollection::AccessMode::kWrite, tenantId};
+
+ // Early exit if collection does not exist or if running on a secondary (requires
+ // opCtx->lockState()->isRSTLLocked()).
+ if (!changeCollection ||
+ !repl::ReplicationCoordinator::get(opCtx.get())
+ ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kConfigDb)) {
+ continue;
+ }
+
+ // Get the metadata required for the removal of the expired change collection
+ // documents. Early exit if the metadata is missing, indicating that there is nothing
+ // to remove.
+ auto purgingJobMetadata =
+ ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
+ opCtx.get(),
+ &*changeCollection,
+ currentWallTime - Seconds(*expiredAfterSeconds));
+ if (!purgingJobMetadata) {
+ continue;
+ }
+
removedCount +=
ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- opCtx.get(), tenantId, currentWallTime - Seconds(*expiredAfterSeconds));
+ opCtx.get(), &*changeCollection, purgingJobMetadata->maxRecordIdBound);
+ changeCollectionManager.getPurgingJobStats().scannedCollections.fetchAndAddRelaxed(1);
+ maxStartWallTime =
+ std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis);
}
+ changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store(maxStartWallTime);
- // TODO SERVER-66636 Use server parameters to track periodic job statistics per tenant.
+ const auto jobDurationMillis = clock->now() - currentWallTime;
if (removedCount > 0) {
LOGV2_DEBUG(6663503,
3,
"Periodic expired change collection job finished executing",
"numberOfRemovals"_attr = removedCount,
- "jobDuration"_attr = (clock->now() - currentWallTime).toString());
+ "jobDuration"_attr = jobDurationMillis.toString());
}
+
+ changeCollectionManager.getPurgingJobStats().totalPass.fetchAndAddRelaxed(1);
+ changeCollectionManager.getPurgingJobStats().timeElapsedMillis.fetchAndAddRelaxed(
+ jobDurationMillis.count());
} catch (const DBException& exception) {
if (exception.toStatus() != ErrorCodes::OK) {
LOGV2_WARNING(6663504,