summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Wlodarek <gregory.wlodarek@mongodb.com>2021-11-04 13:37:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-05 00:29:30 +0000
commit6da66799d036cfb42e18a75ca9bd2272574a814d (patch)
treec001aff6ea5b13fd6db810ef0501544acd0ae489
parente5cb44d46d108c3cf3784621264bb04cd3c2f503 (diff)
downloadmongo-6da66799d036cfb42e18a75ca9bd2272574a814d.tar.gz
SERVER-60334 Avoid caching the cursor and session in WiredTigerSizeStorer
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp59
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h14
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp17
4 files changed, 48 insertions, 44 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index c5d3ba831fc..3037578f391 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -894,7 +894,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
// case for temporary RecordStores (those not associated with any collection) and in unit
// tests. Persistent size information is not required in either case. If a RecordStore needs
// persistent size information, we require it to use a SizeStorer.
- _sizeInfo = _sizeStorer ? _sizeStorer->load(_uri)
+ _sizeInfo = _sizeStorer ? _sizeStorer->load(ctx, _uri)
: std::make_shared<WiredTigerSizeStorer::SizeInfo>(0, 0);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
index 933f3c0f64e..d0a01c83403 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
@@ -50,22 +50,20 @@ namespace mongo {
WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn,
const std::string& storageUri,
bool readOnly)
- : _session(conn), _readOnly(readOnly) {
- WT_SESSION* session = _session.getSession();
+ : _conn(conn),
+ _storageUri(storageUri),
+ _tableId(WiredTigerSession::genTableId()),
+ _readOnly(readOnly) {
+ if (_readOnly) {
+ return;
+ }
std::string config = WiredTigerCustomizationHooks::get(getGlobalServiceContext())
- ->getTableCreateConfig(storageUri);
- if (!readOnly) {
- invariantWTOK(session->create(session, storageUri.c_str(), config.c_str()));
- }
+ ->getTableCreateConfig(_storageUri);
+ WiredTigerSession session(_conn);
invariantWTOK(
- session->open_cursor(session, storageUri.c_str(), nullptr, "overwrite=true", &_cursor));
-}
-
-WiredTigerSizeStorer::~WiredTigerSizeStorer() {
- stdx::lock_guard<Latch> cursorLock(_cursorMutex);
- _cursor->close(_cursor);
+ session.getSession()->create(session.getSession(), _storageUri.c_str(), config.c_str()));
}
void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo) {
@@ -91,7 +89,8 @@ void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeI
"entryUseCount"_attr = entry.use_count());
}
-std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(StringData uri) const {
+std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(OperationContext* opCtx,
+ StringData uri) const {
{
// Check if we can satisfy the read from the buffer.
stdx::lock_guard<Latch> bufferLock(_bufferMutex);
@@ -100,23 +99,19 @@ std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(Strin
return it->second;
}
- stdx::lock_guard<Latch> cursorLock(_cursorMutex);
- // Intentionally ignoring return value.
- ON_BLOCK_EXIT([&] { _cursor->reset(_cursor); });
-
- _cursor->reset(_cursor);
+ WiredTigerCursor cursor(_storageUri, _tableId, /*allowOverwrite=*/false, opCtx);
{
WT_ITEM key = {uri.rawData(), uri.size()};
- _cursor->set_key(_cursor, &key);
- int ret = _cursor->search(_cursor);
+ cursor->set_key(cursor.get(), &key);
+ int ret = cursor->search(cursor.get());
if (ret == WT_NOTFOUND)
return std::make_shared<SizeInfo>();
invariantWTOK(ret);
}
WT_ITEM value;
- invariantWTOK(_cursor->get_value(_cursor, &value));
+ invariantWTOK(cursor->get_value(cursor.get(), &value));
BSONObj data(reinterpret_cast<const char*>(value.data));
LOGV2_DEBUG(
@@ -136,11 +131,18 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) {
return; // Nothing to do.
Timer t;
- stdx::lock_guard<Latch> cursorLock(_cursorMutex);
+
+ // We serialize flushing to disk to avoid running into write conflicts from having multiple
+ // threads try to flush at the same time.
+ stdx::lock_guard<Latch> flushLock(_flushMutex);
+
+ // When the session is destructed, it closes any cursors that remain open.
+ WiredTigerSession session(_conn);
+ WT_CURSOR* cursor = session.getNewCursor(_storageUri, "overwrite=true");
+
{
// On failure, place entries back into the map, unless a newer value already exists.
ON_BLOCK_EXIT([this, &buffer]() {
- this->_cursor->reset(this->_cursor);
if (!buffer.empty()) {
stdx::lock_guard<Latch> bufferLock(this->_bufferMutex);
for (auto& it : buffer)
@@ -148,8 +150,7 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) {
}
});
- WT_SESSION* session = _session.getSession();
- WiredTigerBeginTxnBlock txnOpen(session, syncToDisk ? "sync=true" : nullptr);
+ WiredTigerBeginTxnBlock txnOpen(session.getSession(), syncToDisk ? "sync=true" : nullptr);
for (auto it = buffer.begin(); it != buffer.end(); ++it) {
@@ -169,12 +170,12 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) {
"data"_attr = redact(data));
WiredTigerItem key(uri.c_str(), uri.size());
WiredTigerItem value(data.objdata(), data.objsize());
- _cursor->set_key(_cursor, key.Get());
- _cursor->set_value(_cursor, value.Get());
- invariantWTOK(_cursor->insert(_cursor));
+ cursor->set_key(cursor, key.Get());
+ cursor->set_value(cursor, value.Get());
+ invariantWTOK(cursor->insert(cursor));
}
txnOpen.done();
- invariantWTOK(session->commit_transaction(session, nullptr));
+ invariantWTOK(session.getSession()->commit_transaction(session.getSession(), nullptr));
buffer.clear();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
index 60b2b885ae0..50eb23324c0 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
@@ -77,7 +77,7 @@ public:
};
WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri, bool readOnly = false);
- ~WiredTigerSizeStorer();
+ ~WiredTigerSizeStorer() = default;
/**
* Ensure that the shared SizeInfo will be stored by the next call to flush.
@@ -85,7 +85,7 @@ public:
*/
void store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo);
- std::shared_ptr<SizeInfo> load(StringData uri) const;
+ std::shared_ptr<SizeInfo> load(OperationContext* opCtx, StringData uri) const;
/**
* Writes all changes to the underlying table.
@@ -93,11 +93,13 @@ public:
void flush(bool syncToDisk);
private:
- const WiredTigerSession _session;
+ WT_CONNECTION* _conn;
+ const std::string _storageUri;
+ const uint64_t _tableId; // Not persisted
const bool _readOnly;
- // Guards _cursor. Acquire *before* _bufferMutex.
- mutable Mutex _cursorMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_cursorMutex");
- WT_CURSOR* _cursor; // pointer is const after constructor
+
+ // Serializes flushes to disk.
+ Mutex _flushMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_flushMutex");
using Buffer = StringMap<std::shared_ptr<SizeInfo>>;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
index fc2414886b0..676c429791f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
@@ -98,7 +98,8 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) {
rs.reset(nullptr);
{
- auto& info = *ss.load(uri);
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ auto& info = *ss.load(opCtx.get(), uri);
ASSERT_EQUALS(N, info.numRecords.load());
}
@@ -146,7 +147,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) {
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
const bool enableWtLogging = false;
WiredTigerSizeStorer ss2(harnessHelper->conn(), indexUri, enableWtLogging);
- auto info = ss2.load(uri);
+ auto info = ss2.load(opCtx.get(), uri);
ASSERT_EQUALS(N, info->numRecords.load());
}
@@ -176,12 +177,12 @@ private:
}
protected:
- long long getNumRecords() const {
- return sizeStorer->load(uri)->numRecords.load();
+ long long getNumRecords(OperationContext* opCtx) const {
+ return sizeStorer->load(opCtx, uri)->numRecords.load();
}
- long long getDataSize() const {
- return sizeStorer->load(uri)->dataSize.load();
+ long long getDataSize(OperationContext* opCtx) const {
+ return sizeStorer->load(opCtx, uri)->dataSize.load();
}
std::unique_ptr<WiredTigerHarnessHelper> harnessHelper;
@@ -196,8 +197,8 @@ TEST_F(SizeStorerUpdateTest, Basic) {
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
long long val = 5;
rs->updateStatsAfterRepair(opCtx.get(), val, val);
- ASSERT_EQUALS(getNumRecords(), val);
- ASSERT_EQUALS(getDataSize(), val);
+ ASSERT_EQUALS(getNumRecords(opCtx.get()), val);
+ ASSERT_EQUALS(getDataSize(opCtx.get()), val);
}
} // namespace