diff options
author | Mathias Stearn <mathias@10gen.com> | 2015-02-12 13:41:04 -0500 |
---|---|---|
committer | Ramon Fernandez <ramon.fernandez@mongodb.com> | 2015-02-12 14:33:14 -0500 |
commit | 3eeddde0149962a3e3ed3656da7f851d17028ec7 (patch) | |
tree | b75a6cfaec4341358ad4b3d1dc3014f7de1b5346 | |
parent | af66c7f7ca777d81275ae1994ba6bfebbda17e39 (diff) | |
download | mongo-3eeddde0149962a3e3ed3656da7f851d17028ec7.tar.gz |
SERVER-17271 WiredTiger SizeStorer now uses a single WT_CURSOR for its lifetime
(cherry picked from commit 347da921b66b57142fbec68f143132c35cfd3396)
Conflicts:
src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
5 files changed, 114 insertions, 85 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index fbe0a8da1ab..8bb18967ede 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -176,9 +176,8 @@ namespace mongo { log() << "Repairing size cache"; fassertNoTrace(28577, _salvageIfNeeded(_sizeStorerUri.c_str())); } - WiredTigerSizeStorer* ss = new WiredTigerSizeStorer(); - ss->loadFrom( &session, _sizeStorerUri ); - _sizeStorer.reset( ss ); + _sizeStorer.reset(new WiredTigerSizeStorer(_conn, _sizeStorerUri)); + _sizeStorer->fillCache(); } } @@ -215,9 +214,9 @@ namespace mongo { const StringData& toNS, const StringData& ident, const RecordStore* originalRecordStore ) const { - _sizeStorer->store( _uri( ident ), - originalRecordStore->numRecords( opCtx ), - originalRecordStore->dataSize( opCtx ) ); + _sizeStorer->storeToCache(_uri( ident ), + originalRecordStore->numRecords( opCtx ), + originalRecordStore->dataSize( opCtx ) ); syncSizeInfo(true); return Status::OK(); } @@ -276,14 +275,10 @@ namespace mongo { return; try { - WiredTigerSession session(_conn); - WT_SESSION* s = session.getSession(); - invariantWTOK( s->begin_transaction( s, sync ? "sync=true" : NULL ) ); - _sizeStorer->storeInto( &session, _sizeStorerUri ); - invariantWTOK( s->commit_transaction( s, NULL ) ); + _sizeStorer->syncCache(sync); } catch (const WriteConflictException&) { - // ignore, it means someone else is doing it + // ignore, we'll try again later. } } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 9d15e025e02..8d853163711 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -219,7 +219,7 @@ namespace { if ( _sizeStorer ) { long long numRecords; long long dataSize; - _sizeStorer->load( uri, &numRecords, &dataSize ); + _sizeStorer->loadFromCache( uri, &numRecords, &dataSize ); _numRecords.store( numRecords ); _dataSize.store( dataSize ); _sizeStorer->onCreate( this, numRecords, dataSize ); @@ -239,7 +239,7 @@ namespace { } if ( _sizeStorer ) { - _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() ); + _sizeStorer->storeToCache( _uri, _numRecords.load(), _dataSize.load() ); } } @@ -257,7 +257,6 @@ namespace { LOG(1) << "~WiredTigerRecordStore for: " << ns(); if ( _sizeStorer ) { _sizeStorer->onDestroy( this ); - _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() ); } } @@ -766,7 +765,7 @@ namespace { long long oldNumRecords; long long oldDataSize; - _sizeStorer->load(_uri, &oldNumRecords, &oldDataSize); + _sizeStorer->loadFromCache(_uri, &oldNumRecords, &oldDataSize); if (nrecords != oldNumRecords || dataSizeTotal != oldDataSize) { warning() << _uri << ": Existing data in size storer (" << oldNumRecords << " records " << oldDataSize << " bytes) " @@ -775,7 +774,7 @@ namespace { << "Updating size storer with new values."; } - _sizeStorer->store(_uri, _numRecords.load(), _dataSize.load()); + _sizeStorer->storeToCache(_uri, _numRecords.load(), _dataSize.load()); } output->appendNumber( "nrecords", nrecords ); @@ -932,7 +931,7 @@ namespace { long long dataSize) { _numRecords.store(numRecords); _dataSize.store(dataSize); - _sizeStorer->store(_uri, numRecords, dataSize); + _sizeStorer->storeToCache(_uri, numRecords, dataSize); } RecordId WiredTigerRecordStore::_nextId() { @@ -996,7 +995,7 @@ namespace { } if ( _sizeStorer && _sizeStorerCounter++ % 1000 == 0 ) { - _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() ); + _sizeStorer->storeToCache( _uri, _numRecords.load(), _dataSize.load() ); } } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index 66a6943c0a8..efecfe86c56 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -137,6 +137,9 @@ namespace mongo { virtual RecoveryUnit* newRecoveryUnit() { return new WiredTigerRecoveryUnit( _sessionCache ); } + + WT_CONNECTION* conn() const { return _conn; } + private: unittest::TempDir _dbpath; WT_CONNECTION* _conn; @@ -271,12 +274,13 @@ namespace mongo { } TEST(WiredTigerRecordStoreTest, SizeStorer1 ) { - scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() ); + scoped_ptr<WiredTigerHarnessHelper> harnessHelper(new WiredTigerHarnessHelper()); scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() ); string uri = checked_cast<WiredTigerRecordStore*>( rs.get() )->getURI(); - WiredTigerSizeStorer ss; + string indexUri = "table:myindex"; + WiredTigerSizeStorer ss(harnessHelper->conn(), indexUri); checked_cast<WiredTigerRecordStore*>( rs.get() )->setSizeStorer( &ss ); int N = 12; @@ -303,7 +307,7 @@ namespace mongo { { long long numRecords; long long dataSize; - ss.load( uri, &numRecords, &dataSize ); + ss.loadFromCache( uri, &numRecords, &dataSize ); ASSERT_EQUALS( N, numRecords ); } @@ -318,7 +322,6 @@ namespace mongo { ASSERT_EQUALS( N, rs->numRecords( opCtx.get() ) ); } - string indexUri = "table:myindex"; { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); WiredTigerRecoveryUnit* ru = @@ -331,22 +334,16 @@ namespace mongo { uow.commit(); } - { - WriteUnitOfWork uow( opCtx.get() ); - ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()), - indexUri ); - uow.commit(); - } + ss.syncCache(true); } { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - WiredTigerSizeStorer ss2; - ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()), - indexUri ); + WiredTigerSizeStorer ss2(harnessHelper->conn(), indexUri); + ss2.fillCache(); long long numRecords; long long dataSize; - ss2.load( uri, &numRecords, &dataSize ); + ss2.loadFromCache( uri, &numRecords, &dataSize ); ASSERT_EQUALS( N, numRecords ); } @@ -375,7 +372,7 @@ namespace { private: virtual void setUp() { harnessHelper.reset(new WiredTigerHarnessHelper()); - sizeStorer.reset(new WiredTigerSizeStorer()); + sizeStorer.reset(new WiredTigerSizeStorer(harnessHelper->conn(), "table:sizeStorer")); rs.reset(harnessHelper->newNonCappedRecordStore()); WiredTigerRecordStore* wtrs = checked_cast<WiredTigerRecordStore*>(rs.get()); wtrs->setSizeStorer(sizeStorer.get()); @@ -393,7 +390,7 @@ namespace { } ASSERT_EQUALS(expectedNumRecords, rs->numRecords(NULL)); ASSERT_EQUALS(expectedDataSize, rs->dataSize(NULL)); - sizeStorer->store(uri, 0, 0); + sizeStorer->storeToCache(uri, 0, 0); } virtual void tearDown() { expectedNumRecords = 0; @@ -409,14 +406,14 @@ namespace { long long getNumRecords() const { long long numRecords; long long unused; - sizeStorer->load(uri, &numRecords, &unused); + sizeStorer->loadFromCache(uri, &numRecords, &unused); return numRecords; } long long getDataSize() const { long long unused; long long dataSize; - sizeStorer->load(uri, &unused, &dataSize); + sizeStorer->loadFromCache(uri, &unused, &dataSize); return dataSize; } @@ -472,7 +469,7 @@ namespace { rs.reset(NULL); scoped_ptr<OperationContext> opCtx(harnessHelper->newOperationContext()); - sizeStorer->store(uri, expectedNumRecords*2, expectedDataSize*2); + sizeStorer->storeToCache(uri, expectedNumRecords*2, expectedDataSize*2); rs.reset(new WiredTigerRecordStore(opCtx.get(), "a.b", uri, false, -1, -1, NULL, sizeStorer.get())); ASSERT_EQUALS(expectedNumRecords*2, rs->numRecords(NULL)); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp index aad21c18eaa..f3936c84acc 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp @@ -40,6 +40,7 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -49,16 +50,34 @@ namespace mongo { int MAGIC = 123123; } - WiredTigerSizeStorer::WiredTigerSizeStorer() { + WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri) + : _session(conn) + { + WT_SESSION* session = _session.getSession(); + int ret = session->open_cursor(session, storageUri.c_str(), NULL, + "overwrite=true", &_cursor); + if (ret == ENOENT) { + // Need to create table. + // TODO any config options we want? + invariantWTOK(session->create(session, storageUri.c_str(), NULL)); + ret = session->open_cursor(session, storageUri.c_str(), NULL, + "overwrite=true", &_cursor); + } + invariantWTOK(ret); + _magic = MAGIC; } WiredTigerSizeStorer::~WiredTigerSizeStorer() { + // This shouldn't be necessary, but protects us if we screw up. + boost::mutex::scoped_lock cursorLock( _cursorMutex ); + _magic = 11111; + _cursor->close(_cursor); } void WiredTigerSizeStorer::_checkMagic() const { - if ( _magic == MAGIC ) + if ( MONGO_likely(_magic == MAGIC) ) return; log() << "WiredTigerSizeStorer magic wrong: " << _magic; invariant( _magic == MAGIC ); @@ -86,8 +105,8 @@ namespace mongo { } - void WiredTigerSizeStorer::store( const StringData& uri, - long long numRecords, long long dataSize ) { + void WiredTigerSizeStorer::storeToCache( const StringData& uri, + long long numRecords, long long dataSize ) { _checkMagic(); boost::mutex::scoped_lock lk( _entriesMutex ); Entry& entry = _entries[uri.toString()]; @@ -96,8 +115,8 @@ namespace mongo { entry.dirty = true; } - void WiredTigerSizeStorer::load( const StringData& uri, - long long* numRecords, long long* dataSize ) const { + void WiredTigerSizeStorer::loadFromCache( const StringData& uri, + long long* numRecords, long long* dataSize ) const { _checkMagic(); boost::mutex::scoped_lock lk( _entriesMutex ); Map::const_iterator it = _entries.find( uri.toString() ); @@ -110,26 +129,26 @@ namespace mongo { *dataSize = it->second.dataSize; } - void WiredTigerSizeStorer::loadFrom( WiredTigerSession* session, - const std::string& uri ) { + void WiredTigerSizeStorer::fillCache() { + boost::mutex::scoped_lock cursorLock( _cursorMutex ); _checkMagic(); Map m; { - WT_SESSION* s = session->getSession(); - WT_CURSOR* c = NULL; - int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c ); - if ( ret == ENOENT ) { - // doesn't exist, we'll create later - return; - } - invariantWTOK( ret ); + // Seek to beginning if needed. + invariantWTOK(_cursor->reset(_cursor)); + + // Intentionally ignoring return value. + ON_BLOCK_EXIT(_cursor->reset, _cursor); + + int cursorNextRet; + while ((cursorNextRet = _cursor->next(_cursor)) != WT_NOTFOUND) { + invariantWTOK(cursorNextRet); - while ( c->next(c) == 0 ) { WT_ITEM key; WT_ITEM value; - invariantWTOK( c->get_key(c, &key ) ); - invariantWTOK( c->get_value(c, &value ) ); + invariantWTOK( _cursor->get_key(_cursor, &key ) ); + invariantWTOK( _cursor->get_value(_cursor, &value ) ); std::string uriKey( reinterpret_cast<const char*>( key.data ), key.size ); BSONObj data( reinterpret_cast<const char*>( value.data ) ); @@ -141,15 +160,16 @@ namespace mongo { e.dirty = false; e.rs = NULL; } - invariantWTOK( c->close(c) ); } boost::mutex::scoped_lock lk( _entriesMutex ); - _entries = m; + _entries.swap(m); } - void WiredTigerSizeStorer::storeInto( WiredTigerSession* session, - const std::string& uri ) { + void WiredTigerSizeStorer::syncCache(bool syncToDisk) { + boost::mutex::scoped_lock cursorLock( _cursorMutex ); + _checkMagic(); + Map myMap; { boost::mutex::scoped_lock lk( _entriesMutex ); @@ -173,14 +193,12 @@ namespace mongo { } } - WT_SESSION* s = session->getSession(); - WT_CURSOR* c = NULL; - int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c ); - if ( ret == ENOENT ) { - invariantWTOK( s->create( s, uri.c_str(), "" ) ); - ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c ); - } - invariantWTOK( ret ); + if (myMap.empty()) + return; // Nothing to do. + + WT_SESSION* session = _session.getSession(); + invariantWTOK(session->begin_transaction(session, syncToDisk ? "sync=true" : "")); + ScopeGuard rollbacker = MakeGuard(session->rollback_transaction, session, ""); for ( Map::iterator it = myMap.begin(); it != myMap.end(); ++it ) { string uriKey = it->first; @@ -198,16 +216,22 @@ namespace mongo { WiredTigerItem key( uriKey.c_str(), uriKey.size() ); WiredTigerItem value( data.objdata(), data.objsize() ); - c->set_key( c, key.Get() ); - c->set_value( c, value.Get() ); - invariantWTOK( c->insert(c) ); - entry.dirty = false; - - c->reset(c); + _cursor->set_key( _cursor, key.Get() ); + _cursor->set_value( _cursor, value.Get() ); + invariantWTOK( _cursor->insert(_cursor) ); } - invariantWTOK( c->close(c) ); + invariantWTOK(_cursor->reset(_cursor)); + + rollbacker.Dismiss(); + invariantWTOK(session->commit_transaction(session, NULL)); + { + boost::mutex::scoped_lock lk( _entriesMutex ); + for (Map::iterator it = _entries.begin(); it != _entries.end(); ++it) { + it->second.dirty = false; + } + } } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h index 15df6874139..c046b034991 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h @@ -31,12 +31,13 @@ #pragma once +#include <boost/thread/mutex.hpp> #include <map> #include <string> - -#include <boost/thread/mutex.hpp> +#include <wiredtiger.h> #include "mongo/base/string_data.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" namespace mongo { @@ -45,20 +46,27 @@ namespace mongo { class WiredTigerSizeStorer { public: - WiredTigerSizeStorer(); + WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri); ~WiredTigerSizeStorer(); void onCreate( WiredTigerRecordStore* rs, long long nr, long long ds ); void onDestroy( WiredTigerRecordStore* rs ); - void store( const StringData& uri, - long long numRecords, long long dataSize ); + void storeToCache( const StringData& uri, + long long numRecords, long long dataSize ); - void load( const StringData& uri, - long long* numRecords, long long* dataSize ) const; + void loadFromCache( const StringData& uri, + long long* numRecords, long long* dataSize ) const; - void loadFrom( WiredTigerSession* cursor, const std::string& uri ); - void storeInto( WiredTigerSession* cursor, const std::string& uri ); + /** + * Loads from the underlying table. + */ + void fillCache(); + + /** + * Writes all changes to the underlying table. + */ + void syncCache(bool syncToDisk); private: void _checkMagic() const; @@ -73,9 +81,15 @@ namespace mongo { int _magic; + // Guards _cursor. Acquire *before* _entriesMutex. + mutable boost::mutex _cursorMutex; + const WiredTigerSession _session; + WT_CURSOR* _cursor; // pointer is const after constructor + typedef std::map<std::string,Entry> Map; Map _entries; mutable boost::mutex _entriesMutex; + }; } |