summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2015-02-12 13:41:04 -0500
committerRamon Fernandez <ramon.fernandez@mongodb.com>2015-02-12 14:33:14 -0500
commit3eeddde0149962a3e3ed3656da7f851d17028ec7 (patch)
treeb75a6cfaec4341358ad4b3d1dc3014f7de1b5346
parentaf66c7f7ca777d81275ae1994ba6bfebbda17e39 (diff)
downloadmongo-3eeddde0149962a3e3ed3656da7f851d17028ec7.tar.gz
SERVER-17271 WiredTiger SizeStorer now uses a single WT_CURSOR for its lifetime
(cherry picked from commit 347da921b66b57142fbec68f143132c35cfd3396) Conflicts: src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp19
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp13
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp35
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp100
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h32
5 files changed, 114 insertions, 85 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index fbe0a8da1ab..8bb18967ede 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -176,9 +176,8 @@ namespace mongo {
log() << "Repairing size cache";
fassertNoTrace(28577, _salvageIfNeeded(_sizeStorerUri.c_str()));
}
- WiredTigerSizeStorer* ss = new WiredTigerSizeStorer();
- ss->loadFrom( &session, _sizeStorerUri );
- _sizeStorer.reset( ss );
+ _sizeStorer.reset(new WiredTigerSizeStorer(_conn, _sizeStorerUri));
+ _sizeStorer->fillCache();
}
}
@@ -215,9 +214,9 @@ namespace mongo {
const StringData& toNS,
const StringData& ident,
const RecordStore* originalRecordStore ) const {
- _sizeStorer->store( _uri( ident ),
- originalRecordStore->numRecords( opCtx ),
- originalRecordStore->dataSize( opCtx ) );
+ _sizeStorer->storeToCache(_uri( ident ),
+ originalRecordStore->numRecords( opCtx ),
+ originalRecordStore->dataSize( opCtx ) );
syncSizeInfo(true);
return Status::OK();
}
@@ -276,14 +275,10 @@ namespace mongo {
return;
try {
- WiredTigerSession session(_conn);
- WT_SESSION* s = session.getSession();
- invariantWTOK( s->begin_transaction( s, sync ? "sync=true" : NULL ) );
- _sizeStorer->storeInto( &session, _sizeStorerUri );
- invariantWTOK( s->commit_transaction( s, NULL ) );
+ _sizeStorer->syncCache(sync);
}
catch (const WriteConflictException&) {
- // ignore, it means someone else is doing it
+ // ignore, we'll try again later.
}
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 9d15e025e02..8d853163711 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -219,7 +219,7 @@ namespace {
if ( _sizeStorer ) {
long long numRecords;
long long dataSize;
- _sizeStorer->load( uri, &numRecords, &dataSize );
+ _sizeStorer->loadFromCache( uri, &numRecords, &dataSize );
_numRecords.store( numRecords );
_dataSize.store( dataSize );
_sizeStorer->onCreate( this, numRecords, dataSize );
@@ -239,7 +239,7 @@ namespace {
}
if ( _sizeStorer ) {
- _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() );
+ _sizeStorer->storeToCache( _uri, _numRecords.load(), _dataSize.load() );
}
}
@@ -257,7 +257,6 @@ namespace {
LOG(1) << "~WiredTigerRecordStore for: " << ns();
if ( _sizeStorer ) {
_sizeStorer->onDestroy( this );
- _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() );
}
}
@@ -766,7 +765,7 @@ namespace {
long long oldNumRecords;
long long oldDataSize;
- _sizeStorer->load(_uri, &oldNumRecords, &oldDataSize);
+ _sizeStorer->loadFromCache(_uri, &oldNumRecords, &oldDataSize);
if (nrecords != oldNumRecords || dataSizeTotal != oldDataSize) {
warning() << _uri << ": Existing data in size storer ("
<< oldNumRecords << " records " << oldDataSize << " bytes) "
@@ -775,7 +774,7 @@ namespace {
<< "Updating size storer with new values.";
}
- _sizeStorer->store(_uri, _numRecords.load(), _dataSize.load());
+ _sizeStorer->storeToCache(_uri, _numRecords.load(), _dataSize.load());
}
output->appendNumber( "nrecords", nrecords );
@@ -932,7 +931,7 @@ namespace {
long long dataSize) {
_numRecords.store(numRecords);
_dataSize.store(dataSize);
- _sizeStorer->store(_uri, numRecords, dataSize);
+ _sizeStorer->storeToCache(_uri, numRecords, dataSize);
}
RecordId WiredTigerRecordStore::_nextId() {
@@ -996,7 +995,7 @@ namespace {
}
if ( _sizeStorer && _sizeStorerCounter++ % 1000 == 0 ) {
- _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() );
+ _sizeStorer->storeToCache( _uri, _numRecords.load(), _dataSize.load() );
}
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index 66a6943c0a8..efecfe86c56 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -137,6 +137,9 @@ namespace mongo {
virtual RecoveryUnit* newRecoveryUnit() {
return new WiredTigerRecoveryUnit( _sessionCache );
}
+
+ WT_CONNECTION* conn() const { return _conn; }
+
private:
unittest::TempDir _dbpath;
WT_CONNECTION* _conn;
@@ -271,12 +274,13 @@ namespace mongo {
}
TEST(WiredTigerRecordStoreTest, SizeStorer1 ) {
- scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() );
+ scoped_ptr<WiredTigerHarnessHelper> harnessHelper(new WiredTigerHarnessHelper());
scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() );
string uri = checked_cast<WiredTigerRecordStore*>( rs.get() )->getURI();
- WiredTigerSizeStorer ss;
+ string indexUri = "table:myindex";
+ WiredTigerSizeStorer ss(harnessHelper->conn(), indexUri);
checked_cast<WiredTigerRecordStore*>( rs.get() )->setSizeStorer( &ss );
int N = 12;
@@ -303,7 +307,7 @@ namespace mongo {
{
long long numRecords;
long long dataSize;
- ss.load( uri, &numRecords, &dataSize );
+ ss.loadFromCache( uri, &numRecords, &dataSize );
ASSERT_EQUALS( N, numRecords );
}
@@ -318,7 +322,6 @@ namespace mongo {
ASSERT_EQUALS( N, rs->numRecords( opCtx.get() ) );
}
- string indexUri = "table:myindex";
{
scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
WiredTigerRecoveryUnit* ru =
@@ -331,22 +334,16 @@ namespace mongo {
uow.commit();
}
- {
- WriteUnitOfWork uow( opCtx.get() );
- ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()),
- indexUri );
- uow.commit();
- }
+ ss.syncCache(true);
}
{
scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
- WiredTigerSizeStorer ss2;
- ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()),
- indexUri );
+ WiredTigerSizeStorer ss2(harnessHelper->conn(), indexUri);
+ ss2.fillCache();
long long numRecords;
long long dataSize;
- ss2.load( uri, &numRecords, &dataSize );
+ ss2.loadFromCache( uri, &numRecords, &dataSize );
ASSERT_EQUALS( N, numRecords );
}
@@ -375,7 +372,7 @@ namespace {
private:
virtual void setUp() {
harnessHelper.reset(new WiredTigerHarnessHelper());
- sizeStorer.reset(new WiredTigerSizeStorer());
+ sizeStorer.reset(new WiredTigerSizeStorer(harnessHelper->conn(), "table:sizeStorer"));
rs.reset(harnessHelper->newNonCappedRecordStore());
WiredTigerRecordStore* wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
wtrs->setSizeStorer(sizeStorer.get());
@@ -393,7 +390,7 @@ namespace {
}
ASSERT_EQUALS(expectedNumRecords, rs->numRecords(NULL));
ASSERT_EQUALS(expectedDataSize, rs->dataSize(NULL));
- sizeStorer->store(uri, 0, 0);
+ sizeStorer->storeToCache(uri, 0, 0);
}
virtual void tearDown() {
expectedNumRecords = 0;
@@ -409,14 +406,14 @@ namespace {
long long getNumRecords() const {
long long numRecords;
long long unused;
- sizeStorer->load(uri, &numRecords, &unused);
+ sizeStorer->loadFromCache(uri, &numRecords, &unused);
return numRecords;
}
long long getDataSize() const {
long long unused;
long long dataSize;
- sizeStorer->load(uri, &unused, &dataSize);
+ sizeStorer->loadFromCache(uri, &unused, &dataSize);
return dataSize;
}
@@ -472,7 +469,7 @@ namespace {
rs.reset(NULL);
scoped_ptr<OperationContext> opCtx(harnessHelper->newOperationContext());
- sizeStorer->store(uri, expectedNumRecords*2, expectedDataSize*2);
+ sizeStorer->storeToCache(uri, expectedNumRecords*2, expectedDataSize*2);
rs.reset(new WiredTigerRecordStore(opCtx.get(), "a.b", uri, false, -1, -1, NULL,
sizeStorer.get()));
ASSERT_EQUALS(expectedNumRecords*2, rs->numRecords(NULL));
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
index aad21c18eaa..f3936c84acc 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -49,16 +50,34 @@ namespace mongo {
int MAGIC = 123123;
}
- WiredTigerSizeStorer::WiredTigerSizeStorer() {
+ WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri)
+ : _session(conn)
+ {
+ WT_SESSION* session = _session.getSession();
+ int ret = session->open_cursor(session, storageUri.c_str(), NULL,
+ "overwrite=true", &_cursor);
+ if (ret == ENOENT) {
+ // Need to create table.
+ // TODO any config options we want?
+ invariantWTOK(session->create(session, storageUri.c_str(), NULL));
+ ret = session->open_cursor(session, storageUri.c_str(), NULL,
+ "overwrite=true", &_cursor);
+ }
+ invariantWTOK(ret);
+
_magic = MAGIC;
}
WiredTigerSizeStorer::~WiredTigerSizeStorer() {
+ // This shouldn't be necessary, but protects us if we screw up.
+ boost::mutex::scoped_lock cursorLock( _cursorMutex );
+
_magic = 11111;
+ _cursor->close(_cursor);
}
void WiredTigerSizeStorer::_checkMagic() const {
- if ( _magic == MAGIC )
+ if ( MONGO_likely(_magic == MAGIC) )
return;
log() << "WiredTigerSizeStorer magic wrong: " << _magic;
invariant( _magic == MAGIC );
@@ -86,8 +105,8 @@ namespace mongo {
}
- void WiredTigerSizeStorer::store( const StringData& uri,
- long long numRecords, long long dataSize ) {
+ void WiredTigerSizeStorer::storeToCache( const StringData& uri,
+ long long numRecords, long long dataSize ) {
_checkMagic();
boost::mutex::scoped_lock lk( _entriesMutex );
Entry& entry = _entries[uri.toString()];
@@ -96,8 +115,8 @@ namespace mongo {
entry.dirty = true;
}
- void WiredTigerSizeStorer::load( const StringData& uri,
- long long* numRecords, long long* dataSize ) const {
+ void WiredTigerSizeStorer::loadFromCache( const StringData& uri,
+ long long* numRecords, long long* dataSize ) const {
_checkMagic();
boost::mutex::scoped_lock lk( _entriesMutex );
Map::const_iterator it = _entries.find( uri.toString() );
@@ -110,26 +129,26 @@ namespace mongo {
*dataSize = it->second.dataSize;
}
- void WiredTigerSizeStorer::loadFrom( WiredTigerSession* session,
- const std::string& uri ) {
+ void WiredTigerSizeStorer::fillCache() {
+ boost::mutex::scoped_lock cursorLock( _cursorMutex );
_checkMagic();
Map m;
{
- WT_SESSION* s = session->getSession();
- WT_CURSOR* c = NULL;
- int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c );
- if ( ret == ENOENT ) {
- // doesn't exist, we'll create later
- return;
- }
- invariantWTOK( ret );
+ // Seek to beginning if needed.
+ invariantWTOK(_cursor->reset(_cursor));
+
+ // Intentionally ignoring return value.
+ ON_BLOCK_EXIT(_cursor->reset, _cursor);
+
+ int cursorNextRet;
+ while ((cursorNextRet = _cursor->next(_cursor)) != WT_NOTFOUND) {
+ invariantWTOK(cursorNextRet);
- while ( c->next(c) == 0 ) {
WT_ITEM key;
WT_ITEM value;
- invariantWTOK( c->get_key(c, &key ) );
- invariantWTOK( c->get_value(c, &value ) );
+ invariantWTOK( _cursor->get_key(_cursor, &key ) );
+ invariantWTOK( _cursor->get_value(_cursor, &value ) );
std::string uriKey( reinterpret_cast<const char*>( key.data ), key.size );
BSONObj data( reinterpret_cast<const char*>( value.data ) );
@@ -141,15 +160,16 @@ namespace mongo {
e.dirty = false;
e.rs = NULL;
}
- invariantWTOK( c->close(c) );
}
boost::mutex::scoped_lock lk( _entriesMutex );
- _entries = m;
+ _entries.swap(m);
}
- void WiredTigerSizeStorer::storeInto( WiredTigerSession* session,
- const std::string& uri ) {
+ void WiredTigerSizeStorer::syncCache(bool syncToDisk) {
+ boost::mutex::scoped_lock cursorLock( _cursorMutex );
+ _checkMagic();
+
Map myMap;
{
boost::mutex::scoped_lock lk( _entriesMutex );
@@ -173,14 +193,12 @@ namespace mongo {
}
}
- WT_SESSION* s = session->getSession();
- WT_CURSOR* c = NULL;
- int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c );
- if ( ret == ENOENT ) {
- invariantWTOK( s->create( s, uri.c_str(), "" ) );
- ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c );
- }
- invariantWTOK( ret );
+ if (myMap.empty())
+ return; // Nothing to do.
+
+ WT_SESSION* session = _session.getSession();
+ invariantWTOK(session->begin_transaction(session, syncToDisk ? "sync=true" : ""));
+ ScopeGuard rollbacker = MakeGuard(session->rollback_transaction, session, "");
for ( Map::iterator it = myMap.begin(); it != myMap.end(); ++it ) {
string uriKey = it->first;
@@ -198,16 +216,22 @@ namespace mongo {
WiredTigerItem key( uriKey.c_str(), uriKey.size() );
WiredTigerItem value( data.objdata(), data.objsize() );
- c->set_key( c, key.Get() );
- c->set_value( c, value.Get() );
- invariantWTOK( c->insert(c) );
- entry.dirty = false;
-
- c->reset(c);
+ _cursor->set_key( _cursor, key.Get() );
+ _cursor->set_value( _cursor, value.Get() );
+ invariantWTOK( _cursor->insert(_cursor) );
}
- invariantWTOK( c->close(c) );
+ invariantWTOK(_cursor->reset(_cursor));
+
+ rollbacker.Dismiss();
+ invariantWTOK(session->commit_transaction(session, NULL));
+ {
+ boost::mutex::scoped_lock lk( _entriesMutex );
+ for (Map::iterator it = _entries.begin(); it != _entries.end(); ++it) {
+ it->second.dirty = false;
+ }
+ }
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
index 15df6874139..c046b034991 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
@@ -31,12 +31,13 @@
#pragma once
+#include <boost/thread/mutex.hpp>
#include <map>
#include <string>
-
-#include <boost/thread/mutex.hpp>
+#include <wiredtiger.h>
#include "mongo/base/string_data.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
namespace mongo {
@@ -45,20 +46,27 @@ namespace mongo {
class WiredTigerSizeStorer {
public:
- WiredTigerSizeStorer();
+ WiredTigerSizeStorer(WT_CONNECTION* conn, const std::string& storageUri);
~WiredTigerSizeStorer();
void onCreate( WiredTigerRecordStore* rs, long long nr, long long ds );
void onDestroy( WiredTigerRecordStore* rs );
- void store( const StringData& uri,
- long long numRecords, long long dataSize );
+ void storeToCache( const StringData& uri,
+ long long numRecords, long long dataSize );
- void load( const StringData& uri,
- long long* numRecords, long long* dataSize ) const;
+ void loadFromCache( const StringData& uri,
+ long long* numRecords, long long* dataSize ) const;
- void loadFrom( WiredTigerSession* cursor, const std::string& uri );
- void storeInto( WiredTigerSession* cursor, const std::string& uri );
+ /**
+ * Loads from the underlying table.
+ */
+ void fillCache();
+
+ /**
+ * Writes all changes to the underlying table.
+ */
+ void syncCache(bool syncToDisk);
private:
void _checkMagic() const;
@@ -73,9 +81,15 @@ namespace mongo {
int _magic;
+ // Guards _cursor. Acquire *before* _entriesMutex.
+ mutable boost::mutex _cursorMutex;
+ const WiredTigerSession _session;
+ WT_CURSOR* _cursor; // pointer is const after constructor
+
typedef std::map<std::string,Entry> Map;
Map _entries;
mutable boost::mutex _entriesMutex;
+
};
}