summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2023-03-17 09:41:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-17 10:53:20 +0000
commit693239bfae711a52dd5e574bf1d8694952f782e3 (patch)
treee515f16ceb25b4efb0633af15d5cd56e8b5c4241 /src/mongo/db
parente82a8e8adcf5aab2d39a219e6a31ef2168ff9b64 (diff)
downloadmongo-693239bfae711a52dd5e574bf1d8694952f782e3.tar.gz
SERVER-74098 Implement initial version of ChangeCollection truncate markers
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/change_collection_expired_change_remover_test.cpp132
-rw-r--r--src/mongo/db/change_collection_truncate_markers.cpp72
-rw-r--r--src/mongo/db/change_collection_truncate_markers.h52
4 files changed, 257 insertions, 1 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 3c32e5da968..d5527065857 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -519,6 +519,7 @@ env.Library(
env.Library(
target='change_stream_change_collection_manager',
source=[
+ 'change_collection_truncate_markers.cpp',
'change_stream_change_collection_manager.cpp',
],
LIBDEPS_PRIVATE=[
@@ -2581,6 +2582,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/network',
'$BUILD_DIR/mongo/util/net/ssl_options_server',
+ 'change_stream_options_manager',
'collection_index_usage_tracker',
'commands',
'common',
diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp
index 92c9cdabc15..28779c57af7 100644
--- a/src/mongo/db/change_collection_expired_change_remover_test.cpp
+++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp
@@ -34,14 +34,18 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/catalog/catalog_test_fixture.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_collection_truncate_markers.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/change_stream_serverless_helpers.h"
+#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/record_id_helpers.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/server_parameter_with_storage.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/record_data.h"
#include "mongo/idl/server_parameter_test_util.h"
@@ -84,7 +88,8 @@ protected:
auto oplogEntryBson = oplogEntry.toBSON();
RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()};
- RecordId recordId = record_id_helpers::keyForDate(wallTime);
+ RecordId recordId =
+ record_id_helpers::keyForOptime(timestamp, KeyFormat::String).getValue();
AutoGetChangeCollection changeCollection{
opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
@@ -147,6 +152,76 @@ protected:
"internalChangeStreamUseTenantIdForTesting", true};
};
+class ChangeCollectionTruncateExpirationTest : public ChangeCollectionExpiredChangeRemoverTest {
+protected:
+ ChangeCollectionTruncateExpirationTest() : ChangeCollectionExpiredChangeRemoverTest() {}
+
+ void setExpireAfterSeconds(OperationContext* opCtx, Seconds seconds) {
+ auto* clusterParameters = ServerParameterSet::getClusterParameterSet();
+ auto* changeStreamsParam =
+ clusterParameters
+ ->get<ClusterParameterWithStorage<ChangeStreamsClusterParameterStorage>>(
+ "changeStreams");
+
+ auto oldSettings = changeStreamsParam->getValue(_tenantId);
+ oldSettings.setExpireAfterSeconds(seconds.count());
+ changeStreamsParam->setValue(oldSettings, _tenantId).ignore();
+ }
+
+ void insertDocumentToChangeCollection(OperationContext* opCtx,
+ const TenantId& tenantId,
+ const BSONObj& obj) {
+ WriteUnitOfWork wuow(opCtx);
+ ChangeCollectionExpiredChangeRemoverTest::insertDocumentToChangeCollection(
+ opCtx, tenantId, obj);
+ const auto wallTime = now();
+ Timestamp timestamp{wallTime};
+ RecordId recordId =
+ record_id_helpers::keyForOptime(timestamp, KeyFormat::String).getValue();
+
+ _truncateMarkers->updateCurrentMarkerAfterInsertOnCommit(
+ opCtx, obj.objsize(), recordId, wallTime, 1);
+ wuow.commit();
+ }
+
+ void dropAndRecreateChangeCollection(OperationContext* opCtx,
+ const TenantId& tenantId,
+ int64_t minBytesPerMarker) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ changeCollectionManager.dropChangeCollection(opCtx, tenantId);
+ _truncateMarkers.reset();
+ changeCollectionManager.createChangeCollection(opCtx, tenantId);
+ _truncateMarkers = std::make_unique<ChangeCollectionTruncateMarkers>(
+ tenantId, std::deque<CollectionTruncateMarkers::Marker>{}, 0, 0, minBytesPerMarker);
+ }
+
+ size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ Date_t expirationTime) {
+ // Acquire intent-exclusive lock on the change collection. Early exit if the collection
+ // doesn't exist.
+ const auto changeCollection =
+ AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
+
+ WriteUnitOfWork wuow(opCtx);
+ size_t numRecordsDeleted = 0;
+ while (boost::optional<CollectionTruncateMarkers::Marker> marker =
+ _truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) {
+ auto recordStore = changeCollection->getRecordStore();
+
+ ASSERT_OK(recordStore->rangeTruncate(
+ opCtx, RecordId(), marker->lastRecord, -marker->bytes, -marker->records));
+
+ _truncateMarkers->popOldestMarker();
+ numRecordsDeleted += marker->records;
+ }
+ wuow.commit();
+ return numRecordsDeleted;
+ }
+
+ std::unique_ptr<ChangeCollectionTruncateMarkers> _truncateMarkers;
+};
+
// Tests that the last expired focument retrieved is the expected one.
TEST_F(ChangeCollectionExpiredChangeRemoverTest, VerifyLastExpiredDocument) {
const auto opCtx = operationContext();
@@ -240,4 +315,59 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldLeaveAtLeastOneDocument)
ASSERT_EQ(changeCollectionEntries.size(), 1);
ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), BSON("_id" << 99));
}
+
+// Tests that only the expired documents are removed from the change collection.
+TEST_F(ChangeCollectionTruncateExpirationTest, ShouldRemoveOnlyExpiredDocument_Markers) {
+ const BSONObj firstExpired = BSON("_id"
+ << "firstExpired");
+ const BSONObj secondExpired = BSON("_id"
+ << "secondExpired");
+ const BSONObj notExpired = BSON("_id"
+ << "notExpired");
+
+ const auto timeAtStart = now();
+ const auto opCtx = operationContext();
+ dropAndRecreateChangeCollection(
+ opCtx, _tenantId, firstExpired.objsize() + secondExpired.objsize());
+
+ insertDocumentToChangeCollection(opCtx, _tenantId, firstExpired);
+ clockSource()->advance(Hours(1));
+ insertDocumentToChangeCollection(opCtx, _tenantId, secondExpired);
+
+ // Store the wallTime of the last expired document.
+ const auto expirationTime = now();
+ const auto expirationTimeInSeconds = duration_cast<Seconds>(expirationTime - timeAtStart);
+ setExpireAfterSeconds(opCtx, expirationTimeInSeconds);
+ clockSource()->advance(Hours(1));
+ insertDocumentToChangeCollection(opCtx, _tenantId, notExpired);
+
+ // Verify that only the required documents are removed.
+ ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, expirationTime), 2);
+
+ // Only the 'notExpired' document is left in the change collection.
+ const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId);
+ ASSERT_EQ(changeCollectionEntries.size(), 1);
+ ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), notExpired);
+}
+
+// Tests that the last expired document is never deleted.
+TEST_F(ChangeCollectionTruncateExpirationTest, ShouldLeaveAtLeastOneDocument_Markers) {
+ const auto opCtx = operationContext();
+ dropAndRecreateChangeCollection(opCtx, _tenantId, 1);
+
+ setExpireAfterSeconds(opCtx, Seconds{1});
+
+ for (int i = 0; i < 100; ++i) {
+ insertDocumentToChangeCollection(opCtx, _tenantId, BSON("_id" << i));
+ clockSource()->advance(Seconds{1});
+ }
+
+ // Verify that all but the last document is removed.
+ ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, now()), 99);
+
+ // Only the last document is left in the change collection.
+ const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId);
+ ASSERT_EQ(changeCollectionEntries.size(), 1);
+ ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), BSON("_id" << 99));
+}
} // namespace mongo
diff --git a/src/mongo/db/change_collection_truncate_markers.cpp b/src/mongo/db/change_collection_truncate_markers.cpp
new file mode 100644
index 00000000000..949877c3dfc
--- /dev/null
+++ b/src/mongo/db/change_collection_truncate_markers.cpp
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/change_collection_truncate_markers.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+ChangeCollectionTruncateMarkers::ChangeCollectionTruncateMarkers(TenantId tenantId,
+ std::deque<Marker> markers,
+ int64_t leftoverRecordsCount,
+ int64_t leftoverRecordsBytes,
+ int64_t minBytesPerMarker)
+ : CollectionTruncateMarkers(
+ std::move(markers),
+ leftoverRecordsCount,
+ leftoverRecordsBytes,
+ minBytesPerMarker,
+ /* We enable partial marker support as we need to know the last
+ recordId inserted. Otherwise we risk fully emptying the collection */
+ true),
+ _tenantId(std::move(tenantId)) {}
+
+bool ChangeCollectionTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx) const {
+ const auto& markers = getMarkers();
+ if (markers.empty()) {
+ // If there's nothing in the markers queue then we don't have excess markers by definition.
+ return false;
+ }
+
+ const Marker& oldestMarker = markers.front();
+ const auto& [highestRecordIdInserted, _] = getPartialMarker();
+ if (highestRecordIdInserted <= oldestMarker.lastRecord) {
+ // We cannot expire the marker when the last entry is present there as it would break the
+ // requirement of always having at least 1 entry present in the collection.
+ return false;
+ }
+
+ auto now = opCtx->getServiceContext()->getFastClockSource()->now();
+ auto expireAfterSeconds =
+ Seconds{change_stream_serverless_helpers::getExpireAfterSeconds(_tenantId)};
+ auto expirationTime = now - expireAfterSeconds;
+
+ return oldestMarker.wallTime <= expirationTime;
+}
+} // namespace mongo
diff --git a/src/mongo/db/change_collection_truncate_markers.h b/src/mongo/db/change_collection_truncate_markers.h
new file mode 100644
index 00000000000..5874d7065a4
--- /dev/null
+++ b/src/mongo/db/change_collection_truncate_markers.h
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/storage/collection_markers.h"
+
+/**
+ * Implementation of truncate markers for Change Collections. Respects the requirement of always
+ * maintaining at least 1 entry in the change collection.
+ */
+namespace mongo {
+class ChangeCollectionTruncateMarkers final : public CollectionTruncateMarkers {
+public:
+ ChangeCollectionTruncateMarkers(TenantId tenantId,
+ std::deque<Marker> markers,
+ int64_t leftoverRecordsCount,
+ int64_t leftoverRecordsBytes,
+ int64_t minBytesPerMarker);
+
+private:
+ bool _hasExcessMarkers(OperationContext* opCtx) const override;
+
+ TenantId _tenantId;
+};
+} // namespace mongo