summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-08-21 12:17:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-21 12:46:17 +0000
commite2bc23486212cf112156f281a0ab0d315f9f5f88 (patch)
tree80e3485ae4755069eda730851bdf4a9d3cdd34a7
parentcfd2a26b68cc7ee59fbc94c7ea437500b09ef124 (diff)
downloadmongo-e2bc23486212cf112156f281a0ab0d315f9f5f88.tar.gz
SERVER-66635 Introduce TTL job to delete entries from change collections.
-rw-r--r--jstests/serverless/change_collection_expired_document_remover.js151
-rw-r--r--src/mongo/db/SConscript17
-rw-r--r--src/mongo/db/catalog_raii.cpp11
-rw-r--r--src/mongo/db/catalog_raii.h5
-rw-r--r--src/mongo/db/change_collection_expired_change_remover_test.cpp225
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp169
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.h47
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp150
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h20
-rw-r--r--src/mongo/db/change_streams_cluster_parameter.idl8
-rw-r--r--src/mongo/db/mongod_main.cpp9
11 files changed, 791 insertions, 21 deletions
diff --git a/jstests/serverless/change_collection_expired_document_remover.js b/jstests/serverless/change_collection_expired_document_remover.js
new file mode 100644
index 00000000000..9c76d84c7bc
--- /dev/null
+++ b/jstests/serverless/change_collection_expired_document_remover.js
@@ -0,0 +1,151 @@
+/**
+ * Tests the change collection periodic remover job.
+ *
+ * @tags: [requires_fcv_61]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+
+const kExpiredChangeRemovalJobSleepSeconds = 5;
+const kExpireAfterSeconds = 1;
+const kSleepBetweenWritesSeconds = 5;
+const kSafetyMarginMillis = 1;
+
+const rst = new ReplSetTest({nodes: 2});
+
+// TODO SERVER-67267: Add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
+// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
+rst.startSet({
+ setParameter: {
+ "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}),
+ changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds
+ }
+});
+rst.initiate();
+
+const primary = rst.getPrimary();
+const secondary = rst.getSecondary();
+const testDb = primary.getDB(jsTestName());
+
+// Enable change streams to ensure the creation of change collections.
+assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+
+// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'.
+assert.commandWorked(primary.getDB("admin").runCommand(
+ {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));
+
+// TODO SERVER-65950 Extend the test case to account for multi-tenancy.
+const primaryChangeCollection = primary.getDB("config").system.change_collection;
+const secondaryChangeCollection = secondary.getDB("config").system.change_collection;
+
+// Assert that the change collection contains all documents in 'expectedRetainedDocs' and no
+// document in 'expectedDeletedDocs' for the collection 'testColl'.
+function assertChangeCollectionDocuments(
+ changeColl, testColl, expectedDeletedDocs, expectedRetainedDocs) {
+ const collNss = `${testDb.getName()}.${testColl.getName()}`;
+ const pipeline = (collectionEntries) => [{$match: {op: "i", ns: collNss}},
+ {$replaceRoot: {"newRoot": "$o"}},
+ {$match: {$or: collectionEntries}}];
+
+ // Assert that querying for 'expectedRetainedDocs' yields documents that are exactly the same as
+ // 'expectedRetainedDocs'.
+ if (expectedRetainedDocs.length > 0) {
+ const retainedDocs = changeColl.aggregate(pipeline(expectedRetainedDocs)).toArray();
+ assert.eq(retainedDocs, expectedRetainedDocs);
+ }
+
+ // Assert that the query for any `expectedDeletedDocs` yields no results.
+ if (expectedDeletedDocs.length > 0) {
+ const deletedDocs = changeColl.aggregate(pipeline(expectedDeletedDocs)).toArray();
+ assert.eq(deletedDocs.length, 0);
+ }
+}
+
+// Returns the operation time for the provided document 'doc'.
+function getDocumentOperationTime(doc) {
+ const oplogEntry = primary.getDB("local").oplog.rs.findOne({o: doc});
+ assert(oplogEntry);
+ return oplogEntry.wall.getTime();
+}
+
+(function testOnlyExpiredDocumentsDeleted() {
+ assertDropAndRecreateCollection(testDb, "stocks");
+ const testColl = testDb.stocks;
+
+ // Wait until the remover job hangs.
+ let fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
+ fpHangBeforeRemovingDocs.wait();
+
+ // Insert 5 documents.
+ const expiredDocuments = [
+ {_id: "aapl", price: 140},
+ {_id: "dis", price: 100},
+ {_id: "nflx", price: 185},
+ {_id: "baba", price: 66},
+ {_id: "amc", price: 185}
+ ];
+
+ assert.commandWorked(testColl.insertMany(expiredDocuments));
+ assertChangeCollectionDocuments(primaryChangeCollection,
+ testColl,
+ /* expectedDeletedDocs */[],
+ /* expectedRetainedDocs */ expiredDocuments);
+ const lastExpiredDocumentTime = getDocumentOperationTime(expiredDocuments.at(-1));
+
+ // Sleep for 'kSleepBetweenWritesSeconds' duration such that the next batch of inserts
+ // has a sufficient delay in their wall time relative to the previous batch.
+ sleep(kSleepBetweenWritesSeconds * 1000);
+
+ // Insert 5 more documents.
+ const nonExpiredDocuments = [
+ {_id: "wmt", price: 11},
+ {_id: "coin", price: 23},
+ {_id: "ddog", price: 15},
+ {_id: "goog", price: 199},
+ {_id: "tsla", price: 12}
+ ];
+
+ assert.commandWorked(testColl.insertMany(nonExpiredDocuments));
+ assertChangeCollectionDocuments(primaryChangeCollection,
+ testColl,
+ /* expectedDeletedDocs */[],
+ /* expectedRetainedDocs */ nonExpiredDocuments);
+
+ // Calculate the 'currentWallTime' such that only the first batch of inserted documents
+ // should be expired, ie.: 'lastExpiredDocumentTime' + 'kExpireAfterSeconds' <
+ // 'currentWallTime' < 'firstNonExpiredDocument'
+ const currentWallTime =
+ new Date(lastExpiredDocumentTime + kExpireAfterSeconds * 1000 + kSafetyMarginMillis);
+ const fpInjectWallTime = configureFailPoint(
+ primary, "injectCurrentWallTimeForRemovingExpiredDocuments", {currentWallTime});
+
+ // Unblock the change collection remover job such that it picks up on the injected
+ // 'currentWallTime'.
+ fpHangBeforeRemovingDocs.off();
+
+ // Wait until the remover job has retrieved the injected 'currentWallTime' and reset the first
+ // failpoint.
+ fpInjectWallTime.wait();
+
+ // Wait for a complete cycle of the TTL job.
+ fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
+ fpHangBeforeRemovingDocs.wait();
+
+ // Assert that the first 5 documents got deleted, but the later 5 documents did not.
+ assertChangeCollectionDocuments(
+ primaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments);
+
+ // Wait for the replication to complete and assert that the expired documents also have been
+ // deleted from the secondary.
+ rst.awaitReplication();
+ assertChangeCollectionDocuments(
+ secondaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments);
+ fpHangBeforeRemovingDocs.off();
+})();
+
+rst.stopSet();
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 200c6878ab7..9f78f6d3dc4 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -527,6 +527,19 @@ env.Library(
)
env.Library(
+ target='change_collection_expired_change_remover',
+ source=[
+ 'change_collection_expired_documents_remover.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
+ '$BUILD_DIR/mongo/db/query_exec',
+ '$BUILD_DIR/mongo/util/periodic_runner',
+ ],
+)
+
+env.Library(
target='change_stream_pre_images_collection_manager',
source=[
'change_stream_pre_images_collection_manager.cpp',
@@ -2278,6 +2291,7 @@ env.Library(
# mongod_initializers.
'$BUILD_DIR/mongo/client/clientdriver_minimal',
'$BUILD_DIR/mongo/db/catalog/collection_crud',
+ '$BUILD_DIR/mongo/db/change_collection_expired_change_remover',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
@@ -2456,6 +2470,7 @@ if wiredtiger:
source=[
'cancelable_operation_context_test.cpp',
'catalog_raii_test.cpp',
+ 'change_collection_expired_change_remover_test.cpp',
'client_context_test.cpp',
'client_strand_test.cpp',
'collection_index_usage_tracker_test.cpp',
@@ -2531,6 +2546,8 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/catalog/import_collection_oplog_entry',
'$BUILD_DIR/mongo/db/catalog/index_build_entry_idl',
'$BUILD_DIR/mongo/db/catalog/local_oplog_info',
+ '$BUILD_DIR/mongo/db/change_collection_expired_change_remover',
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
'$BUILD_DIR/mongo/db/mongohasher',
'$BUILD_DIR/mongo/db/op_observer/fcv_op_observer',
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index 4809e2a5344..4ba0dd3e650 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -587,12 +587,15 @@ AutoGetChangeCollection::AutoGetChangeCollection(OperationContext* opCtx,
invariant(opCtx->lockState()->isWriteLocked());
}
- // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
- // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
- AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ if (mode != AccessMode::kRead) {
+ // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
+ // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
+ _allowLockAcquisitionTsWuow.emplace(opCtx->lockState());
+ }
+
_coll.emplace(opCtx,
NamespaceString::makeChangeCollectionNSS(tenantId),
- LockMode::MODE_IX,
+ mode == AccessMode::kRead ? MODE_IS : MODE_IX,
AutoGetCollectionViewMode::kViewsForbidden,
deadline);
}
diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h
index e4dc1fda484..3535cb1adc2 100644
--- a/src/mongo/db/catalog_raii.h
+++ b/src/mongo/db/catalog_raii.h
@@ -479,10 +479,11 @@ private:
* the oplog in the same 'WriteUnitOfWork' and assumes that the global IX
* lock is already held.
* kWrite - takes the IX lock on a tenant's change collection to perform any writes.
+ * kRead - takes the IS lock on a tenant's change collection to perform any reads.
*/
class AutoGetChangeCollection {
public:
- enum class AccessMode { kWriteInOplogContext, kWrite };
+ enum class AccessMode { kWriteInOplogContext, kWrite, kRead };
AutoGetChangeCollection(OperationContext* opCtx,
AccessMode mode,
@@ -498,6 +499,8 @@ public:
private:
boost::optional<AutoGetCollection> _coll;
+
+ boost::optional<AllowLockAcquisitionOnTimestampedUnitOfWork> _allowLockAcquisitionTsWuow;
};
} // namespace mongo
diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp
new file mode 100644
index 00000000000..ccd323202e8
--- /dev/null
+++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp
@@ -0,0 +1,225 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/change_collection_expired_documents_remover.h"
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog/catalog_test_fixture.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/query/plan_executor.h"
+#include "mongo/db/record_id_helpers.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/record_data.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
+#include "mongo/util/duration.h"
+#include "pipeline/change_stream_test_helpers.h"
+
+namespace mongo {
+
+class ChangeCollectionExpiredChangeRemoverTest : public CatalogTestFixture {
+protected:
+ ChangeCollectionExpiredChangeRemoverTest()
+ : CatalogTestFixture(Options{}.useMockClock(true)), _tenantId(OID::gen()) {
+ ChangeStreamChangeCollectionManager::create(getServiceContext());
+ }
+
+ ClockSourceMock* clockSource() {
+ return static_cast<ClockSourceMock*>(getServiceContext()->getFastClockSource());
+ }
+
+ Date_t now() {
+ return clockSource()->now();
+ }
+
+ void insertDocumentToChangeCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ const BSONObj& obj) {
+ const auto wallTime = now();
+ Timestamp timestamp{wallTime};
+
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(repl::OpTime{timestamp, 0});
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(NamespaceString::makeChangeCollectionNSS(tenantId));
+ oplogEntry.setObject(obj);
+ oplogEntry.setWallClockTime(wallTime);
+ auto oplogEntryBson = oplogEntry.toBSON();
+
+ RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()};
+ RecordId recordId = record_id_helpers::keyForDate(wallTime);
+
+ AutoGetChangeCollection changeCollection{
+ opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
+
+ WriteUnitOfWork wunit(opCtx);
+ ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
+ opCtx, {Record{recordId, recordData}}, {timestamp});
+ wunit.commit();
+ }
+
+ std::vector<repl::OplogEntry> readChangeCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId) {
+ auto changeCollection =
+ AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kRead, tenantId};
+
+ auto scanExecutor =
+ InternalPlanner::collectionScan(opCtx,
+ &(*changeCollection),
+ PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
+ InternalPlanner::Direction::FORWARD);
+
+ BSONObj currChangeDoc;
+ std::vector<repl::OplogEntry> entries;
+ while (scanExecutor->getNext(&currChangeDoc, nullptr) == PlanExecutor::ADVANCED) {
+ entries.push_back(repl::OplogEntry(currChangeDoc));
+ }
+
+ return entries;
+ }
+
+ void dropAndRecreateChangeCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId) {
+ auto changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ changeCollectionManager.dropChangeCollection(opCtx, tenantId);
+ changeCollectionManager.createChangeCollection(opCtx, tenantId);
+ }
+
+ const boost::optional<TenantId> _tenantId;
+ RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams",
+ true};
+
+ boost::optional<ChangeStreamChangeCollectionManager> _changeCollectionManager;
+};
+
+// Tests that the last expired focument retrieved is the expected one.
+TEST_F(ChangeCollectionExpiredChangeRemoverTest, VerifyLastExpiredDocument) {
+ const auto opCtx = operationContext();
+ dropAndRecreateChangeCollection(opCtx, _tenantId);
+
+ Date_t expirationTime;
+ BSONObj lastExpiredDocument;
+
+ // Create 100 change documents and consider the first 50 of them as expired.
+ for (int i = 0; i < 100; ++i) {
+ auto doc = BSON("_id" << i);
+ insertDocumentToChangeCollection(opCtx, _tenantId, doc);
+
+ // Store the last expired document and it's wallTime.
+ if (i == 50) {
+ lastExpiredDocument = doc;
+ expirationTime = now();
+ }
+
+ clockSource()->advance(Milliseconds(1));
+ }
+
+ auto changeCollection =
+ AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kRead, _tenantId};
+
+ const auto maxExpiredRecordId =
+ ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
+ opCtx, &*changeCollection, expirationTime);
+
+ // Get the document found at 'maxExpiredRecordId' and test it against 'lastExpiredDocument'.
+ auto scanExecutor =
+ InternalPlanner::collectionScan(opCtx,
+ &(*changeCollection),
+ PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
+ InternalPlanner::Direction::FORWARD,
+ boost::none,
+ maxExpiredRecordId,
+ maxExpiredRecordId);
+
+ BSONObj changeDocAtId;
+ ASSERT_EQ(scanExecutor->getNext(&changeDocAtId, nullptr), PlanExecutor::ADVANCED);
+ ASSERT_BSONOBJ_EQ(repl::OplogEntry(changeDocAtId).getObject(), lastExpiredDocument);
+}
+
+// Tests that only the expired documents are removed from the change collection.
+TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldRemoveOnlyExpiredDocument) {
+ const auto opCtx = operationContext();
+ dropAndRecreateChangeCollection(opCtx, _tenantId);
+
+ const BSONObj firstExpired = BSON("_id"
+ << "firstExpired");
+ const BSONObj secondExpired = BSON("_id"
+ << "secondExpired");
+ const BSONObj notExpired = BSON("_id"
+ << "notExpired");
+
+ insertDocumentToChangeCollection(opCtx, _tenantId, firstExpired);
+ clockSource()->advance(Hours(1));
+ insertDocumentToChangeCollection(opCtx, _tenantId, secondExpired);
+
+ // Store the wallTime of the last expired document.
+ const auto expirationTime = now();
+ clockSource()->advance(Hours(1));
+ insertDocumentToChangeCollection(opCtx, _tenantId, notExpired);
+
+ // Verify that only the required documents are removed.
+ ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
+ opCtx, _tenantId, expirationTime),
+ 2);
+
+ // Only the 'notExpired' document is left in the change collection.
+ const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId);
+ ASSERT_EQ(changeCollectionEntries.size(), 1);
+ ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), notExpired);
+}
+
+// Tests that the last expired document is never deleted.
+TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldLeaveAtLeastOneDocument) {
+ const auto opCtx = operationContext();
+ dropAndRecreateChangeCollection(opCtx, _tenantId);
+
+ for (int i = 0; i < 100; ++i) {
+ insertDocumentToChangeCollection(opCtx, _tenantId, BSON("_id" << i));
+ clockSource()->advance(Milliseconds{1});
+ }
+
+ // Verify that all but the last document is removed.
+ ASSERT_EQ(ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
+ opCtx, _tenantId, now()),
+ 99);
+
+ // Only the last document is left in the change collection.
+ const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId);
+ ASSERT_EQ(changeCollectionEntries.size(), 1);
+ ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), BSON("_id" << 99));
+}
+} // namespace mongo
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
new file mode 100644
index 00000000000..dc1aadeaa06
--- /dev/null
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/change_collection_expired_documents_remover.h"
+
+#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_streams_cluster_parameter_gen.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/service_context.h"
+#include "mongo/logv2/log.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/periodic_runner.h"
+#include <algorithm>
+#include <memory>
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+namespace mongo {
+
+// Hangs the change collection remover job before initiating the deletion process of documents.
+MONGO_FAIL_POINT_DEFINE(hangBeforeRemovingExpiredChanges);
+
+// Injects the provided ISO date to currentWallTime which is used to determine if the change
+// collection document is expired or not.
+MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForRemovingExpiredDocuments);
+
+namespace {
+
+// TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
+// available.
+std::vector<boost::optional<TenantId>> getAllTenants() {
+ return {boost::none};
+}
+
+boost::optional<int64_t> getExpireAfterSeconds(boost::optional<TenantId> tid) {
+ // TODO SERVER-65950 Fetch 'expiredAfterSeconds' per tenant basis.
+ return {gChangeStreamsClusterParameter.getExpireAfterSeconds()};
+}
+
+void removeExpiredDocuments(Client* client) {
+ hangBeforeRemovingExpiredChanges.pauseWhileSet();
+
+ try {
+ auto opCtx = client->makeOperationContext();
+ const auto clock = client->getServiceContext()->getFastClockSource();
+ auto currentWallTime = clock->now();
+
+ // If the fail point 'injectCurrentWallTimeForRemovingDocuments' is enabled then set the
+ // 'currentWallTime' with the provided wall time.
+ if (injectCurrentWallTimeForRemovingExpiredDocuments.shouldFail()) {
+ injectCurrentWallTimeForRemovingExpiredDocuments.execute([&](const BSONObj& data) {
+ currentWallTime = data.getField("currentWallTime").date();
+ });
+ }
+
+ // Number of documents removed in the current pass.
+ size_t removedCount = 0;
+
+ for (const auto& tenantId : getAllTenants()) {
+ auto expiredAfterSeconds = getExpireAfterSeconds(tenantId);
+ invariant(expiredAfterSeconds);
+ removedCount +=
+ ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
+ opCtx.get(), tenantId, currentWallTime - Seconds(*expiredAfterSeconds));
+ }
+
+ // TODO SERVER-66636 Use server parameters to track periodic job statistics per tenant.
+ if (removedCount > 0) {
+ LOGV2_DEBUG(6663503,
+ 3,
+ "Periodic expired change collection job finished executing",
+ "numberOfRemovals"_attr = removedCount,
+ "jobDuration"_attr = (clock->now() - currentWallTime).toString());
+ }
+ } catch (const DBException& exception) {
+ if (exception.toStatus() != ErrorCodes::OK) {
+ LOGV2_WARNING(6663504,
+ "Periodic expired change collection job was killed",
+ "errorCode"_attr = exception.toStatus());
+ } else {
+ LOGV2_ERROR(6663505,
+ "Periodic expired change collection job failed",
+ "reason"_attr = exception.reason());
+ }
+ }
+}
+
+/**
+ * Defines a periodic background job to remove expired documents from change collections.
+ * The job will run every 'changeCollectionRemoverJobSleepSeconds', as defined in the cluster
+ * parameter.
+ */
+class ChangeCollectionExpiredDocumentsRemover {
+public:
+ ChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
+ const auto period = Seconds{gChangeCollectionRemoverJobSleepSeconds.load()};
+ _jobAnchor = serviceContext->getPeriodicRunner()->makeJob(
+ {"ChangeCollectionExpiredDocumentsRemover", removeExpiredDocuments, period});
+ _jobAnchor.start();
+ }
+
+ static void start(ServiceContext* serviceContext) {
+ auto& changeCollectionExpiredDocumentsRemover = _serviceDecoration(serviceContext);
+ changeCollectionExpiredDocumentsRemover =
+ std::make_unique<ChangeCollectionExpiredDocumentsRemover>(serviceContext);
+ }
+
+ static void shutdown(ServiceContext* serviceContext) {
+ auto& changeCollectionExpiredDocumentsRemover = _serviceDecoration(serviceContext);
+ if (changeCollectionExpiredDocumentsRemover) {
+ changeCollectionExpiredDocumentsRemover->_jobAnchor.stop();
+ changeCollectionExpiredDocumentsRemover.reset();
+ }
+ }
+
+private:
+ inline static const auto _serviceDecoration = ServiceContext::declareDecoration<
+ std::unique_ptr<ChangeCollectionExpiredDocumentsRemover>>();
+
+ PeriodicJobAnchor _jobAnchor;
+};
+} // namespace
+
+void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
+ if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ return;
+ }
+
+ LOGV2(6663507, "Starting the ChangeCollectionExpiredChangeRemover");
+ ChangeCollectionExpiredDocumentsRemover::start(serviceContext);
+}
+
+void shutdownChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
+ if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ return;
+ }
+
+ LOGV2(6663508, "Shutting down the ChangeCollectionExpiredChangeRemover");
+ ChangeCollectionExpiredDocumentsRemover::shutdown(serviceContext);
+}
+} // namespace mongo
diff --git a/src/mongo/db/change_collection_expired_documents_remover.h b/src/mongo/db/change_collection_expired_documents_remover.h
new file mode 100644
index 00000000000..bf9e36ae1f4
--- /dev/null
+++ b/src/mongo/db/change_collection_expired_documents_remover.h
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/service_context.h"
+
+namespace mongo {
+
+/**
+ * Starts a periodic background job to remove expired documents from change collections. The job
+ * will run every 'changeCollectionRemoverJobSleepSeconds' as defined in the cluster parameter.
+ */
+void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext);
+
+/**
+ * Stops the periodic background job that removes expired documents from change collections.
+ */
+void shutdownChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext);
+
+} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 0afcb527b9e..d085b514d18 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -27,6 +27,7 @@
* it in the license file.
*/
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/db/change_stream_change_collection_manager.h"
@@ -38,9 +39,13 @@
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -127,20 +132,44 @@ private:
// TODO SERVER-65950 retreive tenant from the oplog.
// TODO SERVER-67170 avoid inspecting the oplog BSON object.
+ if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) {
+ if (nssFieldElem.String() == "config.$cmd"_sd) {
+ if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
+ // The oplog entry might be a drop command on the change collection. Check if
+ // the drop request is for the already deleted change collection, as such do not
+ // attempt to write to the change collection if that is the case. This scenario
+ // is possible because 'WriteUnitOfWork' will stage the changes and while
+ // committing the staged 'CollectionImpl::insertDocuments' change the collection
+ // object might have already been deleted.
+ if (auto dropFieldElem = objectFieldElem["drop"_sd]) {
+ return dropFieldElem.String() != NamespaceString::kChangeCollectionName;
+ }
+ }
+ }
- if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName];
- nssFieldElem && nssFieldElem.String() == "config.$cmd"_sd) {
- if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
- // The oplog entry might be a drop command on the change collection. Check if the
- // drop request is for the already deleted change collection, as such do not attempt
- // to write to the change collection if that is the case. This scenario is possible
- // because 'WriteUnitOfWork' will stage the changes and while committing the staged
- // 'CollectionImpl::insertDocuments' change the collection object might have already
- // been deleted.
- if (auto dropFieldElem = objectFieldElem["drop"_sd]) {
- return dropFieldElem.String() != NamespaceString::kChangeCollectionName;
+ if (nssFieldElem.String() == "admin.$cmd"_sd) {
+ if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
+ // The oplog entry might be a batch delete command on a change collection, avoid
+ // inserting such oplog entries back to the change collection.
+ if (auto applyOpsFieldElem = objectFieldElem["applyOps"_sd]) {
+ const auto nestedOperations = repl::ApplyOps::extractOperations(oplogDoc);
+ for (auto& op : nestedOperations) {
+ if (op.getNss().isChangeCollection() &&
+ op.getOpType() == repl::OpTypeEnum::kDelete) {
+ return false;
+ }
+ }
+ }
}
}
+
+ // The oplog entry might be a single delete command on a change collection, avoid
+ // inserting such oplog entries back to the change collection.
+ if (auto opTypeFieldElem = oplogDoc[repl::OplogEntry::kOpTypeFieldName];
+ opTypeFieldElem &&
+ opTypeFieldElem.String() == repl::OpType_serializer(repl::OpTypeEnum::kDelete)) {
+ return !NamespaceString(nssFieldElem.String()).isChangeCollection();
+ }
}
return true;
@@ -177,8 +206,9 @@ bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() {
}
// TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag.
- return feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
- serverGlobalParams.featureCompatibility);
+ return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
+ serverGlobalParams.featureCompatibility);
}
bool ChangeStreamChangeCollectionManager::hasChangeCollection(
@@ -302,4 +332,98 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
return changeCollectionsWriter.write(opCtx, opDebug);
}
+boost::optional<RecordIdBound>
+ChangeStreamChangeCollectionManager::getChangeCollectionMaxExpiredRecordId(
+ OperationContext* opCtx, const CollectionPtr* changeCollection, const Date_t& expirationTime) {
+ const auto isExpired = [&](const BSONObj& changeDoc) {
+ const BSONElement& wallElem = changeDoc["wall"];
+ invariant(wallElem);
+
+ auto bsonType = wallElem.type();
+ invariant(bsonType == BSONType::Date);
+
+ return wallElem.Date() <= expirationTime;
+ };
+
+ BSONObj currChangeDoc;
+ RecordId currRecordId;
+
+ boost::optional<RecordId> prevRecordId;
+ boost::optional<RecordId> prevPrevRecordId;
+
+ auto scanExecutor = InternalPlanner::collectionScan(opCtx,
+ changeCollection,
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD);
+ while (true) {
+ auto getNextState = scanExecutor->getNext(&currChangeDoc, &currRecordId);
+ switch (getNextState) {
+ case PlanExecutor::IS_EOF:
+ // Either the collection is empty (case in which return boost::none), or all the
+ // documents have expired. The remover job should never delete the last entry of a
+ // change collection, so return the recordId of the document previous to the last
+ // one.
+ return prevPrevRecordId ? RecordIdBound(prevPrevRecordId.get())
+ : boost::optional<RecordIdBound>();
+ case PlanExecutor::ADVANCED: {
+ if (!isExpired(currChangeDoc)) {
+ return prevRecordId ? RecordIdBound(prevRecordId.get())
+ : boost::optional<RecordIdBound>();
+ }
+ }
+ }
+
+ prevPrevRecordId = prevRecordId;
+ prevRecordId = currRecordId;
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId, const Date_t& expirationTime) {
+ // Acquire intent-exclusive lock on the change collection. Early exit if the collection
+ // doesn't exist.
+ const auto changeCollection =
+ AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
+
+ // Early exit if collection does not exist, or if running on a secondary (requires
+ // opCtx->lockState()->isRSTLLocked()).
+ if (!changeCollection ||
+ !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kConfigDb)) {
+ return 0;
+ }
+
+ const auto maxRecordIdBound =
+ getChangeCollectionMaxExpiredRecordId(opCtx, &*changeCollection, expirationTime);
+
+ // Early exit if there are no expired documents to be removed.
+ if (!maxRecordIdBound.has_value()) {
+ return 0;
+ }
+
+ auto params = std::make_unique<DeleteStageParams>();
+ params->isMulti = true;
+
+ auto batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
+ auto deleteExecutor = InternalPlanner::deleteWithCollectionScan(
+ opCtx,
+ &(*changeCollection),
+ std::move(params),
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD,
+ boost::none,
+ std::move(maxRecordIdBound),
+ CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
+ std::move(batchedDeleteParams));
+
+ try {
+ return deleteExecutor->executeDelete();
+ } catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) {
+ // It is expected that a collection drop can kill a query plan while deleting an old
+ // document, so ignore this error.
+ return 0;
+ }
+}
} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index b6836ac0c1e..cd2e166b1df 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -121,6 +121,24 @@ public:
std::vector<InsertStatement>::const_iterator endOplogEntries,
bool isGlobalIXLockAcquired,
OpDebug* opDebug);
-};
+ /**
+ * Forward-scans the given change collection to return the recordId of the last, non-terminal
+ * document having the wall time less than the 'expirationTime'. Returns 'boost::none' if the
+ * collection is empty, or there are no expired documents, or the collection contains a single
+ * expired document.
+ */
+ static boost::optional<RecordIdBound> getChangeCollectionMaxExpiredRecordId(
+ OperationContext* opCtx,
+ const CollectionPtr* changeCollection,
+ const Date_t& expirationTime);
+
+ /**
+ * Removes expired documents from the change collection for the provided 'tenantId'. A document
+ * whose retention time is less than the 'expirationTime' is deleted.
+ */
+ static size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ const Date_t& expirationTime);
+};
} // namespace mongo
diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl
index 06282d7c8d5..fa97168380d 100644
--- a/src/mongo/db/change_streams_cluster_parameter.idl
+++ b/src/mongo/db/change_streams_cluster_parameter.idl
@@ -58,3 +58,11 @@ server_parameters:
cpp_varname: gChangeStreamsClusterParameter
validator:
callback: validateChangeStreamsClusterParameter
+ changeCollectionRemoverJobSleepSeconds:
+ description: "Specifies the number of seconds for which the periodic change collection remover job will sleep between each cycle."
+ set_at: [ startup ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: "gChangeCollectionRemoverJobSleepSeconds"
+ validator:
+ gte: 1
+ default: 10
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 3aceda74989..0b9ede46ae0 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/catalog/health_log.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog/index_key_validate.h"
+#include "mongo/db/change_collection_expired_documents_remover.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/client.h"
@@ -778,13 +779,15 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
}
}
- // Start a background task to periodically remove expired pre-images from the 'system.preimages'
- // collection if not in standalone mode.
+ // If not in standalone mode, start background tasks to:
+ // * Periodically remove expired pre-images from the 'system.preimages'
+ // * Periodically remove expired documents from change collections
const auto isStandalone =
repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() ==
repl::ReplicationCoordinator::modeNone;
if (!isStandalone) {
startChangeStreamExpiredPreImagesRemover(serviceContext);
+ startChangeCollectionExpiredDocumentsRemover(serviceContext);
}
// Set up the logical session cache
@@ -1414,6 +1417,8 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
LOGV2(6278511, "Shutting down the Change Stream Expired Pre-images Remover");
shutdownChangeStreamExpiredPreImagesRemover(serviceContext);
+ shutdownChangeCollectionExpiredDocumentsRemover(serviceContext);
+
// We should always be able to acquire the global lock at shutdown.
// An OperationContext is not necessary to call lockGlobal() during shutdown, as it's only used
// to check that lockGlobal() is not called after a transaction timestamp has been set.