summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2023-03-03 15:25:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-03 19:19:27 +0000
commit202703cd8d23e3894b707fdf94ad56f9fb9044fe (patch)
tree33d2524c7d476de1fe896cd250782bbebe345ac9 /src/mongo/db/storage
parent15a363b98be42d7dcc5db063185d143e0fa72b3c (diff)
downloadmongo-202703cd8d23e3894b707fdf94ad56f9fb9044fe.tar.gz
SERVER-74472 Add ranged truncate method to RecordStore
Diffstat (limited to 'src/mongo/db/storage')
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.cpp8
-rw-r--r--src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp44
-rw-r--r--src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h5
-rw-r--r--src/mongo/db/storage/external_record_store.h9
-rw-r--r--src/mongo/db/storage/record_store.cpp10
-rw-r--r--src/mongo/db/storage/record_store.h21
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp39
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp102
9 files changed, 235 insertions, 8 deletions
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
index 9ebd245b15a..ac1d9394f1a 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
@@ -144,6 +144,14 @@ public:
return Status::OK();
}
+ Status doRangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeDiff,
+ int64_t hintNumRecordsDiff) override {
+ return Status::OK();
+ }
+
void doCappedTruncateAfter(OperationContext* opCtx,
const RecordId& end,
bool inclusive,
diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp
index f70bfb10ac9..661f09bec59 100644
--- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp
+++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp
@@ -74,12 +74,29 @@ private:
class EphemeralForTestRecordStore::TruncateChange : public RecoveryUnit::Change {
public:
- TruncateChange(OperationContext* opCtx, Data* data) : _opCtx(opCtx), _data(data), _dataSize(0) {
+ TruncateChange(OperationContext* opCtx, Data* data, const RecordId& begin, const RecordId& end)
+ : _opCtx(opCtx), _data(data), _dataSize(0) {
using std::swap;
stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex);
- swap(_dataSize, _data->dataSize);
- swap(_records, _data->records);
+
+ auto it = _data->records.begin();
+ while (it != _data->records.end() && it->first < begin) {
+ it++;
+ }
+
+ auto beginId = it->first;
+
+ while (it != _data->records.end() && it->first <= end) {
+ _deletedRecords.try_emplace(it->first, it->second);
+ // RecordId size + value size.
+ _dataSize += it->first.memUsage() + it->second.size;
+ it++;
+ }
+ auto endId = it->first;
+
+ _data->records.erase(_data->records.find(beginId), _data->records.find(endId));
+ _data->dataSize -= _dataSize;
}
virtual void commit(OperationContext* opCtx, boost::optional<Timestamp>) {}
@@ -87,15 +104,15 @@ public:
using std::swap;
stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex);
- swap(_dataSize, _data->dataSize);
- swap(_records, _data->records);
+ _data->records.merge(std::move(_deletedRecords));
+ _data->dataSize += _dataSize;
}
private:
OperationContext* _opCtx;
+ Records _deletedRecords;
Data* const _data;
int64_t _dataSize;
- Records _records;
};
class EphemeralForTestRecordStore::Cursor final : public SeekableRecordCursor {
@@ -476,7 +493,20 @@ std::unique_ptr<SeekableRecordCursor> EphemeralForTestRecordStore::getCursor(
Status EphemeralForTestRecordStore::doTruncate(OperationContext* opCtx) {
// Unlike other changes, TruncateChange mutates _data on construction to perform the
// truncate
- opCtx->recoveryUnit()->registerChange(std::make_unique<TruncateChange>(opCtx, _data));
+ opCtx->recoveryUnit()->registerChange(
+ std::make_unique<TruncateChange>(opCtx, _data, RecordId::minLong(), RecordId::maxLong()));
+ return Status::OK();
+}
+
+Status EphemeralForTestRecordStore::doRangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeDiff,
+ int64_t hintNumRecordsDiff) {
+ // Unlike other changes, TruncateChange mutates _data on construction to perform the
+ // truncate.
+ opCtx->recoveryUnit()->registerChange(
+ std::make_unique<TruncateChange>(opCtx, _data, minRecordId, maxRecordId));
return Status::OK();
}
diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
index 14b832dc069..df4bb197ca2 100644
--- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
+++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
@@ -85,6 +85,11 @@ public:
bool forward) const final;
Status doTruncate(OperationContext* opCtx) override;
+ Status doRangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeDiff,
+ int64_t hintNumRecordsDiff) override;
void doCappedTruncateAfter(OperationContext* opCtx,
const RecordId& end,
diff --git a/src/mongo/db/storage/external_record_store.h b/src/mongo/db/storage/external_record_store.h
index 58a29d2a66c..c759d2321d5 100644
--- a/src/mongo/db/storage/external_record_store.h
+++ b/src/mongo/db/storage/external_record_store.h
@@ -127,6 +127,15 @@ protected:
return {ErrorCodes::Error::UnknownError, "Unknown error"};
}
+ Status doRangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeIncrement,
+ int64_t hintNumRecordsIncrement) final {
+ unimplementedTasserted();
+ return {ErrorCodes::Error::UnknownError, "Unknown error"};
+ }
+
void doCappedTruncateAfter(OperationContext*,
const RecordId&,
bool,
diff --git a/src/mongo/db/storage/record_store.cpp b/src/mongo/db/storage/record_store.cpp
index d79d53b8187..2fc2c7f8b16 100644
--- a/src/mongo/db/storage/record_store.cpp
+++ b/src/mongo/db/storage/record_store.cpp
@@ -81,6 +81,16 @@ Status RecordStore::truncate(OperationContext* opCtx) {
return doTruncate(opCtx);
}
+Status RecordStore::rangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeDiff,
+ int64_t hintNumRecordsDiff) {
+ validateWriteAllowed(opCtx);
+ invariant(minRecordId <= maxRecordId, "Start position cannot be after end position");
+ return doRangeTruncate(opCtx, minRecordId, maxRecordId, hintDataSizeDiff, hintNumRecordsDiff);
+}
+
void RecordStore::cappedTruncateAfter(OperationContext* opCtx,
const RecordId& end,
bool inclusive,
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index 03e99d4b222..8ace312c059 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -532,11 +532,25 @@ public:
// higher level
/**
- * removes all Records
+ * Removes all Records.
*/
Status truncate(OperationContext* opCtx);
/**
+ * Removes all Records in the range [minRecordId, maxRecordId] inclusive of both. The hint*
+ * arguments serve as a hint to the record store of how much data will be truncated. This is
+ * necessary for some implementations to avoid reading the data between the two RecordIds in
+ * order to update numRecords and dataSize correctly. Implementations are free to ignore the
+ * hints if they have a way of obtaining the correct values without the help of external
+ * callers.
+ */
+ Status rangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId = RecordId(),
+ const RecordId& maxRecordId = RecordId(),
+ int64_t hintDataSizeIncrement = 0,
+ int64_t hintNumRecordsIncrement = 0);
+
+ /**
* Truncate documents newer than the document at 'end' from the capped
* collection. The collection cannot be completely emptied using this
* function. An assertion will be thrown if that is attempted.
@@ -722,6 +736,11 @@ protected:
const char* damageSource,
const mutablebson::DamageVector& damages) = 0;
virtual Status doTruncate(OperationContext* opCtx) = 0;
+ virtual Status doRangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeDiff,
+ int64_t hintNumRecordsDiff) = 0;
virtual void doCappedTruncateAfter(OperationContext* opCtx,
const RecordId& end,
bool inclusive,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index b663ab48fe5..f5d593754ee 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -1343,6 +1343,45 @@ Status WiredTigerRecordStore::doTruncate(OperationContext* opCtx) {
return Status::OK();
}
+Status WiredTigerRecordStore::doRangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeDiff,
+ int64_t hintNumRecordsDiff) {
+ WiredTigerCursor startWrap(_uri, _tableId, true, opCtx);
+ WT_CURSOR* start = startWrap.get();
+ int ret = wiredTigerPrepareConflictRetry(opCtx, [&] { return start->next(start); });
+ // Empty collections don't have anything to truncate.
+ if (ret == WT_NOTFOUND) {
+ return Status::OK();
+ }
+ invariantWTOK(ret, start->session);
+
+ boost::optional<CursorKey> startKey;
+ if (minRecordId != RecordId()) {
+ invariantWTOK(start->reset(start), start->session);
+ startKey = makeCursorKey(minRecordId, _keyFormat);
+ setKey(start, &(*startKey));
+ }
+ WiredTigerCursor endWrap(_uri, _tableId, true, opCtx);
+ boost::optional<CursorKey> endKey;
+ WT_CURSOR* finish = [&]() -> WT_CURSOR* {
+ if (maxRecordId == RecordId()) {
+ return nullptr;
+ }
+ endKey = makeCursorKey(maxRecordId, _keyFormat);
+ setKey(endWrap.get(), &(*endKey));
+ return endWrap.get();
+ }();
+
+ WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession()->getSession();
+ invariantWTOK(WT_OP_CHECK(session->truncate(session, nullptr, start, finish, nullptr)),
+ session);
+ _changeNumRecordsAndDataSize(opCtx, hintNumRecordsDiff, hintDataSizeDiff);
+
+ return Status::OK();
+}
+
Status WiredTigerRecordStore::doCompact(OperationContext* opCtx) {
dassert(opCtx->lockState()->isWriteLocked());
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index d5b92f691f4..4cf89e58f09 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -176,6 +176,11 @@ public:
std::unique_ptr<RecordCursor> getRandomCursor(OperationContext* opCtx) const final;
Status doTruncate(OperationContext* opCtx) final;
+ Status doRangeTruncate(OperationContext* opCtx,
+ const RecordId& minRecordId,
+ const RecordId& maxRecordId,
+ int64_t hintDataSizeDiff,
+ int64_t hintNumRecordsDiff) final;
virtual bool compactSupported() const {
return !_isEphemeral;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index b54f99fa022..05444f0d3f0 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -979,6 +980,107 @@ TEST(WiredTigerRecordStoreTest, OplogTruncateMarkers_Duplicates) {
}
}
+void testTruncateRange(int64_t numRecordsToInsert,
+ int64_t deletionPosBegin,
+ int64_t deletionPosEnd) {
+ auto harnessHelper = newRecordStoreHarnessHelper();
+ unique_ptr<RecordStore> rs(harnessHelper->newRecordStore());
+
+ auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
+
+ std::vector<RecordId> recordIds;
+
+ auto opCtx = harnessHelper->newOperationContext();
+
+ for (int i = 0; i < numRecordsToInsert; i++) {
+ auto recordId = insertBSONWithSize(opCtx.get(), wtrs, Timestamp(1, i), 100);
+ ASSERT_OK(recordId);
+ recordIds.emplace_back(std::move(recordId.getValue()));
+ }
+
+ auto sizePerRecord = wtrs->dataSize(opCtx.get()) / wtrs->numRecords(opCtx.get());
+
+ std::sort(recordIds.begin(), recordIds.end());
+
+ const auto& beginId = recordIds[deletionPosBegin];
+ const auto& endId = recordIds[deletionPosEnd];
+ {
+ WriteUnitOfWork wuow(opCtx.get());
+
+ auto numRecordsDeleted = deletionPosEnd - deletionPosBegin + 1;
+
+ ASSERT_OK(wtrs->rangeTruncate(
+ opCtx.get(), beginId, endId, -(sizePerRecord * numRecordsDeleted), -numRecordsDeleted));
+
+ ASSERT_EQ(wtrs->dataSize(opCtx.get()),
+ sizePerRecord * (numRecordsToInsert - numRecordsDeleted));
+ ASSERT_EQ(wtrs->numRecords(opCtx.get()), (numRecordsToInsert - numRecordsDeleted));
+
+ wuow.commit();
+ }
+ std::set<RecordId> expectedRemainingRecordIds;
+ std::copy(recordIds.begin(),
+ recordIds.begin() + deletionPosBegin,
+ std::inserter(expectedRemainingRecordIds, expectedRemainingRecordIds.end()));
+ std::copy(recordIds.begin() + deletionPosEnd + 1,
+ recordIds.end(),
+ std::inserter(expectedRemainingRecordIds, expectedRemainingRecordIds.end()));
+
+ std::set<RecordId> actualRemainingRecordIds;
+
+ auto cursor = wtrs->getCursor(opCtx.get(), true);
+ while (auto record = cursor->next()) {
+ actualRemainingRecordIds.emplace(record->id);
+ }
+ ASSERT_EQ(expectedRemainingRecordIds, actualRemainingRecordIds);
+}
+
+TEST(WiredTigerRecordStoreTest, RangeTruncateTest) {
+ testTruncateRange(100, 3, 50);
+}
+
+TEST(WiredTigerRecordStoreTest, RangeTruncateSameValueTest) {
+ testTruncateRange(100, 3, 3);
+}
+
+DEATH_TEST(WiredTigerRecordStoreTest,
+ RangeTruncateIncorrectOrderTest,
+ "Start position cannot be after end position") {
+ testTruncateRange(100, 4, 3);
+}
+
+TEST(WiredTigerRecordStoreTest, RangeTruncateAllTest) {
+ unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
+ unique_ptr<RecordStore> rs(harnessHelper->newRecordStore());
+
+ auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
+
+ auto opCtx = harnessHelper->newOperationContext();
+
+ static constexpr auto kNumRecordsToInsert = 100;
+ for (int i = 0; i < kNumRecordsToInsert; i++) {
+ auto recordId = insertBSONWithSize(opCtx.get(), wtrs, Timestamp(1, 0), 100);
+ ASSERT_OK(recordId);
+ }
+
+ auto sizePerRecord = wtrs->dataSize(opCtx.get()) / wtrs->numRecords(opCtx.get());
+
+ {
+ WriteUnitOfWork wuow(opCtx.get());
+ ASSERT_OK(wtrs->rangeTruncate(opCtx.get(),
+ RecordId(),
+ RecordId(),
+ -(sizePerRecord * kNumRecordsToInsert),
+ -kNumRecordsToInsert));
+ ASSERT_EQ(wtrs->dataSize(opCtx.get()), 0);
+ ASSERT_EQ(wtrs->numRecords(opCtx.get()), 0);
+ wuow.commit();
+ }
+
+ auto cursor = wtrs->getCursor(opCtx.get(), true);
+ ASSERT_FALSE(cursor->next());
+}
+
TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) {
unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
unique_ptr<RecordStore> rs(harnessHelper->newOplogRecordStore());