summaryrefslogtreecommitdiff
path: root/src/mongo/db/change_stream_change_collection_manager.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.h')
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h67
1 files changed, 46 insertions, 21 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 9d7f3277d44..68838767d0d 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -30,10 +30,13 @@
#pragma once
#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/change_collection_truncate_markers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role.h"
+#include "mongo/db/storage/collection_truncate_markers.h"
+#include "mongo/util/concurrent_shared_values_map.h"
namespace mongo {
@@ -124,15 +127,16 @@ public:
void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId);
/**
- * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog
- * records and the parameter 'oplogTimestamps' is a vector for respective timestamp for each
- * oplog record.
+ * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of
+ * oplog records and the parameter 'oplogTimestamps' is a vector for respective timestamp
+ * for each oplog record.
*
- * The method fetches the tenant-id from the oplog entry, performs necessary modification to the
- * document and then write to the tenant's change collection at the specified oplog timestamp.
+ * The method fetches the tenant-id from the oplog entry, performs necessary modification to
+ * the document and then write to the tenant's change collection at the specified oplog
+ * timestamp.
*
- * Failure in insertion to any change collection will result in a fatal exception and will bring
- * down the node.
+ * Failure in insertion to any change collection will result in a fatal exception and will
+ * bring down the node.
*/
void insertDocumentsToChangeCollection(OperationContext* opCtx,
const std::vector<Record>& oplogRecords,
@@ -142,9 +146,9 @@ public:
/**
* Change Collection Writer. After acquiring ChangeCollectionsWriter the user should trigger
- * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write Unit
- * of Work. Then the write of documents to change collections can be triggered by calling
- * 'write()'.
+ * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write
+ * Unit of Work. Then the write of documents to change collections can be triggered by
+ * calling 'write()'.
*/
class ChangeCollectionsWriter {
friend class ChangeStreamChangeCollectionManager;
@@ -153,10 +157,13 @@ public:
* Constructs a writer from a range ['beginOplogEntries', 'endOplogEntries') of oplog
* entries.
*/
- ChangeCollectionsWriter(OperationContext* opCtx,
- std::vector<InsertStatement>::const_iterator beginOplogEntries,
- std::vector<InsertStatement>::const_iterator endOplogEntries,
- OpDebug* opDebug);
+ ChangeCollectionsWriter(
+ OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator beginOplogEntries,
+ std::vector<InsertStatement>::const_iterator endOplogEntries,
+ OpDebug* opDebug,
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>*
+ tenantMarkerMap);
public:
ChangeCollectionsWriter(ChangeCollectionsWriter&&);
@@ -179,9 +186,9 @@ public:
};
/**
- * Returns a change collection writer that can insert change collection entries into respective
- * change collections. The entries are constructed from a range ['beginOplogEntries',
- * 'endOplogEntries') of oplog entries.
+ * Returns a change collection writer that can insert change collection entries into
+ * respective change collections. The entries are constructed from a range
+ * ['beginOplogEntries', 'endOplogEntries') of oplog entries.
*/
ChangeCollectionsWriter createChangeCollectionsWriter(
OperationContext* opCtx,
@@ -195,25 +202,43 @@ public:
/**
* Scans the provided change collection and returns its metadata that will be used by the
- * purging job to perform deletion on it. The method returns 'boost::none' if the collection is
- * empty.
+ * purging job to perform deletion on it. The method returns 'boost::none' if the collection
+ * is empty.
*/
static boost::optional<ChangeCollectionPurgingJobMetadata>
getChangeCollectionPurgingJobMetadata(OperationContext* opCtx,
const ScopedCollectionAcquisition& changeCollection);
- /** Removes documents from a change collection whose wall time is less than the
+ /**
+ * Removes documents from a change collection whose wall time is less than the
* 'expirationTime'. Returns the number of documents deleted. The 'maxRecordIdBound' is the
* maximum record id bound that will not be included in the collection scan.
+ *
+ * The removal process is performed with a collection scan + batch delete.
*/
- static size_t removeExpiredChangeCollectionsDocuments(
+ static size_t removeExpiredChangeCollectionsDocumentsWithCollScan(
OperationContext* opCtx,
const ScopedCollectionAcquisition& changeCollection,
RecordIdBound maxRecordIdBound,
Date_t expirationTime);
+ /**
+ * Removes documents from a change collection whose wall time is less than the
+ * 'expirationTime'. Returns the number of documents deleted.
+ *
+ * The removal process is performed with a series of range truncate calls to the record
+ * store. Some documents might survive this process as deletion happens in chunks and we can
+ * only delete a chunk if we guarantee it is fully expired.
+ */
+ static size_t removeExpiredChangeCollectionsDocumentsWithTruncate(
+ OperationContext* opCtx,
+ const ScopedCollectionAcquisition& changeCollection,
+ Date_t expirationTime);
+
private:
// Change collections purging job stats.
PurgingJobStats _purgingJobStats;
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>
+ _tenantTruncateMarkersMap;
};
} // namespace mongo