summaryrefslogtreecommitdiff
path: root/src/mongo/db/change_stream_change_collection_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.cpp')
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp81
1 files changed, 39 insertions, 42 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 93c1007cfb3..23c780ed5f1 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -671,13 +671,11 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume
// No marker means it's a new collection, or we've just performed startup. Initialize
// the TruncateMarkers instance.
if (!truncateMarkers) {
- writeConflictRetry(opCtx,
- "initialise change collection truncate markers",
- changeCollectionPtr->ns().ns(),
- [&] {
- truncateMarkers = initialiseTruncateMarkers(
- opCtx, changeCollectionPtr.get(), truncateMap);
- });
+ writeConflictRetry(
+ opCtx, "initialise change collection truncate markers", changeCollectionPtr->ns(), [&] {
+ truncateMarkers =
+ initialiseTruncateMarkers(opCtx, changeCollectionPtr.get(), truncateMap);
+ });
}
int64_t numRecordsDeleted = 0;
@@ -685,41 +683,40 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume
auto removeExpiredMarkers = [&] {
auto rs = changeCollectionPtr->getRecordStore();
while (auto marker = truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) {
- writeConflictRetry(
- opCtx, "truncate change collection", changeCollectionPtr->ns().ns(), [&] {
- // The session might be in use from marker initialisation so we must reset it
- // here in order to allow an untimestamped write.
- opCtx->recoveryUnit()->abandonSnapshot();
- opCtx->recoveryUnit()->allowOneUntimestampedWrite();
- WriteUnitOfWork wuow(opCtx);
-
- auto bytesDeleted = marker->bytes;
- auto docsDeleted = marker->records;
-
- auto status = rs->rangeTruncate(
- opCtx,
- // Truncate from the beginning of the collection, this will
- // cover cases where some leftover documents are present.
- RecordId(),
- marker->lastRecord,
- -bytesDeleted,
- -docsDeleted);
- invariantStatusOK(status);
-
- wuow.commit();
-
- truncateMarkers->popOldestMarker();
- numRecordsDeleted += docsDeleted;
-
- auto& purgingJobStats = changeCollectionManager.getPurgingJobStats();
- purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted);
- purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted);
-
- auto millisWallTime = marker->wallTime.toMillisSinceEpoch();
- if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) {
- purgingJobStats.maxStartWallTimeMillis.store(millisWallTime);
- }
- });
+ writeConflictRetry(opCtx, "truncate change collection", changeCollectionPtr->ns(), [&] {
+ // The session might be in use from marker initialisation so we must reset it
+ // here in order to allow an untimestamped write.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->allowOneUntimestampedWrite();
+ WriteUnitOfWork wuow(opCtx);
+
+ auto bytesDeleted = marker->bytes;
+ auto docsDeleted = marker->records;
+
+ auto status =
+ rs->rangeTruncate(opCtx,
+ // Truncate from the beginning of the collection, this will
+ // cover cases where some leftover documents are present.
+ RecordId(),
+ marker->lastRecord,
+ -bytesDeleted,
+ -docsDeleted);
+ invariantStatusOK(status);
+
+ wuow.commit();
+
+ truncateMarkers->popOldestMarker();
+ numRecordsDeleted += docsDeleted;
+
+ auto& purgingJobStats = changeCollectionManager.getPurgingJobStats();
+ purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted);
+ purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted);
+
+ auto millisWallTime = marker->wallTime.toMillisSinceEpoch();
+ if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) {
+ purgingJobStats.maxStartWallTimeMillis.store(millisWallTime);
+ }
+ });
}
};