summaryrefslogtreecommitdiff
path: root/src/mongo/db/change_stream_change_collection_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/change_stream_change_collection_manager.cpp')
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp79
1 files changed, 49 insertions, 30 deletions
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 5559e49390a..e6aaf3e86f8 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -60,6 +60,15 @@ const auto getChangeCollectionManager =
ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
/**
+ * Returns the list of all tenant ids in the replica set.
+ * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
+ * available.
+ */
+std::vector<boost::optional<TenantId>> getAllTenants() {
+ return {boost::none};
+}
+
+/**
* Creates a Document object from the supplied oplog entry, performs necessary modifications to it
* and then returns it as a BSON object.
*/
@@ -186,6 +195,14 @@ private:
} // namespace
+BSONObj ChangeStreamChangeCollectionManager::PurgingJobStats::toBSON() const {
+ return BSON("totalPass" << totalPass.load() << "docsDeleted" << docsDeleted.load()
+ << "bytesDeleted" << bytesDeleted.load() << "scannedCollections"
+ << scannedCollections.load() << "maxStartWallTimeMillis"
+ << maxStartWallTimeMillis.load() << "timeElapsedMillis"
+ << timeElapsedMillis.load());
+}
+
ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get(
ServiceContext* service) {
return *getChangeCollectionManager(service);
@@ -338,8 +355,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
return changeCollectionsWriter.write(opCtx, opDebug);
}
-boost::optional<RecordIdBound>
-ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
+boost::optional<ChangeCollectionPurgingJobMetadata>
+ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) {
const auto isExpired = [&](const BSONObj& changeDoc) {
const BSONElement& wallElem = changeDoc["wall"];
@@ -354,6 +371,7 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
BSONObj currChangeDoc;
RecordId currRecordId;
+ boost::optional<long long> firstDocWallTime;
boost::optional<RecordId> prevRecordId;
boost::optional<RecordId> prevPrevRecordId;
@@ -364,17 +382,29 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
while (true) {
auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId);
switch (getNextState) {
- case PlanExecutor::IS_EOF:
+ case PlanExecutor::IS_EOF: {
// Either the collection is empty (case in which return boost::none), or all the
// documents have expired. The remover job should never delete the last entry of a
// change collection, so return the recordId of the document previous to the last
// one.
- return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get())
- : boost::optional<RecordIdBound>();
+ if (!prevPrevRecordId) {
+ return boost::none;
+ }
+
+ return {{*firstDocWallTime, RecordIdBound(*prevPrevRecordId)}};
+ }
case PlanExecutor::ADVANCED: {
+ if (!prevRecordId.has_value()) {
+ firstDocWallTime =
+ boost::make_optional(currChangeDoc["wall"].Date().toMillisSinceEpoch());
+ }
+
if (!isExpired(currChangeDoc)) {
- return prevRecordId ? RecordIdBound(prevRecordId.get())
- : boost::optional<RecordIdBound>();
+ if (!prevRecordId) {
+ return boost::none;
+ }
+
+ return {{*firstDocWallTime, RecordIdBound(*prevRecordId)}};
}
}
}
@@ -387,28 +417,9 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
}
size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- OperationContext* opCtx, boost::optional<TenantId> tenantId, const Date_t& expirationTime) {
- // Acquire intent-exclusive lock on the change collection. Early exit if the collection
- // doesn't exist.
- const auto changeCollection =
- AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
-
- // Early exit if collection does not exist, or if running on a secondary (requires
- // opCtx->lockState()->isRSTLLocked()).
- if (!changeCollection ||
- !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
- opCtx, NamespaceString::kConfigDb)) {
- return 0;
- }
-
- const auto maxRecordIdBound =
- getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime);
-
- // Early exit if there are no expired documents to be removed.
- if (!maxRecordIdBound.has_value()) {
- return 0;
- }
-
+ OperationContext* opCtx,
+ const CollectionPtr* changeCollection,
+ const RecordIdBound& maxRecordIdBound) {
auto params = std::make_unique<DeleteStageParams>();
params->isMulti = true;
@@ -425,7 +436,15 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume
std::move(batchedDeleteParams));
try {
- return deleteExecutor->executeDelete();
+ (void)deleteExecutor->executeDelete();
+ auto batchedDeleteStats = deleteExecutor->getBatchedDeleteStats();
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ changeCollectionManager.getPurgingJobStats().docsDeleted.fetchAndAddRelaxed(
+ batchedDeleteStats.docsDeleted);
+ changeCollectionManager.getPurgingJobStats().bytesDeleted.fetchAndAddRelaxed(
+ batchedDeleteStats.bytesDeleted);
+
+ return batchedDeleteStats.docsDeleted;
} catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) {
// It is expected that a collection drop can kill a query plan while deleting an old
// document, so ignore this error.