diff options
4 files changed, 48 insertions, 44 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index c5d3ba831fc..3037578f391 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -894,7 +894,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, // case for temporary RecordStores (those not associated with any collection) and in unit // tests. Persistent size information is not required in either case. If a RecordStore needs // persistent size information, we require it to use a SizeStorer. - _sizeInfo = _sizeStorer ? _sizeStorer->load(_uri) + _sizeInfo = _sizeStorer ? _sizeStorer->load(ctx, _uri) : std::make_shared<WiredTigerSizeStorer::SizeInfo>(0, 0); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp index 933f3c0f64e..d0a01c83403 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp @@ -50,22 +50,20 @@ namespace mongo { WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri, bool readOnly) - : _session(conn), _readOnly(readOnly) { - WT_SESSION* session = _session.getSession(); + : _conn(conn), + _storageUri(storageUri), + _tableId(WiredTigerSession::genTableId()), + _readOnly(readOnly) { + if (_readOnly) { + return; + } std::string config = WiredTigerCustomizationHooks::get(getGlobalServiceContext()) - ->getTableCreateConfig(storageUri); - if (!readOnly) { - invariantWTOK(session->create(session, storageUri.c_str(), config.c_str())); - } + ->getTableCreateConfig(_storageUri); + WiredTigerSession session(_conn); invariantWTOK( - session->open_cursor(session, storageUri.c_str(), nullptr, "overwrite=true", &_cursor)); -} - -WiredTigerSizeStorer::~WiredTigerSizeStorer() { - stdx::lock_guard<Latch> cursorLock(_cursorMutex); - _cursor->close(_cursor); + session.getSession()->create(session.getSession(), _storageUri.c_str(), config.c_str())); } void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo) { @@ -91,7 +89,8 @@ void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeI "entryUseCount"_attr = entry.use_count()); } -std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(StringData uri) const { +std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(OperationContext* opCtx, + StringData uri) const { { // Check if we can satisfy the read from the buffer. stdx::lock_guard<Latch> bufferLock(_bufferMutex); @@ -100,23 +99,19 @@ std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(Strin return it->second; } - stdx::lock_guard<Latch> cursorLock(_cursorMutex); - // Intentionally ignoring return value. - ON_BLOCK_EXIT([&] { _cursor->reset(_cursor); }); - - _cursor->reset(_cursor); + WiredTigerCursor cursor(_storageUri, _tableId, /*allowOverwrite=*/false, opCtx); { WT_ITEM key = {uri.rawData(), uri.size()}; - _cursor->set_key(_cursor, &key); - int ret = _cursor->search(_cursor); + cursor->set_key(cursor.get(), &key); + int ret = cursor->search(cursor.get()); if (ret == WT_NOTFOUND) return std::make_shared<SizeInfo>(); invariantWTOK(ret); } WT_ITEM value; - invariantWTOK(_cursor->get_value(_cursor, &value)); + invariantWTOK(cursor->get_value(cursor.get(), &value)); BSONObj data(reinterpret_cast<const char*>(value.data)); LOGV2_DEBUG( @@ -136,11 +131,18 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) { return; // Nothing to do. Timer t; - stdx::lock_guard<Latch> cursorLock(_cursorMutex); + + // We serialize flushing to disk to avoid running into write conflicts from having multiple + // threads try to flush at the same time. + stdx::lock_guard<Latch> flushLock(_flushMutex); + + // When the session is destructed, it closes any cursors that remain open. + WiredTigerSession session(_conn); + WT_CURSOR* cursor = session.getNewCursor(_storageUri, "overwrite=true"); + { // On failure, place entries back into the map, unless a newer value already exists. ON_BLOCK_EXIT([this, &buffer]() { - this->_cursor->reset(this->_cursor); if (!buffer.empty()) { stdx::lock_guard<Latch> bufferLock(this->_bufferMutex); for (auto& it : buffer) @@ -148,8 +150,7 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) { } }); - WT_SESSION* session = _session.getSession(); - WiredTigerBeginTxnBlock txnOpen(session, syncToDisk ? "sync=true" : nullptr); + WiredTigerBeginTxnBlock txnOpen(session.getSession(), syncToDisk ? "sync=true" : nullptr); for (auto it = buffer.begin(); it != buffer.end(); ++it) { @@ -169,12 +170,12 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) { "data"_attr = redact(data)); WiredTigerItem key(uri.c_str(), uri.size()); WiredTigerItem value(data.objdata(), data.objsize()); - _cursor->set_key(_cursor, key.Get()); - _cursor->set_value(_cursor, value.Get()); - invariantWTOK(_cursor->insert(_cursor)); + cursor->set_key(cursor, key.Get()); + cursor->set_value(cursor, value.Get()); + invariantWTOK(cursor->insert(cursor)); } txnOpen.done(); - invariantWTOK(session->commit_transaction(session, nullptr)); + invariantWTOK(session.getSession()->commit_transaction(session.getSession(), nullptr)); buffer.clear(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h index 60b2b885ae0..50eb23324c0 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h @@ -77,7 +77,7 @@ public: }; WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri, bool readOnly = false); - ~WiredTigerSizeStorer(); + ~WiredTigerSizeStorer() = default; /** * Ensure that the shared SizeInfo will be stored by the next call to flush. @@ -85,7 +85,7 @@ public: */ void store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo); - std::shared_ptr<SizeInfo> load(StringData uri) const; + std::shared_ptr<SizeInfo> load(OperationContext* opCtx, StringData uri) const; /** * Writes all changes to the underlying table. @@ -93,11 +93,13 @@ public: void flush(bool syncToDisk); private: - const WiredTigerSession _session; + WT_CONNECTION* _conn; + const std::string _storageUri; + const uint64_t _tableId; // Not persisted const bool _readOnly; - // Guards _cursor. Acquire *before* _bufferMutex. - mutable Mutex _cursorMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_cursorMutex"); - WT_CURSOR* _cursor; // pointer is const after constructor + + // Serializes flushes to disk. + Mutex _flushMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_flushMutex"); using Buffer = StringMap<std::shared_ptr<SizeInfo>>; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp index fc2414886b0..676c429791f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp @@ -98,7 +98,8 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) { rs.reset(nullptr); { - auto& info = *ss.load(uri); + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + auto& info = *ss.load(opCtx.get(), uri); ASSERT_EQUALS(N, info.numRecords.load()); } @@ -146,7 +147,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); const bool enableWtLogging = false; WiredTigerSizeStorer ss2(harnessHelper->conn(), indexUri, enableWtLogging); - auto info = ss2.load(uri); + auto info = ss2.load(opCtx.get(), uri); ASSERT_EQUALS(N, info->numRecords.load()); } @@ -176,12 +177,12 @@ private: } protected: - long long getNumRecords() const { - return sizeStorer->load(uri)->numRecords.load(); + long long getNumRecords(OperationContext* opCtx) const { + return sizeStorer->load(opCtx, uri)->numRecords.load(); } - long long getDataSize() const { - return sizeStorer->load(uri)->dataSize.load(); + long long getDataSize(OperationContext* opCtx) const { + return sizeStorer->load(opCtx, uri)->dataSize.load(); } std::unique_ptr<WiredTigerHarnessHelper> harnessHelper; @@ -196,8 +197,8 @@ TEST_F(SizeStorerUpdateTest, Basic) { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); long long val = 5; rs->updateStatsAfterRepair(opCtx.get(), val, val); - ASSERT_EQUALS(getNumRecords(), val); - ASSERT_EQUALS(getDataSize(), val); + ASSERT_EQUALS(getNumRecords(opCtx.get()), val); + ASSERT_EQUALS(getDataSize(opCtx.get()), val); } } // namespace |