From d1f935358d6f31c972c5a717b3bbc994b258d2d3 Mon Sep 17 00:00:00 2001 From: Denis Grebennicov Date: Thu, 1 Sep 2022 12:56:22 +0000 Subject: SERVER-66636 Introduce server parameters to track storage and TTL stats for the change collections Co-author: csum112 --- src/mongo/db/SConscript | 1 + ...ange_collection_expired_change_remover_test.cpp | 34 ++++++--- ...change_collection_expired_documents_remover.cpp | 43 ++++++++++- .../db/change_stream_change_collection_manager.cpp | 79 ++++++++++++-------- .../db/change_stream_change_collection_manager.h | 84 +++++++++++++++++++--- src/mongo/db/exec/batched_delete_stage.cpp | 7 +- src/mongo/db/exec/batched_delete_stage.h | 1 + src/mongo/db/exec/batched_delete_stage.idl | 1 + src/mongo/db/exec/delete_stage.cpp | 1 + src/mongo/db/exec/plan_stats.h | 1 + src/mongo/db/stats/SConscript | 11 +++ .../db/stats/change_collection_server_status.cpp | 62 ++++++++++++++++ 12 files changed, 273 insertions(+), 52 deletions(-) create mode 100644 src/mongo/db/stats/change_collection_server_status.cpp diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 93c4345d51e..946158a1ca5 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2241,6 +2241,7 @@ env.Library( 'session/sessions_collection_rs', 'session/sessions_collection_standalone', 'startup_warnings_mongod', + 'stats/change_collection_server_status', 'stats/counters', 'stats/serveronly_stats', 'stats/top', diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp index ccd323202e8..e3ee3c411f2 100644 --- a/src/mongo/db/change_collection_expired_change_remover_test.cpp +++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp @@ -114,11 +114,28 @@ protected: void dropAndRecreateChangeCollection(OperationContext* opCtx, boost::optional tenantId) { - auto changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.dropChangeCollection(opCtx, tenantId); changeCollectionManager.createChangeCollection(opCtx, tenantId); } + size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx, + boost::optional tenantId, + const Date_t& expirationTime) { + // Acquire intent-exclusive lock on the change collection. Early exit if the collection + // doesn't exist. + const auto changeCollection = + AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; + + // Get the 'maxRecordIdBound' and perform the removal of the expired documents. + const auto maxRecordIdBound = + ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( + opCtx, &*changeCollection, expirationTime) + ->maxRecordIdBound; + return ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( + opCtx, &*changeCollection, maxRecordIdBound); + } + const boost::optional _tenantId; RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams", true}; @@ -151,9 +168,10 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, VerifyLastExpiredDocument) { auto changeCollection = AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kRead, _tenantId}; - const auto maxExpiredRecordId = - ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( - opCtx, &*changeCollection, expirationTime); + auto maxExpiredRecordId = + ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( + opCtx, &*changeCollection, expirationTime) + ->maxRecordIdBound; // Get the document found at 'maxExpiredRecordId' and test it against 'lastExpiredDocument'. auto scanExecutor = @@ -192,9 +210,7 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldRemoveOnlyExpiredDocument insertDocumentToChangeCollection(opCtx, _tenantId, notExpired); // Verify that only the required documents are removed. - ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - opCtx, _tenantId, expirationTime), - 2); + ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, expirationTime), 2); // Only the 'notExpired' document is left in the change collection. const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId); @@ -213,9 +229,7 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldLeaveAtLeastOneDocument) } // Verify that all but the last document is removed. - ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - opCtx, _tenantId, now()), - 99); + ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, now()), 99); // Only the last document is left in the change collection. const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId); diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp index dc1aadeaa06..5eae8abdf76 100644 --- a/src/mongo/db/change_collection_expired_documents_remover.cpp +++ b/src/mongo/db/change_collection_expired_documents_remover.cpp @@ -29,9 +29,11 @@ #include "mongo/db/change_collection_expired_documents_remover.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_streams_cluster_parameter_gen.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" @@ -83,23 +85,58 @@ void removeExpiredDocuments(Client* client) { // Number of documents removed in the current pass. size_t removedCount = 0; + long long maxStartWallTime = 0; + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get()); for (const auto& tenantId : getAllTenants()) { auto expiredAfterSeconds = getExpireAfterSeconds(tenantId); invariant(expiredAfterSeconds); + + // Acquire intent-exclusive lock on the change collection. + AutoGetChangeCollection changeCollection{ + opCtx.get(), AutoGetChangeCollection::AccessMode::kWrite, tenantId}; + + // Early exit if collection does not exist or if running on a secondary (requires + // opCtx->lockState()->isRSTLLocked()). + if (!changeCollection || + !repl::ReplicationCoordinator::get(opCtx.get()) + ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kConfigDb)) { + continue; + } + + // Get the metadata required for the removal of the expired change collection + // documents. Early exit if the metadata is missing, indicating that there is nothing + // to remove. + auto purgingJobMetadata = + ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( + opCtx.get(), + &*changeCollection, + currentWallTime - Seconds(*expiredAfterSeconds)); + if (!purgingJobMetadata) { + continue; + } + removedCount += ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - opCtx.get(), tenantId, currentWallTime - Seconds(*expiredAfterSeconds)); + opCtx.get(), &*changeCollection, purgingJobMetadata->maxRecordIdBound); + changeCollectionManager.getPurgingJobStats().scannedCollections.fetchAndAddRelaxed(1); + maxStartWallTime = + std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis); } + changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store(maxStartWallTime); - // TODO SERVER-66636 Use server parameters to track periodic job statistics per tenant. + const auto jobDurationMillis = clock->now() - currentWallTime; if (removedCount > 0) { LOGV2_DEBUG(6663503, 3, "Periodic expired change collection job finished executing", "numberOfRemovals"_attr = removedCount, - "jobDuration"_attr = (clock->now() - currentWallTime).toString()); + "jobDuration"_attr = jobDurationMillis.toString()); } + + changeCollectionManager.getPurgingJobStats().totalPass.fetchAndAddRelaxed(1); + changeCollectionManager.getPurgingJobStats().timeElapsedMillis.fetchAndAddRelaxed( + jobDurationMillis.count()); } catch (const DBException& exception) { if (exception.toStatus() != ErrorCodes::OK) { LOGV2_WARNING(6663504, diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 5559e49390a..e6aaf3e86f8 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -59,6 +59,15 @@ namespace { const auto getChangeCollectionManager = ServiceContext::declareDecoration>(); +/** + * Returns the list of all tenant ids in the replica set. + * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is + * available. + */ +std::vector> getAllTenants() { + return {boost::none}; +} + /** * Creates a Document object from the supplied oplog entry, performs necessary modifications to it * and then returns it as a BSON object. @@ -186,6 +195,14 @@ private: } // namespace +BSONObj ChangeStreamChangeCollectionManager::PurgingJobStats::toBSON() const { + return BSON("totalPass" << totalPass.load() << "docsDeleted" << docsDeleted.load() + << "bytesDeleted" << bytesDeleted.load() << "scannedCollections" + << scannedCollections.load() << "maxStartWallTimeMillis" + << maxStartWallTimeMillis.load() << "timeElapsedMillis" + << timeElapsedMillis.load()); +} + ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get( ServiceContext* service) { return *getChangeCollectionManager(service); @@ -338,8 +355,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( return changeCollectionsWriter.write(opCtx, opDebug); } -boost::optional -ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( +boost::optional +ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) { const auto isExpired = [&](const BSONObj& changeDoc) { const BSONElement& wallElem = changeDoc["wall"]; @@ -354,6 +371,7 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( BSONObj currChangeDoc; RecordId currRecordId; + boost::optional firstDocWallTime; boost::optional prevRecordId; boost::optional prevPrevRecordId; @@ -364,17 +382,29 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( while (true) { auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId); switch (getNextState) { - case PlanExecutor::IS_EOF: + case PlanExecutor::IS_EOF: { // Either the collection is empty (case in which return boost::none), or all the // documents have expired. The remover job should never delete the last entry of a // change collection, so return the recordId of the document previous to the last // one. - return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get()) - : boost::optional(); + if (!prevPrevRecordId) { + return boost::none; + } + + return {{*firstDocWallTime, RecordIdBound(*prevPrevRecordId)}}; + } case PlanExecutor::ADVANCED: { + if (!prevRecordId.has_value()) { + firstDocWallTime = + boost::make_optional(currChangeDoc["wall"].Date().toMillisSinceEpoch()); + } + if (!isExpired(currChangeDoc)) { - return prevRecordId ? RecordIdBound(prevRecordId.get()) - : boost::optional(); + if (!prevRecordId) { + return boost::none; + } + + return {{*firstDocWallTime, RecordIdBound(*prevRecordId)}}; } } } @@ -387,28 +417,9 @@ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( } size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - OperationContext* opCtx, boost::optional tenantId, const Date_t& expirationTime) { - // Acquire intent-exclusive lock on the change collection. Early exit if the collection - // doesn't exist. - const auto changeCollection = - AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; - - // Early exit if collection does not exist, or if running on a secondary (requires - // opCtx->lockState()->isRSTLLocked()). - if (!changeCollection || - !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( - opCtx, NamespaceString::kConfigDb)) { - return 0; - } - - const auto maxRecordIdBound = - getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime); - - // Early exit if there are no expired documents to be removed. - if (!maxRecordIdBound.has_value()) { - return 0; - } - + OperationContext* opCtx, + const CollectionPtr* changeCollection, + const RecordIdBound& maxRecordIdBound) { auto params = std::make_unique(); params->isMulti = true; @@ -425,7 +436,15 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume std::move(batchedDeleteParams)); try { - return deleteExecutor->executeDelete(); + (void)deleteExecutor->executeDelete(); + auto batchedDeleteStats = deleteExecutor->getBatchedDeleteStats(); + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + changeCollectionManager.getPurgingJobStats().docsDeleted.fetchAndAddRelaxed( + batchedDeleteStats.docsDeleted); + changeCollectionManager.getPurgingJobStats().bytesDeleted.fetchAndAddRelaxed( + batchedDeleteStats.bytesDeleted); + + return batchedDeleteStats.docsDeleted; } catch (const ExceptionFor&) { // It is expected that a collection drop can kill a query plan while deleting an old // document, so ignore this error. diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 21a60c1bf0e..49ff64d635b 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -36,11 +36,63 @@ namespace mongo { +/** + * Metadata associated with a particular change collection that is used by the purging job. + */ +struct ChangeCollectionPurgingJobMetadata { + // The wall time in milliseconds of the first document of the change collection. + long long firstDocWallTimeMillis; + + // The maximum record id beyond which the change collection documents will be not deleted. + RecordIdBound maxRecordIdBound; +}; + /** * Manages the creation, deletion and insertion lifecycle of the change collection. */ class ChangeStreamChangeCollectionManager { public: + /** + * Statistics of the change collection purging job. + */ + struct PurgingJobStats { + /** + * Total number of deletion passes completed by the purging job. + */ + AtomicWord totalPass; + + /** + * Cumulative number of change collections documents deleted by the purging job. + */ + AtomicWord docsDeleted; + + /** + * Cumulative size in bytes of all deleted documents from all change collections by the + * purging job. + */ + AtomicWord bytesDeleted; + + /** + * Cumulative number of change collections scanned by the purging job. + */ + AtomicWord scannedCollections; + + /** + * Cumulative number of milliseconds elapsed since the first pass by the purging job. + */ + AtomicWord timeElapsedMillis; + + /** + * Maximum wall time in milliseconds from the first document of each change collection. + */ + AtomicWord maxStartWallTimeMillis; + + /** + * Serializes the purging job statistics to the BSON object. + */ + BSONObj toBSON() const; + }; + explicit ChangeStreamChangeCollectionManager(ServiceContext* service) {} ~ChangeStreamChangeCollectionManager() = default; @@ -122,23 +174,39 @@ public: bool isGlobalIXLockAcquired, OpDebug* opDebug); + PurgingJobStats& getPurgingJobStats() { + return _purgingJobStats; + } + /** - * Forward-scans the given change collection to return the recordId of the last, non-terminal - * document having the wall time less than the 'expirationTime'. Returns 'boost::none' if the + * Forward-scans the given change collection to return the wall time of the first document as + * well as recordId of the last, non-terminal document having the wall time less than the + * 'expirationTime'. Returns 'boost::none' if the collection is empty, or there are no expired + * documents, or the collection contains a single expired document. + */ + + /** + * Forward scans the provided change collection and returns its metadata that will be used by + * the purging job to perform deletion on it. The method returns 'boost::none' if either the * collection is empty, or there are no expired documents, or the collection contains a single * expired document. */ - static boost::optional getChangeCollectionMaxExpiredRecordId( - OperationContext* opCtx, - const CollectionPtr* changeCollection, - const Date_t& expirationTime); + static boost::optional + getChangeCollectionPurgingJobMetadata(OperationContext* opCtx, + const CollectionPtr* changeCollection, + const Date_t& expirationTime); /** * Removes expired documents from the change collection for the provided 'tenantId'. A document * whose retention time is less than the 'expirationTime' is deleted. + * Returns wall time of the first document as well as number of documents deleted. */ static size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx, - boost::optional tenantId, - const Date_t& expirationTime); + const CollectionPtr* changeCollection, + const RecordIdBound& maxRecordIdBound); + +private: + // Change collections purging job stats. + PurgingJobStats _purgingJobStats; }; } // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp index 63242ad5206..b12a5c0d73b 100644 --- a/src/mongo/db/exec/batched_delete_stage.cpp +++ b/src/mongo/db/exec/batched_delete_stage.cpp @@ -255,6 +255,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { std::set recordsToSkip; unsigned int docsDeleted = 0; + unsigned int bytesDeleted = 0; unsigned int bufferOffset = 0; long long timeInBatch = 0; @@ -264,7 +265,8 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { "BatchedDeleteStage::_deleteBatch", collection()->ns().ns(), [&] { - timeInBatch = _commitBatch(out, &recordsToSkip, &docsDeleted, &bufferOffset); + timeInBatch = + _commitBatch(out, &recordsToSkip, &docsDeleted, &bytesDeleted, &bufferOffset); return PlanStage::NEED_TIME; }, [&] { @@ -292,6 +294,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { incrementSSSMetricNoOverflow(batchedDeletesSSS.batches, 1); incrementSSSMetricNoOverflow(batchedDeletesSSS.timeInBatchMillis, timeInBatch); _specificStats.docsDeleted += docsDeleted; + _specificStats.bytesDeleted += bytesDeleted; if (bufferOffset < _stagedDeletesBuffer.size()) { // targetBatchTimeMS was met. Remove staged deletes that have been evaluated @@ -312,6 +315,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { long long BatchedDeleteStage::_commitBatch(WorkingSetID* out, std::set* recordsToSkip, unsigned int* docsDeleted, + unsigned int* bytesDeleted, unsigned int* bufferOffset) { // Estimate the size of the oplog entry that would result from committing the batch, // to ensure we emit an oplog entry that's within the 16MB BSON limit. @@ -395,6 +399,7 @@ long long BatchedDeleteStage::_commitBatch(WorkingSetID* out, : Collection::StoreDeletedDoc::Off); (*docsDeleted)++; + (*bytesDeleted) += bsonObjDoc.objsize(); batchedDeleteStageSleepAfterNDocuments.executeIf( [&](const BSONObj& data) { diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h index 8e5bbff2b21..84c166ba0dd 100644 --- a/src/mongo/db/exec/batched_delete_stage.h +++ b/src/mongo/db/exec/batched_delete_stage.h @@ -137,6 +137,7 @@ private: long long _commitBatch(WorkingSetID* out, std::set* recordsToSkip, unsigned int* docsDeleted, + unsigned int* bytesDeleted, unsigned int* bufferOffset); // Attempts to stage a new delete in the _stagedDeletesBuffer. Returns the PlanStage::StageState diff --git a/src/mongo/db/exec/batched_delete_stage.idl b/src/mongo/db/exec/batched_delete_stage.idl index 5449c0e2562..af9542ed6b2 100644 --- a/src/mongo/db/exec/batched_delete_stage.idl +++ b/src/mongo/db/exec/batched_delete_stage.idl @@ -41,6 +41,7 @@ server_parameters: default: 2097152 # 2MB validator: gte: 0 + lte: 2147483647 batchedDeletesTargetBatchDocs: description: "Threshold of documents at which a batch of document deletions is committed. A value of zero means unlimited" set_at: [startup, runtime] diff --git a/src/mongo/db/exec/delete_stage.cpp b/src/mongo/db/exec/delete_stage.cpp index 57368624775..8b85e082fd0 100644 --- a/src/mongo/db/exec/delete_stage.cpp +++ b/src/mongo/db/exec/delete_stage.cpp @@ -304,6 +304,7 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { } } _specificStats.docsDeleted += _params->numStatsForDoc ? _params->numStatsForDoc(bsonObjDoc) : 1; + _specificStats.bytesDeleted += bsonObjDoc.objsize(); if (_params->returnDeleted) { // After deleting the document, the RecordId associated with this member is invalid. diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index 6f621872985..d7336e79a31 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -411,6 +411,7 @@ struct DeleteStats : public SpecificStats { } size_t docsDeleted = 0u; + size_t bytesDeleted = 0u; }; struct BatchedDeleteStats : public DeleteStats { diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript index 668c60eed5b..eb6ff375190 100644 --- a/src/mongo/db/stats/SConscript +++ b/src/mongo/db/stats/SConscript @@ -84,6 +84,17 @@ env.Library( ], ) +env.Library( + target='change_collection_server_status', + source=['change_collection_server_status.cpp'], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/commands/server_status_core', + '$BUILD_DIR/mongo/db/server_base', + ], +) + env.Library( target='transaction_stats', source=[ diff --git a/src/mongo/db/stats/change_collection_server_status.cpp b/src/mongo/db/stats/change_collection_server_status.cpp new file mode 100644 index 00000000000..ee424a4ae43 --- /dev/null +++ b/src/mongo/db/stats/change_collection_server_status.cpp @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/commands/server_status.h" + +namespace mongo { +/** + * Adds a section 'changeCollections' to the serverStatus metrics that provides aggregated + * statistics for change collections. + */ +class ChangeCollectionServerStatus final : public ServerStatusSection { +public: + ChangeCollectionServerStatus() : ServerStatusSection("changeCollections") {} + + bool includeByDefault() const override { + return true; + } + + void appendSection(OperationContext* opCtx, + const BSONElement& configElement, + BSONObjBuilder* result) const override { + // Append the section only when running in serverless. + if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + return; + } + + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + result->append(getSectionName(), + BSON("purgingJob" << changeCollectionManager.getPurgingJobStats().toBSON())); + } +} changeCollectionServerStatus; + +} // namespace mongo -- cgit v1.2.1