diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2023-04-28 14:10:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-28 15:50:38 +0000 |
commit | c0a7899adb1d6ba9c015c93be83606b090a22662 (patch) | |
tree | f78affc9d8072bba98db36235ef0c418e2514a63 /src/mongo | |
parent | 657341a3615bf63ecab75957a7b51c242978180e (diff) | |
download | mongo-c0a7899adb1d6ba9c015c93be83606b090a22662.tar.gz |
SERVER-74749 Integrate Change Collection with truncate markers
Diffstat (limited to 'src/mongo')
14 files changed, 500 insertions, 151 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index d86f51eee3a..a241ab43198 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -531,6 +531,8 @@ env.Library( '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/service_context', + 'change_streams_cluster_parameter', + 'record_id_helpers', ], ) diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp index 16d4e516b51..85882be52db 100644 --- a/src/mongo/db/change_collection_expired_change_remover_test.cpp +++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp @@ -145,8 +145,9 @@ protected: ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( opCtx, changeCollection) ->maxRecordIdBound; - return ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - opCtx, changeCollection, maxRecordIdBound, expirationTime); + return ChangeStreamChangeCollectionManager:: + removeExpiredChangeCollectionsDocumentsWithCollScan( + opCtx, changeCollection, maxRecordIdBound, expirationTime); } const TenantId _tenantId; @@ -174,58 +175,26 @@ protected: changeStreamsParam->setValue(oldSettings, _tenantId).ignore(); } - void insertDocumentToChangeCollection(OperationContext* opCtx, - const TenantId& tenantId, - const BSONObj& obj) { - WriteUnitOfWork wuow(opCtx); - ChangeCollectionExpiredChangeRemoverTest::insertDocumentToChangeCollection( - opCtx, tenantId, obj); - const auto wallTime = now(); - Timestamp timestamp{wallTime}; - RecordId recordId = - record_id_helpers::keyForOptime(timestamp, KeyFormat::String).getValue(); - - _truncateMarkers->updateCurrentMarkerAfterInsertOnCommit( - opCtx, obj.objsize(), recordId, wallTime, 1); - wuow.commit(); - } - - void dropAndRecreateChangeCollection(OperationContext* opCtx, - const TenantId& tenantId, - int64_t minBytesPerMarker) { - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); - changeCollectionManager.dropChangeCollection(opCtx, tenantId); - _truncateMarkers.reset(); - changeCollectionManager.createChangeCollection(opCtx, tenantId); - _truncateMarkers = std::make_unique<ChangeCollectionTruncateMarkers>( - tenantId, std::deque<CollectionTruncateMarkers::Marker>{}, 0, 0, minBytesPerMarker); - } - size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx, const TenantId& tenantId, Date_t expirationTime) { // Acquire intent-exclusive lock on the change collection. Early exit if the collection // doesn't exist. - const auto changeCollection = - AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId}; - - WriteUnitOfWork wuow(opCtx); - size_t numRecordsDeleted = 0; - while (boost::optional<CollectionTruncateMarkers::Marker> marker = - _truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) { - auto recordStore = changeCollection->getRecordStore(); - - ASSERT_OK(recordStore->rangeTruncate( - opCtx, RecordId(), marker->lastRecord, -marker->bytes, -marker->records)); + const auto changeCollection = acquireCollection( + opCtx, + CollectionAcquisitionRequest(NamespaceString::makeChangeCollectionNSS(tenantId), + PlacementConcern{boost::none, ShardVersion::UNSHARDED()}, + repl::ReadConcernArgs::get(opCtx), + AcquisitionPrerequisites::kWrite), + MODE_IX); - _truncateMarkers->popOldestMarker(); - numRecordsDeleted += marker->records; - } - wuow.commit(); - return numRecordsDeleted; + return ChangeStreamChangeCollectionManager:: + removeExpiredChangeCollectionsDocumentsWithTruncate( + opCtx, changeCollection, expirationTime); } - std::unique_ptr<ChangeCollectionTruncateMarkers> _truncateMarkers; + RAIIServerParameterControllerForTest truncateFeatureFlag{ + "featureFlagUseUnreplicatedTruncatesForDeletions", true}; }; // Tests that the last expired focument retrieved is the expected one. @@ -336,10 +305,13 @@ TEST_F(ChangeCollectionTruncateExpirationTest, ShouldRemoveOnlyExpiredDocument_M const BSONObj notExpired = BSON("_id" << "notExpired"); + RAIIServerParameterControllerForTest minBytesPerMarker{ + "changeCollectionTruncateMarkersMinBytes", + firstExpired.objsize() + secondExpired.objsize()}; + const auto timeAtStart = now(); const auto opCtx = operationContext(); - dropAndRecreateChangeCollection( - opCtx, _tenantId, firstExpired.objsize() + secondExpired.objsize()); + dropAndRecreateChangeCollection(opCtx, _tenantId); insertDocumentToChangeCollection(opCtx, _tenantId, firstExpired); clockSource()->advance(Hours(1)); @@ -363,8 +335,11 @@ TEST_F(ChangeCollectionTruncateExpirationTest, ShouldRemoveOnlyExpiredDocument_M // Tests that the last expired document is never deleted. TEST_F(ChangeCollectionTruncateExpirationTest, ShouldLeaveAtLeastOneDocument_Markers) { + RAIIServerParameterControllerForTest minBytesPerMarker{ + "changeCollectionTruncateMarkersMinBytes", 1}; const auto opCtx = operationContext(); - dropAndRecreateChangeCollection(opCtx, _tenantId, 1); + + dropAndRecreateChangeCollection(opCtx, _tenantId); setExpireAfterSeconds(opCtx, Seconds{1}); diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp index 5dc4d1fe7e6..84b9f4817ea 100644 --- a/src/mongo/db/change_collection_expired_documents_remover.cpp +++ b/src/mongo/db/change_collection_expired_documents_remover.cpp @@ -35,6 +35,7 @@ #include "mongo/db/change_streams_cluster_parameter_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/service_context.h" #include "mongo/db/shard_role.h" #include "mongo/logv2/log.h" @@ -67,9 +68,18 @@ change_stream_serverless_helpers::TenantSet getConfigDbTenants(OperationContext* return tenantIds; } +bool usesUnreplicatedTruncates() { + // (Ignore FCV check): This feature flag is potentially backported to previous version of the + // server. We can't rely on the FCV version to see whether it's enabled or not. + return feature_flags::gFeatureFlagUseUnreplicatedTruncatesForDeletions + .isEnabledAndIgnoreFCVUnsafe(); +} + void removeExpiredDocuments(Client* client) { hangBeforeRemovingExpiredChanges.pauseWhileSet(); + bool useUnreplicatedTruncates = usesUnreplicatedTruncates(); + try { auto opCtx = client->makeOperationContext(); const auto clock = client->getServiceContext()->getFastClockSource(); @@ -77,11 +87,9 @@ void removeExpiredDocuments(Client* client) { // If the fail point 'injectCurrentWallTimeForRemovingDocuments' is enabled then set the // 'currentWallTime' with the provided wall time. - if (injectCurrentWallTimeForRemovingExpiredDocuments.shouldFail()) { - injectCurrentWallTimeForRemovingExpiredDocuments.execute([&](const BSONObj& data) { - currentWallTime = data.getField("currentWallTime").date(); - }); - } + injectCurrentWallTimeForRemovingExpiredDocuments.execute([&](const BSONObj& data) { + currentWallTime = data.getField("currentWallTime").date(); + }); // Number of documents removed in the current pass. size_t removedCount = 0; @@ -102,39 +110,53 @@ void removeExpiredDocuments(Client* client) { AcquisitionPrerequisites::kWrite), MODE_IX); - // Early exit if collection does not exist or if running on a secondary (requires - // opCtx->lockState()->isRSTLLocked()). - if (!changeCollection.exists() || + // Early exit if collection does not exist. + if (!changeCollection.exists()) { + continue; + } + // Early exit if running on a secondary and we haven't enabled the unreplicated truncate + // maintenance flag (requires opCtx->lockState()->isRSTLLocked()). + if (!useUnreplicatedTruncates && !repl::ReplicationCoordinator::get(opCtx.get()) ->canAcceptWritesForDatabase(opCtx.get(), DatabaseName::kConfig.toString())) { continue; } - // Get the metadata required for the removal of the expired change collection - // documents. Early exit if the metadata is missing, indicating that there is nothing - // to remove. - auto purgingJobMetadata = - ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( - opCtx.get(), changeCollection); - if (!purgingJobMetadata) { - continue; + if (useUnreplicatedTruncates) { + removedCount += ChangeStreamChangeCollectionManager:: + removeExpiredChangeCollectionsDocumentsWithTruncate( + opCtx.get(), + changeCollection, + currentWallTime - Seconds(expiredAfterSeconds)); + } else { + // Get the metadata required for the removal of the expired change collection + // documents. Early exit if the metadata is missing, indicating that there is + // nothing to remove. + auto purgingJobMetadata = + ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( + opCtx.get(), changeCollection); + if (!purgingJobMetadata) { + continue; + } + + removedCount += ChangeStreamChangeCollectionManager:: + removeExpiredChangeCollectionsDocumentsWithCollScan( + opCtx.get(), + changeCollection, + purgingJobMetadata->maxRecordIdBound, + currentWallTime - Seconds(expiredAfterSeconds)); + maxStartWallTime = + std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis); } - removedCount += - ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( - opCtx.get(), - changeCollection, - purgingJobMetadata->maxRecordIdBound, - currentWallTime - Seconds(expiredAfterSeconds)); changeCollectionManager.getPurgingJobStats().scannedCollections.fetchAndAddRelaxed(1); - maxStartWallTime = - std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis); } // The purging job metadata will be 'boost::none' if none of the change collections have // more than one oplog entry, as such the 'maxStartWallTimeMillis' will be zero. Avoid - // reporting 0 as 'maxStartWallTimeMillis'. - if (maxStartWallTime > 0) { + // reporting 0 as 'maxStartWallTimeMillis'. If using unreplicated truncates, this is + // maintained by the call to removeExpiredChangeCollectionsDocumentsWithTruncate. + if (!useUnreplicatedTruncates && maxStartWallTime > 0) { changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store( maxStartWallTime); } diff --git a/src/mongo/db/change_collection_truncate_markers.cpp b/src/mongo/db/change_collection_truncate_markers.cpp index f1670f93d58..ce50a3aec32 100644 --- a/src/mongo/db/change_collection_truncate_markers.cpp +++ b/src/mongo/db/change_collection_truncate_markers.cpp @@ -28,10 +28,33 @@ */ #include "mongo/db/change_collection_truncate_markers.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/operation_context.h" namespace mongo { + +namespace { +MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForCheckingMarkers); + +Date_t getWallTimeToUse(OperationContext* opCtx) { + auto now = opCtx->getServiceContext()->getFastClockSource()->now(); + injectCurrentWallTimeForCheckingMarkers.execute( + [&](const BSONObj& data) { now = data.getField("currentWallTime").date(); }); + return now; +} + +bool hasMarkerWallTimeExpired(OperationContext* opCtx, + Date_t markerWallTime, + const TenantId& tenantId) { + auto now = getWallTimeToUse(opCtx); + auto expireAfterSeconds = + Seconds{change_stream_serverless_helpers::getExpireAfterSeconds(tenantId)}; + auto expirationTime = now - expireAfterSeconds; + return markerWallTime <= expirationTime; +} +} // namespace + ChangeCollectionTruncateMarkers::ChangeCollectionTruncateMarkers(TenantId tenantId, std::deque<Marker> markers, int64_t leftoverRecordsCount, @@ -56,11 +79,88 @@ bool ChangeCollectionTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx) return false; } - auto now = opCtx->getServiceContext()->getFastClockSource()->now(); - auto expireAfterSeconds = - Seconds{change_stream_serverless_helpers::getExpireAfterSeconds(_tenantId)}; - auto expirationTime = now - expireAfterSeconds; + return hasMarkerWallTimeExpired(opCtx, oldestMarker.wallTime, _tenantId); +} + +bool ChangeCollectionTruncateMarkers::_hasPartialMarkerExpired(OperationContext* opCtx) const { + const auto& [_, highestSeenWallTime] = getPartialMarker(); + + return hasMarkerWallTimeExpired(opCtx, highestSeenWallTime, _tenantId); +} + +void ChangeCollectionTruncateMarkers::expirePartialMarker(OperationContext* opCtx, + const Collection* changeCollection) { + createPartialMarkerIfNecessary(opCtx); + // We can't use the normal peekOldestMarkerIfNeeded method since that calls _hasExcessMarkers + // and it will return false since the new oldest marker will have the last entry. + auto oldestMarker = + checkMarkersWith([&](const std::deque<CollectionTruncateMarkers::Marker>& markers) + -> boost::optional<CollectionTruncateMarkers::Marker> { + // Partial marker did not get generated, early exit. + if (markers.empty()) { + return {}; + } + auto firstMarker = markers.front(); + // We will only consider the case of an expired marker. + if (!hasMarkerWallTimeExpired(opCtx, firstMarker.wallTime, _tenantId)) { + return {}; + } + return firstMarker; + }); + + if (!oldestMarker) { + // The oldest marker hasn't expired, nothing to do here. + return; + } + + // Abandon the snapshot so we can fetch the most recent version of the table. This increases the + // chances the last entry isn't present in the new partial marker. + opCtx->recoveryUnit()->abandonSnapshot(); + WriteUnitOfWork wuow(opCtx); + + auto backCursor = changeCollection->getRecordStore()->getCursor(opCtx, false); + // If the oldest marker does not contain the last entry it's a normal marker, don't perform any + // modifications to it. + auto obj = backCursor->next(); + if (!obj || obj->id > oldestMarker->lastRecord) { + return; + } + + // At this point the marker contains the last entry of the collection, we have to shift the last + // entry to the next marker so we can expire the previous entries. + auto bytesNotTruncated = obj->data.size(); + const auto& doc = obj->data.toBson(); + auto wallTime = doc[repl::OplogEntry::kWallClockTimeFieldName].Date(); + + updateCurrentMarkerAfterInsertOnCommit(opCtx, bytesNotTruncated, obj->id, wallTime, 1); + + auto bytesDeleted = oldestMarker->bytes - bytesNotTruncated; + auto docsDeleted = oldestMarker->records - 1; + + // We build the previous record id based on the extracted value + auto previousRecordId = [&] { + auto currId = doc[repl::OplogEntry::k_idFieldName].timestamp(); + invariant(currId > Timestamp::min(), "Last entry timestamp must be larger than 0"); + + auto fixedBson = BSON(repl::OplogEntry::k_idFieldName << (currId - 1)); + + auto recordId = invariantStatusOK( + record_id_helpers::keyForDoc(fixedBson, + changeCollection->getClusteredInfo()->getIndexSpec(), + changeCollection->getDefaultCollator())); + return recordId; + }(); + auto newMarker = + CollectionTruncateMarkers::Marker{docsDeleted, bytesDeleted, previousRecordId, wallTime}; - return oldestMarker.wallTime <= expirationTime; + // Replace now the oldest marker with a version that doesn't contain the last entry. This is + // susceptible to races with concurrent inserts. But the invariant of metrics being correct in + // aggregate still holds. Ignoring this issue is a valid strategy here as we move the ignored + // bytes to the next partial marker and we only guarantee eventual correctness. + modifyMarkersWith([&](std::deque<CollectionTruncateMarkers::Marker>& markers) { + markers.pop_front(); + markers.emplace_front(std::move(newMarker)); + }); + wuow.commit(); } } // namespace mongo diff --git a/src/mongo/db/change_collection_truncate_markers.h b/src/mongo/db/change_collection_truncate_markers.h index b60242cd260..5ea4ee330d0 100644 --- a/src/mongo/db/change_collection_truncate_markers.h +++ b/src/mongo/db/change_collection_truncate_markers.h @@ -45,9 +45,18 @@ public: int64_t leftoverRecordsBytes, int64_t minBytesPerMarker); + // Expires the partial marker with proper care for the last entry. Expiring here means: + // * Turning the partial marker into an actual marker + // * Ensuring the last entry isn't present in the generated marker + // The last entry is necessary for correctness of the change collection. This method will shift + // the last entry size and count to the next partial marker. + void expirePartialMarker(OperationContext* opCtx, const Collection* changeCollection); + private: bool _hasExcessMarkers(OperationContext* opCtx) const override; + bool _hasPartialMarkerExpired(OperationContext* opCtx) const override; + TenantId _tenantId; }; } // namespace mongo diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index e75140f9de3..aad6f315c74 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_serverless_helpers.h" +#include "mongo/db/change_streams_cluster_parameter_gen.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/multitenancy_gen.h" #include "mongo/db/namespace_string.h" @@ -165,6 +166,13 @@ boost::optional<BSONObj> createChangeCollectionEntryFromOplog(const BSONObj& opl auto readyChangeCollDoc = changeCollDoc.freeze(); return readyChangeCollDoc.toBson(); } + +bool usesUnreplicatedTruncates() { + // (Ignore FCV check): This feature flag is potentially backported to previous version of the + // server. We can't rely on the FCV version to see whether it's enabled or not. + return feature_flags::gFeatureFlagUseUnreplicatedTruncatesForDeletions + .isEnabledAndIgnoreFCVUnsafe(); +} } // namespace /** @@ -173,10 +181,15 @@ boost::optional<BSONObj> createChangeCollectionEntryFromOplog(const BSONObj& opl */ class ChangeStreamChangeCollectionManager::ChangeCollectionsWriterInternal { public: - explicit ChangeCollectionsWriterInternal(OperationContext* opCtx, - OpDebug* opDebug, - const AutoGetChangeCollection::AccessMode& accessMode) - : _accessMode{accessMode}, _opCtx{opCtx}, _opDebug{opDebug} {} + explicit ChangeCollectionsWriterInternal( + OperationContext* opCtx, + OpDebug* opDebug, + const AutoGetChangeCollection::AccessMode& accessMode, + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>* map) + : _accessMode{accessMode}, + _opCtx{opCtx}, + _opDebug{opDebug}, + _tenantTruncateMarkersMap(map) {} /** * Adds the insert statement for the provided tenant that will be written to the change @@ -227,6 +240,14 @@ public: // Writes to the change collection should not be replicated. repl::UnreplicatedWritesBlock unReplBlock(_opCtx); + // To avoid creating a lot of unnecessary calls to + // CollectionTruncateMarkers::updateCurrentMarkerAfterInsertOnCommit we aggregate all + // the results and make a singular call. This requires storing the highest + // RecordId/WallTime seen from the insert statements. + RecordId maxRecordIdSeen; + Date_t maxWallTimeSeen; + int64_t bytesInserted = 0; + /** * For a serverless shard merge, we clone all change collection entries from the donor * and then fetch/apply retryable writes that took place before the migration. As a @@ -246,6 +267,8 @@ public: LOGV2(7282901, "Ignoring DuplicateKey error for change collection insert", "doc"_attr = insertStatement.doc.toString()); + // Continue to the next insert statement as we've ommitted the current one. + continue; } else if (!status.isOK()) { return Status(status.code(), str::stream() @@ -254,6 +277,38 @@ public: << "failed") .withReason(status.reason()); } + + // Right now we assume that the tenant change collection is clustered and + // reconstruct the RecordId used in the KV store. Ideally we want the write path to + // return the record ids used for the insert but as it isn't available we + // reconstruct the key here. + dassert(tenantChangeCollection->isClustered()); + auto recordId = invariantStatusOK(record_id_helpers::keyForDoc( + insertStatement.doc, + tenantChangeCollection->getClusteredInfo()->getIndexSpec(), + tenantChangeCollection->getDefaultCollator())); + + maxRecordIdSeen = std::max(std::move(recordId), maxRecordIdSeen); + auto docWallTime = + insertStatement.doc[repl::OplogEntry::kWallClockTimeFieldName].Date(); + maxWallTimeSeen = std::max(maxWallTimeSeen, docWallTime); + + bytesInserted += insertStatement.doc.objsize(); + } + + std::shared_ptr<ChangeCollectionTruncateMarkers> truncateMarkers = + usesUnreplicatedTruncates() + ? _tenantTruncateMarkersMap->find(tenantChangeCollection->uuid()) + : nullptr; + if (truncateMarkers && bytesInserted > 0) { + // We update the TruncateMarkers instance if it exists. Creation is performed + // asynchronously by the remover thread. + truncateMarkers->updateCurrentMarkerAfterInsertOnCommit( + _opCtx, + bytesInserted, + maxRecordIdSeen, + maxWallTimeSeen, + insertStatementsAndChangeCollection.insertStatements.size()); } } return Status::OK(); @@ -301,19 +356,23 @@ private: // Indicates if locks have been acquired. bool _locksAcquired{false}; + + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>* + _tenantTruncateMarkersMap; }; ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::ChangeCollectionsWriter( OperationContext* opCtx, std::vector<InsertStatement>::const_iterator beginOplogEntries, std::vector<InsertStatement>::const_iterator endOplogEntries, - OpDebug* opDebug) { + OpDebug* opDebug, + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>* tenantMarkerMap) { // This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for // commiting the unit of work. invariant(opCtx->lockState()->inAWriteUnitOfWork()); _writer = std::make_unique<ChangeCollectionsWriterInternal>( - opCtx, opDebug, AutoGetChangeCollection::AccessMode::kWrite); + opCtx, opDebug, AutoGetChangeCollection::AccessMode::kWrite, tenantMarkerMap); // Transform oplog entries to change collections entries and group them by tenant id. for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries; @@ -356,7 +415,8 @@ ChangeStreamChangeCollectionManager::createChangeCollectionsWriter( std::vector<InsertStatement>::const_iterator beginOplogEntries, std::vector<InsertStatement>::const_iterator endOplogEntries, OpDebug* opDebug) { - return ChangeCollectionsWriter{opCtx, beginOplogEntries, endOplogEntries, opDebug}; + return ChangeCollectionsWriter{ + opCtx, beginOplogEntries, endOplogEntries, opDebug, &_tenantTruncateMarkersMap}; } BSONObj ChangeStreamChangeCollectionManager::PurgingJobStats::toBSON() const { @@ -402,6 +462,22 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* DropReply dropReply; const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId); + const bool useUnreplicatedDeletes = usesUnreplicatedTruncates(); + // We get the UUID now in order to remove the collection from the map later. We can't get the + // UUID once the collection has been dropped. + auto collUUID = [&]() -> boost::optional<UUID> { + if (!useUnreplicatedDeletes) { + // Won't update the truncate markers map so no need to get the UUID. + return boost::none; + } + AutoGetDb lk(opCtx, changeCollNss.dbName(), MODE_IS); + auto collection = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, changeCollNss); + if (collection) { + return collection->uuid(); + } + return boost::none; + }(); const auto status = dropCollection(opCtx, changeCollNss, @@ -411,6 +487,12 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* str::stream() << "Failed to drop change collection: " << changeCollNss.toStringForErrorMsg() << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceNotFound); + + if (useUnreplicatedDeletes && collUUID) { + // Remove the collection from the TruncateMarkers map. As we are dropping the collection + // there's no need to keep it for the remover. Data will be deleted anyways. + _tenantTruncateMarkersMap.erase(*collUUID); + } } void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( @@ -424,7 +506,10 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( invariant(opCtx->lockState()->inAWriteUnitOfWork()); ChangeCollectionsWriterInternal changeCollectionsWriter{ - opCtx, nullptr /*opDebug*/, AutoGetChangeCollection::AccessMode::kWriteInOplogContext}; + opCtx, + nullptr /*opDebug*/, + AutoGetChangeCollection::AccessMode::kWriteInOplogContext, + &_tenantTruncateMarkersMap}; for (size_t idx = 0; idx < oplogRecords.size(); idx++) { auto& record = oplogRecords[idx]; @@ -478,7 +563,7 @@ ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata( return {{firstDocAttributes->first, RecordIdBound(std::move(lastDocRecordId))}}; } -size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments( +size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocumentsWithCollScan( OperationContext* opCtx, const ScopedCollectionAcquisition& changeCollection, RecordIdBound maxRecordIdBound, @@ -511,6 +596,11 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume changeCollectionManager.getPurgingJobStats().bytesDeleted.fetchAndAddRelaxed( batchedDeleteStats.bytesDeleted); + // As we are using collection scans this means we aren't using truncate markers. Clear the + // map since they will not get updated anyways. The markers will get recreated if the + // feature flag is turned on again. + changeCollectionManager._tenantTruncateMarkersMap.clear(); + return batchedDeleteStats.docsDeleted; } catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) { // It is expected that a collection drop can kill a query plan while deleting an old @@ -518,4 +608,123 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume return 0; } } + +namespace { +std::shared_ptr<ChangeCollectionTruncateMarkers> initialiseTruncateMarkers( + OperationContext* opCtx, + const Collection* changeCollectionPtr, + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>& truncateMap) { + auto rs = changeCollectionPtr->getRecordStore(); + const auto& ns = changeCollectionPtr->ns(); + + WriteUnitOfWork wuow(opCtx); + + auto minBytesPerMarker = gChangeCollectionTruncateMarkersMinBytes; + CollectionTruncateMarkers::InitialSetOfMarkers initialSetOfMarkers = + CollectionTruncateMarkers::createFromExistingRecordStore( + opCtx, rs, ns, minBytesPerMarker, [](const Record& record) { + const auto obj = record.data.toBson(); + auto wallTime = obj[repl::OplogEntry::kWallClockTimeFieldName].Date(); + return CollectionTruncateMarkers::RecordIdAndWallTime{record.id, wallTime}; + }); + // Leftover bytes contains the difference between the amount of bytes we had for the + // markers and the latest collection size/count. This is susceptible to a race + // condition, but metrics are already assumed to be approximate. Ignoring this issue is + // a valid strategy here. + auto truncateMarkers = truncateMap.getOrEmplace(changeCollectionPtr->uuid(), + *ns.tenantId(), + std::move(initialSetOfMarkers.markers), + initialSetOfMarkers.leftoverRecordsCount, + initialSetOfMarkers.leftoverRecordsBytes, + minBytesPerMarker); + // Update the truncate markers with the last collection entry's RecordId and wall time. + // This is necessary for correct marker expiration. Otherwise the highest seen points + // would be null. Nothing would expire since we have to maintain the last entry in the + // change collection and null RecordId < any initialised RecordId. This would only get + // fixed once an entry has been inserted, initialising the data points. + auto backCursor = rs->getCursor(opCtx, false); + if (auto obj = backCursor->next()) { + auto wallTime = obj->data.toBson()[repl::OplogEntry::kWallClockTimeFieldName].Date(); + truncateMarkers->updateCurrentMarkerAfterInsertOnCommit(opCtx, 0, obj->id, wallTime, 0); + } + + wuow.commit(); + + return truncateMarkers; +} +} // namespace + +size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocumentsWithTruncate( + OperationContext* opCtx, + const ScopedCollectionAcquisition& changeCollection, + Date_t expirationTime) { + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + auto& truncateMap = changeCollectionManager._tenantTruncateMarkersMap; + + const auto& changeCollectionPtr = changeCollection.getCollectionPtr(); + auto truncateMarkers = truncateMap.find(changeCollectionPtr->uuid()); + + // No marker means it's a new collection, or we've just performed startup. Initialize + // the TruncateMarkers instance. + if (!truncateMarkers) { + writeConflictRetry(opCtx, + "initialise change collection truncate markers", + changeCollectionPtr->ns().ns(), + [&] { + truncateMarkers = initialiseTruncateMarkers( + opCtx, changeCollectionPtr.get(), truncateMap); + }); + } + + int64_t numRecordsDeleted = 0; + + auto removeExpiredMarkers = [&] { + auto rs = changeCollectionPtr->getRecordStore(); + while (auto marker = truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) { + writeConflictRetry( + opCtx, "truncate change collection", changeCollectionPtr->ns().ns(), [&] { + // The session might be in use from marker initialisation so we must reset it + // here in order to allow an untimestamped write. + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->allowOneUntimestampedWrite(); + WriteUnitOfWork wuow(opCtx); + + auto bytesDeleted = marker->bytes; + auto docsDeleted = marker->records; + + auto status = rs->rangeTruncate( + opCtx, + // Truncate from the beginning of the collection, this will + // cover cases where some leftover documents are present. + RecordId(), + marker->lastRecord, + -bytesDeleted, + -docsDeleted); + invariantStatusOK(status); + + wuow.commit(); + + truncateMarkers->popOldestMarker(); + numRecordsDeleted += docsDeleted; + + auto& purgingJobStats = changeCollectionManager.getPurgingJobStats(); + purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted); + purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted); + + auto millisWallTime = marker->wallTime.toMillisSinceEpoch(); + if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) { + purgingJobStats.maxStartWallTimeMillis.store(millisWallTime); + } + }); + } + }; + + removeExpiredMarkers(); + // We now create a partial marker that will shift the last entry to the next marker if it's + // present there. This will allow us to expire all entries up to the last one. + truncateMarkers->expirePartialMarker(opCtx, changeCollectionPtr.get()); + // Second pass to remove the potentially new partial marker. + removeExpiredMarkers(); + return numRecordsDeleted; +} } // namespace mongo diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 9d7f3277d44..68838767d0d 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -30,10 +30,13 @@ #pragma once #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/change_collection_truncate_markers.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" #include "mongo/db/shard_role.h" +#include "mongo/db/storage/collection_truncate_markers.h" +#include "mongo/util/concurrent_shared_values_map.h" namespace mongo { @@ -124,15 +127,16 @@ public: void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId); /** - * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog - * records and the parameter 'oplogTimestamps' is a vector for respective timestamp for each - * oplog record. + * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of + * oplog records and the parameter 'oplogTimestamps' is a vector for respective timestamp + * for each oplog record. * - * The method fetches the tenant-id from the oplog entry, performs necessary modification to the - * document and then write to the tenant's change collection at the specified oplog timestamp. + * The method fetches the tenant-id from the oplog entry, performs necessary modification to + * the document and then write to the tenant's change collection at the specified oplog + * timestamp. * - * Failure in insertion to any change collection will result in a fatal exception and will bring - * down the node. + * Failure in insertion to any change collection will result in a fatal exception and will + * bring down the node. */ void insertDocumentsToChangeCollection(OperationContext* opCtx, const std::vector<Record>& oplogRecords, @@ -142,9 +146,9 @@ public: /** * Change Collection Writer. After acquiring ChangeCollectionsWriter the user should trigger - * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write Unit - * of Work. Then the write of documents to change collections can be triggered by calling - * 'write()'. + * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write + * Unit of Work. Then the write of documents to change collections can be triggered by + * calling 'write()'. */ class ChangeCollectionsWriter { friend class ChangeStreamChangeCollectionManager; @@ -153,10 +157,13 @@ public: * Constructs a writer from a range ['beginOplogEntries', 'endOplogEntries') of oplog * entries. */ - ChangeCollectionsWriter(OperationContext* opCtx, - std::vector<InsertStatement>::const_iterator beginOplogEntries, - std::vector<InsertStatement>::const_iterator endOplogEntries, - OpDebug* opDebug); + ChangeCollectionsWriter( + OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator beginOplogEntries, + std::vector<InsertStatement>::const_iterator endOplogEntries, + OpDebug* opDebug, + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>* + tenantMarkerMap); public: ChangeCollectionsWriter(ChangeCollectionsWriter&&); @@ -179,9 +186,9 @@ public: }; /** - * Returns a change collection writer that can insert change collection entries into respective - * change collections. The entries are constructed from a range ['beginOplogEntries', - * 'endOplogEntries') of oplog entries. + * Returns a change collection writer that can insert change collection entries into + * respective change collections. The entries are constructed from a range + * ['beginOplogEntries', 'endOplogEntries') of oplog entries. */ ChangeCollectionsWriter createChangeCollectionsWriter( OperationContext* opCtx, @@ -195,25 +202,43 @@ public: /** * Scans the provided change collection and returns its metadata that will be used by the - * purging job to perform deletion on it. The method returns 'boost::none' if the collection is - * empty. + * purging job to perform deletion on it. The method returns 'boost::none' if the collection + * is empty. */ static boost::optional<ChangeCollectionPurgingJobMetadata> getChangeCollectionPurgingJobMetadata(OperationContext* opCtx, const ScopedCollectionAcquisition& changeCollection); - /** Removes documents from a change collection whose wall time is less than the + /** + * Removes documents from a change collection whose wall time is less than the * 'expirationTime'. Returns the number of documents deleted. The 'maxRecordIdBound' is the * maximum record id bound that will not be included in the collection scan. + * + * The removal process is performed with a collection scan + batch delete. */ - static size_t removeExpiredChangeCollectionsDocuments( + static size_t removeExpiredChangeCollectionsDocumentsWithCollScan( OperationContext* opCtx, const ScopedCollectionAcquisition& changeCollection, RecordIdBound maxRecordIdBound, Date_t expirationTime); + /** + * Removes documents from a change collection whose wall time is less than the + * 'expirationTime'. Returns the number of documents deleted. + * + * The removal process is performed with a series of range truncate calls to the record + * store. Some documents might survive this process as deletion happens in chunks and we can + * only delete a chunk if we guarantee it is fully expired. + */ + static size_t removeExpiredChangeCollectionsDocumentsWithTruncate( + OperationContext* opCtx, + const ScopedCollectionAcquisition& changeCollection, + Date_t expirationTime); + private: // Change collections purging job stats. PurgingJobStats _purgingJobStats; + ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash> + _tenantTruncateMarkersMap; }; } // namespace mongo diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl index 06b1f3a73f4..8dfc7d9c576 100644 --- a/src/mongo/db/change_streams_cluster_parameter.idl +++ b/src/mongo/db/change_streams_cluster_parameter.idl @@ -68,3 +68,13 @@ server_parameters: validator: gte: 1 default: 10 + changeCollectionTruncateMarkersMinBytes: + description: "Server parameter that specifies the minimum number of bytes contained in each + truncate marker for change collections. This is only used if + featureFlagUseUnreplicatedTruncatesForDeletions is enabled" + set_at: startup + cpp_varname: gChangeCollectionTruncateMarkersMinBytes + cpp_vartype: int32_t + default: 33_554_432 # 32 MiB + validator: + gt: 0 diff --git a/src/mongo/db/storage/collection_truncate_markers.cpp b/src/mongo/db/storage/collection_truncate_markers.cpp index ffd452f5c36..dcac54dff54 100644 --- a/src/mongo/db/storage/collection_truncate_markers.cpp +++ b/src/mongo/db/storage/collection_truncate_markers.cpp @@ -98,7 +98,6 @@ CollectionTruncateMarkers::peekOldestMarkerIfNeeded(OperationContext* opCtx) con return _markers.front(); } - void CollectionTruncateMarkers::popOldestMarker() { stdx::lock_guard<Latch> lk(_markersMutex); _markers.pop_front(); @@ -159,7 +158,6 @@ void CollectionTruncateMarkers::createNewMarkerIfNeeded(OperationContext* opCtx, pokeReclaimThread(opCtx); } - void CollectionTruncateMarkers::updateCurrentMarkerAfterInsertOnCommit( OperationContext* opCtx, int64_t bytesInserted, @@ -511,7 +509,7 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarkerAfterIns // will happen after the marker has been created. This guarantees that the metrics // will eventually be correct as long as the expiration criteria checks for the // metrics and the highest marker expiration. - collectionMarkers->_replaceNewHighestMarkingIfNecessary(recordId, wallTime); + collectionMarkers->_updateHighestSeenRecordIdAndWallTime(recordId, wallTime); collectionMarkers->_currentRecords.addAndFetch(countInserted); int64_t newCurrentBytes = collectionMarkers->_currentBytes.addAndFetch(bytesInserted); if (newCurrentBytes >= collectionMarkers->_minBytesPerMarker) { @@ -568,7 +566,7 @@ void CollectionTruncateMarkersWithPartialExpiration::createPartialMarkerIfNecess } } -void CollectionTruncateMarkersWithPartialExpiration::_replaceNewHighestMarkingIfNecessary( +void CollectionTruncateMarkersWithPartialExpiration::_updateHighestSeenRecordIdAndWallTime( const RecordId& rId, Date_t wallTime) { stdx::unique_lock lk(_lastHighestRecordMutex); _lastHighestRecordId = std::max(_lastHighestRecordId, rId); diff --git a/src/mongo/db/storage/collection_truncate_markers.h b/src/mongo/db/storage/collection_truncate_markers.h index 05ce2590918..7122c97cade 100644 --- a/src/mongo/db/storage/collection_truncate_markers.h +++ b/src/mongo/db/storage/collection_truncate_markers.h @@ -239,6 +239,22 @@ private: protected: CollectionTruncateMarkers(CollectionTruncateMarkers&& other); + template <typename F> + auto modifyMarkersWith(F&& f) { + static_assert(std::is_invocable_v<F, std::deque<Marker>&>, + "Function must be of type T(std::deque<Marker>&)"); + stdx::lock_guard lk(_markersMutex); + return f(_markers); + } + + template <typename F> + auto checkMarkersWith(F&& f) const { + static_assert(std::is_invocable_v<F, const std::deque<Marker>&>, + "Function must be of type T(const std::deque<Marker>&)"); + stdx::lock_guard lk(_markersMutex); + return f(_markers); + } + const std::deque<Marker>& getMarkers() const { return _markers; } @@ -285,10 +301,6 @@ private: RecordId _lastHighestRecordId; Date_t _lastHighestWallTime; - // Replaces the highest marker if _isMarkerLargerThanHighest returns true. - void _replaceNewHighestMarkingIfNecessary(const RecordId& newMarkerRecordId, - Date_t newMarkerWallTime); - // Used to decide if the current partially built marker has expired. virtual bool _hasPartialMarkerExpired(OperationContext* opCtx) const { return false; @@ -301,6 +313,9 @@ protected: std::pair<const RecordId&, const Date_t&> getPartialMarker() const { return {_lastHighestRecordId, _lastHighestWallTime}; } + + // Updates the highest seen RecordId and wall time if they are above the current ones. + void _updateHighestSeenRecordIdAndWallTime(const RecordId& rId, Date_t wallTime); }; } // namespace mongo diff --git a/src/mongo/db/storage/record_store.cpp b/src/mongo/db/storage/record_store.cpp index 1bae257ba7b..51e301cf00e 100644 --- a/src/mongo/db/storage/record_store.cpp +++ b/src/mongo/db/storage/record_store.cpp @@ -88,6 +88,8 @@ Status RecordStore::rangeTruncate(OperationContext* opCtx, int64_t hintDataSizeDiff, int64_t hintNumRecordsDiff) { validateWriteAllowed(opCtx); + invariant(minRecordId != RecordId() || maxRecordId != RecordId(), + "Ranged truncate must have one bound defined"); invariant(minRecordId <= maxRecordId, "Start position cannot be after end position"); return doRangeTruncate(opCtx, minRecordId, maxRecordId, hintDataSizeDiff, hintNumRecordsDiff); } diff --git a/src/mongo/db/storage/record_store_test_truncate.cpp b/src/mongo/db/storage/record_store_test_truncate.cpp index a26f2a5155e..00d75c0e869 100644 --- a/src/mongo/db/storage/record_store_test_truncate.cpp +++ b/src/mongo/db/storage/record_store_test_truncate.cpp @@ -33,6 +33,7 @@ #include "mongo/db/storage/record_store.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -114,5 +115,15 @@ TEST(RecordStoreTestHarness, TruncateNonEmpty) { } } +DEATH_TEST(RecordStoreTestHarness, + RangeTruncateMustHaveBoundsTest, + "Ranged truncate must have one bound defined") { + const auto harnessHelper(newRecordStoreHarnessHelper()); + unique_ptr<RecordStore> rs(harnessHelper->newRecordStore()); + + auto opCtx = harnessHelper->newOperationContext(); + + auto result = rs->rangeTruncate(opCtx.get(), RecordId(), RecordId(), 0, 0); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 6aa988f13d8..4286d32c2c3 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1406,12 +1406,15 @@ Status WiredTigerRecordStore::doRangeTruncate(OperationContext* opCtx, return Status::OK(); } invariantWTOK(ret, start->session); + // Make sure to reset the cursor since we have to replace it with what the user provided us. + invariantWTOK(start->reset(start), start->session); boost::optional<CursorKey> startKey; if (minRecordId != RecordId()) { - invariantWTOK(start->reset(start), start->session); startKey = makeCursorKey(minRecordId, _keyFormat); setKey(start, &(*startKey)); + } else { + start = nullptr; } WiredTigerCursor endWrap(_uri, _tableId, true, opCtx); boost::optional<CursorKey> endKey; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index bd2e356eb0f..3d921b3e45e 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -1098,38 +1098,6 @@ DEATH_TEST(WiredTigerRecordStoreTest, testTruncateRange(100, 4, 3); } -TEST(WiredTigerRecordStoreTest, RangeTruncateAllTest) { - unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper()); - unique_ptr<RecordStore> rs(harnessHelper->newRecordStore()); - - auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get()); - - auto opCtx = harnessHelper->newOperationContext(); - - static constexpr auto kNumRecordsToInsert = 100; - for (int i = 0; i < kNumRecordsToInsert; i++) { - auto recordId = insertBSONWithSize(opCtx.get(), wtrs, Timestamp(1, 0), 100); - ASSERT_OK(recordId); - } - - auto sizePerRecord = wtrs->dataSize(opCtx.get()) / wtrs->numRecords(opCtx.get()); - - { - WriteUnitOfWork wuow(opCtx.get()); - ASSERT_OK(wtrs->rangeTruncate(opCtx.get(), - RecordId(), - RecordId(), - -(sizePerRecord * kNumRecordsToInsert), - -kNumRecordsToInsert)); - ASSERT_EQ(wtrs->dataSize(opCtx.get()), 0); - ASSERT_EQ(wtrs->numRecords(opCtx.get()), 0); - wuow.commit(); - } - - auto cursor = wtrs->getCursor(opCtx.get(), true); - ASSERT_FALSE(cursor->next()); -} - TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) { unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper()); unique_ptr<RecordStore> rs(harnessHelper->newOplogRecordStore()); |