summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2022-09-01 12:56:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-01 13:48:08 +0000
commitd1f935358d6f31c972c5a717b3bbc994b258d2d3 (patch)
tree554654bd52216ed5cc8e66f48a29fc8ed03655c8
parentce3443291cd663ae5375941f380d4bc52bc88a85 (diff)
downloadmongo-d1f935358d6f31c972c5a717b3bbc994b258d2d3.tar.gz
SERVER-66636 Introduce server parameters to track storage and TTL stats for the change collections
Co-author: csum112<catalin.sumanaru@mongodb.com>
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/change_collection_expired_change_remover_test.cpp34
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp43
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp79
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h84
-rw-r--r--src/mongo/db/exec/batched_delete_stage.cpp7
-rw-r--r--src/mongo/db/exec/batched_delete_stage.h1
-rw-r--r--src/mongo/db/exec/batched_delete_stage.idl1
-rw-r--r--src/mongo/db/exec/delete_stage.cpp1
-rw-r--r--src/mongo/db/exec/plan_stats.h1
-rw-r--r--src/mongo/db/stats/SConscript11
-rw-r--r--src/mongo/db/stats/change_collection_server_status.cpp62
12 files changed, 273 insertions, 52 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 93c4345d51e..946158a1ca5 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2241,6 +2241,7 @@ env.Library(
'session/sessions_collection_rs',
'session/sessions_collection_standalone',
'startup_warnings_mongod',
+ 'stats/change_collection_server_status',
'stats/counters',
'stats/serveronly_stats',
'stats/top',
diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp
index ccd323202e8..e3ee3c411f2 100644
--- a/src/mongo/db/change_collection_expired_change_remover_test.cpp
+++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp
@@ -114,11 +114,28 @@ protected:
void dropAndRecreateChangeCollection(OperationContext* opCtx,
boost::optional<TenantId> tenantId) {
- auto changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.dropChangeCollection(opCtx, tenantId);
changeCollectionManager.createChangeCollection(opCtx, tenantId);
}
+ size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ const Date_t& expirationTime) {
+ // Acquire intent-exclusive lock on the change collection. Early exit if the collection
+ // doesn't exist.
+ const auto changeCollection =
+ AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
+
+ // Get the 'maxRecordIdBound' and perform the removal of the expired documents.
+ const auto maxRecordIdBound =
+ ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
+ opCtx, &*changeCollection, expirationTime)
+ ->maxRecordIdBound;
+ return ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
+ opCtx, &*changeCollection, maxRecordIdBound);
+ }
+
const boost::optional<TenantId> _tenantId;
RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams",
true};
@@ -151,9 +168,10 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, VerifyLastExpiredDocument) {
auto changeCollection =
AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kRead, _tenantId};
- const auto maxExpiredRecordId =
- ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
- opCtx, &*changeCollection, expirationTime);
+ auto maxExpiredRecordId =
+ ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
+ opCtx, &*changeCollection, expirationTime)
+ ->maxRecordIdBound;
// Get the document found at 'maxExpiredRecordId' and test it against 'lastExpiredDocument'.
auto scanExecutor =
@@ -192,9 +210,7 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldRemoveOnlyExpiredDocument
insertDocumentToChangeCollection(opCtx, _tenantId, notExpired);
// Verify that only the required documents are removed.
- ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- opCtx, _tenantId, expirationTime),
- 2);
+ ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, expirationTime), 2);
// Only the 'notExpired' document is left in the change collection.
const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId);
@@ -213,9 +229,7 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldLeaveAtLeastOneDocument)
}
// Verify that all but the last document is removed.
- ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- opCtx, _tenantId, now()),
- 99);
+ ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, now()), 99);
// Only the last document is left in the change collection.
const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId);
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
index dc1aadeaa06..5eae8abdf76 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.cpp
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -29,9 +29,11 @@
#include "mongo/db/change_collection_expired_documents_remover.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/mutex.h"
@@ -83,23 +85,58 @@ void removeExpiredDocuments(Client* client) {
// Number of documents removed in the current pass.
size_t removedCount = 0;
+ long long maxStartWallTime = 0;
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get());
for (const auto& tenantId : getAllTenants()) {
auto expiredAfterSeconds = getExpireAfterSeconds(tenantId);
invariant(expiredAfterSeconds);
+
+ // Acquire intent-exclusive lock on the change collection.
+ AutoGetChangeCollection changeCollection{
+ opCtx.get(), AutoGetChangeCollection::AccessMode::kWrite, tenantId};
+
+ // Early exit if collection does not exist or if running on a secondary (requires
+ // opCtx->lockState()->isRSTLLocked()).
+ if (!changeCollection ||
+ !repl::ReplicationCoordinator::get(opCtx.get())
+ ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kConfigDb)) {
+ continue;
+ }
+
+ // Get the metadata required for the removal of the expired change collection
+ // documents. Early exit if the metadata is missing, indicating that there is nothing
+ // to remove.
+ auto purgingJobMetadata =
+ ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
+ opCtx.get(),
+ &*changeCollection,
+ currentWallTime - Seconds(*expiredAfterSeconds));
+ if (!purgingJobMetadata) {
+ continue;
+ }
+
removedCount +=
ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- opCtx.get(), tenantId, currentWallTime - Seconds(*expiredAfterSeconds));
+ opCtx.get(), &*changeCollection, purgingJobMetadata->maxRecordIdBound);
+ changeCollectionManager.getPurgingJobStats().scannedCollections.fetchAndAddRelaxed(1);
+ maxStartWallTime =
+ std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis);
}
+ changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store(maxStartWallTime);
- // TODO SERVER-66636 Use server parameters to track periodic job statistics per tenant.
+ const auto jobDurationMillis = clock->now() - currentWallTime;
if (removedCount > 0) {
LOGV2_DEBUG(6663503,
3,
"Periodic expired change collection job finished executing",
"numberOfRemovals"_attr = removedCount,
- "jobDuration"_attr = (clock->now() - currentWallTime).toString());
+ "jobDuration"_attr = jobDurationMillis.toString());
}
+
+ changeCollectionManager.getPurgingJobStats().totalPass.fetchAndAddRelaxed(1);
+ changeCollectionManager.getPurgingJobStats().timeElapsedMillis.fetchAndAddRelaxed(
+ jobDurationMillis.count());
} catch (const DBException& exception) {
if (exception.toStatus() != ErrorCodes::OK) {
LOGV2_WARNING(6663504,
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 5559e49390a..e6aaf3e86f8 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -60,6 +60,15 @@ const auto getChangeCollectionManager =
ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
/**
+ * Returns the list of all tenant ids in the replica set.
+ * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
+ * available.
+ */
+std::vector<boost::optional<TenantId>> getAllTenants() {
+ return {boost::none};
+}
+
+/**
* Creates a Document object from the supplied oplog entry, performs necessary modifications to it
* and then returns it as a BSON object.
*/
@@ -186,6 +195,14 @@ private:
} // namespace
+BSONObj ChangeStreamChangeCollectionManager::PurgingJobStats::toBSON() const {
+ return BSON("totalPass" << totalPass.load() << "docsDeleted" << docsDeleted.load()
+ << "bytesDeleted" << bytesDeleted.load() << "scannedCollections"
+ << scannedCollections.load() << "maxStartWallTimeMillis"
+ << maxStartWallTimeMillis.load() << "timeElapsedMillis"
+ << timeElapsedMillis.load());
+}
+
ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get(
ServiceContext* service) {
return *getChangeCollectionManager(service);
@@ -338,8 +355,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
return changeCollectionsWriter.write(opCtx, opDebug);
}
-boost::optional<RecordIdBound>
-ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
+boost::optional<ChangeCollectionPurgingJobMetadata>
+ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) {
const auto isExpired = [&](const BSONObj& changeDoc) {
const BSONElement& wallElem = changeDoc["wall"];
@@ -354,6 +371,7 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
BSONObj currChangeDoc;
RecordId currRecordId;
+ boost::optional<long long> firstDocWallTime;
boost::optional<RecordId> prevRecordId;
boost::optional<RecordId> prevPrevRecordId;
@@ -364,17 +382,29 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
while (true) {
auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId);
switch (getNextState) {
- case PlanExecutor::IS_EOF:
+ case PlanExecutor::IS_EOF: {
// Either the collection is empty (case in which return boost::none), or all the
// documents have expired. The remover job should never delete the last entry of a
// change collection, so return the recordId of the document previous to the last
// one.
- return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get())
- : boost::optional<RecordIdBound>();
+ if (!prevPrevRecordId) {
+ return boost::none;
+ }
+
+ return {{*firstDocWallTime, RecordIdBound(*prevPrevRecordId)}};
+ }
case PlanExecutor::ADVANCED: {
+ if (!prevRecordId.has_value()) {
+ firstDocWallTime =
+ boost::make_optional(currChangeDoc["wall"].Date().toMillisSinceEpoch());
+ }
+
if (!isExpired(currChangeDoc)) {
- return prevRecordId ? RecordIdBound(prevRecordId.get())
- : boost::optional<RecordIdBound>();
+ if (!prevRecordId) {
+ return boost::none;
+ }
+
+ return {{*firstDocWallTime, RecordIdBound(*prevRecordId)}};
}
}
}
@@ -387,28 +417,9 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
}
size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- OperationContext* opCtx, boost::optional<TenantId> tenantId, const Date_t& expirationTime) {
- // Acquire intent-exclusive lock on the change collection. Early exit if the collection
- // doesn't exist.
- const auto changeCollection =
- AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
-
- // Early exit if collection does not exist, or if running on a secondary (requires
- // opCtx->lockState()->isRSTLLocked()).
- if (!changeCollection ||
- !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
- opCtx, NamespaceString::kConfigDb)) {
- return 0;
- }
-
- const auto maxRecordIdBound =
- getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime);
-
- // Early exit if there are no expired documents to be removed.
- if (!maxRecordIdBound.has_value()) {
- return 0;
- }
-
+ OperationContext* opCtx,
+ const CollectionPtr* changeCollection,
+ const RecordIdBound& maxRecordIdBound) {
auto params = std::make_unique<DeleteStageParams>();
params->isMulti = true;
@@ -425,7 +436,15 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume
std::move(batchedDeleteParams));
try {
- return deleteExecutor->executeDelete();
+ (void)deleteExecutor->executeDelete();
+ auto batchedDeleteStats = deleteExecutor->getBatchedDeleteStats();
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ changeCollectionManager.getPurgingJobStats().docsDeleted.fetchAndAddRelaxed(
+ batchedDeleteStats.docsDeleted);
+ changeCollectionManager.getPurgingJobStats().bytesDeleted.fetchAndAddRelaxed(
+ batchedDeleteStats.bytesDeleted);
+
+ return batchedDeleteStats.docsDeleted;
} catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) {
// It is expected that a collection drop can kill a query plan while deleting an old
// document, so ignore this error.
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 21a60c1bf0e..49ff64d635b 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -37,10 +37,62 @@
namespace mongo {
/**
+ * Metadata associated with a particular change collection that is used by the purging job.
+ */
+struct ChangeCollectionPurgingJobMetadata {
+ // The wall time in milliseconds of the first document of the change collection.
+ long long firstDocWallTimeMillis;
+
+ // The maximum record id beyond which the change collection documents will be not deleted.
+ RecordIdBound maxRecordIdBound;
+};
+
+/**
* Manages the creation, deletion and insertion lifecycle of the change collection.
*/
class ChangeStreamChangeCollectionManager {
public:
+ /**
+ * Statistics of the change collection purging job.
+ */
+ struct PurgingJobStats {
+ /**
+ * Total number of deletion passes completed by the purging job.
+ */
+ AtomicWord<long long> totalPass;
+
+ /**
+ * Cumulative number of change collections documents deleted by the purging job.
+ */
+ AtomicWord<long long> docsDeleted;
+
+ /**
+ * Cumulative size in bytes of all deleted documents from all change collections by the
+ * purging job.
+ */
+ AtomicWord<long long> bytesDeleted;
+
+ /**
+ * Cumulative number of change collections scanned by the purging job.
+ */
+ AtomicWord<long long> scannedCollections;
+
+ /**
+ * Cumulative number of milliseconds elapsed since the first pass by the purging job.
+ */
+ AtomicWord<long long> timeElapsedMillis;
+
+ /**
+ * Maximum wall time in milliseconds from the first document of each change collection.
+ */
+ AtomicWord<long long> maxStartWallTimeMillis;
+
+ /**
+ * Serializes the purging job statistics to the BSON object.
+ */
+ BSONObj toBSON() const;
+ };
+
explicit ChangeStreamChangeCollectionManager(ServiceContext* service) {}
~ChangeStreamChangeCollectionManager() = default;
@@ -122,23 +174,39 @@ public:
bool isGlobalIXLockAcquired,
OpDebug* opDebug);
+ PurgingJobStats& getPurgingJobStats() {
+ return _purgingJobStats;
+ }
+
/**
- * Forward-scans the given change collection to return the recordId of the last, non-terminal
- * document having the wall time less than the 'expirationTime'. Returns 'boost::none' if the
+ * Forward-scans the given change collection to return the wall time of the first document as
+ * well as recordId of the last, non-terminal document having the wall time less than the
+ * 'expirationTime'. Returns 'boost::none' if the collection is empty, or there are no expired
+ * documents, or the collection contains a single expired document.
+ */
+
+ /**
+ * Forward scans the provided change collection and returns its metadata that will be used by
+ * the purging job to perform deletion on it. The method returns 'boost::none' if either the
* collection is empty, or there are no expired documents, or the collection contains a single
* expired document.
*/
- static boost::optional<RecordIdBound> getChangeCollectionMaxExpiredRecordId(
- OperationContext* opCtx,
- const CollectionPtr* changeCollection,
- const Date_t& expirationTime);
+ static boost::optional<ChangeCollectionPurgingJobMetadata>
+ getChangeCollectionPurgingJobMetadata(OperationContext* opCtx,
+ const CollectionPtr* changeCollection,
+ const Date_t& expirationTime);
/**
* Removes expired documents from the change collection for the provided 'tenantId'. A document
* whose retention time is less than the 'expirationTime' is deleted.
+ * Returns wall time of the first document as well as number of documents deleted.
*/
static size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx,
- boost::optional<TenantId> tenantId,
- const Date_t& expirationTime);
+ const CollectionPtr* changeCollection,
+ const RecordIdBound& maxRecordIdBound);
+
+private:
+ // Change collections purging job stats.
+ PurgingJobStats _purgingJobStats;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp
index 63242ad5206..b12a5c0d73b 100644
--- a/src/mongo/db/exec/batched_delete_stage.cpp
+++ b/src/mongo/db/exec/batched_delete_stage.cpp
@@ -255,6 +255,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
std::set<WorkingSetID> recordsToSkip;
unsigned int docsDeleted = 0;
+ unsigned int bytesDeleted = 0;
unsigned int bufferOffset = 0;
long long timeInBatch = 0;
@@ -264,7 +265,8 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
"BatchedDeleteStage::_deleteBatch",
collection()->ns().ns(),
[&] {
- timeInBatch = _commitBatch(out, &recordsToSkip, &docsDeleted, &bufferOffset);
+ timeInBatch =
+ _commitBatch(out, &recordsToSkip, &docsDeleted, &bytesDeleted, &bufferOffset);
return PlanStage::NEED_TIME;
},
[&] {
@@ -292,6 +294,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
incrementSSSMetricNoOverflow(batchedDeletesSSS.batches, 1);
incrementSSSMetricNoOverflow(batchedDeletesSSS.timeInBatchMillis, timeInBatch);
_specificStats.docsDeleted += docsDeleted;
+ _specificStats.bytesDeleted += bytesDeleted;
if (bufferOffset < _stagedDeletesBuffer.size()) {
// targetBatchTimeMS was met. Remove staged deletes that have been evaluated
@@ -312,6 +315,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
long long BatchedDeleteStage::_commitBatch(WorkingSetID* out,
std::set<WorkingSetID>* recordsToSkip,
unsigned int* docsDeleted,
+ unsigned int* bytesDeleted,
unsigned int* bufferOffset) {
// Estimate the size of the oplog entry that would result from committing the batch,
// to ensure we emit an oplog entry that's within the 16MB BSON limit.
@@ -395,6 +399,7 @@ long long BatchedDeleteStage::_commitBatch(WorkingSetID* out,
: Collection::StoreDeletedDoc::Off);
(*docsDeleted)++;
+ (*bytesDeleted) += bsonObjDoc.objsize();
batchedDeleteStageSleepAfterNDocuments.executeIf(
[&](const BSONObj& data) {
diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h
index 8e5bbff2b21..84c166ba0dd 100644
--- a/src/mongo/db/exec/batched_delete_stage.h
+++ b/src/mongo/db/exec/batched_delete_stage.h
@@ -137,6 +137,7 @@ private:
long long _commitBatch(WorkingSetID* out,
std::set<WorkingSetID>* recordsToSkip,
unsigned int* docsDeleted,
+ unsigned int* bytesDeleted,
unsigned int* bufferOffset);
// Attempts to stage a new delete in the _stagedDeletesBuffer. Returns the PlanStage::StageState
diff --git a/src/mongo/db/exec/batched_delete_stage.idl b/src/mongo/db/exec/batched_delete_stage.idl
index 5449c0e2562..af9542ed6b2 100644
--- a/src/mongo/db/exec/batched_delete_stage.idl
+++ b/src/mongo/db/exec/batched_delete_stage.idl
@@ -41,6 +41,7 @@ server_parameters:
default: 2097152 # 2MB
validator:
gte: 0
+ lte: 2147483647
batchedDeletesTargetBatchDocs:
description: "Threshold of documents at which a batch of document deletions is committed. A value of zero means unlimited"
set_at: [startup, runtime]
diff --git a/src/mongo/db/exec/delete_stage.cpp b/src/mongo/db/exec/delete_stage.cpp
index 57368624775..8b85e082fd0 100644
--- a/src/mongo/db/exec/delete_stage.cpp
+++ b/src/mongo/db/exec/delete_stage.cpp
@@ -304,6 +304,7 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) {
}
}
_specificStats.docsDeleted += _params->numStatsForDoc ? _params->numStatsForDoc(bsonObjDoc) : 1;
+ _specificStats.bytesDeleted += bsonObjDoc.objsize();
if (_params->returnDeleted) {
// After deleting the document, the RecordId associated with this member is invalid.
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index 6f621872985..d7336e79a31 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -411,6 +411,7 @@ struct DeleteStats : public SpecificStats {
}
size_t docsDeleted = 0u;
+ size_t bytesDeleted = 0u;
};
struct BatchedDeleteStats : public DeleteStats {
diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript
index 668c60eed5b..eb6ff375190 100644
--- a/src/mongo/db/stats/SConscript
+++ b/src/mongo/db/stats/SConscript
@@ -85,6 +85,17 @@ env.Library(
)
env.Library(
+ target='change_collection_server_status',
+ source=['change_collection_server_status.cpp'],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/commands/server_status_core',
+ '$BUILD_DIR/mongo/db/server_base',
+ ],
+)
+
+env.Library(
target='transaction_stats',
source=[
'single_transaction_stats.cpp',
diff --git a/src/mongo/db/stats/change_collection_server_status.cpp b/src/mongo/db/stats/change_collection_server_status.cpp
new file mode 100644
index 00000000000..ee424a4ae43
--- /dev/null
+++ b/src/mongo/db/stats/change_collection_server_status.cpp
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/commands/server_status.h"
+
+namespace mongo {
+/**
+ * Adds a section 'changeCollections' to the serverStatus metrics that provides aggregated
+ * statistics for change collections.
+ */
+class ChangeCollectionServerStatus final : public ServerStatusSection {
+public:
+ ChangeCollectionServerStatus() : ServerStatusSection("changeCollections") {}
+
+ bool includeByDefault() const override {
+ return true;
+ }
+
+ void appendSection(OperationContext* opCtx,
+ const BSONElement& configElement,
+ BSONObjBuilder* result) const override {
+ // Append the section only when running in serverless.
+ if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ return;
+ }
+
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ result->append(getSectionName(),
+ BSON("purgingJob" << changeCollectionManager.getPurgingJobStats().toBSON()));
+ }
+} changeCollectionServerStatus;
+
+} // namespace mongo