diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2023-03-03 15:25:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-03 19:19:27 +0000 |
commit | 202703cd8d23e3894b707fdf94ad56f9fb9044fe (patch) | |
tree | 33d2524c7d476de1fe896cd250782bbebe345ac9 /src/mongo/db/storage | |
parent | 15a363b98be42d7dcc5db063185d143e0fa72b3c (diff) | |
download | mongo-202703cd8d23e3894b707fdf94ad56f9fb9044fe.tar.gz |
SERVER-74472 Add ranged truncate method to RecordStore
Diffstat (limited to 'src/mongo/db/storage')
9 files changed, 235 insertions, 8 deletions
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 9ebd245b15a..ac1d9394f1a 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -144,6 +144,14 @@ public: return Status::OK(); } + Status doRangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeDiff, + int64_t hintNumRecordsDiff) override { + return Status::OK(); + } + void doCappedTruncateAfter(OperationContext* opCtx, const RecordId& end, bool inclusive, diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp index f70bfb10ac9..661f09bec59 100644 --- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp +++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp @@ -74,12 +74,29 @@ private: class EphemeralForTestRecordStore::TruncateChange : public RecoveryUnit::Change { public: - TruncateChange(OperationContext* opCtx, Data* data) : _opCtx(opCtx), _data(data), _dataSize(0) { + TruncateChange(OperationContext* opCtx, Data* data, const RecordId& begin, const RecordId& end) + : _opCtx(opCtx), _data(data), _dataSize(0) { using std::swap; stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); - swap(_dataSize, _data->dataSize); - swap(_records, _data->records); + + auto it = _data->records.begin(); + while (it != _data->records.end() && it->first < begin) { + it++; + } + + auto beginId = it->first; + + while (it != _data->records.end() && it->first <= end) { + _deletedRecords.try_emplace(it->first, it->second); + // RecordId size + value size. + _dataSize += it->first.memUsage() + it->second.size; + it++; + } + auto endId = it->first; + + _data->records.erase(_data->records.find(beginId), _data->records.find(endId)); + _data->dataSize -= _dataSize; } virtual void commit(OperationContext* opCtx, boost::optional<Timestamp>) {} @@ -87,15 +104,15 @@ public: using std::swap; stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); - swap(_dataSize, _data->dataSize); - swap(_records, _data->records); + _data->records.merge(std::move(_deletedRecords)); + _data->dataSize += _dataSize; } private: OperationContext* _opCtx; + Records _deletedRecords; Data* const _data; int64_t _dataSize; - Records _records; }; class EphemeralForTestRecordStore::Cursor final : public SeekableRecordCursor { @@ -476,7 +493,20 @@ std::unique_ptr<SeekableRecordCursor> EphemeralForTestRecordStore::getCursor( Status EphemeralForTestRecordStore::doTruncate(OperationContext* opCtx) { // Unlike other changes, TruncateChange mutates _data on construction to perform the // truncate - opCtx->recoveryUnit()->registerChange(std::make_unique<TruncateChange>(opCtx, _data)); + opCtx->recoveryUnit()->registerChange( + std::make_unique<TruncateChange>(opCtx, _data, RecordId::minLong(), RecordId::maxLong())); + return Status::OK(); +} + +Status EphemeralForTestRecordStore::doRangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeDiff, + int64_t hintNumRecordsDiff) { + // Unlike other changes, TruncateChange mutates _data on construction to perform the + // truncate. + opCtx->recoveryUnit()->registerChange( + std::make_unique<TruncateChange>(opCtx, _data, minRecordId, maxRecordId)); return Status::OK(); } diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h index 14b832dc069..df4bb197ca2 100644 --- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h +++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h @@ -85,6 +85,11 @@ public: bool forward) const final; Status doTruncate(OperationContext* opCtx) override; + Status doRangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeDiff, + int64_t hintNumRecordsDiff) override; void doCappedTruncateAfter(OperationContext* opCtx, const RecordId& end, diff --git a/src/mongo/db/storage/external_record_store.h b/src/mongo/db/storage/external_record_store.h index 58a29d2a66c..c759d2321d5 100644 --- a/src/mongo/db/storage/external_record_store.h +++ b/src/mongo/db/storage/external_record_store.h @@ -127,6 +127,15 @@ protected: return {ErrorCodes::Error::UnknownError, "Unknown error"}; } + Status doRangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeIncrement, + int64_t hintNumRecordsIncrement) final { + unimplementedTasserted(); + return {ErrorCodes::Error::UnknownError, "Unknown error"}; + } + void doCappedTruncateAfter(OperationContext*, const RecordId&, bool, diff --git a/src/mongo/db/storage/record_store.cpp b/src/mongo/db/storage/record_store.cpp index d79d53b8187..2fc2c7f8b16 100644 --- a/src/mongo/db/storage/record_store.cpp +++ b/src/mongo/db/storage/record_store.cpp @@ -81,6 +81,16 @@ Status RecordStore::truncate(OperationContext* opCtx) { return doTruncate(opCtx); } +Status RecordStore::rangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeDiff, + int64_t hintNumRecordsDiff) { + validateWriteAllowed(opCtx); + invariant(minRecordId <= maxRecordId, "Start position cannot be after end position"); + return doRangeTruncate(opCtx, minRecordId, maxRecordId, hintDataSizeDiff, hintNumRecordsDiff); +} + void RecordStore::cappedTruncateAfter(OperationContext* opCtx, const RecordId& end, bool inclusive, diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index 03e99d4b222..8ace312c059 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -532,11 +532,25 @@ public: // higher level /** - * removes all Records + * Removes all Records. */ Status truncate(OperationContext* opCtx); /** + * Removes all Records in the range [minRecordId, maxRecordId] inclusive of both. The hint* + * arguments serve as a hint to the record store of how much data will be truncated. This is + * necessary for some implementations to avoid reading the data between the two RecordIds in + * order to update numRecords and dataSize correctly. Implementations are free to ignore the + * hints if they have a way of obtaining the correct values without the help of external + * callers. + */ + Status rangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId = RecordId(), + const RecordId& maxRecordId = RecordId(), + int64_t hintDataSizeIncrement = 0, + int64_t hintNumRecordsIncrement = 0); + + /** * Truncate documents newer than the document at 'end' from the capped * collection. The collection cannot be completely emptied using this * function. An assertion will be thrown if that is attempted. @@ -722,6 +736,11 @@ protected: const char* damageSource, const mutablebson::DamageVector& damages) = 0; virtual Status doTruncate(OperationContext* opCtx) = 0; + virtual Status doRangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeDiff, + int64_t hintNumRecordsDiff) = 0; virtual void doCappedTruncateAfter(OperationContext* opCtx, const RecordId& end, bool inclusive, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index b663ab48fe5..f5d593754ee 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1343,6 +1343,45 @@ Status WiredTigerRecordStore::doTruncate(OperationContext* opCtx) { return Status::OK(); } +Status WiredTigerRecordStore::doRangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeDiff, + int64_t hintNumRecordsDiff) { + WiredTigerCursor startWrap(_uri, _tableId, true, opCtx); + WT_CURSOR* start = startWrap.get(); + int ret = wiredTigerPrepareConflictRetry(opCtx, [&] { return start->next(start); }); + // Empty collections don't have anything to truncate. + if (ret == WT_NOTFOUND) { + return Status::OK(); + } + invariantWTOK(ret, start->session); + + boost::optional<CursorKey> startKey; + if (minRecordId != RecordId()) { + invariantWTOK(start->reset(start), start->session); + startKey = makeCursorKey(minRecordId, _keyFormat); + setKey(start, &(*startKey)); + } + WiredTigerCursor endWrap(_uri, _tableId, true, opCtx); + boost::optional<CursorKey> endKey; + WT_CURSOR* finish = [&]() -> WT_CURSOR* { + if (maxRecordId == RecordId()) { + return nullptr; + } + endKey = makeCursorKey(maxRecordId, _keyFormat); + setKey(endWrap.get(), &(*endKey)); + return endWrap.get(); + }(); + + WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession()->getSession(); + invariantWTOK(WT_OP_CHECK(session->truncate(session, nullptr, start, finish, nullptr)), + session); + _changeNumRecordsAndDataSize(opCtx, hintNumRecordsDiff, hintDataSizeDiff); + + return Status::OK(); +} + Status WiredTigerRecordStore::doCompact(OperationContext* opCtx) { dassert(opCtx->lockState()->isWriteLocked()); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index d5b92f691f4..4cf89e58f09 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -176,6 +176,11 @@ public: std::unique_ptr<RecordCursor> getRandomCursor(OperationContext* opCtx) const final; Status doTruncate(OperationContext* opCtx) final; + Status doRangeTruncate(OperationContext* opCtx, + const RecordId& minRecordId, + const RecordId& maxRecordId, + int64_t hintDataSizeDiff, + int64_t hintNumRecordsDiff) final; virtual bool compactSupported() const { return !_isEphemeral; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index b54f99fa022..05444f0d3f0 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -45,6 +45,7 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/unittest/barrier.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -979,6 +980,107 @@ TEST(WiredTigerRecordStoreTest, OplogTruncateMarkers_Duplicates) { } } +void testTruncateRange(int64_t numRecordsToInsert, + int64_t deletionPosBegin, + int64_t deletionPosEnd) { + auto harnessHelper = newRecordStoreHarnessHelper(); + unique_ptr<RecordStore> rs(harnessHelper->newRecordStore()); + + auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get()); + + std::vector<RecordId> recordIds; + + auto opCtx = harnessHelper->newOperationContext(); + + for (int i = 0; i < numRecordsToInsert; i++) { + auto recordId = insertBSONWithSize(opCtx.get(), wtrs, Timestamp(1, i), 100); + ASSERT_OK(recordId); + recordIds.emplace_back(std::move(recordId.getValue())); + } + + auto sizePerRecord = wtrs->dataSize(opCtx.get()) / wtrs->numRecords(opCtx.get()); + + std::sort(recordIds.begin(), recordIds.end()); + + const auto& beginId = recordIds[deletionPosBegin]; + const auto& endId = recordIds[deletionPosEnd]; + { + WriteUnitOfWork wuow(opCtx.get()); + + auto numRecordsDeleted = deletionPosEnd - deletionPosBegin + 1; + + ASSERT_OK(wtrs->rangeTruncate( + opCtx.get(), beginId, endId, -(sizePerRecord * numRecordsDeleted), -numRecordsDeleted)); + + ASSERT_EQ(wtrs->dataSize(opCtx.get()), + sizePerRecord * (numRecordsToInsert - numRecordsDeleted)); + ASSERT_EQ(wtrs->numRecords(opCtx.get()), (numRecordsToInsert - numRecordsDeleted)); + + wuow.commit(); + } + std::set<RecordId> expectedRemainingRecordIds; + std::copy(recordIds.begin(), + recordIds.begin() + deletionPosBegin, + std::inserter(expectedRemainingRecordIds, expectedRemainingRecordIds.end())); + std::copy(recordIds.begin() + deletionPosEnd + 1, + recordIds.end(), + std::inserter(expectedRemainingRecordIds, expectedRemainingRecordIds.end())); + + std::set<RecordId> actualRemainingRecordIds; + + auto cursor = wtrs->getCursor(opCtx.get(), true); + while (auto record = cursor->next()) { + actualRemainingRecordIds.emplace(record->id); + } + ASSERT_EQ(expectedRemainingRecordIds, actualRemainingRecordIds); +} + +TEST(WiredTigerRecordStoreTest, RangeTruncateTest) { + testTruncateRange(100, 3, 50); +} + +TEST(WiredTigerRecordStoreTest, RangeTruncateSameValueTest) { + testTruncateRange(100, 3, 3); +} + +DEATH_TEST(WiredTigerRecordStoreTest, + RangeTruncateIncorrectOrderTest, + "Start position cannot be after end position") { + testTruncateRange(100, 4, 3); +} + +TEST(WiredTigerRecordStoreTest, RangeTruncateAllTest) { + unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper()); + unique_ptr<RecordStore> rs(harnessHelper->newRecordStore()); + + auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get()); + + auto opCtx = harnessHelper->newOperationContext(); + + static constexpr auto kNumRecordsToInsert = 100; + for (int i = 0; i < kNumRecordsToInsert; i++) { + auto recordId = insertBSONWithSize(opCtx.get(), wtrs, Timestamp(1, 0), 100); + ASSERT_OK(recordId); + } + + auto sizePerRecord = wtrs->dataSize(opCtx.get()) / wtrs->numRecords(opCtx.get()); + + { + WriteUnitOfWork wuow(opCtx.get()); + ASSERT_OK(wtrs->rangeTruncate(opCtx.get(), + RecordId(), + RecordId(), + -(sizePerRecord * kNumRecordsToInsert), + -kNumRecordsToInsert)); + ASSERT_EQ(wtrs->dataSize(opCtx.get()), 0); + ASSERT_EQ(wtrs->numRecords(opCtx.get()), 0); + wuow.commit(); + } + + auto cursor = wtrs->getCursor(opCtx.get(), true); + ASSERT_FALSE(cursor->next()); +} + TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) { unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper()); unique_ptr<RecordStore> rs(harnessHelper->newOplogRecordStore()); |