summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2015-01-27 21:17:02 -0500
committerDan Pasette <dan@mongodb.com>2015-01-27 23:44:25 -0500
commit81cb23e84653d66158bca6052e21cdae6eec64f4 (patch)
treedad18a13328cb99fa7f026e358f82ddeaa21f260
parent124386bc6077c1a9e56578fe800b31d43e15a648 (diff)
downloadmongo-81cb23e84653d66158bca6052e21cdae6eec64f4.tar.gz
SERVER-16951: option to limit # open wt transactions
(cherry picked from commit bbd95ca6a8b538b4cffece0b9d9c3ed811a455a7)
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp19
-rw-r--r--src/mongo/db/concurrency/lock_state.h2
-rw-r--r--src/mongo/db/concurrency/locker.h6
-rw-r--r--src/mongo/db/concurrency/locker_noop.h6
-rw-r--r--src/mongo/db/operation_context.h2
-rw-r--r--src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/in_memory/in_memory_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/heap_record_store_btree.h2
-rw-r--r--src/mongo/db/storage/recovery_unit.h3
-rw-r--r--src/mongo/db/storage/recovery_unit_noop.h4
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp10
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp12
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp12
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp158
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h24
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp4
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp12
23 files changed, 222 insertions, 75 deletions
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index a76e75ff0d1..dc9bbdfd024 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -222,7 +222,7 @@ namespace {
_lock.lock();
LockRequestsMap::ConstIterator it = _requests.begin();
while (!it.finished()) {
- ss << " " << it.key().toString();
+ ss << " " << it.key().toString() << " held in " << modeName(it->mode);
it.next();
}
_lock.unlock();
@@ -786,6 +786,23 @@ namespace {
}
}
+ template<bool IsForMMAPV1>
+ bool LockerImpl<IsForMMAPV1>::hasStrongLocks() const {
+ if (!isLocked()) return false;
+
+ boost::lock_guard<SpinLock> lk(_lock);
+ LockRequestsMap::ConstIterator it = _requests.begin();
+ while (!it.finished()) {
+ if (it->mode == MODE_X || it->mode == MODE_S) {
+ return true;
+ }
+
+ it.next();
+ }
+
+ return false;
+ }
+
//
// Auto classes
diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h
index 2022199c6d3..86847e7a475 100644
--- a/src/mongo/db/concurrency/lock_state.h
+++ b/src/mongo/db/concurrency/lock_state.h
@@ -244,6 +244,8 @@ namespace mongo {
_lockPendingParallelWriter = newValue;
}
+ virtual bool hasStrongLocks() const;
+
private:
bool _batchWriter;
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index 81b1ccdddd1..f19edfe4d3f 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -277,6 +277,12 @@ namespace mongo {
virtual bool isBatchWriter() const = 0;
virtual void setLockPendingParallelWriter(bool newValue) = 0;
+ /**
+ * A string lock is MODE_X or MODE_S.
+ * These are incompatible with other locks and therefore are strong.
+ */
+ virtual bool hasStrongLocks() const = 0;
+
protected:
Locker() { }
};
diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h
index 44510c49f9d..eaa17b102d7 100644
--- a/src/mongo/db/concurrency/locker_noop.h
+++ b/src/mongo/db/concurrency/locker_noop.h
@@ -143,7 +143,7 @@ namespace mongo {
}
virtual bool isWriteLocked() const {
- invariant(false);
+ return false;
}
virtual bool isReadLocked() const {
@@ -170,6 +170,10 @@ namespace mongo {
invariant(false);
}
+ virtual bool hasStrongLocks() const {
+ return false;
+ }
+
};
} // namespace mongo
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 0a72c557fef..f12bdcea6b6 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -149,7 +149,7 @@ namespace mongo {
_ended(false) {
_txn->lockState()->beginWriteUnitOfWork();
- _txn->recoveryUnit()->beginUnitOfWork();
+ _txn->recoveryUnit()->beginUnitOfWork(_txn);
}
~WriteUnitOfWork() {
diff --git a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp
index 1690baea054..5ade0603a96 100644
--- a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp
+++ b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp
@@ -37,7 +37,7 @@ namespace mongo {
invariant(_depth == 0);
}
- void InMemoryRecoveryUnit::beginUnitOfWork() {
+ void InMemoryRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
_depth++;
}
diff --git a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h
index ddf38dff73e..78de9b71d68 100644
--- a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h
+++ b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.h
@@ -45,7 +45,7 @@ namespace mongo {
InMemoryRecoveryUnit() : _depth(0) {}
virtual ~InMemoryRecoveryUnit();
- virtual void beginUnitOfWork();
+ virtual void beginUnitOfWork(OperationContext* opCtx);
virtual void commitUnitOfWork();
virtual void endUnitOfWork();
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
index 5947cae00bd..85ff9e8e4f9 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
@@ -46,7 +46,7 @@ namespace mongo {
}
- void DurRecoveryUnit::beginUnitOfWork() {
+ void DurRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
_startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writes.size()));
}
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
index 6307bf66b31..322dd170dcd 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
@@ -45,7 +45,7 @@ namespace mongo {
virtual ~DurRecoveryUnit() { }
- virtual void beginUnitOfWork();
+ virtual void beginUnitOfWork(OperationContext* opCtx);
virtual void commitUnitOfWork();
virtual void endUnitOfWork();
diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp
index 3d8323d3682..9dfd6628a4a 100644
--- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp
+++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp
@@ -112,7 +112,7 @@ namespace mongo {
invariant( _depth == 0 );
}
- void HeapRecordStoreBtreeRecoveryUnit::beginUnitOfWork() {
+ void HeapRecordStoreBtreeRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
_depth++;
}
diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
index 710a34a8eec..a3596d88851 100644
--- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
+++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
@@ -189,7 +189,7 @@ namespace mongo {
virtual ~HeapRecordStoreBtreeRecoveryUnit();
- virtual void beginUnitOfWork();
+ virtual void beginUnitOfWork(OperationContext* opCtx);
virtual void commitUnitOfWork();
virtual void endUnitOfWork();
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index ac456e02b7c..15d2bae4466 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -37,6 +37,7 @@
namespace mongo {
class BSONObjBuilder;
+ class OperationContext;
/**
* A RecoveryUnit is responsible for ensuring that data is persisted.
@@ -72,7 +73,7 @@ namespace mongo {
*
* TODO see if we can get rid of nested UnitsOfWork.
*/
- virtual void beginUnitOfWork() = 0;
+ virtual void beginUnitOfWork(OperationContext* opCtx) = 0;
virtual void commitUnitOfWork() = 0;
virtual void endUnitOfWork() = 0;
diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h
index e6f560a7871..58c341fb76b 100644
--- a/src/mongo/db/storage/recovery_unit_noop.h
+++ b/src/mongo/db/storage/recovery_unit_noop.h
@@ -32,10 +32,12 @@
namespace mongo {
+ class OperationContext;
+
class RecoveryUnitNoop : public RecoveryUnit {
public:
// TODO implement rollback
- virtual void beginUnitOfWork() {}
+ virtual void beginUnitOfWork(OperationContext* opCtx) {}
virtual void commitUnitOfWork() {}
virtual void endUnitOfWork() {}
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index 1c5353fdd6d..dc0ccc3be30 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -30,6 +30,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/elapsed_tracker',
'$BUILD_DIR/mongo/foundation',
'$BUILD_DIR/mongo/processinfo',
+ '$BUILD_DIR/mongo/util/concurrency/ticketholder',
'$BUILD_DIR/third_party/shim_wiredtiger',
'$BUILD_DIR/third_party/shim_snappy',
'$BUILD_DIR/third_party/shim_zlib',
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
index 19a1e6d0817..0f854078e07 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
@@ -193,7 +193,7 @@ namespace {
int WiredTigerIndex::Create(OperationContext* txn,
const std::string& uri,
const std::string& config) {
- WT_SESSION* s = WiredTigerRecoveryUnit::get( txn )->getSession()->getSession();
+ WT_SESSION* s = WiredTigerRecoveryUnit::get( txn )->getSession(txn)->getSession();
LOG(1) << "create uri: " << uri << " config: " << config;
return s->create(s, uri.c_str(), config.c_str());
}
@@ -301,7 +301,7 @@ namespace {
output->append("type", type);
}
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn);
WT_SESSION* s = session->getSession();
Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + uri(),
"statistics=(fast)", output);
@@ -340,7 +340,7 @@ namespace {
}
long long WiredTigerIndex::getSpaceUsedBytes( OperationContext* txn ) const {
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn);
return static_cast<long long>( WiredTigerUtil::getIdentSize( session->getSession(),
_uri ) );
}
@@ -399,7 +399,7 @@ namespace {
WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) {
// Open cursors can cause bulk open_cursor to fail with EBUSY.
// TODO any other cases that could cause EBUSY?
- WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_txn)->getSession();
+ WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_txn)->getSession(_txn);
outerSession->closeAllCursors();
// Not using cursor cache since we need to set "bulk".
@@ -662,7 +662,7 @@ namespace {
if ( !wt_keeptxnopen() && !_eof ) {
// Ensure an active session exists, so any restored cursors will bind to it
- WiredTigerRecoveryUnit::get(txn)->getSession();
+ WiredTigerRecoveryUnit::get(txn)->getSession(txn);
_locate(_savedLoc);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 84122dfa2b2..fbe0a8da1ab 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -224,13 +224,13 @@ namespace mongo {
int64_t WiredTigerKVEngine::getIdentSize( OperationContext* opCtx,
const StringData& ident ) {
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession();
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx);
return WiredTigerUtil::getIdentSize(session->getSession(), _uri(ident) );
}
Status WiredTigerKVEngine::repairIdent( OperationContext* opCtx,
const StringData& ident ) {
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession();
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx);
session->closeAllCursors();
string uri = _uri(ident);
return _salvageIfNeeded(uri.c_str());
@@ -455,7 +455,8 @@ namespace mongo {
}
bool WiredTigerKVEngine::hasIdent(OperationContext* opCtx, const StringData& ident) const {
- return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession()->getSession(), _uri(ident));
+ return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(),
+ _uri(ident));
}
bool WiredTigerKVEngine::_hasUri(WT_SESSION* session, const std::string& uri) const {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 2e77a729397..29b5346705b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -303,7 +303,7 @@ namespace {
int64_t WiredTigerRecordStore::storageSize( OperationContext* txn,
BSONObjBuilder* extraInfo,
int infoLevel ) const {
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn);
StatusWith<int64_t> result = WiredTigerUtil::getStatisticsValueAs<int64_t>(
session->getSession(),
"statistics:" + getURI(), "statistics=(fast)", WT_STAT_DSRC_BLOCK_SIZE);
@@ -447,7 +447,9 @@ namespace {
invariant( realRecoveryUnit );
WiredTigerSessionCache* sc = realRecoveryUnit->getSessionCache();
txn->setRecoveryUnit( new WiredTigerRecoveryUnit( sc ) );
- WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession()->getSession();
+
+ WiredTigerRecoveryUnit::get(txn)->markNoTicketRequired(); // realRecoveryUnit already has
+ WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession();
int64_t dataSize = _dataSize.load();
int64_t numRecords = _numRecords.load();
@@ -783,7 +785,7 @@ namespace {
output->appendNumber( "nrecords", nrecords );
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn);
WT_SESSION* s = session->getSession();
BSONObjBuilder bob(output->subobjStart(kWiredTigerEngineName));
Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + getURI(),
@@ -804,7 +806,7 @@ namespace {
result->appendIntOrLL( "max", _cappedMaxDocs );
result->appendIntOrLL( "maxSize", static_cast<long long>(_cappedMaxSize / scale) );
}
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn);
WT_SESSION* s = session->getSession();
BSONObjBuilder bob(result->subobjStart(kWiredTigerEngineName));
{
@@ -1198,7 +1200,7 @@ namespace {
invariant( _savedRecoveryUnit == txn->recoveryUnit() );
if ( needRestore || !wt_keeptxnopen() ) {
// This will ensure an active session exists, so any restored cursors will bind to it
- invariant(WiredTigerRecoveryUnit::get(txn)->getSession() == _cursor->getSession());
+ invariant(WiredTigerRecoveryUnit::get(txn)->getSession(txn) == _cursor->getSession());
RecordId saved = _lastLoc;
_locate(_lastLoc, false);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp
index ab9099c2ea9..d2c0f30869b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp
@@ -33,12 +33,14 @@
#include "mongo/platform/basic.h"
+#include "mongo/base/checked_cast.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
#include "mongo/util/background.h"
#include "mongo/util/log.h"
@@ -67,6 +69,7 @@ namespace mongo {
}
OperationContextImpl txn;
+ checked_cast<WiredTigerRecoveryUnit*>(txn.recoveryUnit())->markNoTicketRequired();
try {
Lock::DBLock dbLock(txn.lockState(), _ns.db(), MODE_IX);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index 28b74c5908c..9a0f2d5e110 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -100,7 +100,7 @@ namespace mongo {
{
WriteUnitOfWork uow(&txn);
- WT_SESSION* s = ru->getSession()->getSession();
+ WT_SESSION* s = ru->getSession(&txn)->getSession();
invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) );
uow.commit();
}
@@ -126,7 +126,7 @@ namespace mongo {
{
WriteUnitOfWork uow(&txn);
- WT_SESSION* s = ru->getSession()->getSession();
+ WT_SESSION* s = ru->getSession(&txn)->getSession();
invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) );
uow.commit();
}
@@ -326,14 +326,15 @@ namespace mongo {
{
WriteUnitOfWork uow( opCtx.get() );
- WT_SESSION* s = ru->getSession()->getSession();
+ WT_SESSION* s = ru->getSession(opCtx.get())->getSession();
invariantWTOK( s->create( s, indexUri.c_str(), "" ) );
uow.commit();
}
{
WriteUnitOfWork uow( opCtx.get() );
- ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri );
+ ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()),
+ indexUri );
uow.commit();
}
}
@@ -341,7 +342,8 @@ namespace mongo {
{
scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
WiredTigerSizeStorer ss2;
- ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri );
+ ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(opCtx.get()),
+ indexUri );
long long numRecords;
long long dataSize;
ss2.load( uri, &numRecords, &dataSize );
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 93dd31e6528..a248f84368a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -34,11 +34,16 @@
#include <boost/thread/mutex.hpp>
#include "mongo/base/checked_cast.h"
+#include "mongo/base/init.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
@@ -82,7 +87,8 @@ namespace mongo {
_myTransactionCount( 0 ),
_everStartedWrite( false ),
_currentlySquirreled( false ),
- _syncing( false ) {
+ _syncing( false ),
+ _noTicketNeeded( false ) {
}
WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() {
@@ -95,11 +101,13 @@ namespace mongo {
}
void WiredTigerRecoveryUnit::reportState( BSONObjBuilder* b ) const {
- b->append( "wt_depth", _depth );
- b->append( "wt_active", _active );
- b->append( "wt_everStartedWrite", _everStartedWrite );
- if ( _active )
- b->append( "wt_millisSinceCommit", _timer.millis() );
+ b->append("wt_depth", _depth);
+ b->append("wt_active", _active);
+ b->append("wt_everStartedWrite", _everStartedWrite);
+ b->append("wt_hasTicket", _ticket.hasTicket());
+ b->appendNumber("wt_myTransactionCount", static_cast<long long>(_myTransactionCount));
+ if (_active)
+ b->append("wt_millisSinceCommit", _timer.millis());
}
void WiredTigerRecoveryUnit::_commit() {
@@ -125,10 +133,11 @@ namespace mongo {
_changes.clear();
}
- void WiredTigerRecoveryUnit::beginUnitOfWork() {
+ void WiredTigerRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
invariant( !_currentlySquirreled );
_depth++;
_everStartedWrite = true;
+ _getTicket(opCtx);
}
void WiredTigerRecoveryUnit::commitUnitOfWork() {
@@ -176,13 +185,13 @@ namespace mongo {
fassert( 28575, _active );
}
- WiredTigerSession* WiredTigerRecoveryUnit::getSession() {
+ WiredTigerSession* WiredTigerRecoveryUnit::getSession(OperationContext* opCtx) {
if ( !_session ) {
_session = _sessionCache->getSession();
}
if ( !_active ) {
- _txnOpen();
+ _txnOpen(opCtx);
}
return _session;
}
@@ -198,6 +207,81 @@ namespace mongo {
_oplogReadTill = loc;
}
+ namespace {
+
+
+ class TicketServerParameter : public ServerParameter {
+ MONGO_DISALLOW_COPYING(TicketServerParameter);
+ public:
+ TicketServerParameter(TicketHolder* holder, const std::string& name)
+ : ServerParameter(ServerParameterSet::getGlobal(),
+ name,
+ true,
+ true),
+ _holder( holder ) {
+ }
+
+ virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name) {
+ b.append(name, _holder->outof());
+ }
+
+ virtual Status set( const BSONElement& newValueElement ) {
+ if (!newValueElement.isNumber())
+ return Status(ErrorCodes::BadValue,
+ str::stream() << name() << " has to be a number");
+ return _set(newValueElement.numberInt());
+ }
+
+ virtual Status setFromString( const std::string& str ) {
+ int num = 0;
+ Status status = parseNumberFromString(str, &num);
+ if (!status.isOK())
+ return status;
+ return _set(num);
+ }
+
+ Status _set(int newNum) {
+ if (newNum <= 0) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << name() << " has to be > 0");
+ }
+
+ return _holder->resize(newNum);
+ }
+
+ private:
+ TicketHolder* _holder;
+ };
+
+ TicketHolder openWriteTransaction(128);
+ TicketServerParameter openWriteTransactionParam(&openWriteTransaction,
+ "wiredTigerConcurrentWriteTransactions");
+
+ TicketHolder openReadTransaction(256);
+ TicketServerParameter openReadTransactionParam(&openReadTransaction,
+ "wiredTigerConcurrentReadTransactions");
+
+ }
+
+ void WiredTigerRecoveryUnit::appendGlobalStats(BSONObjBuilder& b) {
+ BSONObjBuilder bb(b.subobjStart("concurrentTransactions"));
+ {
+ BSONObjBuilder bbb(bb.subobjStart("write"));
+ bbb.append("out", openWriteTransaction.used());
+ bbb.append("available", openWriteTransaction.available());
+ bbb.append("totalTickets", openWriteTransaction.outof());
+ bbb.done();
+ }
+ {
+ BSONObjBuilder bbb(bb.subobjStart("read"));
+ bbb.append("out", openReadTransaction.used());
+ bbb.append("available", openReadTransaction.available());
+ bbb.append("totalTickets", openReadTransaction.outof());
+ bbb.done();
+ }
+ bb.done();
+ }
+
void WiredTigerRecoveryUnit::_txnClose( bool commit ) {
invariant( _active );
WT_SESSION *s = _session->getSession();
@@ -213,14 +297,49 @@ namespace mongo {
}
_active = false;
_myTransactionCount++;
+ _ticket.reset(NULL);
}
uint64_t WiredTigerRecoveryUnit::getMyTransactionCount() const {
return _myTransactionCount;
}
- void WiredTigerRecoveryUnit::_txnOpen() {
+ void WiredTigerRecoveryUnit::markNoTicketRequired() {
+ invariant(!_ticket.hasTicket());
+ _noTicketNeeded = true;
+ }
+
+ void WiredTigerRecoveryUnit::_getTicket(OperationContext* opCtx) {
+ // already have a ticket
+ if (_ticket.hasTicket())
+ return;
+
+ if (_noTicketNeeded)
+ return;
+
+ bool writeLocked;
+
+ // If we have a strong lock, waiting for a ticket can cause a deadlock.
+ if (opCtx != NULL &&
+ opCtx->lockState() != NULL) {
+ if (opCtx->lockState()->hasStrongLocks())
+ return;
+ writeLocked = opCtx->lockState()->isWriteLocked();
+ }
+ else {
+ writeLocked = _everStartedWrite;
+ }
+
+ TicketHolder* holder = writeLocked ? &openWriteTransaction : &openReadTransaction;
+
+ holder->waitForTicket();
+ _ticket.reset(holder);
+ }
+
+ void WiredTigerRecoveryUnit::_txnOpen(OperationContext* opCtx) {
invariant( !_active );
+ _getTicket(opCtx);
+
WT_SESSION *s = _session->getSession();
_syncing = _syncing || awaitCommitData.numWaitingForSync.load() > 0;
invariantWTOK( s->begin_transaction(s, _syncing ? "sync=true" : NULL) );
@@ -247,24 +366,10 @@ namespace mongo {
WiredTigerCursor::WiredTigerCursor(const std::string& uri,
uint64_t id,
bool forRecordStore,
- WiredTigerRecoveryUnit* ru) {
- _init( uri, id, forRecordStore, ru );
- }
-
- WiredTigerCursor::WiredTigerCursor(const std::string& uri,
- uint64_t id,
- bool forRecordStore,
OperationContext* txn) {
- _init( uri, id, forRecordStore, WiredTigerRecoveryUnit::get( txn ) );
- }
-
- void WiredTigerCursor::_init( const std::string& uri,
- uint64_t id,
- bool forRecordStore,
- WiredTigerRecoveryUnit* ru ) {
_uriID = id;
- _ru = ru;
- _session = _ru->getSession();
+ _ru = WiredTigerRecoveryUnit::get( txn );
+ _session = _ru->getSession(txn);
_cursor = _session->getCursor( uri, id, forRecordStore );
if ( !_cursor ) {
error() << "no cursor for uri: " << uri;
@@ -272,7 +377,6 @@ namespace mongo {
}
WiredTigerCursor::~WiredTigerCursor() {
- invariant( _session == _ru->getSession() );
_session->releaseCursor( _uriID, _cursor );
_cursor = NULL;
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index ed2b77bff00..e1247457420 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -40,6 +40,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
#include "mongo/db/storage/recovery_unit.h"
+#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/timer.h"
namespace mongo {
@@ -56,7 +57,7 @@ namespace mongo {
virtual void reportState( BSONObjBuilder* b ) const;
- virtual void beginUnitOfWork();
+ virtual void beginUnitOfWork(OperationContext* opCtx);
virtual void commitUnitOfWork();
@@ -81,7 +82,7 @@ namespace mongo {
// ---- WT STUFF
- WiredTigerSession* getSession();
+ WiredTigerSession* getSession(OperationContext* opCtx);
WiredTigerSessionCache* getSessionCache() { return _sessionCache; }
bool inActiveTxn() const { return _active; }
void assertInActiveTxn() const;
@@ -92,15 +93,18 @@ namespace mongo {
void setOplogReadTill( const RecordId& loc );
RecordId getOplogReadTill() const { return _oplogReadTill; }
+ void markNoTicketRequired();
+
static WiredTigerRecoveryUnit* get(OperationContext *txn);
+ static void appendGlobalStats(BSONObjBuilder& b);
private:
void _abort();
void _commit();
void _txnClose( bool commit );
- void _txnOpen();
+ void _txnOpen(OperationContext* opCtx);
WiredTigerSessionCache* _sessionCache; // not owned
WiredTigerSession* _session; // owned, but from pool
@@ -116,6 +120,10 @@ namespace mongo {
typedef OwnedPointerVector<Change> Changes;
Changes _changes;
+
+ bool _noTicketNeeded;
+ void _getTicket(OperationContext* opCtx);
+ TicketHolderReleaser _ticket;
};
/**
@@ -127,10 +135,7 @@ namespace mongo {
uint64_t uriID,
bool forRecordStore,
OperationContext* txn);
- WiredTigerCursor(const std::string& uri,
- uint64_t uriID,
- bool forRecordStore,
- WiredTigerRecoveryUnit* ru);
+
~WiredTigerCursor();
@@ -149,11 +154,6 @@ namespace mongo {
void assertInActiveTxn() const { _ru->assertInActiveTxn(); }
private:
- void _init( const std::string& uri,
- uint64_t uriID,
- bool forRecordStore,
- WiredTigerRecoveryUnit* ru );
-
uint64_t _uriID;
WiredTigerRecoveryUnit* _ru; // not owned
WiredTigerSession* _session;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp
index a640793cc29..25f68dfeb72 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp
@@ -62,7 +62,7 @@ namespace mongo {
const BSONElement& configElement) const {
WiredTigerSession* session =
- checked_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit())->getSession();
+ checked_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit())->getSession(txn);
invariant(session);
WT_SESSION* s = session->getSession();
@@ -78,6 +78,8 @@ namespace mongo {
bob.append("reason", status.reason());
}
+ WiredTigerRecoveryUnit::appendGlobalStats(bob);
+
return bob.obj();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp
index cffc1a01284..e35ecd35f2f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp
@@ -112,7 +112,7 @@ namespace mongo {
void createSession(const char* config) {
WT_SESSION* wtSession =
- WiredTigerRecoveryUnit::get(_opCtx.get())->getSession()->getSession();
+ WiredTigerRecoveryUnit::get(_opCtx.get())->getSession(_opCtx.get())->getSession();
ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, getURI(), config)));
}
private:
@@ -262,7 +262,7 @@ namespace mongo {
TEST(WiredTigerUtilTest, GetStatisticsValueMissingTable) {
WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)");
WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache());
- WiredTigerSession* session = recoveryUnit.getSession();
+ WiredTigerSession* session = recoveryUnit.getSession(NULL);
StatusWith<uint64_t> result = WiredTigerUtil::getStatisticsValue(session->getSession(),
"statistics:table:no_such_table", "statistics=(fast)", WT_STAT_DSRC_BLOCK_SIZE);
ASSERT_NOT_OK(result.getStatus());
@@ -272,7 +272,7 @@ namespace mongo {
TEST(WiredTigerUtilTest, GetStatisticsValueStatisticsDisabled) {
WiredTigerUtilHarnessHelper harnessHelper("statistics=(none)");
WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache());
- WiredTigerSession* session = recoveryUnit.getSession();
+ WiredTigerSession* session = recoveryUnit.getSession(NULL);
WT_SESSION* wtSession = session->getSession();
ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL)));
StatusWith<uint64_t> result = WiredTigerUtil::getStatisticsValue(session->getSession(),
@@ -284,7 +284,7 @@ namespace mongo {
TEST(WiredTigerUtilTest, GetStatisticsValueInvalidKey) {
WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)");
WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache());
- WiredTigerSession* session = recoveryUnit.getSession();
+ WiredTigerSession* session = recoveryUnit.getSession(NULL);
WT_SESSION* wtSession = session->getSession();
ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL)));
// Use connection statistics key which does not apply to a table.
@@ -297,7 +297,7 @@ namespace mongo {
TEST(WiredTigerUtilTest, GetStatisticsValueValidKey) {
WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)");
WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache());
- WiredTigerSession* session = recoveryUnit.getSession();
+ WiredTigerSession* session = recoveryUnit.getSession(NULL);
WT_SESSION* wtSession = session->getSession();
ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL)));
// Use connection statistics key which does not apply to a table.
@@ -311,7 +311,7 @@ namespace mongo {
TEST(WiredTigerUtilTest, GetStatisticsValueAsUInt8) {
WiredTigerUtilHarnessHelper harnessHelper("statistics=(all)");
WiredTigerRecoveryUnit recoveryUnit(harnessHelper.getSessionCache());
- WiredTigerSession* session = recoveryUnit.getSession();
+ WiredTigerSession* session = recoveryUnit.getSession(NULL);
WT_SESSION* wtSession = session->getSession();
ASSERT_OK(wtRCToStatus(wtSession->create(wtSession, "table:mytable", NULL)));