summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp59
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h14
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp17
4 files changed, 48 insertions, 44 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index eed04767a6e..30ba0eb4c45 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -722,7 +722,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
// the case for temporary RecordStores (those not associated with any collection) and in unit
// tests. Persistent size information is not required in either case. If a RecordStore needs
// persistent size information, we require it to use a SizeStorer.
- _sizeInfo = _sizeStorer ? _sizeStorer->load(_uri)
+ _sizeInfo = _sizeStorer ? _sizeStorer->load(ctx, _uri)
: std::make_shared<WiredTigerSizeStorer::SizeInfo>(0, 0);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
index be5dfca336e..9decf12275f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
@@ -50,22 +50,20 @@ namespace mongo {
WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn,
const std::string& storageUri,
bool readOnly)
- : _session(conn), _readOnly(readOnly) {
- WT_SESSION* session = _session.getSession();
+ : _conn(conn),
+ _storageUri(storageUri),
+ _tableId(WiredTigerSession::genTableId()),
+ _readOnly(readOnly) {
+ if (_readOnly) {
+ return;
+ }
std::string config = WiredTigerCustomizationHooks::get(getGlobalServiceContext())
- ->getTableCreateConfig(storageUri);
- if (!readOnly) {
- invariantWTOK(session->create(session, storageUri.c_str(), config.c_str()));
- }
+ ->getTableCreateConfig(_storageUri);
+ WiredTigerSession session(_conn);
invariantWTOK(
- session->open_cursor(session, storageUri.c_str(), nullptr, "overwrite=true", &_cursor));
-}
-
-WiredTigerSizeStorer::~WiredTigerSizeStorer() {
- stdx::lock_guard<Latch> cursorLock(_cursorMutex);
- _cursor->close(_cursor);
+ session.getSession()->create(session.getSession(), _storageUri.c_str(), config.c_str()));
}
void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo) {
@@ -87,7 +85,8 @@ void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeI
<< ", dataSize: " << sizeInfo->dataSize.load() << ", use_count: " << entry.use_count();
}
-std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(StringData uri) const {
+std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(OperationContext* opCtx,
+ StringData uri) const {
{
// Check if we can satisfy the read from the buffer.
stdx::lock_guard<Latch> bufferLock(_bufferMutex);
@@ -96,23 +95,19 @@ std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(Strin
return it->second;
}
- stdx::lock_guard<Latch> cursorLock(_cursorMutex);
- // Intentionally ignoring return value.
- ON_BLOCK_EXIT([&] { _cursor->reset(_cursor); });
-
- _cursor->reset(_cursor);
+ WiredTigerCursor cursor(_storageUri, _tableId, /*allowOverwrite=*/false, opCtx);
{
WT_ITEM key = {uri.rawData(), uri.size()};
- _cursor->set_key(_cursor, &key);
- int ret = _cursor->search(_cursor);
+ cursor->set_key(cursor.get(), &key);
+ int ret = cursor->search(cursor.get());
if (ret == WT_NOTFOUND)
return std::make_shared<SizeInfo>();
invariantWTOK(ret);
}
WT_ITEM value;
- invariantWTOK(_cursor->get_value(_cursor, &value));
+ invariantWTOK(cursor->get_value(cursor.get(), &value));
BSONObj data(reinterpret_cast<const char*>(value.data));
LOG(2) << "WiredTigerSizeStorer::load " << uri << " -> " << redact(data);
@@ -131,11 +126,18 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) {
return; // Nothing to do.
Timer t;
- stdx::lock_guard<Latch> cursorLock(_cursorMutex);
+
+ // We serialize flushing to disk to avoid running into write conflicts from having multiple
+ // threads try to flush at the same time.
+ stdx::lock_guard<Latch> flushLock(_flushMutex);
+
+ // When the session is destructed, it closes any cursors that remain open.
+ WiredTigerSession session(_conn);
+ WT_CURSOR* cursor = session.getCursor(_storageUri, _tableId, true);
+
{
// On failure, place entries back into the map, unless a newer value already exists.
ON_BLOCK_EXIT([this, &buffer]() {
- this->_cursor->reset(this->_cursor);
if (!buffer.empty()) {
stdx::lock_guard<Latch> bufferLock(this->_bufferMutex);
for (auto& it : buffer)
@@ -150,8 +152,7 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) {
if (syncToDisk) {
txnConfig += ",sync=true";
}
- WT_SESSION* session = _session.getSession();
- WiredTigerBeginTxnBlock txnOpen(session, txnConfig.c_str());
+ WiredTigerBeginTxnBlock txnOpen(session.getSession(), txnConfig.c_str());
for (auto it = buffer.begin(); it != buffer.end(); ++it) {
@@ -167,9 +168,9 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) {
LOG(2) << "WiredTigerSizeStorer::flush " << uri << " -> " << redact(data);
WiredTigerItem key(uri.c_str(), uri.size());
WiredTigerItem value(data.objdata(), data.objsize());
- _cursor->set_key(_cursor, key.Get());
- _cursor->set_value(_cursor, value.Get());
- int ret = _cursor->insert(_cursor);
+ cursor->set_key(cursor, key.Get());
+ cursor->set_value(cursor, value.Get());
+ int ret = cursor->insert(cursor);
if (ret == WT_ROLLBACK) {
// One of the code paths calling this function is when a session is checked back
// into the session cache. This could involve read-only operations which don't
@@ -180,7 +181,7 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) {
invariantWTOK(ret);
}
txnOpen.done();
- invariantWTOK(session->commit_transaction(session, nullptr));
+ invariantWTOK(session.getSession()->commit_transaction(session.getSession(), nullptr));
buffer.clear();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
index c143b9e87f5..99d2f16ec7d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
@@ -79,7 +79,7 @@ public:
WiredTigerSizeStorer(WT_CONNECTION* conn,
const std::string& storageUri,
const bool readOnly = false);
- ~WiredTigerSizeStorer();
+ ~WiredTigerSizeStorer() = default;
/**
* Ensure that the shared SizeInfo will be stored by the next call to flush.
@@ -87,7 +87,7 @@ public:
*/
void store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo);
- std::shared_ptr<SizeInfo> load(StringData uri) const;
+ std::shared_ptr<SizeInfo> load(OperationContext* opCtx, StringData uri) const;
/**
* Writes all changes to the underlying table.
@@ -95,11 +95,13 @@ public:
void flush(bool syncToDisk);
private:
- const WiredTigerSession _session;
+ WT_CONNECTION* _conn;
+ const std::string _storageUri;
+ const uint64_t _tableId; // Not persisted
const bool _readOnly;
- // Guards _cursor. Acquire *before* _bufferMutex.
- mutable Mutex _cursorMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_cursorMutex");
- WT_CURSOR* _cursor; // pointer is const after constructor
+
+ // Serializes flushes to disk.
+ Mutex _flushMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_flushMutex");
using Buffer = StringMap<std::shared_ptr<SizeInfo>>;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
index 2da84b493cc..c446e637b3d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
@@ -249,7 +249,8 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) {
rs.reset(NULL);
{
- auto& info = *ss.load(uri);
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ auto& info = *ss.load(opCtx.get(), uri);
ASSERT_EQUALS(N, info.numRecords.load());
}
@@ -295,7 +296,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) {
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
const bool enableWtLogging = false;
WiredTigerSizeStorer ss2(harnessHelper->conn(), indexUri, enableWtLogging);
- auto info = ss2.load(uri);
+ auto info = ss2.load(opCtx.get(), uri);
ASSERT_EQUALS(N, info->numRecords.load());
}
@@ -325,12 +326,12 @@ private:
}
protected:
- long long getNumRecords() const {
- return sizeStorer->load(uri)->numRecords.load();
+ long long getNumRecords(OperationContext* opCtx) const {
+ return sizeStorer->load(opCtx, uri)->numRecords.load();
}
- long long getDataSize() const {
- return sizeStorer->load(uri)->dataSize.load();
+ long long getDataSize(OperationContext* opCtx) const {
+ return sizeStorer->load(opCtx, uri)->dataSize.load();
}
std::unique_ptr<WiredTigerHarnessHelper> harnessHelper;
@@ -345,8 +346,8 @@ TEST_F(SizeStorerUpdateTest, Basic) {
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
long long val = 5;
rs->updateStatsAfterRepair(opCtx.get(), val, val);
- ASSERT_EQUALS(getNumRecords(), val);
- ASSERT_EQUALS(getDataSize(), val);
+ ASSERT_EQUALS(getNumRecords(opCtx.get()), val);
+ ASSERT_EQUALS(getDataSize(opCtx.get()), val);
}
} // namespace