From e2bc23486212cf112156f281a0ab0d315f9f5f88 Mon Sep 17 00:00:00 2001 From: Rishab Joshi Date: Sun, 21 Aug 2022 12:17:00 +0000 Subject: SERVER-66635 Introduce TTL job to delete entries from change collections. --- .../change_collection_expired_document_remover.js | 151 ++++++++++++++ src/mongo/db/SConscript | 17 ++ src/mongo/db/catalog_raii.cpp | 11 +- src/mongo/db/catalog_raii.h | 5 +- ...ange_collection_expired_change_remover_test.cpp | 225 +++++++++++++++++++++ ...change_collection_expired_documents_remover.cpp | 169 ++++++++++++++++ .../change_collection_expired_documents_remover.h | 47 +++++ .../db/change_stream_change_collection_manager.cpp | 150 ++++++++++++-- .../db/change_stream_change_collection_manager.h | 20 +- src/mongo/db/change_streams_cluster_parameter.idl | 8 + src/mongo/db/mongod_main.cpp | 9 +- 11 files changed, 791 insertions(+), 21 deletions(-) create mode 100644 jstests/serverless/change_collection_expired_document_remover.js create mode 100644 src/mongo/db/change_collection_expired_change_remover_test.cpp create mode 100644 src/mongo/db/change_collection_expired_documents_remover.cpp create mode 100644 src/mongo/db/change_collection_expired_documents_remover.h diff --git a/jstests/serverless/change_collection_expired_document_remover.js b/jstests/serverless/change_collection_expired_document_remover.js new file mode 100644 index 00000000000..9c76d84c7bc --- /dev/null +++ b/jstests/serverless/change_collection_expired_document_remover.js @@ -0,0 +1,151 @@ +/** + * Tests the change collection periodic remover job. + * + * @tags: [requires_fcv_61] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); // For configureFailPoint. +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. + +const kExpiredChangeRemovalJobSleepSeconds = 5; +const kExpireAfterSeconds = 1; +const kSleepBetweenWritesSeconds = 5; +const kSafetyMarginMillis = 1; + +const rst = new ReplSetTest({nodes: 2}); + +// TODO SERVER-67267: Add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and +// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. +rst.startSet({ + setParameter: { + "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}), + changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds + } +}); +rst.initiate(); + +const primary = rst.getPrimary(); +const secondary = rst.getSecondary(); +const testDb = primary.getDB(jsTestName()); + +// Enable change streams to ensure the creation of change collections. +assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true})); + +// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'. +assert.commandWorked(primary.getDB("admin").runCommand( + {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}})); + +// TODO SERVER-65950 Extend the test case to account for multi-tenancy. +const primaryChangeCollection = primary.getDB("config").system.change_collection; +const secondaryChangeCollection = secondary.getDB("config").system.change_collection; + +// Assert that the change collection contains all documents in 'expectedRetainedDocs' and no +// document in 'expectedDeletedDocs' for the collection 'testColl'. +function assertChangeCollectionDocuments( + changeColl, testColl, expectedDeletedDocs, expectedRetainedDocs) { + const collNss = `${testDb.getName()}.${testColl.getName()}`; + const pipeline = (collectionEntries) => [{$match: {op: "i", ns: collNss}}, + {$replaceRoot: {"newRoot": "$o"}}, + {$match: {$or: collectionEntries}}]; + + // Assert that querying for 'expectedRetainedDocs' yields documents that are exactly the same as + // 'expectedRetainedDocs'. + if (expectedRetainedDocs.length > 0) { + const retainedDocs = changeColl.aggregate(pipeline(expectedRetainedDocs)).toArray(); + assert.eq(retainedDocs, expectedRetainedDocs); + } + + // Assert that the query for any `expectedDeletedDocs` yields no results. + if (expectedDeletedDocs.length > 0) { + const deletedDocs = changeColl.aggregate(pipeline(expectedDeletedDocs)).toArray(); + assert.eq(deletedDocs.length, 0); + } +} + +// Returns the operation time for the provided document 'doc'. +function getDocumentOperationTime(doc) { + const oplogEntry = primary.getDB("local").oplog.rs.findOne({o: doc}); + assert(oplogEntry); + return oplogEntry.wall.getTime(); +} + +(function testOnlyExpiredDocumentsDeleted() { + assertDropAndRecreateCollection(testDb, "stocks"); + const testColl = testDb.stocks; + + // Wait until the remover job hangs. + let fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges"); + fpHangBeforeRemovingDocs.wait(); + + // Insert 5 documents. + const expiredDocuments = [ + {_id: "aapl", price: 140}, + {_id: "dis", price: 100}, + {_id: "nflx", price: 185}, + {_id: "baba", price: 66}, + {_id: "amc", price: 185} + ]; + + assert.commandWorked(testColl.insertMany(expiredDocuments)); + assertChangeCollectionDocuments(primaryChangeCollection, + testColl, + /* expectedDeletedDocs */[], + /* expectedRetainedDocs */ expiredDocuments); + const lastExpiredDocumentTime = getDocumentOperationTime(expiredDocuments.at(-1)); + + // Sleep for 'kSleepBetweenWritesSeconds' duration such that the next batch of inserts + // has a sufficient delay in their wall time relative to the previous batch. + sleep(kSleepBetweenWritesSeconds * 1000); + + // Insert 5 more documents. + const nonExpiredDocuments = [ + {_id: "wmt", price: 11}, + {_id: "coin", price: 23}, + {_id: "ddog", price: 15}, + {_id: "goog", price: 199}, + {_id: "tsla", price: 12} + ]; + + assert.commandWorked(testColl.insertMany(nonExpiredDocuments)); + assertChangeCollectionDocuments(primaryChangeCollection, + testColl, + /* expectedDeletedDocs */[], + /* expectedRetainedDocs */ nonExpiredDocuments); + + // Calculate the 'currentWallTime' such that only the first batch of inserted documents + // should be expired, ie.: 'lastExpiredDocumentTime' + 'kExpireAfterSeconds' < + // 'currentWallTime' < 'firstNonExpiredDocument' + const currentWallTime = + new Date(lastExpiredDocumentTime + kExpireAfterSeconds * 1000 + kSafetyMarginMillis); + const fpInjectWallTime = configureFailPoint( + primary, "injectCurrentWallTimeForRemovingExpiredDocuments", {currentWallTime}); + + // Unblock the change collection remover job such that it picks up on the injected + // 'currentWallTime'. + fpHangBeforeRemovingDocs.off(); + + // Wait until the remover job has retrieved the injected 'currentWallTime' and reset the first + // failpoint. + fpInjectWallTime.wait(); + + // Wait for a complete cycle of the TTL job. + fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges"); + fpHangBeforeRemovingDocs.wait(); + + // Assert that the first 5 documents got deleted, but the later 5 documents did not. + assertChangeCollectionDocuments( + primaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments); + + // Wait for the replication to complete and assert that the expired documents also have been + // deleted from the secondary. + rst.awaitReplication(); + assertChangeCollectionDocuments( + secondaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments); + fpHangBeforeRemovingDocs.off(); +})(); + +rst.stopSet(); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 200c6878ab7..9f78f6d3dc4 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -526,6 +526,19 @@ env.Library( ], ) +env.Library( + target='change_collection_expired_change_remover', + source=[ + 'change_collection_expired_documents_remover.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', + '$BUILD_DIR/mongo/db/query_exec', + '$BUILD_DIR/mongo/util/periodic_runner', + ], +) + env.Library( target='change_stream_pre_images_collection_manager', source=[ @@ -2278,6 +2291,7 @@ env.Library( # mongod_initializers. '$BUILD_DIR/mongo/client/clientdriver_minimal', '$BUILD_DIR/mongo/db/catalog/collection_crud', + '$BUILD_DIR/mongo/db/change_collection_expired_change_remover', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', @@ -2456,6 +2470,7 @@ if wiredtiger: source=[ 'cancelable_operation_context_test.cpp', 'catalog_raii_test.cpp', + 'change_collection_expired_change_remover_test.cpp', 'client_context_test.cpp', 'client_strand_test.cpp', 'collection_index_usage_tracker_test.cpp', @@ -2531,6 +2546,8 @@ if wiredtiger: '$BUILD_DIR/mongo/db/catalog/import_collection_oplog_entry', '$BUILD_DIR/mongo/db/catalog/index_build_entry_idl', '$BUILD_DIR/mongo/db/catalog/local_oplog_info', + '$BUILD_DIR/mongo/db/change_collection_expired_change_remover', + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', '$BUILD_DIR/mongo/db/mongohasher', '$BUILD_DIR/mongo/db/op_observer/fcv_op_observer', diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index 4809e2a5344..4ba0dd3e650 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -587,12 +587,15 @@ AutoGetChangeCollection::AutoGetChangeCollection(OperationContext* opCtx, invariant(opCtx->lockState()->isWriteLocked()); } - // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove - // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. - AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + if (mode != AccessMode::kRead) { + // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove + // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. + _allowLockAcquisitionTsWuow.emplace(opCtx->lockState()); + } + _coll.emplace(opCtx, NamespaceString::makeChangeCollectionNSS(tenantId), - LockMode::MODE_IX, + mode == AccessMode::kRead ? MODE_IS : MODE_IX, AutoGetCollectionViewMode::kViewsForbidden, deadline); } diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h index e4dc1fda484..3535cb1adc2 100644 --- a/src/mongo/db/catalog_raii.h +++ b/src/mongo/db/catalog_raii.h @@ -479,10 +479,11 @@ private: * the oplog in the same 'WriteUnitOfWork' and assumes that the global IX * lock is already held. * kWrite - takes the IX lock on a tenant's change collection to perform any writes. + * kRead - takes the IS lock on a tenant's change collection to perform any reads. */ class AutoGetChangeCollection { public: - enum class AccessMode { kWriteInOplogContext, kWrite }; + enum class AccessMode { kWriteInOplogContext, kWrite, kRead }; AutoGetChangeCollection(OperationContext* opCtx, AccessMode mode, @@ -498,6 +499,8 @@ public: private: boost::optional _coll; + + boost::optional _allowLockAcquisitionTsWuow; }; } // namespace mongo diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp new file mode 100644 index 00000000000..ccd323202e8 --- /dev/null +++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp @@ -0,0 +1,225 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/change_collection_expired_documents_remover.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/catalog_test_fixture.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/query/plan_executor.h" +#include "mongo/db/record_id_helpers.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage/record_data.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/duration.h" +#include "pipeline/change_stream_test_helpers.h" + +namespace mongo { + +class ChangeCollectionExpiredChangeRemoverTest : public CatalogTestFixture { +protected: + ChangeCollectionExpiredChangeRemoverTest() + : CatalogTestFixture(Options{}.useMockClock(true)), _tenantId(OID::gen()) { + ChangeStreamChangeCollectionManager::create(getServiceContext()); + } + + ClockSourceMock* clockSource() { + return static_cast(getServiceContext()->getFastClockSource()); + } + + Date_t now() { + return clockSource()->now(); + } + + void insertDocumentToChangeCollection(OperationContext* opCtx, + boost::optional tenantId, + const BSONObj& obj) { + const auto wallTime = now(); + Timestamp timestamp{wallTime}; + + repl::MutableOplogEntry oplogEntry; + oplogEntry.setOpTime(repl::OpTime{timestamp, 0}); + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(NamespaceString::makeChangeCollectionNSS(tenantId)); + oplogEntry.setObject(obj); + oplogEntry.setWallClockTime(wallTime); + auto oplogEntryBson = oplogEntry.toBSON(); + + RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()}; + RecordId recordId = record_id_helpers::keyForDate(wallTime); + + AutoGetChangeCollection changeCollection{ + opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; + + WriteUnitOfWork wunit(opCtx); + ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( + opCtx, {Record{recordId, recordData}}, {timestamp}); + wunit.commit(); + } + + std::vector readChangeCollection(OperationContext* opCtx, + boost::optional tenantId) { + auto changeCollection = + AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kRead, tenantId}; + + auto scanExecutor = + InternalPlanner::collectionScan(opCtx, + &(*changeCollection), + PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY, + InternalPlanner::Direction::FORWARD); + + BSONObj currChangeDoc; + std::vector entries; + while (scanExecutor->getNext(&currChangeDoc, nullptr) == PlanExecutor::ADVANCED) { + entries.push_back(repl::OplogEntry(currChangeDoc)); + } + + return entries; + } + + void dropAndRecreateChangeCollection(OperationContext* opCtx, + boost::optional tenantId) { + auto changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + changeCollectionManager.dropChangeCollection(opCtx, tenantId); + changeCollectionManager.createChangeCollection(opCtx, tenantId); + } + + const boost::optional _tenantId; + RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams", + true}; + + boost::optional _changeCollectionManager; +}; + +// Tests that the last expired focument retrieved is the expected one. +TEST_F(ChangeCollectionExpiredChangeRemoverTest, VerifyLastExpiredDocument) { + const auto opCtx = operationContext(); + dropAndRecreateChangeCollection(opCtx, _tenantId); + + Date_t expirationTime; + BSONObj lastExpiredDocument; + + // Create 100 change documents and consider the first 50 of them as expired. + for (int i = 0; i < 100; ++i) { + auto doc = BSON("_id" << i); + insertDocumentToChangeCollection(opCtx, _tenantId, doc); + + // Store the last expired document and it's wallTime. + if (i == 50) { + lastExpiredDocument = doc; + expirationTime = now(); + } + + clockSource()->advance(Milliseconds(1)); + } + + auto changeCollection = + AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kRead, _tenantId}; + + const auto maxExpiredRecordId = + ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( + opCtx, &*changeCollection, expirationTime); + + // Get the document found at 'maxExpiredRecordId' and test it against 'lastExpiredDocument'. + auto scanExecutor = + InternalPlanner::collectionScan(opCtx, + &(*changeCollection), + PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY, + InternalPlanner::Direction::FORWARD, + boost::none, + maxExpiredRecordId, + maxExpiredRecordId); + + BSONObj changeDocAtId; + ASSERT_EQ(scanExecutor->getNext(&changeDocAtId, nullptr), PlanExecutor::ADVANCED); + ASSERT_BSONOBJ_EQ(repl::OplogEntry(changeDocAtId).getObject(), lastExpiredDocument); +} + +// Tests that only the expired documents are removed from the change collection. +TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldRemoveOnlyExpiredDocument) { + const auto opCtx = operationContext(); + dropAndRecreateChangeCollection(opCtx, _tenantId); + + const BSONObj firstExpired = BSON("_id" + << "firstExpired"); + const BSONObj secondExpired = BSON("_id" + << "secondExpired"); + const BSONObj notExpired = BSON("_id" + << "notExpired"); + + insertDocumentToChangeCollection(opCtx, _tenantId, firstExpired); + clockSource()->advance(Hours(1)); + insertDocumentToChangeCollection(opCtx, _tenantId, secondExpired); + + // Store the wallTime of the last expired document. + const auto expirationTime = now(); + clockSource()->advance(Hours(1)); + insertDocumentToChangeCollection(opCtx, _tenantId, notExpired); + + // Verify that only the required documents are removed. + ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( + opCtx, _tenantId, expirationTime), + 2); + + // Only the 'notExpired' document is left in the change collection. + const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId); + ASSERT_EQ(changeCollectionEntries.size(), 1); + ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), notExpired); +} + +// Tests that the last expired document is never deleted. +TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldLeaveAtLeastOneDocument) { + const auto opCtx = operationContext(); + dropAndRecreateChangeCollection(opCtx, _tenantId); + + for (int i = 0; i < 100; ++i) { + insertDocumentToChangeCollection(opCtx, _tenantId, BSON("_id" << i)); + clockSource()->advance(Milliseconds{1}); + } + + // Verify that all but the last document is removed. + ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( + opCtx, _tenantId, now()), + 99); + + // Only the last document is left in the change collection. + const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId); + ASSERT_EQ(changeCollectionEntries.size(), 1); + ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), BSON("_id" << 99)); +} +} // namespace mongo diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp new file mode 100644 index 00000000000..dc1aadeaa06 --- /dev/null +++ b/src/mongo/db/change_collection_expired_documents_remover.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/change_collection_expired_documents_remover.h" + +#include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_streams_cluster_parameter_gen.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/service_context.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/mutex.h" +#include "mongo/util/duration.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/periodic_runner.h" +#include +#include + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +namespace mongo { + +// Hangs the change collection remover job before initiating the deletion process of documents. +MONGO_FAIL_POINT_DEFINE(hangBeforeRemovingExpiredChanges); + +// Injects the provided ISO date to currentWallTime which is used to determine if the change +// collection document is expired or not. +MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForRemovingExpiredDocuments); + +namespace { + +// TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is +// available. +std::vector> getAllTenants() { + return {boost::none}; +} + +boost::optional getExpireAfterSeconds(boost::optional tid) { + // TODO SERVER-65950 Fetch 'expiredAfterSeconds' per tenant basis. + return {gChangeStreamsClusterParameter.getExpireAfterSeconds()}; +} + +void removeExpiredDocuments(Client* client) { + hangBeforeRemovingExpiredChanges.pauseWhileSet(); + + try { + auto opCtx = client->makeOperationContext(); + const auto clock = client->getServiceContext()->getFastClockSource(); + auto currentWallTime = clock->now(); + + // If the fail point 'injectCurrentWallTimeForRemovingDocuments' is enabled then set the + // 'currentWallTime' with the provided wall time. + if (injectCurrentWallTimeForRemovingExpiredDocuments.shouldFail()) { + injectCurrentWallTimeForRemovingExpiredDocuments.execute([&](const BSONObj& data) { + currentWallTime = data.getField("currentWallTime").date(); + }); + } + + // Number of documents removed in the current pass. + size_t removedCount = 0; + + for (const auto& tenantId : getAllTenants()) { + auto expiredAfterSeconds = getExpireAfterSeconds(tenantId); + invariant(expiredAfterSeconds); + removedCount += + ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( + opCtx.get(), tenantId, currentWallTime - Seconds(*expiredAfterSeconds)); + } + + // TODO SERVER-66636 Use server parameters to track periodic job statistics per tenant. + if (removedCount > 0) { + LOGV2_DEBUG(6663503, + 3, + "Periodic expired change collection job finished executing", + "numberOfRemovals"_attr = removedCount, + "jobDuration"_attr = (clock->now() - currentWallTime).toString()); + } + } catch (const DBException& exception) { + if (exception.toStatus() != ErrorCodes::OK) { + LOGV2_WARNING(6663504, + "Periodic expired change collection job was killed", + "errorCode"_attr = exception.toStatus()); + } else { + LOGV2_ERROR(6663505, + "Periodic expired change collection job failed", + "reason"_attr = exception.reason()); + } + } +} + +/** + * Defines a periodic background job to remove expired documents from change collections. + * The job will run every 'changeCollectionRemoverJobSleepSeconds', as defined in the cluster + * parameter. + */ +class ChangeCollectionExpiredDocumentsRemover { +public: + ChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) { + const auto period = Seconds{gChangeCollectionRemoverJobSleepSeconds.load()}; + _jobAnchor = serviceContext->getPeriodicRunner()->makeJob( + {"ChangeCollectionExpiredDocumentsRemover", removeExpiredDocuments, period}); + _jobAnchor.start(); + } + + static void start(ServiceContext* serviceContext) { + auto& changeCollectionExpiredDocumentsRemover = _serviceDecoration(serviceContext); + changeCollectionExpiredDocumentsRemover = + std::make_unique(serviceContext); + } + + static void shutdown(ServiceContext* serviceContext) { + auto& changeCollectionExpiredDocumentsRemover = _serviceDecoration(serviceContext); + if (changeCollectionExpiredDocumentsRemover) { + changeCollectionExpiredDocumentsRemover->_jobAnchor.stop(); + changeCollectionExpiredDocumentsRemover.reset(); + } + } + +private: + inline static const auto _serviceDecoration = ServiceContext::declareDecoration< + std::unique_ptr>(); + + PeriodicJobAnchor _jobAnchor; +}; +} // namespace + +void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) { + if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + return; + } + + LOGV2(6663507, "Starting the ChangeCollectionExpiredChangeRemover"); + ChangeCollectionExpiredDocumentsRemover::start(serviceContext); +} + +void shutdownChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) { + if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + return; + } + + LOGV2(6663508, "Shutting down the ChangeCollectionExpiredChangeRemover"); + ChangeCollectionExpiredDocumentsRemover::shutdown(serviceContext); +} +} // namespace mongo diff --git a/src/mongo/db/change_collection_expired_documents_remover.h b/src/mongo/db/change_collection_expired_documents_remover.h new file mode 100644 index 00000000000..bf9e36ae1f4 --- /dev/null +++ b/src/mongo/db/change_collection_expired_documents_remover.h @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/service_context.h" + +namespace mongo { + +/** + * Starts a periodic background job to remove expired documents from change collections. The job + * will run every 'changeCollectionRemoverJobSleepSeconds' as defined in the cluster parameter. + */ +void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext); + +/** + * Stops the periodic background job that removes expired documents from change collections. + */ +void shutdownChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext); + +} // namespace mongo diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 0afcb527b9e..d085b514d18 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -27,6 +27,7 @@ * it in the license file. */ + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/db/change_stream_change_collection_manager.h" @@ -38,9 +39,13 @@ #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/concurrency/exception_util.h" #include "mongo/db/multitenancy_gen.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/repl/apply_ops_command_info.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/logv2/log.h" namespace mongo { @@ -127,20 +132,44 @@ private: // TODO SERVER-65950 retreive tenant from the oplog. // TODO SERVER-67170 avoid inspecting the oplog BSON object. + if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) { + if (nssFieldElem.String() == "config.$cmd"_sd) { + if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { + // The oplog entry might be a drop command on the change collection. Check if + // the drop request is for the already deleted change collection, as such do not + // attempt to write to the change collection if that is the case. This scenario + // is possible because 'WriteUnitOfWork' will stage the changes and while + // committing the staged 'CollectionImpl::insertDocuments' change the collection + // object might have already been deleted. + if (auto dropFieldElem = objectFieldElem["drop"_sd]) { + return dropFieldElem.String() != NamespaceString::kChangeCollectionName; + } + } + } - if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]; - nssFieldElem && nssFieldElem.String() == "config.$cmd"_sd) { - if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { - // The oplog entry might be a drop command on the change collection. Check if the - // drop request is for the already deleted change collection, as such do not attempt - // to write to the change collection if that is the case. This scenario is possible - // because 'WriteUnitOfWork' will stage the changes and while committing the staged - // 'CollectionImpl::insertDocuments' change the collection object might have already - // been deleted. - if (auto dropFieldElem = objectFieldElem["drop"_sd]) { - return dropFieldElem.String() != NamespaceString::kChangeCollectionName; + if (nssFieldElem.String() == "admin.$cmd"_sd) { + if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { + // The oplog entry might be a batch delete command on a change collection, avoid + // inserting such oplog entries back to the change collection. + if (auto applyOpsFieldElem = objectFieldElem["applyOps"_sd]) { + const auto nestedOperations = repl::ApplyOps::extractOperations(oplogDoc); + for (auto& op : nestedOperations) { + if (op.getNss().isChangeCollection() && + op.getOpType() == repl::OpTypeEnum::kDelete) { + return false; + } + } + } } } + + // The oplog entry might be a single delete command on a change collection, avoid + // inserting such oplog entries back to the change collection. + if (auto opTypeFieldElem = oplogDoc[repl::OplogEntry::kOpTypeFieldName]; + opTypeFieldElem && + opTypeFieldElem.String() == repl::OpType_serializer(repl::OpTypeEnum::kDelete)) { + return !NamespaceString(nssFieldElem.String()).isChangeCollection(); + } } return true; @@ -177,8 +206,9 @@ bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() { } // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag. - return feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( - serverGlobalParams.featureCompatibility); + return serverGlobalParams.featureCompatibility.isVersionInitialized() && + feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( + serverGlobalParams.featureCompatibility); } bool ChangeStreamChangeCollectionManager::hasChangeCollection( @@ -302,4 +332,98 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( return changeCollectionsWriter.write(opCtx, opDebug); } +boost::optional +ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId( + OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) { + const auto isExpired = [&](const BSONObj& changeDoc) { + const BSONElement& wallElem = changeDoc["wall"]; + invariant(wallElem); + + auto bsonType = wallElem.type(); + invariant(bsonType == BSONType::Date); + + return wallElem.Date() <= expirationTime; + }; + + BSONObj currChangeDoc; + RecordId currRecordId; + + boost::optional prevRecordId; + boost::optional prevPrevRecordId; + + auto scanExecutor = InternalPlanner::collectionScan(opCtx, + changeCollection, + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD); + while (true) { + auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId); + switch (getNextState) { + case PlanExecutor::IS_EOF: + // Either the collection is empty (case in which return boost::none), or all the + // documents have expired. The remover job should never delete the last entry of a + // change collection, so return the recordId of the document previous to the last + // one. + return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get()) + : boost::optional(); + case PlanExecutor::ADVANCED: { + if (!isExpired(currChangeDoc)) { + return prevRecordId ? RecordIdBound(prevRecordId.get()) + : boost::optional(); + } + } + } + + prevPrevRecordId = prevRecordId; + prevRecordId = currRecordId; + } + + MONGO_UNREACHABLE; +} + +size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( + OperationContext* opCtx, boost::optional tenantId, const Date_t& expirationTime) { + // Acquire intent-exclusive lock on the change collection. Early exit if the collection + // doesn't exist. + const auto changeCollection = + AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; + + // Early exit if collection does not exist, or if running on a secondary (requires + // opCtx->lockState()->isRSTLLocked()). + if (!changeCollection || + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, NamespaceString::kConfigDb)) { + return 0; + } + + const auto maxRecordIdBound = + getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime); + + // Early exit if there are no expired documents to be removed. + if (!maxRecordIdBound.has_value()) { + return 0; + } + + auto params = std::make_unique(); + params->isMulti = true; + + auto batchedDeleteParams = std::make_unique(); + auto deleteExecutor = InternalPlanner::deleteWithCollectionScan( + opCtx, + &(*changeCollection), + std::move(params), + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD, + boost::none, + std::move(maxRecordIdBound), + CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, + std::move(batchedDeleteParams)); + + try { + return deleteExecutor->executeDelete(); + } catch (const ExceptionFor&) { + // It is expected that a collection drop can kill a query plan while deleting an old + // document, so ignore this error. + return 0; + } +} } // namespace mongo diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index b6836ac0c1e..cd2e166b1df 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -121,6 +121,24 @@ public: std::vector::const_iterator endOplogEntries, bool isGlobalIXLockAcquired, OpDebug* opDebug); -}; + /** + * Forward-scans the given change collection to return the recordId of the last, non-terminal + * document having the wall time less than the 'expirationTime'. Returns 'boost::none' if the + * collection is empty, or there are no expired documents, or the collection contains a single + * expired document. + */ + static boost::optional getChangeCollectionMaxExpiredRecordId( + OperationContext* opCtx, + const CollectionPtr* changeCollection, + const Date_t& expirationTime); + + /** + * Removes expired documents from the change collection for the provided 'tenantId'. A document + * whose retention time is less than the 'expirationTime' is deleted. + */ + static size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx, + boost::optional tenantId, + const Date_t& expirationTime); +}; } // namespace mongo diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl index 06282d7c8d5..fa97168380d 100644 --- a/src/mongo/db/change_streams_cluster_parameter.idl +++ b/src/mongo/db/change_streams_cluster_parameter.idl @@ -58,3 +58,11 @@ server_parameters: cpp_varname: gChangeStreamsClusterParameter validator: callback: validateChangeStreamsClusterParameter + changeCollectionRemoverJobSleepSeconds: + description: "Specifies the number of seconds for which the periodic change collection remover job will sleep between each cycle." + set_at: [ startup ] + cpp_vartype: AtomicWord + cpp_varname: "gChangeCollectionRemoverJobSleepSeconds" + validator: + gte: 1 + default: 10 diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 3aceda74989..0b9ede46ae0 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -57,6 +57,7 @@ #include "mongo/db/catalog/health_log.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_key_validate.h" +#include "mongo/db/change_collection_expired_documents_remover.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_options_manager.h" #include "mongo/db/client.h" @@ -778,13 +779,15 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { } } - // Start a background task to periodically remove expired pre-images from the 'system.preimages' - // collection if not in standalone mode. + // If not in standalone mode, start background tasks to: + // * Periodically remove expired pre-images from the 'system.preimages' + // * Periodically remove expired documents from change collections const auto isStandalone = repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() == repl::ReplicationCoordinator::modeNone; if (!isStandalone) { startChangeStreamExpiredPreImagesRemover(serviceContext); + startChangeCollectionExpiredDocumentsRemover(serviceContext); } // Set up the logical session cache @@ -1414,6 +1417,8 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { LOGV2(6278511, "Shutting down the Change Stream Expired Pre-images Remover"); shutdownChangeStreamExpiredPreImagesRemover(serviceContext); + shutdownChangeCollectionExpiredDocumentsRemover(serviceContext); + // We should always be able to acquire the global lock at shutdown. // An OperationContext is not necessary to call lockGlobal() during shutdown, as it's only used // to check that lockGlobal() is not called after a transaction timestamp has been set. -- cgit v1.2.1