diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2023-03-17 09:41:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-17 10:53:20 +0000 |
commit | 693239bfae711a52dd5e574bf1d8694952f782e3 (patch) | |
tree | e515f16ceb25b4efb0633af15d5cd56e8b5c4241 /src/mongo/db | |
parent | e82a8e8adcf5aab2d39a219e6a31ef2168ff9b64 (diff) | |
download | mongo-693239bfae711a52dd5e574bf1d8694952f782e3.tar.gz |
SERVER-74098 Implement initial version of ChangeCollection truncate markers
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/change_collection_expired_change_remover_test.cpp | 132 | ||||
-rw-r--r-- | src/mongo/db/change_collection_truncate_markers.cpp | 72 | ||||
-rw-r--r-- | src/mongo/db/change_collection_truncate_markers.h | 52 |
4 files changed, 257 insertions, 1 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 3c32e5da968..d5527065857 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -519,6 +519,7 @@ env.Library( env.Library( target='change_stream_change_collection_manager', source=[ + 'change_collection_truncate_markers.cpp', 'change_stream_change_collection_manager.cpp', ], LIBDEPS_PRIVATE=[ @@ -2581,6 +2582,7 @@ if wiredtiger: '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/network', '$BUILD_DIR/mongo/util/net/ssl_options_server', + 'change_stream_options_manager', 'collection_index_usage_tracker', 'commands', 'common', diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp index 92c9cdabc15..28779c57af7 100644 --- a/src/mongo/db/change_collection_expired_change_remover_test.cpp +++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp @@ -34,14 +34,18 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/change_collection_truncate_markers.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_options_manager.h" #include "mongo/db/change_stream_serverless_helpers.h" +#include "mongo/db/change_streams_cluster_parameter_gen.h" #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/record_id_helpers.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/server_parameter_with_storage.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/record_data.h" #include "mongo/idl/server_parameter_test_util.h" @@ -84,7 +88,8 @@ protected: auto oplogEntryBson = oplogEntry.toBSON(); RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()}; - RecordId recordId = record_id_helpers::keyForDate(wallTime); + RecordId recordId = + record_id_helpers::keyForOptime(timestamp, KeyFormat::String).getValue(); AutoGetChangeCollection changeCollection{ opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; @@ -147,6 +152,76 @@ protected: "internalChangeStreamUseTenantIdForTesting", true}; }; +class ChangeCollectionTruncateExpirationTest : public ChangeCollectionExpiredChangeRemoverTest { +protected: + ChangeCollectionTruncateExpirationTest() : ChangeCollectionExpiredChangeRemoverTest() {} + + void setExpireAfterSeconds(OperationContext* opCtx, Seconds seconds) { + auto* clusterParameters = ServerParameterSet::getClusterParameterSet(); + auto* changeStreamsParam = + clusterParameters + ->get<ClusterParameterWithStorage<ChangeStreamsClusterParameterStorage>>( + "changeStreams"); + + auto oldSettings = changeStreamsParam->getValue(_tenantId); + oldSettings.setExpireAfterSeconds(seconds.count()); + changeStreamsParam->setValue(oldSettings, _tenantId).ignore(); + } + + void insertDocumentToChangeCollection(OperationContext* opCtx, + const TenantId& tenantId, + const BSONObj& obj) { + WriteUnitOfWork wuow(opCtx); + ChangeCollectionExpiredChangeRemoverTest::insertDocumentToChangeCollection( + opCtx, tenantId, obj); + const auto wallTime = now(); + Timestamp timestamp{wallTime}; + RecordId recordId = + record_id_helpers::keyForOptime(timestamp, KeyFormat::String).getValue(); + + _truncateMarkers->updateCurrentMarkerAfterInsertOnCommit( + opCtx, obj.objsize(), recordId, wallTime, 1); + wuow.commit(); + } + + void dropAndRecreateChangeCollection(OperationContext* opCtx, + const TenantId& tenantId, + int64_t minBytesPerMarker) { + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + changeCollectionManager.dropChangeCollection(opCtx, tenantId); + _truncateMarkers.reset(); + changeCollectionManager.createChangeCollection(opCtx, tenantId); + _truncateMarkers = std::make_unique<ChangeCollectionTruncateMarkers>( + tenantId, std::deque<CollectionTruncateMarkers::Marker>{}, 0, 0, minBytesPerMarker); + } + + size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx, + boost::optional<TenantId> tenantId, + Date_t expirationTime) { + // Acquire intent-exclusive lock on the change collection. Early exit if the collection + // doesn't exist. + const auto changeCollection = + AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; + + WriteUnitOfWork wuow(opCtx); + size_t numRecordsDeleted = 0; + while (boost::optional<CollectionTruncateMarkers::Marker> marker = + _truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) { + auto recordStore = changeCollection->getRecordStore(); + + ASSERT_OK(recordStore->rangeTruncate( + opCtx, RecordId(), marker->lastRecord, -marker->bytes, -marker->records)); + + _truncateMarkers->popOldestMarker(); + numRecordsDeleted += marker->records; + } + wuow.commit(); + return numRecordsDeleted; + } + + std::unique_ptr<ChangeCollectionTruncateMarkers> _truncateMarkers; +}; + // Tests that the last expired focument retrieved is the expected one. TEST_F(ChangeCollectionExpiredChangeRemoverTest, VerifyLastExpiredDocument) { const auto opCtx = operationContext(); @@ -240,4 +315,59 @@ TEST_F(ChangeCollectionExpiredChangeRemoverTest, ShouldLeaveAtLeastOneDocument) ASSERT_EQ(changeCollectionEntries.size(), 1); ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), BSON("_id" << 99)); } + +// Tests that only the expired documents are removed from the change collection. +TEST_F(ChangeCollectionTruncateExpirationTest, ShouldRemoveOnlyExpiredDocument_Markers) { + const BSONObj firstExpired = BSON("_id" + << "firstExpired"); + const BSONObj secondExpired = BSON("_id" + << "secondExpired"); + const BSONObj notExpired = BSON("_id" + << "notExpired"); + + const auto timeAtStart = now(); + const auto opCtx = operationContext(); + dropAndRecreateChangeCollection( + opCtx, _tenantId, firstExpired.objsize() + secondExpired.objsize()); + + insertDocumentToChangeCollection(opCtx, _tenantId, firstExpired); + clockSource()->advance(Hours(1)); + insertDocumentToChangeCollection(opCtx, _tenantId, secondExpired); + + // Store the wallTime of the last expired document. + const auto expirationTime = now(); + const auto expirationTimeInSeconds = duration_cast<Seconds>(expirationTime - timeAtStart); + setExpireAfterSeconds(opCtx, expirationTimeInSeconds); + clockSource()->advance(Hours(1)); + insertDocumentToChangeCollection(opCtx, _tenantId, notExpired); + + // Verify that only the required documents are removed. + ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, expirationTime), 2); + + // Only the 'notExpired' document is left in the change collection. + const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId); + ASSERT_EQ(changeCollectionEntries.size(), 1); + ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), notExpired); +} + +// Tests that the last expired document is never deleted. +TEST_F(ChangeCollectionTruncateExpirationTest, ShouldLeaveAtLeastOneDocument_Markers) { + const auto opCtx = operationContext(); + dropAndRecreateChangeCollection(opCtx, _tenantId, 1); + + setExpireAfterSeconds(opCtx, Seconds{1}); + + for (int i = 0; i < 100; ++i) { + insertDocumentToChangeCollection(opCtx, _tenantId, BSON("_id" << i)); + clockSource()->advance(Seconds{1}); + } + + // Verify that all but the last document is removed. + ASSERT_EQ(removeExpiredChangeCollectionsDocuments(opCtx, _tenantId, now()), 99); + + // Only the last document is left in the change collection. + const auto changeCollectionEntries = readChangeCollection(opCtx, _tenantId); + ASSERT_EQ(changeCollectionEntries.size(), 1); + ASSERT_BSONOBJ_EQ(changeCollectionEntries[0].getObject(), BSON("_id" << 99)); +} } // namespace mongo diff --git a/src/mongo/db/change_collection_truncate_markers.cpp b/src/mongo/db/change_collection_truncate_markers.cpp new file mode 100644 index 00000000000..949877c3dfc --- /dev/null +++ b/src/mongo/db/change_collection_truncate_markers.cpp @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/change_collection_truncate_markers.h" +#include "mongo/db/change_stream_serverless_helpers.h" +#include "mongo/db/operation_context.h" + +namespace mongo { +ChangeCollectionTruncateMarkers::ChangeCollectionTruncateMarkers(TenantId tenantId, + std::deque<Marker> markers, + int64_t leftoverRecordsCount, + int64_t leftoverRecordsBytes, + int64_t minBytesPerMarker) + : CollectionTruncateMarkers( + std::move(markers), + leftoverRecordsCount, + leftoverRecordsBytes, + minBytesPerMarker, + /* We enable partial marker support as we need to know the last + recordId inserted. Otherwise we risk fully emptying the collection */ + true), + _tenantId(std::move(tenantId)) {} + +bool ChangeCollectionTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx) const { + const auto& markers = getMarkers(); + if (markers.empty()) { + // If there's nothing in the markers queue then we don't have excess markers by definition. + return false; + } + + const Marker& oldestMarker = markers.front(); + const auto& [highestRecordIdInserted, _] = getPartialMarker(); + if (highestRecordIdInserted <= oldestMarker.lastRecord) { + // We cannot expire the marker when the last entry is present there as it would break the + // requirement of always having at least 1 entry present in the collection. + return false; + } + + auto now = opCtx->getServiceContext()->getFastClockSource()->now(); + auto expireAfterSeconds = + Seconds{change_stream_serverless_helpers::getExpireAfterSeconds(_tenantId)}; + auto expirationTime = now - expireAfterSeconds; + + return oldestMarker.wallTime <= expirationTime; +} +} // namespace mongo diff --git a/src/mongo/db/change_collection_truncate_markers.h b/src/mongo/db/change_collection_truncate_markers.h new file mode 100644 index 00000000000..5874d7065a4 --- /dev/null +++ b/src/mongo/db/change_collection_truncate_markers.h @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/storage/collection_markers.h" + +/** + * Implementation of truncate markers for Change Collections. Respects the requirement of always + * maintaining at least 1 entry in the change collection. + */ +namespace mongo { +class ChangeCollectionTruncateMarkers final : public CollectionTruncateMarkers { +public: + ChangeCollectionTruncateMarkers(TenantId tenantId, + std::deque<Marker> markers, + int64_t leftoverRecordsCount, + int64_t leftoverRecordsBytes, + int64_t minBytesPerMarker); + +private: + bool _hasExcessMarkers(OperationContext* opCtx) const override; + + TenantId _tenantId; +}; +} // namespace mongo |