diff options
4 files changed, 48 insertions, 44 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index eed04767a6e..30ba0eb4c45 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -722,7 +722,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, // the case for temporary RecordStores (those not associated with any collection) and in unit // tests. Persistent size information is not required in either case. If a RecordStore needs // persistent size information, we require it to use a SizeStorer. - _sizeInfo = _sizeStorer ? _sizeStorer->load(_uri) + _sizeInfo = _sizeStorer ? _sizeStorer->load(ctx, _uri) : std::make_shared<WiredTigerSizeStorer::SizeInfo>(0, 0); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp index be5dfca336e..9decf12275f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp @@ -50,22 +50,20 @@ namespace mongo { WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri, bool readOnly) - : _session(conn), _readOnly(readOnly) { - WT_SESSION* session = _session.getSession(); + : _conn(conn), + _storageUri(storageUri), + _tableId(WiredTigerSession::genTableId()), + _readOnly(readOnly) { + if (_readOnly) { + return; + } std::string config = WiredTigerCustomizationHooks::get(getGlobalServiceContext()) - ->getTableCreateConfig(storageUri); - if (!readOnly) { - invariantWTOK(session->create(session, storageUri.c_str(), config.c_str())); - } + ->getTableCreateConfig(_storageUri); + WiredTigerSession session(_conn); invariantWTOK( - session->open_cursor(session, storageUri.c_str(), nullptr, "overwrite=true", &_cursor)); -} - -WiredTigerSizeStorer::~WiredTigerSizeStorer() { - stdx::lock_guard<Latch> cursorLock(_cursorMutex); - _cursor->close(_cursor); + session.getSession()->create(session.getSession(), _storageUri.c_str(), config.c_str())); } void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo) { @@ -87,7 +85,8 @@ void WiredTigerSizeStorer::store(StringData uri, std::shared_ptr<SizeInfo> sizeI << ", dataSize: " << sizeInfo->dataSize.load() << ", use_count: " << entry.use_count(); } -std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(StringData uri) const { +std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(OperationContext* opCtx, + StringData uri) const { { // Check if we can satisfy the read from the buffer. stdx::lock_guard<Latch> bufferLock(_bufferMutex); @@ -96,23 +95,19 @@ std::shared_ptr<WiredTigerSizeStorer::SizeInfo> WiredTigerSizeStorer::load(Strin return it->second; } - stdx::lock_guard<Latch> cursorLock(_cursorMutex); - // Intentionally ignoring return value. - ON_BLOCK_EXIT([&] { _cursor->reset(_cursor); }); - - _cursor->reset(_cursor); + WiredTigerCursor cursor(_storageUri, _tableId, /*allowOverwrite=*/false, opCtx); { WT_ITEM key = {uri.rawData(), uri.size()}; - _cursor->set_key(_cursor, &key); - int ret = _cursor->search(_cursor); + cursor->set_key(cursor.get(), &key); + int ret = cursor->search(cursor.get()); if (ret == WT_NOTFOUND) return std::make_shared<SizeInfo>(); invariantWTOK(ret); } WT_ITEM value; - invariantWTOK(_cursor->get_value(_cursor, &value)); + invariantWTOK(cursor->get_value(cursor.get(), &value)); BSONObj data(reinterpret_cast<const char*>(value.data)); LOG(2) << "WiredTigerSizeStorer::load " << uri << " -> " << redact(data); @@ -131,11 +126,18 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) { return; // Nothing to do. Timer t; - stdx::lock_guard<Latch> cursorLock(_cursorMutex); + + // We serialize flushing to disk to avoid running into write conflicts from having multiple + // threads try to flush at the same time. + stdx::lock_guard<Latch> flushLock(_flushMutex); + + // When the session is destructed, it closes any cursors that remain open. + WiredTigerSession session(_conn); + WT_CURSOR* cursor = session.getCursor(_storageUri, _tableId, true); + { // On failure, place entries back into the map, unless a newer value already exists. ON_BLOCK_EXIT([this, &buffer]() { - this->_cursor->reset(this->_cursor); if (!buffer.empty()) { stdx::lock_guard<Latch> bufferLock(this->_bufferMutex); for (auto& it : buffer) @@ -150,8 +152,7 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) { if (syncToDisk) { txnConfig += ",sync=true"; } - WT_SESSION* session = _session.getSession(); - WiredTigerBeginTxnBlock txnOpen(session, txnConfig.c_str()); + WiredTigerBeginTxnBlock txnOpen(session.getSession(), txnConfig.c_str()); for (auto it = buffer.begin(); it != buffer.end(); ++it) { @@ -167,9 +168,9 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) { LOG(2) << "WiredTigerSizeStorer::flush " << uri << " -> " << redact(data); WiredTigerItem key(uri.c_str(), uri.size()); WiredTigerItem value(data.objdata(), data.objsize()); - _cursor->set_key(_cursor, key.Get()); - _cursor->set_value(_cursor, value.Get()); - int ret = _cursor->insert(_cursor); + cursor->set_key(cursor, key.Get()); + cursor->set_value(cursor, value.Get()); + int ret = cursor->insert(cursor); if (ret == WT_ROLLBACK) { // One of the code paths calling this function is when a session is checked back // into the session cache. This could involve read-only operations which don't @@ -180,7 +181,7 @@ void WiredTigerSizeStorer::flush(bool syncToDisk) { invariantWTOK(ret); } txnOpen.done(); - invariantWTOK(session->commit_transaction(session, nullptr)); + invariantWTOK(session.getSession()->commit_transaction(session.getSession(), nullptr)); buffer.clear(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h index c143b9e87f5..99d2f16ec7d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h @@ -79,7 +79,7 @@ public: WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri, const bool readOnly = false); - ~WiredTigerSizeStorer(); + ~WiredTigerSizeStorer() = default; /** * Ensure that the shared SizeInfo will be stored by the next call to flush. @@ -87,7 +87,7 @@ public: */ void store(StringData uri, std::shared_ptr<SizeInfo> sizeInfo); - std::shared_ptr<SizeInfo> load(StringData uri) const; + std::shared_ptr<SizeInfo> load(OperationContext* opCtx, StringData uri) const; /** * Writes all changes to the underlying table. @@ -95,11 +95,13 @@ public: void flush(bool syncToDisk); private: - const WiredTigerSession _session; + WT_CONNECTION* _conn; + const std::string _storageUri; + const uint64_t _tableId; // Not persisted const bool _readOnly; - // Guards _cursor. Acquire *before* _bufferMutex. - mutable Mutex _cursorMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_cursorMutex"); - WT_CURSOR* _cursor; // pointer is const after constructor + + // Serializes flushes to disk. + Mutex _flushMutex = MONGO_MAKE_LATCH("WiredTigerSessionStorer::_flushMutex"); using Buffer = StringMap<std::shared_ptr<SizeInfo>>; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp index 2da84b493cc..c446e637b3d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp @@ -249,7 +249,8 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) { rs.reset(NULL); { - auto& info = *ss.load(uri); + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + auto& info = *ss.load(opCtx.get(), uri); ASSERT_EQUALS(N, info.numRecords.load()); } @@ -295,7 +296,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); const bool enableWtLogging = false; WiredTigerSizeStorer ss2(harnessHelper->conn(), indexUri, enableWtLogging); - auto info = ss2.load(uri); + auto info = ss2.load(opCtx.get(), uri); ASSERT_EQUALS(N, info->numRecords.load()); } @@ -325,12 +326,12 @@ private: } protected: - long long getNumRecords() const { - return sizeStorer->load(uri)->numRecords.load(); + long long getNumRecords(OperationContext* opCtx) const { + return sizeStorer->load(opCtx, uri)->numRecords.load(); } - long long getDataSize() const { - return sizeStorer->load(uri)->dataSize.load(); + long long getDataSize(OperationContext* opCtx) const { + return sizeStorer->load(opCtx, uri)->dataSize.load(); } std::unique_ptr<WiredTigerHarnessHelper> harnessHelper; @@ -345,8 +346,8 @@ TEST_F(SizeStorerUpdateTest, Basic) { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); long long val = 5; rs->updateStatsAfterRepair(opCtx.get(), val, val); - ASSERT_EQUALS(getNumRecords(), val); - ASSERT_EQUALS(getDataSize(), val); + ASSERT_EQUALS(getNumRecords(opCtx.get()), val); + ASSERT_EQUALS(getDataSize(opCtx.get()), val); } } // namespace |