diff options
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.h')
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.h | 67 |
1 files changed, 46 insertions, 21 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 9d7f3277d44..68838767d0d 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -30,10 +30,13 @@ #pragma once #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/change_collection_truncate_markers.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" #include "mongo/db/shard_role.h" +#include "mongo/db/storage/collection_truncate_markers.h" +#include "mongo/util/concurrent_shared_values_map.h" namespace mongo { @@ -124,15 +127,16 @@ public: void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId); /** - * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog - * records and the parameter 'oplogTimestamps' is a vector for respective timestamp for each - * oplog record. + * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of + * oplog records and the parameter 'oplogTimestamps' is a vector for respective timestamp + * for each oplog record. * - * The method fetches the tenant-id from the oplog entry, performs necessary modification to the - * document and then write to the tenant's change collection at the specified oplog timestamp. + * The method fetches the tenant-id from the oplog entry, performs necessary modification to + * the document and then write to the tenant's change collection at the specified oplog + * timestamp. * - * Failure in insertion to any change collection will result in a fatal exception and will bring - * down the node. + * Failure in insertion to any change collection will result in a fatal exception and will + * bring down the node. */ void insertDocumentsToChangeCollection(OperationContext* opCtx, const std::vector<Record>& oplogRecords, @@ -142,9 +146,9 @@ public: /** * Change Collection Writer. After acquiring ChangeCollectionsWriter the user should trigger - * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write Unit - * of Work. Then the write of documents to change collections can be triggered by calling - * 'write()'. + * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write + * Unit of Work. Then the write of documents to change collections can be triggered by + * calling 'write()'. */ class ChangeCollectionsWriter { friend class ChangeStreamChangeCollectionManager; @@ -153,10 +157,13 @@ public: * Constructs a writer from a range ['beginOplogEntries', 'endOplogEntries') of oplog * entries. */ - ChangeCollectionsWriter(OperationContext* opCtx, - std::vector<InsertStatement>::const_iterator beginOplogEntries, - std::vector<InsertStatement>::const_iterator endOplogEntries, - OpDebug* opDebug); + ChangeCollectionsWriter( + OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator beginOplogEntries, + std::vector<InsertStatement>::const_iterator endOplogEntries, + OpDebug* opDebug, + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>* + tenantMarkerMap); public: ChangeCollectionsWriter(ChangeCollectionsWriter&&); @@ -179,9 +186,9 @@ public: }; /** - * Returns a change collection writer that can insert change collection entries into respective - * change collections. The entries are constructed from a range ['beginOplogEntries', - * 'endOplogEntries') of oplog entries. + * Returns a change collection writer that can insert change collection entries into + * respective change collections. The entries are constructed from a range + * ['beginOplogEntries', 'endOplogEntries') of oplog entries. */ ChangeCollectionsWriter createChangeCollectionsWriter( OperationContext* opCtx, @@ -195,25 +202,43 @@ public: /** * Scans the provided change collection and returns its metadata that will be used by the - * purging job to perform deletion on it. The method returns 'boost::none' if the collection is - * empty. + * purging job to perform deletion on it. The method returns 'boost::none' if the collection + * is empty. */ static boost::optional<ChangeCollectionPurgingJobMetadata> getChangeCollectionPurgingJobMetadata(OperationContext* opCtx, const ScopedCollectionAcquisition& changeCollection); - /** Removes documents from a change collection whose wall time is less than the + /** + * Removes documents from a change collection whose wall time is less than the * 'expirationTime'. Returns the number of documents deleted. The 'maxRecordIdBound' is the * maximum record id bound that will not be included in the collection scan. + * + * The removal process is performed with a collection scan + batch delete. */ - static size_t removeExpiredChangeCollectionsDocuments( + static size_t removeExpiredChangeCollectionsDocumentsWithCollScan( OperationContext* opCtx, const ScopedCollectionAcquisition& changeCollection, RecordIdBound maxRecordIdBound, Date_t expirationTime); + /** + * Removes documents from a change collection whose wall time is less than the + * 'expirationTime'. Returns the number of documents deleted. + * + * The removal process is performed with a series of range truncate calls to the record + * store. Some documents might survive this process as deletion happens in chunks and we can + * only delete a chunk if we guarantee it is fully expired. + */ + static size_t removeExpiredChangeCollectionsDocumentsWithTruncate( + OperationContext* opCtx, + const ScopedCollectionAcquisition& changeCollection, + Date_t expirationTime); + private: // Change collections purging job stats. PurgingJobStats _purgingJobStats; + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash> + _tenantTruncateMarkersMap; }; } // namespace mongo |