diff options
author | Eliot Horowitz <eliot@10gen.com> | 2015-01-27 21:17:02 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2015-01-27 22:51:22 -0500 |
commit | bbd95ca6a8b538b4cffece0b9d9c3ed811a455a7 (patch) | |
tree | 4cef07ca31be4896a1ab91e432eb074f4d0254dd /src/mongo | |
parent | 8299d7435855820d16b25f4a66b572ddf6a11cf5 (diff) | |
download | mongo-bbd95ca6a8b538b4cffece0b9d9c3ed811a455a7.tar.gz |
SERVER-16951: option to limit # open wt transactions
Diffstat (limited to 'src/mongo')
23 files changed, 222 insertions, 75 deletions
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index a76e75ff0d1..dc9bbdfd024 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -222,7 +222,7 @@ namespace { _lock.lock(); LockRequestsMap::ConstIterator it = _requests.begin(); while (!it.finished()) { - ss << " " << it.key().toString(); + ss << " " << it.key().toString() << " held in " << modeName(it->mode); it.next(); } _lock.unlock(); @@ -786,6 +786,23 @@ namespace { } } + template<bool IsForMMAPV1> + bool LockerImpl<IsForMMAPV1>::hasStrongLocks() const { + if (!isLocked()) return false; + + boost::lock_guard<SpinLock> lk(_lock); + LockRequestsMap::ConstIterator it = _requests.begin(); + while (!it.finished()) { + if (it->mode == MODE_X || it->mode == MODE_S) { + return true; + } + + it.next(); + } + + return false; + } + // // Auto classes diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index 2022199c6d3..86847e7a475 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -244,6 +244,8 @@ namespace mongo { _lockPendingParallelWriter = newValue; } + virtual bool hasStrongLocks() const; + private: bool _batchWriter; diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index 81b1ccdddd1..f19edfe4d3f 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -277,6 +277,12 @@ namespace mongo { virtual bool isBatchWriter() const = 0; virtual void setLockPendingParallelWriter(bool newValue) = 0; + /** + * A string lock is MODE_X or MODE_S. + * These are incompatible with other locks and therefore are strong. + */ + virtual bool hasStrongLocks() const = 0; + protected: Locker() { } }; diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h index 44510c49f9d..eaa17b102d7 100644 --- a/src/mongo/db/concurrency/locker_noop.h +++ b/src/mongo/db/concurrency/locker_noop.h @@ -143,7 +143,7 @@ namespace mongo { } virtual bool isWriteLocked() const { - invariant(false); + return false; } virtual bool isReadLocked() const { @@ -170,6 +170,10 @@ namespace mongo { invariant(false); } + virtual bool hasStrongLocks() const { + return false; + } + }; } // namespace mongo diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 0a72c557fef..f12bdcea6b6 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -149,7 +149,7 @@ namespace mongo { _ended(false) { _txn->lockState()->beginWriteUnitOfWork(); - _txn->recoveryUnit()->beginUnitOfWork(); + _txn->recoveryUnit()->beginUnitOfWork(_txn); } ~WriteUnitOfWork() { diff --git a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp index 1690baea054..5ade0603a96 100644 --- a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp +++ b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp @@ -37,7 +37,7 @@ namespace mongo { invariant(_depth == 0); } - void InMemoryRecoveryUnit::beginUnitOfWork() { + void InMemoryRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { _depth++; } diff --git a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h index ddf38dff73e..78de9b71d68 100644 --- a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h +++ b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h @@ -45,7 +45,7 @@ namespace mongo { InMemoryRecoveryUnit() : _depth(0) {} virtual ~InMemoryRecoveryUnit(); - virtual void beginUnitOfWork(); + virtual void beginUnitOfWork(OperationContext* opCtx); virtual void commitUnitOfWork(); virtual void endUnitOfWork(); diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp index 5947cae00bd..85ff9e8e4f9 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp @@ -46,7 +46,7 @@ namespace mongo { } - void DurRecoveryUnit::beginUnitOfWork() { + void DurRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { _startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writes.size())); } diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h index 6307bf66b31..322dd170dcd 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h +++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h @@ -45,7 +45,7 @@ namespace mongo { virtual ~DurRecoveryUnit() { } - virtual void beginUnitOfWork(); + virtual void beginUnitOfWork(OperationContext* opCtx); virtual void commitUnitOfWork(); virtual void endUnitOfWork(); diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp index 3d8323d3682..9dfd6628a4a 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp @@ -112,7 +112,7 @@ namespace mongo { invariant( _depth == 0 ); } - void HeapRecordStoreBtreeRecoveryUnit::beginUnitOfWork() { + void HeapRecordStoreBtreeRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { _depth++; } diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h index 710a34a8eec..a3596d88851 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h @@ -189,7 +189,7 @@ namespace mongo { virtual ~HeapRecordStoreBtreeRecoveryUnit(); - virtual void beginUnitOfWork(); + virtual void beginUnitOfWork(OperationContext* opCtx); virtual void commitUnitOfWork(); virtual void endUnitOfWork(); diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index ac456e02b7c..15d2bae4466 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -37,6 +37,7 @@ namespace mongo { class BSONObjBuilder; + class OperationContext; /** * A RecoveryUnit is responsible for ensuring that data is persisted. @@ -72,7 +73,7 @@ namespace mongo { * * TODO see if we can get rid of nested UnitsOfWork. */ - virtual void beginUnitOfWork() = 0; + virtual void beginUnitOfWork(OperationContext* opCtx) = 0; virtual void commitUnitOfWork() = 0; virtual void endUnitOfWork() = 0; diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h index e6f560a7871..58c341fb76b 100644 --- a/src/mongo/db/storage/recovery_unit_noop.h +++ b/src/mongo/db/storage/recovery_unit_noop.h @@ -32,10 +32,12 @@ namespace mongo { + class OperationContext; + class RecoveryUnitNoop : public RecoveryUnit { public: // TODO implement rollback - virtual void beginUnitOfWork() {} + virtual void beginUnitOfWork(OperationContext* opCtx) {} virtual void commitUnitOfWork() {} virtual void endUnitOfWork() {} diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript index 1c5353fdd6d..dc0ccc3be30 100644 --- a/src/mongo/db/storage/wiredtiger/SConscript +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -30,6 +30,7 @@ if wiredtiger: '$BUILD_DIR/mongo/elapsed_tracker', '$BUILD_DIR/mongo/foundation', '$BUILD_DIR/mongo/processinfo', + '$BUILD_DIR/mongo/util/concurrency/ticketholder', '$BUILD_DIR/third_party/shim_wiredtiger', '$BUILD_DIR/third_party/shim_snappy', '$BUILD_DIR/third_party/shim_zlib', diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index 19a1e6d0817..0f854078e07 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -193,7 +193,7 @@ namespace { int WiredTigerIndex::Create(OperationContext* txn, const std::string& uri, const std::string& config) { - WT_SESSION* s = WiredTigerRecoveryUnit::get( txn )->getSession()->getSession(); + WT_SESSION* s = WiredTigerRecoveryUnit::get( txn )->getSession(txn)->getSession(); LOG(1) << "create uri: " << uri << " config: " << config; return s->create(s, uri.c_str(), config.c_str()); } @@ -301,7 +301,7 @@ namespace { output->append("type", type); } - WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn); WT_SESSION* s = session->getSession(); Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + uri(), "statistics=(fast)", output); @@ -340,7 +340,7 @@ namespace { } long long WiredTigerIndex::getSpaceUsedBytes( OperationContext* txn ) const { - WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn); return static_cast<long long>( WiredTigerUtil::getIdentSize( session->getSession(), _uri ) ); } @@ -399,7 +399,7 @@ namespace { WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) { // Open cursors can cause bulk open_cursor to fail with EBUSY. // TODO any other cases that could cause EBUSY? - WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_txn)->getSession(); + WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_txn)->getSession(_txn); outerSession->closeAllCursors(); // Not using cursor cache since we need to set "bulk". @@ -662,7 +662,7 @@ namespace { if ( !wt_keeptxnopen() && !_eof ) { // Ensure an active session exists, so any restored cursors will bind to it - WiredTigerRecoveryUnit::get(txn)->getSession(); + WiredTigerRecoveryUnit::get(txn)->getSession(txn); _locate(_savedLoc); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 84122dfa2b2..fbe0a8da1ab 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -224,13 +224,13 @@ namespace mongo { int64_t WiredTigerKVEngine::getIdentSize( OperationContext* opCtx, const StringData& ident ) { - WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx); return WiredTigerUtil::getIdentSize(session->getSession(), _uri(ident) ); } Status WiredTigerKVEngine::repairIdent( OperationContext* opCtx, const StringData& ident ) { - WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx); session->closeAllCursors(); string uri = _uri(ident); return _salvageIfNeeded(uri.c_str()); @@ -455,7 +455,8 @@ namespace mongo { } bool WiredTigerKVEngine::hasIdent(OperationContext* opCtx, const StringData& ident) const { - return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession()->getSession(), _uri(ident)); + return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(), + _uri(ident)); } bool WiredTigerKVEngine::_hasUri(WT_SESSION* session, const std::string& uri) const { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 2e77a729397..29b5346705b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -303,7 +303,7 @@ namespace { int64_t WiredTigerRecordStore::storageSize( OperationContext* txn, BSONObjBuilder* extraInfo, int infoLevel ) const { - WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn); StatusWith<int64_t> result = WiredTigerUtil::getStatisticsValueAs<int64_t>( session->getSession(), "statistics:" + getURI(), "statistics=(fast)", WT_STAT_DSRC_BLOCK_SIZE); @@ -447,7 +447,9 @@ namespace { invariant( realRecoveryUnit ); WiredTigerSessionCache* sc = realRecoveryUnit->getSessionCache(); txn->setRecoveryUnit( new WiredTigerRecoveryUnit( sc ) ); - WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession()->getSession(); + + WiredTigerRecoveryUnit::get(txn)->markNoTicketRequired(); // realRecoveryUnit already has + WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession(); int64_t dataSize = _dataSize.load(); int64_t numRecords = _numRecords.load(); @@ -783,7 +785,7 @@ namespace { output->appendNumber( "nrecords", nrecords ); - WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn); WT_SESSION* s = session->getSession(); BSONObjBuilder bob(output->subobjStart(kWiredTigerEngineName)); Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + getURI(), @@ -804,7 +806,7 @@ namespace { result->appendIntOrLL( "max", _cappedMaxDocs ); result->appendIntOrLL( "maxSize", static_cast<long long>(_cappedMaxSize / scale) ); } - WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn); WT_SESSION* s = session->getSession(); BSONObjBuilder bob(result->subobjStart(kWiredTigerEngineName)); { @@ -1198,7 +1200,7 @@ namespace { invariant( _savedRecoveryUnit == txn->recoveryUnit() ); if ( needRestore || !wt_keeptxnopen() ) { // This will ensure an active session exists, so any restored cursors will bind to it - invariant(WiredTigerRecoveryUnit::get(txn)->getSession() == _cursor->getSession()); + invariant(WiredTigerRecoveryUnit::get(txn)->getSession(txn) == _cursor->getSession()); RecordId saved = _lastLoc; _locate(_lastLoc, false); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp index ab9099c2ea9..d2c0f30869b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp @@ -33,12 +33,14 @@ #include "mongo/platform/basic.h" +#include "mongo/base/checked_cast.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/global_environment_experiment.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" #include "mongo/util/background.h" #include "mongo/util/log.h" @@ -67,6 +69,7 @@ namespace mongo { } OperationContextImpl txn; + checked_cast<WiredTigerRecoveryUnit*>(txn.recoveryUnit())->markNoTicketRequired(); try { Lock::DBLock dbLock(txn.lockState(), _ns.db(), MODE_IX); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index 28b74c5908c..9a0f2d5e110 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -100,7 +100,7 @@ namespace mongo { { WriteUnitOfWork uow(&txn); - WT_SESSION* s = ru->getSession()->getSession(); + WT_SESSION* s = ru->getSession(&txn)->getSession(); invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) ); uow.commit(); } @@ -126,7 +126,7 @@ namespace mongo { { WriteUnitOfWork uow(&txn); - WT_SESSION* s = ru->getSession()->getSession(); + WT_SESSION* s = ru->getSession(&txn)->getSession(); invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) ); uow.commit(); } @@ -326,14 +326,15 @@ namespace mongo { { WriteUnitOfWork uow( opCtx.get() ); - WT_SESSION* s = ru->getSession()->getSession(); + WT_SESSION* s = ru->getSession(opCtx.get())->getSession(); invariantWTOK( s->create( s, indexUri.c_str(), "" ) ); uow.commit(); } { WriteUnitOfWork uow( opCtx.get() ); - ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri ); + ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()), + indexUri ); uow.commit(); } } @@ -341,7 +342,8 @@ namespace mongo { { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); WiredTigerSizeStorer ss2; - ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri ); + ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()), + indexUri ); long long numRecords; long long dataSize; ss2.load( uri, &numRecords, &dataSize ); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 93dd31e6528..a248f84368a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -34,11 +34,16 @@ #include <boost/thread/mutex.hpp> #include "mongo/base/checked_cast.h" +#include "mongo/base/init.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" #include "mongo/util/stacktrace.h" namespace mongo { @@ -82,7 +87,8 @@ namespace mongo { _myTransactionCount( 0 ), _everStartedWrite( false ), _currentlySquirreled( false ), - _syncing( false ) { + _syncing( false ), + _noTicketNeeded( false ) { } WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() { @@ -95,11 +101,13 @@ namespace mongo { } void WiredTigerRecoveryUnit::reportState( BSONObjBuilder* b ) const { - b->append( "wt_depth", _depth ); - b->append( "wt_active", _active ); - b->append( "wt_everStartedWrite", _everStartedWrite ); - if ( _active ) - b->append( "wt_millisSinceCommit", _timer.millis() ); + b->append("wt_depth", _depth); + b->append("wt_active", _active); + b->append("wt_everStartedWrite", _everStartedWrite); + b->append("wt_hasTicket", _ticket.hasTicket()); + b->appendNumber("wt_myTransactionCount", static_cast<long long>(_myTransactionCount)); + if (_active) + b->append("wt_millisSinceCommit", _timer.millis()); } void WiredTigerRecoveryUnit::_commit() { @@ -125,10 +133,11 @@ namespace mongo { _changes.clear(); } - void WiredTigerRecoveryUnit::beginUnitOfWork() { + void WiredTigerRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { invariant( !_currentlySquirreled ); _depth++; _everStartedWrite = true; + _getTicket(opCtx); } void WiredTigerRecoveryUnit::commitUnitOfWork() { @@ -176,13 +185,13 @@ namespace mongo { fassert( 28575, _active ); } - WiredTigerSession* WiredTigerRecoveryUnit::getSession() { + WiredTigerSession* WiredTigerRecoveryUnit::getSession(OperationContext* opCtx) { if ( !_session ) { _session = _sessionCache->getSession(); } if ( !_active ) { - _txnOpen(); + _txnOpen(opCtx); } return _session; } @@ -198,6 +207,81 @@ namespace mongo { _oplogReadTill = loc; } + namespace { + + + class TicketServerParameter : public ServerParameter { + MONGO_DISALLOW_COPYING(TicketServerParameter); + public: + TicketServerParameter(TicketHolder* holder, const std::string& name) + : ServerParameter(ServerParameterSet::getGlobal(), + name, + true, + true), + _holder( holder ) { + } + + virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name) { + b.append(name, _holder->outof()); + } + + virtual Status set( const BSONElement& newValueElement ) { + if (!newValueElement.isNumber()) + return Status(ErrorCodes::BadValue, + str::stream() << name() << " has to be a number"); + return _set(newValueElement.numberInt()); + } + + virtual Status setFromString( const std::string& str ) { + int num = 0; + Status status = parseNumberFromString(str, &num); + if (!status.isOK()) + return status; + return _set(num); + } + + Status _set(int newNum) { + if (newNum <= 0) { + return Status(ErrorCodes::BadValue, + str::stream() << name() << " has to be > 0"); + } + + return _holder->resize(newNum); + } + + private: + TicketHolder* _holder; + }; + + TicketHolder openWriteTransaction(128); + TicketServerParameter openWriteTransactionParam(&openWriteTransaction, + "wiredTigerConcurrentWriteTransactions"); + + TicketHolder openReadTransaction(256); + TicketServerParameter openReadTransactionParam(&openReadTransaction, + "wiredTigerConcurrentReadTransactions"); + + } + + void WiredTigerRecoveryUnit::appendGlobalStats(BSONObjBuilder& b) { + BSONObjBuilder bb(b.subobjStart("concurrentTransactions")); + { + BSONObjBuilder bbb(bb.subobjStart("write")); + bbb.append("out", openWriteTransaction.used()); + bbb.append("available", openWriteTransaction.available()); + bbb.append("totalTickets", openWriteTransaction.outof()); + bbb.done(); + } + { + BSONObjBuilder bbb(bb.subobjStart("read")); + bbb.append("out", openReadTransaction.used()); + bbb.append("available", openReadTransaction.available()); + bbb.append("totalTickets", openReadTransaction.outof()); + bbb.done(); + } + bb.done(); + } + void WiredTigerRecoveryUnit::_txnClose( bool commit ) { invariant( _active ); WT_SESSION *s = _session->getSession(); @@ -213,14 +297,49 @@ namespace mongo { } _active = false; _myTransactionCount++; + _ticket.reset(NULL); } uint64_t WiredTigerRecoveryUnit::getMyTransactionCount() const { return _myTransactionCount; } - void WiredTigerRecoveryUnit::_txnOpen() { + void WiredTigerRecoveryUnit::markNoTicketRequired() { + invariant(!_ticket.hasTicket()); + _noTicketNeeded = true; + } + + void WiredTigerRecoveryUnit::_getTicket(OperationContext* opCtx) { + // already have a ticket + if (_ticket.hasTicket()) + return; + + if (_noTicketNeeded) + return; + + bool writeLocked; + + // If we have a strong lock, waiting for a ticket can cause a deadlock. + if (opCtx != NULL && + opCtx->lockState() != NULL) { + if (opCtx->lockState()->hasStrongLocks()) + return; + writeLocked = opCtx->lockState()->isWriteLocked(); + } + else { + writeLocked = _everStartedWrite; + } + + TicketHolder* holder = writeLocked ? &openWriteTransaction : &openReadTransaction; + + holder->waitForTicket(); + _ticket.reset(holder); + } + + void WiredTigerRecoveryUnit::_txnOpen(OperationContext* opCtx) { invariant( !_active ); + _getTicket(opCtx); + WT_SESSION *s = _session->getSession(); _syncing = _syncing || awaitCommitData.numWaitingForSync.load() > 0; invariantWTOK( s->begin_transaction(s, _syncing ? "sync=true" : NULL) ); @@ -247,24 +366,10 @@ namespace mongo { WiredTigerCursor::WiredTigerCursor(const std::string& uri, uint64_t id, bool forRecordStore, - WiredTigerRecoveryUnit* ru) { - _init( uri, id, forRecordStore, ru ); - } - - WiredTigerCursor::WiredTigerCursor(const std::string& uri, - uint64_t id, - bool forRecordStore, OperationContext* txn) { - _init( uri, id, forRecordStore, WiredTigerRecoveryUnit::get( txn ) ); - } - - void WiredTigerCursor::_init( const std::string& uri, - uint64_t id, - bool forRecordStore, - WiredTigerRecoveryUnit* ru ) { _uriID = id; - _ru = ru; - _session = _ru->getSession(); + _ru = WiredTigerRecoveryUnit::get( txn ); + _session = _ru->getSession(txn); _cursor = _session->getCursor( uri, id, forRecordStore ); if ( !_cursor ) { error() << "no cursor for uri: " << uri; @@ -272,7 +377,6 @@ namespace mongo { } WiredTigerCursor::~WiredTigerCursor() { - invariant( _session == _ru->getSession() ); _session->releaseCursor( _uriID, _cursor ); _cursor = NULL; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index ed2b77bff00..e1247457420 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -40,6 +40,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/record_id.h" #include "mongo/db/storage/recovery_unit.h" +#include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/timer.h" namespace mongo { @@ -56,7 +57,7 @@ namespace mongo { virtual void reportState( BSONObjBuilder* b ) const; - virtual void beginUnitOfWork(); + virtual void beginUnitOfWork(OperationContext* opCtx); virtual void commitUnitOfWork(); @@ -81,7 +82,7 @@ namespace mongo { // ---- WT STUFF - WiredTigerSession* getSession(); + WiredTigerSession* getSession(OperationContext* opCtx); WiredTigerSessionCache* getSessionCache() { return _sessionCache; } bool inActiveTxn() const { return _active; } void assertInActiveTxn() const; @@ -92,15 +93,18 @@ namespace mongo { void setOplogReadTill( const RecordId& loc ); RecordId getOplogReadTill() const { return _oplogReadTill; } + void markNoTicketRequired(); + static WiredTigerRecoveryUnit* get(OperationContext *txn); + static void appendGlobalStats(BSONObjBuilder& b); private: void _abort(); void _commit(); void _txnClose( bool commit ); - void _txnOpen(); + void _txnOpen(OperationContext* opCtx); WiredTigerSessionCache* _sessionCache; // not owned WiredTigerSession* _session; // owned, but from pool @@ -116,6 +120,10 @@ namespace mongo { typedef OwnedPointerVector<Change> Changes; Changes _changes; + + bool _noTicketNeeded; + void _getTicket(OperationContext* opCtx); + TicketHolderReleaser _ticket; }; /** @@ -127,10 +135,7 @@ namespace mongo { uint64_t uriID, bool forRecordStore, OperationContext* txn); - WiredTigerCursor(const std::string& uri, - uint64_t uriID, - bool forRecordStore, - WiredTigerRecoveryUnit* ru); + ~WiredTigerCursor(); @@ -149,11 +154,6 @@ namespace mongo { void assertInActiveTxn() const { _ru->assertInActiveTxn(); } private: - void _init( const std::string& uri, - uint64_t uriID, - bool forRecordStore, - WiredTigerRecoveryUnit* ru ); - uint64_t _uriID; WiredTigerRecoveryUnit* _ru; // not owned WiredTigerSession* _session; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp index a640793cc29..25f68dfeb72 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp @@ -62,7 +62,7 @@ namespace mongo { const BSONElement& configElement) const { WiredTigerSession* session = - checked_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit())->getSession(); + checked_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit())->getSession(txn); invariant(session); WT_SESSION* s = session->getSession(); @@ -78,6 +78,8 @@ namespace mongo { bob.append("reason", status.reason()); } + WiredTigerRecoveryUnit::appendGlobalStats(bob); + return bob.obj(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp index cffc1a01284..e35ecd35f2f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp @@ -112,7 +112,7 @@ namespace mongo { void createSession(const char* config) { WT_SESSION* wtSession = - WiredTigerRecoveryUnit::get(_opCtx.get())->getSession()->getSession(); + WiredTigerRecoveryUnit::get(_opCtx.get())->getSession(_opCtx.get())->getSession(); ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, getURI(), config))); } private: @@ -262,7 +262,7 @@ namespace mongo { TEST(WiredTigerUtilTest, GetStatisticsValueMissingTable) { WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)"); WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache()); - WiredTigerSession* session = recoveryUnit.getSession(); + WiredTigerSession* session = recoveryUnit.getSession(NULL); StatusWith<uint64_t> result = WiredTigerUtil::getStatisticsValue(session->getSession(), "statistics:table:no_such_table", "statistics=(fast)", WT_STAT_DSRC_BLOCK_SIZE); ASSERT_NOT_OK(result.getStatus()); @@ -272,7 +272,7 @@ namespace mongo { TEST(WiredTigerUtilTest, GetStatisticsValueStatisticsDisabled) { WiredTigerUtilHarnessHelper harnessHelper("statistics=(none)"); WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache()); - WiredTigerSession* session = recoveryUnit.getSession(); + WiredTigerSession* session = recoveryUnit.getSession(NULL); WT_SESSION* wtSession = session->getSession(); ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL))); StatusWith<uint64_t> result = WiredTigerUtil::getStatisticsValue(session->getSession(), @@ -284,7 +284,7 @@ namespace mongo { TEST(WiredTigerUtilTest, GetStatisticsValueInvalidKey) { WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)"); WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache()); - WiredTigerSession* session = recoveryUnit.getSession(); + WiredTigerSession* session = recoveryUnit.getSession(NULL); WT_SESSION* wtSession = session->getSession(); ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL))); // Use connection statistics key which does not apply to a table. @@ -297,7 +297,7 @@ namespace mongo { TEST(WiredTigerUtilTest, GetStatisticsValueValidKey) { WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)"); WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache()); - WiredTigerSession* session = recoveryUnit.getSession(); + WiredTigerSession* session = recoveryUnit.getSession(NULL); WT_SESSION* wtSession = session->getSession(); ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL))); // Use connection statistics key which does not apply to a table. @@ -311,7 +311,7 @@ namespace mongo { TEST(WiredTigerUtilTest, GetStatisticsValueAsUInt8) { WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)"); WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache()); - WiredTigerSession* session = recoveryUnit.getSession(); + WiredTigerSession* session = recoveryUnit.getSession(NULL); WT_SESSION* wtSession = session->getSession(); ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL))); |