summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2023-04-28 14:10:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-28 15:50:38 +0000
commitc0a7899adb1d6ba9c015c93be83606b090a22662 (patch)
treef78affc9d8072bba98db36235ef0c418e2514a63 /src/mongo
parent657341a3615bf63ecab75957a7b51c242978180e (diff)
downloadmongo-c0a7899adb1d6ba9c015c93be83606b090a22662.tar.gz
SERVER-74749 Integrate Change Collection with truncate markers
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/change_collection_expired_change_remover_test.cpp73
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp74
-rw-r--r--src/mongo/db/change_collection_truncate_markers.cpp110
-rw-r--r--src/mongo/db/change_collection_truncate_markers.h9
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp227
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h67
-rw-r--r--src/mongo/db/change_streams_cluster_parameter.idl10
-rw-r--r--src/mongo/db/storage/collection_truncate_markers.cpp6
-rw-r--r--src/mongo/db/storage/collection_truncate_markers.h23
-rw-r--r--src/mongo/db/storage/record_store.cpp2
-rw-r--r--src/mongo/db/storage/record_store_test_truncate.cpp11
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp32
14 files changed, 500 insertions, 151 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index d86f51eee3a..a241ab43198 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -531,6 +531,8 @@ env.Library(
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/db/service_context',
+ 'change_streams_cluster_parameter',
+ 'record_id_helpers',
],
)
diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp
index 16d4e516b51..85882be52db 100644
--- a/src/mongo/db/change_collection_expired_change_remover_test.cpp
+++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp
@@ -145,8 +145,9 @@ protected:
ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
opCtx, changeCollection)
->maxRecordIdBound;
- return ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- opCtx, changeCollection, maxRecordIdBound, expirationTime);
+ return ChangeStreamChangeCollectionManager::
+ removeExpiredChangeCollectionsDocumentsWithCollScan(
+ opCtx, changeCollection, maxRecordIdBound, expirationTime);
}
const TenantId _tenantId;
@@ -174,58 +175,26 @@ protected:
changeStreamsParam->setValue(oldSettings, _tenantId).ignore();
}
- void insertDocumentToChangeCollection(OperationContext* opCtx,
- const TenantId& tenantId,
- const BSONObj& obj) {
- WriteUnitOfWork wuow(opCtx);
- ChangeCollectionExpiredChangeRemoverTest::insertDocumentToChangeCollection(
- opCtx, tenantId, obj);
- const auto wallTime = now();
- Timestamp timestamp{wallTime};
- RecordId recordId =
- record_id_helpers::keyForOptime(timestamp, KeyFormat::String).getValue();
-
- _truncateMarkers->updateCurrentMarkerAfterInsertOnCommit(
- opCtx, obj.objsize(), recordId, wallTime, 1);
- wuow.commit();
- }
-
- void dropAndRecreateChangeCollection(OperationContext* opCtx,
- const TenantId& tenantId,
- int64_t minBytesPerMarker) {
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
- changeCollectionManager.dropChangeCollection(opCtx, tenantId);
- _truncateMarkers.reset();
- changeCollectionManager.createChangeCollection(opCtx, tenantId);
- _truncateMarkers = std::make_unique<ChangeCollectionTruncateMarkers>(
- tenantId, std::deque<CollectionTruncateMarkers::Marker>{}, 0, 0, minBytesPerMarker);
- }
-
size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx,
const TenantId& tenantId,
Date_t expirationTime) {
// Acquire intent-exclusive lock on the change collection. Early exit if the collection
// doesn't exist.
- const auto changeCollection =
- AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kWrite, tenantId};
-
- WriteUnitOfWork wuow(opCtx);
- size_t numRecordsDeleted = 0;
- while (boost::optional<CollectionTruncateMarkers::Marker> marker =
- _truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) {
- auto recordStore = changeCollection->getRecordStore();
-
- ASSERT_OK(recordStore->rangeTruncate(
- opCtx, RecordId(), marker->lastRecord, -marker->bytes, -marker->records));
+ const auto changeCollection = acquireCollection(
+ opCtx,
+ CollectionAcquisitionRequest(NamespaceString::makeChangeCollectionNSS(tenantId),
+ PlacementConcern{boost::none, ShardVersion::UNSHARDED()},
+ repl::ReadConcernArgs::get(opCtx),
+ AcquisitionPrerequisites::kWrite),
+ MODE_IX);
- _truncateMarkers->popOldestMarker();
- numRecordsDeleted += marker->records;
- }
- wuow.commit();
- return numRecordsDeleted;
+ return ChangeStreamChangeCollectionManager::
+ removeExpiredChangeCollectionsDocumentsWithTruncate(
+ opCtx, changeCollection, expirationTime);
}
- std::unique_ptr<ChangeCollectionTruncateMarkers> _truncateMarkers;
+ RAIIServerParameterControllerForTest truncateFeatureFlag{
+ "featureFlagUseUnreplicatedTruncatesForDeletions", true};
};
// Tests that the last expired focument retrieved is the expected one.
@@ -336,10 +305,13 @@ TEST_F(ChangeCollectionTruncateExpirationTest, ShouldRemoveOnlyExpiredDocument_M
const BSONObj notExpired = BSON("_id"
<< "notExpired");
+ RAIIServerParameterControllerForTest minBytesPerMarker{
+ "changeCollectionTruncateMarkersMinBytes",
+ firstExpired.objsize() + secondExpired.objsize()};
+
const auto timeAtStart = now();
const auto opCtx = operationContext();
- dropAndRecreateChangeCollection(
- opCtx, _tenantId, firstExpired.objsize() + secondExpired.objsize());
+ dropAndRecreateChangeCollection(opCtx, _tenantId);
insertDocumentToChangeCollection(opCtx, _tenantId, firstExpired);
clockSource()->advance(Hours(1));
@@ -363,8 +335,11 @@ TEST_F(ChangeCollectionTruncateExpirationTest, ShouldRemoveOnlyExpiredDocument_M
// Tests that the last expired document is never deleted.
TEST_F(ChangeCollectionTruncateExpirationTest, ShouldLeaveAtLeastOneDocument_Markers) {
+ RAIIServerParameterControllerForTest minBytesPerMarker{
+ "changeCollectionTruncateMarkersMinBytes", 1};
const auto opCtx = operationContext();
- dropAndRecreateChangeCollection(opCtx, _tenantId, 1);
+
+ dropAndRecreateChangeCollection(opCtx, _tenantId);
setExpireAfterSeconds(opCtx, Seconds{1});
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
index 5dc4d1fe7e6..84b9f4817ea 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.cpp
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role.h"
#include "mongo/logv2/log.h"
@@ -67,9 +68,18 @@ change_stream_serverless_helpers::TenantSet getConfigDbTenants(OperationContext*
return tenantIds;
}
+bool usesUnreplicatedTruncates() {
+ // (Ignore FCV check): This feature flag is potentially backported to previous version of the
+ // server. We can't rely on the FCV version to see whether it's enabled or not.
+ return feature_flags::gFeatureFlagUseUnreplicatedTruncatesForDeletions
+ .isEnabledAndIgnoreFCVUnsafe();
+}
+
void removeExpiredDocuments(Client* client) {
hangBeforeRemovingExpiredChanges.pauseWhileSet();
+ bool useUnreplicatedTruncates = usesUnreplicatedTruncates();
+
try {
auto opCtx = client->makeOperationContext();
const auto clock = client->getServiceContext()->getFastClockSource();
@@ -77,11 +87,9 @@ void removeExpiredDocuments(Client* client) {
// If the fail point 'injectCurrentWallTimeForRemovingDocuments' is enabled then set the
// 'currentWallTime' with the provided wall time.
- if (injectCurrentWallTimeForRemovingExpiredDocuments.shouldFail()) {
- injectCurrentWallTimeForRemovingExpiredDocuments.execute([&](const BSONObj& data) {
- currentWallTime = data.getField("currentWallTime").date();
- });
- }
+ injectCurrentWallTimeForRemovingExpiredDocuments.execute([&](const BSONObj& data) {
+ currentWallTime = data.getField("currentWallTime").date();
+ });
// Number of documents removed in the current pass.
size_t removedCount = 0;
@@ -102,39 +110,53 @@ void removeExpiredDocuments(Client* client) {
AcquisitionPrerequisites::kWrite),
MODE_IX);
- // Early exit if collection does not exist or if running on a secondary (requires
- // opCtx->lockState()->isRSTLLocked()).
- if (!changeCollection.exists() ||
+ // Early exit if collection does not exist.
+ if (!changeCollection.exists()) {
+ continue;
+ }
+ // Early exit if running on a secondary and we haven't enabled the unreplicated truncate
+ // maintenance flag (requires opCtx->lockState()->isRSTLLocked()).
+ if (!useUnreplicatedTruncates &&
!repl::ReplicationCoordinator::get(opCtx.get())
->canAcceptWritesForDatabase(opCtx.get(), DatabaseName::kConfig.toString())) {
continue;
}
- // Get the metadata required for the removal of the expired change collection
- // documents. Early exit if the metadata is missing, indicating that there is nothing
- // to remove.
- auto purgingJobMetadata =
- ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
- opCtx.get(), changeCollection);
- if (!purgingJobMetadata) {
- continue;
+ if (useUnreplicatedTruncates) {
+ removedCount += ChangeStreamChangeCollectionManager::
+ removeExpiredChangeCollectionsDocumentsWithTruncate(
+ opCtx.get(),
+ changeCollection,
+ currentWallTime - Seconds(expiredAfterSeconds));
+ } else {
+ // Get the metadata required for the removal of the expired change collection
+ // documents. Early exit if the metadata is missing, indicating that there is
+ // nothing to remove.
+ auto purgingJobMetadata =
+ ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
+ opCtx.get(), changeCollection);
+ if (!purgingJobMetadata) {
+ continue;
+ }
+
+ removedCount += ChangeStreamChangeCollectionManager::
+ removeExpiredChangeCollectionsDocumentsWithCollScan(
+ opCtx.get(),
+ changeCollection,
+ purgingJobMetadata->maxRecordIdBound,
+ currentWallTime - Seconds(expiredAfterSeconds));
+ maxStartWallTime =
+ std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis);
}
- removedCount +=
- ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
- opCtx.get(),
- changeCollection,
- purgingJobMetadata->maxRecordIdBound,
- currentWallTime - Seconds(expiredAfterSeconds));
changeCollectionManager.getPurgingJobStats().scannedCollections.fetchAndAddRelaxed(1);
- maxStartWallTime =
- std::max(maxStartWallTime, purgingJobMetadata->firstDocWallTimeMillis);
}
// The purging job metadata will be 'boost::none' if none of the change collections have
// more than one oplog entry, as such the 'maxStartWallTimeMillis' will be zero. Avoid
- // reporting 0 as 'maxStartWallTimeMillis'.
- if (maxStartWallTime > 0) {
+ // reporting 0 as 'maxStartWallTimeMillis'. If using unreplicated truncates, this is
+ // maintained by the call to removeExpiredChangeCollectionsDocumentsWithTruncate.
+ if (!useUnreplicatedTruncates && maxStartWallTime > 0) {
changeCollectionManager.getPurgingJobStats().maxStartWallTimeMillis.store(
maxStartWallTime);
}
diff --git a/src/mongo/db/change_collection_truncate_markers.cpp b/src/mongo/db/change_collection_truncate_markers.cpp
index f1670f93d58..ce50a3aec32 100644
--- a/src/mongo/db/change_collection_truncate_markers.cpp
+++ b/src/mongo/db/change_collection_truncate_markers.cpp
@@ -28,10 +28,33 @@
*/
#include "mongo/db/change_collection_truncate_markers.h"
+#include "mongo/db/catalog/collection.h"
#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/operation_context.h"
namespace mongo {
+
+namespace {
+MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForCheckingMarkers);
+
+Date_t getWallTimeToUse(OperationContext* opCtx) {
+ auto now = opCtx->getServiceContext()->getFastClockSource()->now();
+ injectCurrentWallTimeForCheckingMarkers.execute(
+ [&](const BSONObj& data) { now = data.getField("currentWallTime").date(); });
+ return now;
+}
+
+bool hasMarkerWallTimeExpired(OperationContext* opCtx,
+ Date_t markerWallTime,
+ const TenantId& tenantId) {
+ auto now = getWallTimeToUse(opCtx);
+ auto expireAfterSeconds =
+ Seconds{change_stream_serverless_helpers::getExpireAfterSeconds(tenantId)};
+ auto expirationTime = now - expireAfterSeconds;
+ return markerWallTime <= expirationTime;
+}
+} // namespace
+
ChangeCollectionTruncateMarkers::ChangeCollectionTruncateMarkers(TenantId tenantId,
std::deque<Marker> markers,
int64_t leftoverRecordsCount,
@@ -56,11 +79,88 @@ bool ChangeCollectionTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx)
return false;
}
- auto now = opCtx->getServiceContext()->getFastClockSource()->now();
- auto expireAfterSeconds =
- Seconds{change_stream_serverless_helpers::getExpireAfterSeconds(_tenantId)};
- auto expirationTime = now - expireAfterSeconds;
+ return hasMarkerWallTimeExpired(opCtx, oldestMarker.wallTime, _tenantId);
+}
+
+bool ChangeCollectionTruncateMarkers::_hasPartialMarkerExpired(OperationContext* opCtx) const {
+ const auto& [_, highestSeenWallTime] = getPartialMarker();
+
+ return hasMarkerWallTimeExpired(opCtx, highestSeenWallTime, _tenantId);
+}
+
+void ChangeCollectionTruncateMarkers::expirePartialMarker(OperationContext* opCtx,
+ const Collection* changeCollection) {
+ createPartialMarkerIfNecessary(opCtx);
+ // We can't use the normal peekOldestMarkerIfNeeded method since that calls _hasExcessMarkers
+ // and it will return false since the new oldest marker will have the last entry.
+ auto oldestMarker =
+ checkMarkersWith([&](const std::deque<CollectionTruncateMarkers::Marker>& markers)
+ -> boost::optional<CollectionTruncateMarkers::Marker> {
+ // Partial marker did not get generated, early exit.
+ if (markers.empty()) {
+ return {};
+ }
+ auto firstMarker = markers.front();
+ // We will only consider the case of an expired marker.
+ if (!hasMarkerWallTimeExpired(opCtx, firstMarker.wallTime, _tenantId)) {
+ return {};
+ }
+ return firstMarker;
+ });
+
+ if (!oldestMarker) {
+ // The oldest marker hasn't expired, nothing to do here.
+ return;
+ }
+
+ // Abandon the snapshot so we can fetch the most recent version of the table. This increases the
+ // chances the last entry isn't present in the new partial marker.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ WriteUnitOfWork wuow(opCtx);
+
+ auto backCursor = changeCollection->getRecordStore()->getCursor(opCtx, false);
+ // If the oldest marker does not contain the last entry it's a normal marker, don't perform any
+ // modifications to it.
+ auto obj = backCursor->next();
+ if (!obj || obj->id > oldestMarker->lastRecord) {
+ return;
+ }
+
+ // At this point the marker contains the last entry of the collection, we have to shift the last
+ // entry to the next marker so we can expire the previous entries.
+ auto bytesNotTruncated = obj->data.size();
+ const auto& doc = obj->data.toBson();
+ auto wallTime = doc[repl::OplogEntry::kWallClockTimeFieldName].Date();
+
+ updateCurrentMarkerAfterInsertOnCommit(opCtx, bytesNotTruncated, obj->id, wallTime, 1);
+
+ auto bytesDeleted = oldestMarker->bytes - bytesNotTruncated;
+ auto docsDeleted = oldestMarker->records - 1;
+
+ // We build the previous record id based on the extracted value
+ auto previousRecordId = [&] {
+ auto currId = doc[repl::OplogEntry::k_idFieldName].timestamp();
+ invariant(currId > Timestamp::min(), "Last entry timestamp must be larger than 0");
+
+ auto fixedBson = BSON(repl::OplogEntry::k_idFieldName << (currId - 1));
+
+ auto recordId = invariantStatusOK(
+ record_id_helpers::keyForDoc(fixedBson,
+ changeCollection->getClusteredInfo()->getIndexSpec(),
+ changeCollection->getDefaultCollator()));
+ return recordId;
+ }();
+ auto newMarker =
+ CollectionTruncateMarkers::Marker{docsDeleted, bytesDeleted, previousRecordId, wallTime};
- return oldestMarker.wallTime <= expirationTime;
+ // Replace now the oldest marker with a version that doesn't contain the last entry. This is
+ // susceptible to races with concurrent inserts. But the invariant of metrics being correct in
+ // aggregate still holds. Ignoring this issue is a valid strategy here as we move the ignored
+ // bytes to the next partial marker and we only guarantee eventual correctness.
+ modifyMarkersWith([&](std::deque<CollectionTruncateMarkers::Marker>& markers) {
+ markers.pop_front();
+ markers.emplace_front(std::move(newMarker));
+ });
+ wuow.commit();
}
} // namespace mongo
diff --git a/src/mongo/db/change_collection_truncate_markers.h b/src/mongo/db/change_collection_truncate_markers.h
index b60242cd260..5ea4ee330d0 100644
--- a/src/mongo/db/change_collection_truncate_markers.h
+++ b/src/mongo/db/change_collection_truncate_markers.h
@@ -45,9 +45,18 @@ public:
int64_t leftoverRecordsBytes,
int64_t minBytesPerMarker);
+ // Expires the partial marker with proper care for the last entry. Expiring here means:
+ // * Turning the partial marker into an actual marker
+ // * Ensuring the last entry isn't present in the generated marker
+ // The last entry is necessary for correctness of the change collection. This method will shift
+ // the last entry size and count to the next partial marker.
+ void expirePartialMarker(OperationContext* opCtx, const Collection* changeCollection);
+
private:
bool _hasExcessMarkers(OperationContext* opCtx) const override;
+ bool _hasPartialMarkerExpired(OperationContext* opCtx) const override;
+
TenantId _tenantId;
};
} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index e75140f9de3..aad6f315c74 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_serverless_helpers.h"
+#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/namespace_string.h"
@@ -165,6 +166,13 @@ boost::optional<BSONObj> createChangeCollectionEntryFromOplog(const BSONObj& opl
auto readyChangeCollDoc = changeCollDoc.freeze();
return readyChangeCollDoc.toBson();
}
+
+bool usesUnreplicatedTruncates() {
+ // (Ignore FCV check): This feature flag is potentially backported to previous version of the
+ // server. We can't rely on the FCV version to see whether it's enabled or not.
+ return feature_flags::gFeatureFlagUseUnreplicatedTruncatesForDeletions
+ .isEnabledAndIgnoreFCVUnsafe();
+}
} // namespace
/**
@@ -173,10 +181,15 @@ boost::optional<BSONObj> createChangeCollectionEntryFromOplog(const BSONObj& opl
*/
class ChangeStreamChangeCollectionManager::ChangeCollectionsWriterInternal {
public:
- explicit ChangeCollectionsWriterInternal(OperationContext* opCtx,
- OpDebug* opDebug,
- const AutoGetChangeCollection::AccessMode& accessMode)
- : _accessMode{accessMode}, _opCtx{opCtx}, _opDebug{opDebug} {}
+ explicit ChangeCollectionsWriterInternal(
+ OperationContext* opCtx,
+ OpDebug* opDebug,
+ const AutoGetChangeCollection::AccessMode& accessMode,
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>* map)
+ : _accessMode{accessMode},
+ _opCtx{opCtx},
+ _opDebug{opDebug},
+ _tenantTruncateMarkersMap(map) {}
/**
* Adds the insert statement for the provided tenant that will be written to the change
@@ -227,6 +240,14 @@ public:
// Writes to the change collection should not be replicated.
repl::UnreplicatedWritesBlock unReplBlock(_opCtx);
+ // To avoid creating a lot of unnecessary calls to
+ // CollectionTruncateMarkers::updateCurrentMarkerAfterInsertOnCommit we aggregate all
+ // the results and make a singular call. This requires storing the highest
+ // RecordId/WallTime seen from the insert statements.
+ RecordId maxRecordIdSeen;
+ Date_t maxWallTimeSeen;
+ int64_t bytesInserted = 0;
+
/**
* For a serverless shard merge, we clone all change collection entries from the donor
* and then fetch/apply retryable writes that took place before the migration. As a
@@ -246,6 +267,8 @@ public:
LOGV2(7282901,
"Ignoring DuplicateKey error for change collection insert",
"doc"_attr = insertStatement.doc.toString());
+ // Continue to the next insert statement as we've ommitted the current one.
+ continue;
} else if (!status.isOK()) {
return Status(status.code(),
str::stream()
@@ -254,6 +277,38 @@ public:
<< "failed")
.withReason(status.reason());
}
+
+ // Right now we assume that the tenant change collection is clustered and
+ // reconstruct the RecordId used in the KV store. Ideally we want the write path to
+ // return the record ids used for the insert but as it isn't available we
+ // reconstruct the key here.
+ dassert(tenantChangeCollection->isClustered());
+ auto recordId = invariantStatusOK(record_id_helpers::keyForDoc(
+ insertStatement.doc,
+ tenantChangeCollection->getClusteredInfo()->getIndexSpec(),
+ tenantChangeCollection->getDefaultCollator()));
+
+ maxRecordIdSeen = std::max(std::move(recordId), maxRecordIdSeen);
+ auto docWallTime =
+ insertStatement.doc[repl::OplogEntry::kWallClockTimeFieldName].Date();
+ maxWallTimeSeen = std::max(maxWallTimeSeen, docWallTime);
+
+ bytesInserted += insertStatement.doc.objsize();
+ }
+
+ std::shared_ptr<ChangeCollectionTruncateMarkers> truncateMarkers =
+ usesUnreplicatedTruncates()
+ ? _tenantTruncateMarkersMap->find(tenantChangeCollection->uuid())
+ : nullptr;
+ if (truncateMarkers && bytesInserted > 0) {
+ // We update the TruncateMarkers instance if it exists. Creation is performed
+ // asynchronously by the remover thread.
+ truncateMarkers->updateCurrentMarkerAfterInsertOnCommit(
+ _opCtx,
+ bytesInserted,
+ maxRecordIdSeen,
+ maxWallTimeSeen,
+ insertStatementsAndChangeCollection.insertStatements.size());
}
}
return Status::OK();
@@ -301,19 +356,23 @@ private:
// Indicates if locks have been acquired.
bool _locksAcquired{false};
+
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>*
+ _tenantTruncateMarkersMap;
};
ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::ChangeCollectionsWriter(
OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator beginOplogEntries,
std::vector<InsertStatement>::const_iterator endOplogEntries,
- OpDebug* opDebug) {
+ OpDebug* opDebug,
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>* tenantMarkerMap) {
// This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for
// commiting the unit of work.
invariant(opCtx->lockState()->inAWriteUnitOfWork());
_writer = std::make_unique<ChangeCollectionsWriterInternal>(
- opCtx, opDebug, AutoGetChangeCollection::AccessMode::kWrite);
+ opCtx, opDebug, AutoGetChangeCollection::AccessMode::kWrite, tenantMarkerMap);
// Transform oplog entries to change collections entries and group them by tenant id.
for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries;
@@ -356,7 +415,8 @@ ChangeStreamChangeCollectionManager::createChangeCollectionsWriter(
std::vector<InsertStatement>::const_iterator beginOplogEntries,
std::vector<InsertStatement>::const_iterator endOplogEntries,
OpDebug* opDebug) {
- return ChangeCollectionsWriter{opCtx, beginOplogEntries, endOplogEntries, opDebug};
+ return ChangeCollectionsWriter{
+ opCtx, beginOplogEntries, endOplogEntries, opDebug, &_tenantTruncateMarkersMap};
}
BSONObj ChangeStreamChangeCollectionManager::PurgingJobStats::toBSON() const {
@@ -402,6 +462,22 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext*
DropReply dropReply;
const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId);
+ const bool useUnreplicatedDeletes = usesUnreplicatedTruncates();
+ // We get the UUID now in order to remove the collection from the map later. We can't get the
+ // UUID once the collection has been dropped.
+ auto collUUID = [&]() -> boost::optional<UUID> {
+ if (!useUnreplicatedDeletes) {
+ // Won't update the truncate markers map so no need to get the UUID.
+ return boost::none;
+ }
+ AutoGetDb lk(opCtx, changeCollNss.dbName(), MODE_IS);
+ auto collection =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, changeCollNss);
+ if (collection) {
+ return collection->uuid();
+ }
+ return boost::none;
+ }();
const auto status =
dropCollection(opCtx,
changeCollNss,
@@ -411,6 +487,12 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext*
str::stream() << "Failed to drop change collection: "
<< changeCollNss.toStringForErrorMsg() << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceNotFound);
+
+ if (useUnreplicatedDeletes && collUUID) {
+ // Remove the collection from the TruncateMarkers map. As we are dropping the collection
+ // there's no need to keep it for the remover. Data will be deleted anyways.
+ _tenantTruncateMarkersMap.erase(*collUUID);
+ }
}
void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
@@ -424,7 +506,10 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
invariant(opCtx->lockState()->inAWriteUnitOfWork());
ChangeCollectionsWriterInternal changeCollectionsWriter{
- opCtx, nullptr /*opDebug*/, AutoGetChangeCollection::AccessMode::kWriteInOplogContext};
+ opCtx,
+ nullptr /*opDebug*/,
+ AutoGetChangeCollection::AccessMode::kWriteInOplogContext,
+ &_tenantTruncateMarkersMap};
for (size_t idx = 0; idx < oplogRecords.size(); idx++) {
auto& record = oplogRecords[idx];
@@ -478,7 +563,7 @@ ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
return {{firstDocAttributes->first, RecordIdBound(std::move(lastDocRecordId))}};
}
-size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocuments(
+size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocumentsWithCollScan(
OperationContext* opCtx,
const ScopedCollectionAcquisition& changeCollection,
RecordIdBound maxRecordIdBound,
@@ -511,6 +596,11 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume
changeCollectionManager.getPurgingJobStats().bytesDeleted.fetchAndAddRelaxed(
batchedDeleteStats.bytesDeleted);
+ // As we are using collection scans this means we aren't using truncate markers. Clear the
+ // map since they will not get updated anyways. The markers will get recreated if the
+ // feature flag is turned on again.
+ changeCollectionManager._tenantTruncateMarkersMap.clear();
+
return batchedDeleteStats.docsDeleted;
} catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) {
// It is expected that a collection drop can kill a query plan while deleting an old
@@ -518,4 +608,123 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume
return 0;
}
}
+
+namespace {
+std::shared_ptr<ChangeCollectionTruncateMarkers> initialiseTruncateMarkers(
+ OperationContext* opCtx,
+ const Collection* changeCollectionPtr,
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>& truncateMap) {
+ auto rs = changeCollectionPtr->getRecordStore();
+ const auto& ns = changeCollectionPtr->ns();
+
+ WriteUnitOfWork wuow(opCtx);
+
+ auto minBytesPerMarker = gChangeCollectionTruncateMarkersMinBytes;
+ CollectionTruncateMarkers::InitialSetOfMarkers initialSetOfMarkers =
+ CollectionTruncateMarkers::createFromExistingRecordStore(
+ opCtx, rs, ns, minBytesPerMarker, [](const Record& record) {
+ const auto obj = record.data.toBson();
+ auto wallTime = obj[repl::OplogEntry::kWallClockTimeFieldName].Date();
+ return CollectionTruncateMarkers::RecordIdAndWallTime{record.id, wallTime};
+ });
+ // Leftover bytes contains the difference between the amount of bytes we had for the
+ // markers and the latest collection size/count. This is susceptible to a race
+ // condition, but metrics are already assumed to be approximate. Ignoring this issue is
+ // a valid strategy here.
+ auto truncateMarkers = truncateMap.getOrEmplace(changeCollectionPtr->uuid(),
+ *ns.tenantId(),
+ std::move(initialSetOfMarkers.markers),
+ initialSetOfMarkers.leftoverRecordsCount,
+ initialSetOfMarkers.leftoverRecordsBytes,
+ minBytesPerMarker);
+ // Update the truncate markers with the last collection entry's RecordId and wall time.
+ // This is necessary for correct marker expiration. Otherwise the highest seen points
+ // would be null. Nothing would expire since we have to maintain the last entry in the
+ // change collection and null RecordId < any initialised RecordId. This would only get
+ // fixed once an entry has been inserted, initialising the data points.
+ auto backCursor = rs->getCursor(opCtx, false);
+ if (auto obj = backCursor->next()) {
+ auto wallTime = obj->data.toBson()[repl::OplogEntry::kWallClockTimeFieldName].Date();
+ truncateMarkers->updateCurrentMarkerAfterInsertOnCommit(opCtx, 0, obj->id, wallTime, 0);
+ }
+
+ wuow.commit();
+
+ return truncateMarkers;
+}
+} // namespace
+
+size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocumentsWithTruncate(
+ OperationContext* opCtx,
+ const ScopedCollectionAcquisition& changeCollection,
+ Date_t expirationTime) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ auto& truncateMap = changeCollectionManager._tenantTruncateMarkersMap;
+
+ const auto& changeCollectionPtr = changeCollection.getCollectionPtr();
+ auto truncateMarkers = truncateMap.find(changeCollectionPtr->uuid());
+
+ // No marker means it's a new collection, or we've just performed startup. Initialize
+ // the TruncateMarkers instance.
+ if (!truncateMarkers) {
+ writeConflictRetry(opCtx,
+ "initialise change collection truncate markers",
+ changeCollectionPtr->ns().ns(),
+ [&] {
+ truncateMarkers = initialiseTruncateMarkers(
+ opCtx, changeCollectionPtr.get(), truncateMap);
+ });
+ }
+
+ int64_t numRecordsDeleted = 0;
+
+ auto removeExpiredMarkers = [&] {
+ auto rs = changeCollectionPtr->getRecordStore();
+ while (auto marker = truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) {
+ writeConflictRetry(
+ opCtx, "truncate change collection", changeCollectionPtr->ns().ns(), [&] {
+ // The session might be in use from marker initialisation so we must reset it
+ // here in order to allow an untimestamped write.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->allowOneUntimestampedWrite();
+ WriteUnitOfWork wuow(opCtx);
+
+ auto bytesDeleted = marker->bytes;
+ auto docsDeleted = marker->records;
+
+ auto status = rs->rangeTruncate(
+ opCtx,
+ // Truncate from the beginning of the collection, this will
+ // cover cases where some leftover documents are present.
+ RecordId(),
+ marker->lastRecord,
+ -bytesDeleted,
+ -docsDeleted);
+ invariantStatusOK(status);
+
+ wuow.commit();
+
+ truncateMarkers->popOldestMarker();
+ numRecordsDeleted += docsDeleted;
+
+ auto& purgingJobStats = changeCollectionManager.getPurgingJobStats();
+ purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted);
+ purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted);
+
+ auto millisWallTime = marker->wallTime.toMillisSinceEpoch();
+ if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) {
+ purgingJobStats.maxStartWallTimeMillis.store(millisWallTime);
+ }
+ });
+ }
+ };
+
+ removeExpiredMarkers();
+ // We now create a partial marker that will shift the last entry to the next marker if it's
+ // present there. This will allow us to expire all entries up to the last one.
+ truncateMarkers->expirePartialMarker(opCtx, changeCollectionPtr.get());
+ // Second pass to remove the potentially new partial marker.
+ removeExpiredMarkers();
+ return numRecordsDeleted;
+}
} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 9d7f3277d44..68838767d0d 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -30,10 +30,13 @@
#pragma once
#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/change_collection_truncate_markers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role.h"
+#include "mongo/db/storage/collection_truncate_markers.h"
+#include "mongo/util/concurrent_shared_values_map.h"
namespace mongo {
@@ -124,15 +127,16 @@ public:
void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId);
/**
- * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog
- * records and the parameter 'oplogTimestamps' is a vector for respective timestamp for each
- * oplog record.
+ * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of
+ * oplog records and the parameter 'oplogTimestamps' is a vector for respective timestamp
+ * for each oplog record.
*
- * The method fetches the tenant-id from the oplog entry, performs necessary modification to the
- * document and then write to the tenant's change collection at the specified oplog timestamp.
+ * The method fetches the tenant-id from the oplog entry, performs necessary modification to
+ * the document and then write to the tenant's change collection at the specified oplog
+ * timestamp.
*
- * Failure in insertion to any change collection will result in a fatal exception and will bring
- * down the node.
+ * Failure in insertion to any change collection will result in a fatal exception and will
+ * bring down the node.
*/
void insertDocumentsToChangeCollection(OperationContext* opCtx,
const std::vector<Record>& oplogRecords,
@@ -142,9 +146,9 @@ public:
/**
* Change Collection Writer. After acquiring ChangeCollectionsWriter the user should trigger
- * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write Unit
- * of Work. Then the write of documents to change collections can be triggered by calling
- * 'write()'.
+ * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write
+ * Unit of Work. Then the write of documents to change collections can be triggered by
+ * calling 'write()'.
*/
class ChangeCollectionsWriter {
friend class ChangeStreamChangeCollectionManager;
@@ -153,10 +157,13 @@ public:
* Constructs a writer from a range ['beginOplogEntries', 'endOplogEntries') of oplog
* entries.
*/
- ChangeCollectionsWriter(OperationContext* opCtx,
- std::vector<InsertStatement>::const_iterator beginOplogEntries,
- std::vector<InsertStatement>::const_iterator endOplogEntries,
- OpDebug* opDebug);
+ ChangeCollectionsWriter(
+ OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator beginOplogEntries,
+ std::vector<InsertStatement>::const_iterator endOplogEntries,
+ OpDebug* opDebug,
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>*
+ tenantMarkerMap);
public:
ChangeCollectionsWriter(ChangeCollectionsWriter&&);
@@ -179,9 +186,9 @@ public:
};
/**
- * Returns a change collection writer that can insert change collection entries into respective
- * change collections. The entries are constructed from a range ['beginOplogEntries',
- * 'endOplogEntries') of oplog entries.
+ * Returns a change collection writer that can insert change collection entries into
+ * respective change collections. The entries are constructed from a range
+ * ['beginOplogEntries', 'endOplogEntries') of oplog entries.
*/
ChangeCollectionsWriter createChangeCollectionsWriter(
OperationContext* opCtx,
@@ -195,25 +202,43 @@ public:
/**
* Scans the provided change collection and returns its metadata that will be used by the
- * purging job to perform deletion on it. The method returns 'boost::none' if the collection is
- * empty.
+ * purging job to perform deletion on it. The method returns 'boost::none' if the collection
+ * is empty.
*/
static boost::optional<ChangeCollectionPurgingJobMetadata>
getChangeCollectionPurgingJobMetadata(OperationContext* opCtx,
const ScopedCollectionAcquisition& changeCollection);
- /** Removes documents from a change collection whose wall time is less than the
+ /**
+ * Removes documents from a change collection whose wall time is less than the
* 'expirationTime'. Returns the number of documents deleted. The 'maxRecordIdBound' is the
* maximum record id bound that will not be included in the collection scan.
+ *
+ * The removal process is performed with a collection scan + batch delete.
*/
- static size_t removeExpiredChangeCollectionsDocuments(
+ static size_t removeExpiredChangeCollectionsDocumentsWithCollScan(
OperationContext* opCtx,
const ScopedCollectionAcquisition& changeCollection,
RecordIdBound maxRecordIdBound,
Date_t expirationTime);
+ /**
+ * Removes documents from a change collection whose wall time is less than the
+ * 'expirationTime'. Returns the number of documents deleted.
+ *
+ * The removal process is performed with a series of range truncate calls to the record
+ * store. Some documents might survive this process as deletion happens in chunks and we can
+ * only delete a chunk if we guarantee it is fully expired.
+ */
+ static size_t removeExpiredChangeCollectionsDocumentsWithTruncate(
+ OperationContext* opCtx,
+ const ScopedCollectionAcquisition& changeCollection,
+ Date_t expirationTime);
+
private:
// Change collections purging job stats.
PurgingJobStats _purgingJobStats;
+ ConcurrentSharedValuesMap<UUID, ChangeCollectionTruncateMarkers, UUID::Hash>
+ _tenantTruncateMarkersMap;
};
} // namespace mongo
diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl
index 06b1f3a73f4..8dfc7d9c576 100644
--- a/src/mongo/db/change_streams_cluster_parameter.idl
+++ b/src/mongo/db/change_streams_cluster_parameter.idl
@@ -68,3 +68,13 @@ server_parameters:
validator:
gte: 1
default: 10
+ changeCollectionTruncateMarkersMinBytes:
+ description: "Server parameter that specifies the minimum number of bytes contained in each
+ truncate marker for change collections. This is only used if
+ featureFlagUseUnreplicatedTruncatesForDeletions is enabled"
+ set_at: startup
+ cpp_varname: gChangeCollectionTruncateMarkersMinBytes
+ cpp_vartype: int32_t
+ default: 33_554_432 # 32 MiB
+ validator:
+ gt: 0
diff --git a/src/mongo/db/storage/collection_truncate_markers.cpp b/src/mongo/db/storage/collection_truncate_markers.cpp
index ffd452f5c36..dcac54dff54 100644
--- a/src/mongo/db/storage/collection_truncate_markers.cpp
+++ b/src/mongo/db/storage/collection_truncate_markers.cpp
@@ -98,7 +98,6 @@ CollectionTruncateMarkers::peekOldestMarkerIfNeeded(OperationContext* opCtx) con
return _markers.front();
}
-
void CollectionTruncateMarkers::popOldestMarker() {
stdx::lock_guard<Latch> lk(_markersMutex);
_markers.pop_front();
@@ -159,7 +158,6 @@ void CollectionTruncateMarkers::createNewMarkerIfNeeded(OperationContext* opCtx,
pokeReclaimThread(opCtx);
}
-
void CollectionTruncateMarkers::updateCurrentMarkerAfterInsertOnCommit(
OperationContext* opCtx,
int64_t bytesInserted,
@@ -511,7 +509,7 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarkerAfterIns
// will happen after the marker has been created. This guarantees that the metrics
// will eventually be correct as long as the expiration criteria checks for the
// metrics and the highest marker expiration.
- collectionMarkers->_replaceNewHighestMarkingIfNecessary(recordId, wallTime);
+ collectionMarkers->_updateHighestSeenRecordIdAndWallTime(recordId, wallTime);
collectionMarkers->_currentRecords.addAndFetch(countInserted);
int64_t newCurrentBytes = collectionMarkers->_currentBytes.addAndFetch(bytesInserted);
if (newCurrentBytes >= collectionMarkers->_minBytesPerMarker) {
@@ -568,7 +566,7 @@ void CollectionTruncateMarkersWithPartialExpiration::createPartialMarkerIfNecess
}
}
-void CollectionTruncateMarkersWithPartialExpiration::_replaceNewHighestMarkingIfNecessary(
+void CollectionTruncateMarkersWithPartialExpiration::_updateHighestSeenRecordIdAndWallTime(
const RecordId& rId, Date_t wallTime) {
stdx::unique_lock lk(_lastHighestRecordMutex);
_lastHighestRecordId = std::max(_lastHighestRecordId, rId);
diff --git a/src/mongo/db/storage/collection_truncate_markers.h b/src/mongo/db/storage/collection_truncate_markers.h
index 05ce2590918..7122c97cade 100644
--- a/src/mongo/db/storage/collection_truncate_markers.h
+++ b/src/mongo/db/storage/collection_truncate_markers.h
@@ -239,6 +239,22 @@ private:
protected:
CollectionTruncateMarkers(CollectionTruncateMarkers&& other);
+ template <typename F>
+ auto modifyMarkersWith(F&& f) {
+ static_assert(std::is_invocable_v<F, std::deque<Marker>&>,
+ "Function must be of type T(std::deque<Marker>&)");
+ stdx::lock_guard lk(_markersMutex);
+ return f(_markers);
+ }
+
+ template <typename F>
+ auto checkMarkersWith(F&& f) const {
+ static_assert(std::is_invocable_v<F, const std::deque<Marker>&>,
+ "Function must be of type T(const std::deque<Marker>&)");
+ stdx::lock_guard lk(_markersMutex);
+ return f(_markers);
+ }
+
const std::deque<Marker>& getMarkers() const {
return _markers;
}
@@ -285,10 +301,6 @@ private:
RecordId _lastHighestRecordId;
Date_t _lastHighestWallTime;
- // Replaces the highest marker if _isMarkerLargerThanHighest returns true.
- void _replaceNewHighestMarkingIfNecessary(const RecordId& newMarkerRecordId,
- Date_t newMarkerWallTime);
-
// Used to decide if the current partially built marker has expired.
virtual bool _hasPartialMarkerExpired(OperationContext* opCtx) const {
return false;
@@ -301,6 +313,9 @@ protected:
std::pair<const RecordId&, const Date_t&> getPartialMarker() const {
return {_lastHighestRecordId, _lastHighestWallTime};
}
+
+ // Updates the highest seen RecordId and wall time if they are above the current ones.
+ void _updateHighestSeenRecordIdAndWallTime(const RecordId& rId, Date_t wallTime);
};
} // namespace mongo
diff --git a/src/mongo/db/storage/record_store.cpp b/src/mongo/db/storage/record_store.cpp
index 1bae257ba7b..51e301cf00e 100644
--- a/src/mongo/db/storage/record_store.cpp
+++ b/src/mongo/db/storage/record_store.cpp
@@ -88,6 +88,8 @@ Status RecordStore::rangeTruncate(OperationContext* opCtx,
int64_t hintDataSizeDiff,
int64_t hintNumRecordsDiff) {
validateWriteAllowed(opCtx);
+ invariant(minRecordId != RecordId() || maxRecordId != RecordId(),
+ "Ranged truncate must have one bound defined");
invariant(minRecordId <= maxRecordId, "Start position cannot be after end position");
return doRangeTruncate(opCtx, minRecordId, maxRecordId, hintDataSizeDiff, hintNumRecordsDiff);
}
diff --git a/src/mongo/db/storage/record_store_test_truncate.cpp b/src/mongo/db/storage/record_store_test_truncate.cpp
index a26f2a5155e..00d75c0e869 100644
--- a/src/mongo/db/storage/record_store_test_truncate.cpp
+++ b/src/mongo/db/storage/record_store_test_truncate.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/storage/record_store.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -114,5 +115,15 @@ TEST(RecordStoreTestHarness, TruncateNonEmpty) {
}
}
+DEATH_TEST(RecordStoreTestHarness,
+ RangeTruncateMustHaveBoundsTest,
+ "Ranged truncate must have one bound defined") {
+ const auto harnessHelper(newRecordStoreHarnessHelper());
+ unique_ptr<RecordStore> rs(harnessHelper->newRecordStore());
+
+ auto opCtx = harnessHelper->newOperationContext();
+
+ auto result = rs->rangeTruncate(opCtx.get(), RecordId(), RecordId(), 0, 0);
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 6aa988f13d8..4286d32c2c3 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -1406,12 +1406,15 @@ Status WiredTigerRecordStore::doRangeTruncate(OperationContext* opCtx,
return Status::OK();
}
invariantWTOK(ret, start->session);
+ // Make sure to reset the cursor since we have to replace it with what the user provided us.
+ invariantWTOK(start->reset(start), start->session);
boost::optional<CursorKey> startKey;
if (minRecordId != RecordId()) {
- invariantWTOK(start->reset(start), start->session);
startKey = makeCursorKey(minRecordId, _keyFormat);
setKey(start, &(*startKey));
+ } else {
+ start = nullptr;
}
WiredTigerCursor endWrap(_uri, _tableId, true, opCtx);
boost::optional<CursorKey> endKey;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index bd2e356eb0f..3d921b3e45e 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -1098,38 +1098,6 @@ DEATH_TEST(WiredTigerRecordStoreTest,
testTruncateRange(100, 4, 3);
}
-TEST(WiredTigerRecordStoreTest, RangeTruncateAllTest) {
- unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newRecordStore());
-
- auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
-
- auto opCtx = harnessHelper->newOperationContext();
-
- static constexpr auto kNumRecordsToInsert = 100;
- for (int i = 0; i < kNumRecordsToInsert; i++) {
- auto recordId = insertBSONWithSize(opCtx.get(), wtrs, Timestamp(1, 0), 100);
- ASSERT_OK(recordId);
- }
-
- auto sizePerRecord = wtrs->dataSize(opCtx.get()) / wtrs->numRecords(opCtx.get());
-
- {
- WriteUnitOfWork wuow(opCtx.get());
- ASSERT_OK(wtrs->rangeTruncate(opCtx.get(),
- RecordId(),
- RecordId(),
- -(sizePerRecord * kNumRecordsToInsert),
- -kNumRecordsToInsert));
- ASSERT_EQ(wtrs->dataSize(opCtx.get()), 0);
- ASSERT_EQ(wtrs->numRecords(opCtx.get()), 0);
- wuow.commit();
- }
-
- auto cursor = wtrs->getCursor(opCtx.get(), true);
- ASSERT_FALSE(cursor->next());
-}
-
TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) {
unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
unique_ptr<RecordStore> rs(harnessHelper->newOplogRecordStore());