summaryrefslogtreecommitdiff
path: root/src/mongo/db/change_collection_expired_documents_remover.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/change_collection_expired_documents_remover.cpp')
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp29
1 files changed, 18 insertions, 11 deletions
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
index 80a816945be..d84fb21b672 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.cpp
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -56,25 +57,31 @@ MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForRemovingExpiredDocuments);
namespace {
-// TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
-// available.
-std::vector<boost::optional<TenantId>> getAllTenants() {
- return {boost::none};
+change_stream_serverless_helpers::TenantSet getConfigDbTenants(OperationContext* opCtx) {
+ auto tenantIds = change_stream_serverless_helpers::getConfigDbTenants(opCtx);
+ if (auto testTenantId = change_stream_serverless_helpers::resolveTenantId(boost::none)) {
+ tenantIds.insert(*testTenantId);
+ }
+
+ return tenantIds;
}
-boost::optional<int64_t> getExpireAfterSeconds(boost::optional<TenantId> tid) {
+boost::optional<int64_t> getExpireAfterSeconds(const TenantId& tenantId) {
auto* clusterParameters = ServerParameterSet::getClusterParameterSet();
auto* changeStreamsParam =
clusterParameters->get<ClusterParameterWithStorage<ChangeStreamsClusterParameterStorage>>(
"changeStreams");
- return changeStreamsParam->getValue(tid).getExpireAfterSeconds();
+
+ // TODO SERVER-69511 Pass 'tenantId' instead of 'boost::none'. Move this function to
+ // 'change_stream_serverless_helpers'.
+ return changeStreamsParam->getValue(boost::none).getExpireAfterSeconds();
}
void removeExpiredDocuments(Client* client) {
// TODO SERVER-66717 Remove this logic from this method. Due to the delay in the feature flag
// activation it was placed here. The remover job should ultimately be initialized at the mongod
// startup when launched in serverless mode.
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
return;
}
@@ -98,7 +105,7 @@ void removeExpiredDocuments(Client* client) {
long long maxStartWallTime = 0;
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get());
- for (const auto& tenantId : getAllTenants()) {
+ for (const auto& tenantId : getConfigDbTenants(opCtx.get())) {
auto expiredAfterSeconds = getExpireAfterSeconds(tenantId);
invariant(expiredAfterSeconds);
@@ -169,13 +176,13 @@ void removeExpiredDocuments(Client* client) {
/**
* Defines a periodic background job to remove expired documents from change collections.
- * The job will run every 'changeCollectionRemoverJobSleepSeconds', as defined in the cluster
- * parameter.
+ * The job will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds', as defined in
+ * the cluster parameter.
*/
class ChangeCollectionExpiredDocumentsRemover {
public:
ChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
- const auto period = Seconds{gChangeCollectionRemoverJobSleepSeconds.load()};
+ const auto period = Seconds{gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds.load()};
_jobAnchor = serviceContext->getPeriodicRunner()->makeJob(
{"ChangeCollectionExpiredDocumentsRemover", removeExpiredDocuments, period});
_jobAnchor.start();