diff options
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.cpp')
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.cpp | 81 |
1 files changed, 39 insertions, 42 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 93c1007cfb3..23c780ed5f1 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -671,13 +671,11 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume // No marker means it's a new collection, or we've just performed startup. Initialize // the TruncateMarkers instance. if (!truncateMarkers) { - writeConflictRetry(opCtx, - "initialise change collection truncate markers", - changeCollectionPtr->ns().ns(), - [&] { - truncateMarkers = initialiseTruncateMarkers( - opCtx, changeCollectionPtr.get(), truncateMap); - }); + writeConflictRetry( + opCtx, "initialise change collection truncate markers", changeCollectionPtr->ns(), [&] { + truncateMarkers = + initialiseTruncateMarkers(opCtx, changeCollectionPtr.get(), truncateMap); + }); } int64_t numRecordsDeleted = 0; @@ -685,41 +683,40 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume auto removeExpiredMarkers = [&] { auto rs = changeCollectionPtr->getRecordStore(); while (auto marker = truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) { - writeConflictRetry( - opCtx, "truncate change collection", changeCollectionPtr->ns().ns(), [&] { - // The session might be in use from marker initialisation so we must reset it - // here in order to allow an untimestamped write. - opCtx->recoveryUnit()->abandonSnapshot(); - opCtx->recoveryUnit()->allowOneUntimestampedWrite(); - WriteUnitOfWork wuow(opCtx); - - auto bytesDeleted = marker->bytes; - auto docsDeleted = marker->records; - - auto status = rs->rangeTruncate( - opCtx, - // Truncate from the beginning of the collection, this will - // cover cases where some leftover documents are present. - RecordId(), - marker->lastRecord, - -bytesDeleted, - -docsDeleted); - invariantStatusOK(status); - - wuow.commit(); - - truncateMarkers->popOldestMarker(); - numRecordsDeleted += docsDeleted; - - auto& purgingJobStats = changeCollectionManager.getPurgingJobStats(); - purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted); - purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted); - - auto millisWallTime = marker->wallTime.toMillisSinceEpoch(); - if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) { - purgingJobStats.maxStartWallTimeMillis.store(millisWallTime); - } - }); + writeConflictRetry(opCtx, "truncate change collection", changeCollectionPtr->ns(), [&] { + // The session might be in use from marker initialisation so we must reset it + // here in order to allow an untimestamped write. + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->allowOneUntimestampedWrite(); + WriteUnitOfWork wuow(opCtx); + + auto bytesDeleted = marker->bytes; + auto docsDeleted = marker->records; + + auto status = + rs->rangeTruncate(opCtx, + // Truncate from the beginning of the collection, this will + // cover cases where some leftover documents are present. + RecordId(), + marker->lastRecord, + -bytesDeleted, + -docsDeleted); + invariantStatusOK(status); + + wuow.commit(); + + truncateMarkers->popOldestMarker(); + numRecordsDeleted += docsDeleted; + + auto& purgingJobStats = changeCollectionManager.getPurgingJobStats(); + purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted); + purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted); + + auto millisWallTime = marker->wallTime.toMillisSinceEpoch(); + if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) { + purgingJobStats.maxStartWallTimeMillis.store(millisWallTime); + } + }); } }; |