summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2017-04-21 18:09:06 -0400
committerGeert Bosch <geert@mongodb.com>2017-05-01 14:57:51 -0400
commit498df9ab853bb03514b8803b9b1f6c2b6900b533 (patch)
treeac6e5974014177968a12b66745f7b97ac3622893
parentdb53b03f5c1420b9bcade637873522f1847f9e3f (diff)
downloadmongo-498df9ab853bb03514b8803b9b1f6c2b6900b533.tar.gz
SERVER-28427 Implement timeouts for the TicketHolder
-rw-r--r--src/mongo/db/concurrency/d_concurrency.cpp15
-rw-r--r--src/mongo/db/concurrency/d_concurrency.h7
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp56
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp64
-rw-r--r--src/mongo/db/concurrency/lock_state.h22
-rw-r--r--src/mongo/db/concurrency/lock_state_test.cpp15
-rw-r--r--src/mongo/db/concurrency/lock_stats_test.cpp3
-rw-r--r--src/mongo/db/concurrency/locker.h18
-rw-r--r--src/mongo/db/concurrency/locker_noop.h8
-rw-r--r--src/mongo/db/db.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp3
-rw-r--r--src/mongo/util/concurrency/SConscript9
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp42
-rw-r--r--src/mongo/util/concurrency/ticketholder.h3
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp74
15 files changed, 257 insertions, 86 deletions
diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp
index 5360ca958f2..cbd3655febe 100644
--- a/src/mongo/db/concurrency/d_concurrency.cpp
+++ b/src/mongo/db/concurrency/d_concurrency.cpp
@@ -136,28 +136,31 @@ bool Lock::ResourceMutex::isAtLeastReadLocked(Locker* locker) {
}
Lock::GlobalLock::GlobalLock(OperationContext* opCtx, LockMode lockMode, unsigned timeoutMs)
- : GlobalLock(opCtx, lockMode, EnqueueOnly()) {
+ : GlobalLock(opCtx, lockMode, timeoutMs, EnqueueOnly()) {
waitForLock(timeoutMs);
}
-Lock::GlobalLock::GlobalLock(OperationContext* opCtx, LockMode lockMode, EnqueueOnly enqueueOnly)
+Lock::GlobalLock::GlobalLock(OperationContext* opCtx,
+ LockMode lockMode,
+ unsigned timeoutMs,
+ EnqueueOnly enqueueOnly)
: _opCtx(opCtx),
_result(LOCK_INVALID),
_pbwm(opCtx->lockState(), resourceIdParallelBatchWriterMode) {
- _enqueue(lockMode);
+ _enqueue(lockMode, timeoutMs);
}
-void Lock::GlobalLock::_enqueue(LockMode lockMode) {
+void Lock::GlobalLock::_enqueue(LockMode lockMode, unsigned timeoutMs) {
if (_opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()) {
_pbwm.lock(MODE_IS);
}
- _result = _opCtx->lockState()->lockGlobalBegin(lockMode);
+ _result = _opCtx->lockState()->lockGlobalBegin(lockMode, Milliseconds(timeoutMs));
}
void Lock::GlobalLock::waitForLock(unsigned timeoutMs) {
if (_result == LOCK_WAITING) {
- _result = _opCtx->lockState()->lockGlobalComplete(timeoutMs);
+ _result = _opCtx->lockState()->lockGlobalComplete(Milliseconds(timeoutMs));
}
if (_result != LOCK_OK && _opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()) {
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h
index 5eb5de3a5e2..d5dc7599b3b 100644
--- a/src/mongo/db/concurrency/d_concurrency.h
+++ b/src/mongo/db/concurrency/d_concurrency.h
@@ -179,7 +179,10 @@ public:
* Enqueues lock but does not block on lock acquisition.
* Call waitForLock() to complete locking process.
*/
- GlobalLock(OperationContext* opCtx, LockMode lockMode, EnqueueOnly enqueueOnly);
+ GlobalLock(OperationContext* opCtx,
+ LockMode lockMode,
+ unsigned timeoutMs,
+ EnqueueOnly enqueueOnly);
~GlobalLock() {
_unlock();
@@ -198,7 +201,7 @@ public:
}
private:
- void _enqueue(LockMode lockMode);
+ void _enqueue(LockMode lockMode, unsigned timeoutMs);
void _unlock();
OperationContext* const _opCtx;
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index 2c497e4fa84..b0ece4e9069 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -39,9 +39,11 @@
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/log.h"
#include "mongo/util/progress_meter.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -70,10 +72,31 @@ private:
bool _oldSupportsDocLocking;
};
+/**
+ * A RAII object that instantiates a TicketHolder that limits number of allowed global lock
+ * acquisitions to numTickets. The opCtx must live as long as the UseGlobalThrottling instance.
+ */
+class UseGlobalThrottling {
+public:
+ explicit UseGlobalThrottling(OperationContext* opCtx, int numTickets)
+ : _opCtx(opCtx), _holder(1) {
+ _opCtx->lockState()->setGlobalThrottling(&_holder, &_holder);
+ }
+ ~UseGlobalThrottling() {
+ // Reset the global setting as we're about to destroy the ticket holder.
+ _opCtx->lockState()->setGlobalThrottling(nullptr, nullptr);
+ }
+
+private:
+ OperationContext* _opCtx;
+ TicketHolder _holder;
+};
+
class DConcurrencyTestFixture : public unittest::Test {
public:
DConcurrencyTestFixture() : _client(getGlobalServiceContext()->makeClient("testClient")) {}
+ ~DConcurrencyTestFixture() {}
/**
* Constructs and returns a new OperationContext.
@@ -694,6 +717,39 @@ TEST_F(DConcurrencyTestFixture, ResourceMutexLabels) {
ASSERT(mutex2.getName() == "label2");
}
+TEST_F(DConcurrencyTestFixture, Throttling) {
+ auto clientOpctxPairs = makeKClientsWithLockers<DefaultLockerImpl>(2);
+ auto opctx1 = clientOpctxPairs[0].second.get();
+ auto opctx2 = clientOpctxPairs[1].second.get();
+ UseGlobalThrottling throttle(opctx1, 1);
+
+ bool overlongWait;
+ int tries = 0;
+ const int maxTries = 15;
+ const int timeoutMillis = 42;
+
+ do {
+ // Test that throttling will correctly handle timeouts.
+ Lock::GlobalRead R1(opctx1, 0);
+ ASSERT(R1.isLocked());
+
+ Date_t t1 = Date_t::now();
+ {
+ Lock::GlobalRead R2(opctx2, timeoutMillis);
+ ASSERT(!R2.isLocked());
+ }
+ Date_t t2 = Date_t::now();
+
+ // Test that the timeout did result in at least the requested wait.
+ ASSERT_GTE(t2 - t1, Milliseconds(timeoutMillis));
+
+ // Timeouts should be reasonably immediate. In maxTries attempts at least one test should be
+ // able to complete within a second, as the theoretical test duration is less than 50 ms.
+ overlongWait = t2 - t1 >= Seconds(1);
+ } while (overlongWait && ++tries < maxTries);
+ ASSERT(!overlongWait);
+}
+
// These tests exercise single- and multi-threaded performance of uncontended lock acquisition. It
// is neither practical nor useful to run them on debug builds.
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 9235f56bbb4..0f8f6d79a3b 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -116,7 +116,7 @@ const ResourceId resourceIdMMAPV1Flush =
ResourceId(RESOURCE_MMAPV1_FLUSH, ResourceId::SINGLETON_MMAPV1_FLUSH);
// How often (in millis) to check for deadlock if a lock has not been granted for some time
-const unsigned DeadlockTimeoutMs = 500;
+const Milliseconds DeadlockTimeout = Milliseconds(500);
// Dispenses unique LockerId identifiers
AtomicUInt64 idCounter(0);
@@ -220,18 +220,12 @@ void CondVarLockGrantNotification::clear() {
_result = LOCK_INVALID;
}
-LockResult CondVarLockGrantNotification::wait(unsigned timeoutMs) {
+LockResult CondVarLockGrantNotification::wait(Milliseconds timeout) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- while (_result == LOCK_INVALID) {
- if (stdx::cv_status::timeout ==
- _cond.wait_for(lock,
- Milliseconds(static_cast<int64_t>(timeoutMs)).toSystemDuration())) {
- // Timeout
- return LOCK_TIMEOUT;
- }
- }
-
- return _result;
+ return _cond.wait_for(
+ lock, timeout.toSystemDuration(), [this] { return _result != LOCK_INVALID; })
+ ? _result
+ : LOCK_TIMEOUT;
}
void CondVarLockGrantNotification::notify(ResourceId resId, LockResult result) {
@@ -293,10 +287,11 @@ Locker::ClientState LockerImpl<IsForMMAPV1>::getClientState() const {
}
template <bool IsForMMAPV1>
-LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode, unsigned timeoutMs) {
- LockResult result = lockGlobalBegin(mode);
+LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode) {
+ LockResult result = _lockGlobalBegin(mode, Milliseconds::max());
+
if (result == LOCK_WAITING) {
- result = lockGlobalComplete(timeoutMs);
+ result = lockGlobalComplete(Milliseconds::max());
}
if (result == LOCK_OK) {
@@ -307,14 +302,19 @@ LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode, unsigned timeoutMs
}
template <bool IsForMMAPV1>
-LockResult LockerImpl<IsForMMAPV1>::lockGlobalBegin(LockMode mode) {
+LockResult LockerImpl<IsForMMAPV1>::_lockGlobalBegin(LockMode mode, Milliseconds timeout) {
dassert(isLocked() == (_modeForTicket != MODE_NONE));
if (_modeForTicket == MODE_NONE) {
const bool reader = isSharedLockMode(mode);
auto holder = ticketHolders[mode];
if (holder) {
_clientState.store(reader ? kQueuedReader : kQueuedWriter);
- holder->waitForTicket();
+ if (timeout == Milliseconds::max()) {
+ holder->waitForTicket();
+ } else if (!holder->waitForTicketUntil(Date_t::now() + timeout)) {
+ _clientState.store(kInactive);
+ return LOCK_TIMEOUT;
+ }
}
_clientState.store(reader ? kActiveReader : kActiveWriter);
_modeForTicket = mode;
@@ -331,8 +331,8 @@ LockResult LockerImpl<IsForMMAPV1>::lockGlobalBegin(LockMode mode) {
}
template <bool IsForMMAPV1>
-LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(unsigned timeoutMs) {
- return lockComplete(resourceIdGlobal, getLockMode(resourceIdGlobal), timeoutMs, false);
+LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(Milliseconds timeout) {
+ return lockComplete(resourceIdGlobal, getLockMode(resourceIdGlobal), timeout, false);
}
template <bool IsForMMAPV1>
@@ -430,7 +430,7 @@ void LockerImpl<IsForMMAPV1>::endWriteUnitOfWork() {
template <bool IsForMMAPV1>
LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock) {
const LockResult result = lockBegin(resId, mode);
@@ -442,7 +442,7 @@ LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId,
// unsuccessful result that the lock manager would return is LOCK_WAITING.
invariant(result == LOCK_WAITING);
- return lockComplete(resId, mode, timeoutMs, checkDeadlock);
+ return lockComplete(resId, mode, timeout, checkDeadlock);
}
template <bool IsForMMAPV1>
@@ -718,7 +718,7 @@ LockResult LockerImpl<IsForMMAPV1>::lockBegin(ResourceId resId, LockMode mode) {
template <bool IsForMMAPV1>
LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock) {
// Under MMAP V1 engine a deadlock can occur if a thread goes to sleep waiting on
// DB lock, while holding the flush lock, so it has to be released. This is only
@@ -734,14 +734,14 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId,
// Don't go sleeping without bound in order to be able to report long waits or wake up for
// deadlock detection.
- unsigned waitTimeMs = std::min(timeoutMs, DeadlockTimeoutMs);
+ Milliseconds waitTime = std::min(timeout, DeadlockTimeout);
const uint64_t startOfTotalWaitTime = curTimeMicros64();
uint64_t startOfCurrentWaitTime = startOfTotalWaitTime;
while (true) {
// It is OK if this call wakes up spuriously, because we re-evaluate the remaining
// wait time anyways.
- result = _notify.wait(waitTimeMs);
+ result = _notify.wait(waitTime);
// Account for the time spent waiting on the notification object
const uint64_t curTimeMicros = curTimeMicros64();
@@ -768,16 +768,16 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId,
}
// If infinite timeout was requested, just keep waiting
- if (timeoutMs == UINT_MAX) {
+ if (timeout == Milliseconds::max()) {
continue;
}
- const unsigned totalBlockTimeMs = (curTimeMicros - startOfTotalWaitTime) / 1000;
- waitTimeMs = (totalBlockTimeMs < timeoutMs)
- ? std::min(timeoutMs - totalBlockTimeMs, DeadlockTimeoutMs)
- : 0;
+ const auto totalBlockTime = duration_cast<Milliseconds>(
+ Microseconds(int64_t(curTimeMicros - startOfTotalWaitTime)));
+ waitTime = (totalBlockTime < timeout) ? std::min(timeout - totalBlockTime, DeadlockTimeout)
+ : Milliseconds(0);
- if (waitTimeMs == 0) {
+ if (waitTime == Milliseconds(0)) {
break;
}
}
@@ -877,7 +877,7 @@ AutoAcquireFlushLockForMMAPV1Commit::AutoAcquireFlushLockForMMAPV1Commit(Locker*
// due to too much uncommitted in-memory journal, but won't have corruption.
while (true) {
- LockResult result = _locker->lock(resourceIdMMAPV1Flush, MODE_S, UINT_MAX, true);
+ LockResult result = _locker->lock(resourceIdMMAPV1Flush, MODE_S, Milliseconds::max(), true);
if (result == LOCK_OK) {
break;
}
@@ -894,7 +894,7 @@ void AutoAcquireFlushLockForMMAPV1Commit::upgradeFlushLockToExclusive() {
// This should not be able to deadlock, since we already hold the S journal lock, which
// means all writers are kicked out. Readers always yield the journal lock if they block
// waiting on any other lock.
- invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, MODE_X, UINT_MAX, false));
+ invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, MODE_X, Milliseconds::max(), false));
// Lock bumps the recursive count. Drop it back down so that the destructor doesn't
// complain.
diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h
index e4bdded0bdf..04e700c9aa1 100644
--- a/src/mongo/db/concurrency/lock_state.h
+++ b/src/mongo/db/concurrency/lock_state.h
@@ -55,9 +55,9 @@ public:
/**
* Uninterruptible blocking method, which waits for the notification to fire.
*
- * @param timeoutMs How many milliseconds to wait before returning LOCK_TIMEOUT.
+ * @param timeout How many milliseconds to wait before returning LOCK_TIMEOUT.
*/
- LockResult wait(unsigned timeoutMs);
+ LockResult wait(Milliseconds timeout);
private:
virtual void notify(ResourceId resId, LockResult result);
@@ -101,9 +101,11 @@ public:
stdx::thread::id getThreadId() const override;
- virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs = UINT_MAX);
- virtual LockResult lockGlobalBegin(LockMode mode);
- virtual LockResult lockGlobalComplete(unsigned timeoutMs);
+ virtual LockResult lockGlobal(LockMode mode);
+ virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) {
+ return _lockGlobalBegin(mode, timeout);
+ }
+ virtual LockResult lockGlobalComplete(Milliseconds timeout);
virtual void lockMMAPV1Flush();
virtual void downgradeGlobalXtoSForMMAPV1();
@@ -118,7 +120,7 @@ public:
virtual LockResult lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs = UINT_MAX,
+ Milliseconds timeout = Milliseconds::max(),
bool checkDeadlock = false);
virtual void downgrade(ResourceId resId, LockMode newMode);
@@ -167,12 +169,12 @@ public:
*
* @param resId Resource id which was passed to an earlier lockBegin call. Must match.
* @param mode Mode which was passed to an earlier lockBegin call. Must match.
- * @param timeoutMs How long to wait for the lock acquisition to complete.
+ * @param timeout How long to wait for the lock acquisition to complete.
* @param checkDeadlock whether to perform deadlock detection while waiting.
*/
LockResult lockComplete(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock);
private:
@@ -180,6 +182,10 @@ private:
typedef FastMapNoAlloc<ResourceId, LockRequest, 16> LockRequestsMap;
+ /**
+ * Like lockGlobalBegin, but accepts a timeout for acquiring a ticket.
+ */
+ LockResult _lockGlobalBegin(LockMode, Milliseconds timeout);
/**
* The main functionality of the unlock method, except accepts iterator in order to avoid
diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp
index 13bc6b58cbc..e96a460d61f 100644
--- a/src/mongo/db/concurrency/lock_state_test.cpp
+++ b/src/mongo/db/concurrency/lock_state_test.cpp
@@ -88,7 +88,7 @@ TEST(LockerImpl, ConflictWithTimeout) {
DefaultLockerImpl locker2;
ASSERT(LOCK_OK == locker2.lockGlobal(MODE_IX));
- ASSERT(LOCK_TIMEOUT == locker2.lock(resId, MODE_S, 0));
+ ASSERT(LOCK_TIMEOUT == locker2.lock(resId, MODE_S, Milliseconds(0)));
ASSERT(locker2.getLockMode(resId) == MODE_NONE);
@@ -110,7 +110,7 @@ TEST(LockerImpl, ConflictUpgradeWithTimeout) {
ASSERT(LOCK_OK == locker2.lock(resId, MODE_S));
// Try upgrading locker 1, which should block and timeout
- ASSERT(LOCK_TIMEOUT == locker1.lock(resId, MODE_X, 1));
+ ASSERT(LOCK_TIMEOUT == locker1.lock(resId, MODE_X, Milliseconds(1)));
locker1.unlockGlobal();
locker2.unlockGlobal();
@@ -276,13 +276,14 @@ TEST(LockerImpl, CanceledDeadlockUnblocks) {
ASSERT(LOCK_WAITING == locker3.lockBegin(db1, MODE_S));
// Detect deadlock, canceling our request
- ASSERT(LOCK_DEADLOCK == locker2.lockComplete(db1, MODE_X, 1, /*checkDeadlock*/ true));
+ ASSERT(LOCK_DEADLOCK ==
+ locker2.lockComplete(db1, MODE_X, Milliseconds(1), /*checkDeadlock*/ true));
// Now locker3 must be able to complete its request
- ASSERT(LOCK_OK == locker3.lockComplete(db1, MODE_S, 1, /*checkDeadlock*/ false));
+ ASSERT(LOCK_OK == locker3.lockComplete(db1, MODE_S, Milliseconds(1), /*checkDeadlock*/ false));
// Locker1 still can't complete its request
- ASSERT(LOCK_TIMEOUT == locker1.lockComplete(db2, MODE_X, 1, false));
+ ASSERT(LOCK_TIMEOUT == locker1.lockComplete(db2, MODE_X, Milliseconds(1), false));
// Check ownership for db1
ASSERT(locker1.getLockMode(db1) == MODE_S);
@@ -373,10 +374,10 @@ TEST(LockerImpl, GetLockerInfoShouldReportPendingLocks) {
ASSERT(successfulLocker.unlock(dbId));
ASSERT(successfulLocker.unlockGlobal());
- const unsigned timeoutMs = 0;
+ const Milliseconds timeout = Milliseconds(0);
const bool checkDeadlock = false;
ASSERT_EQ(LOCK_OK,
- conflictingLocker.lockComplete(collectionId, MODE_IS, timeoutMs, checkDeadlock));
+ conflictingLocker.lockComplete(collectionId, MODE_IS, timeout, checkDeadlock));
conflictingLocker.getLockerInfo(&lockerInfo);
ASSERT_FALSE(lockerInfo.waitingResource.isValid());
diff --git a/src/mongo/db/concurrency/lock_stats_test.cpp b/src/mongo/db/concurrency/lock_stats_test.cpp
index 8ae6eb3c010..2dc06744c88 100644
--- a/src/mongo/db/concurrency/lock_stats_test.cpp
+++ b/src/mongo/db/concurrency/lock_stats_test.cpp
@@ -66,7 +66,8 @@ TEST(LockStats, Wait) {
ASSERT_EQUALS(LOCK_WAITING, lockerConflict.lockBegin(resId, MODE_S));
// Sleep 1 millisecond so the wait time passes
- ASSERT_EQUALS(LOCK_TIMEOUT, lockerConflict.lockComplete(resId, MODE_S, 1, false));
+ ASSERT_EQUALS(LOCK_TIMEOUT,
+ lockerConflict.lockComplete(resId, MODE_S, Milliseconds(1), false));
}
// Make sure that the waits/blocks are non-zero
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index 6dacd299e65..b973da6d66b 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -105,22 +105,22 @@ public:
*
* @param mode Mode in which the global lock should be acquired. Also indicates the intent
* of the operation.
- * @param timeoutMs How long to wait for the global lock (and the flush lock, for the MMAP
- * V1 engine) to be acquired.
*
* @return LOCK_OK, if the global lock (and the flush lock, for the MMAP V1 engine) were
* acquired within the specified time bound. Otherwise, the respective failure
* code and neither lock will be acquired.
*/
- virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs = UINT_MAX) = 0;
+ virtual LockResult lockGlobal(LockMode mode) = 0;
/**
* Requests the global lock to be acquired in the specified mode.
*
* See the comments for lockBegin/Complete for more information on the semantics.
+ * The timeout indicates how long to wait for the lock to be acquired. The lockGlobalBegin
+ * method has a timeout for use with the TicketHolder, if there is one.
*/
- virtual LockResult lockGlobalBegin(LockMode mode) = 0;
- virtual LockResult lockGlobalComplete(unsigned timeoutMs) = 0;
+ virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) = 0;
+ virtual LockResult lockGlobalComplete(Milliseconds timeout) = 0;
/**
* This method is used only in the MMAP V1 storage engine, otherwise it is a no-op. See the
@@ -171,9 +171,9 @@ public:
*
* @param resId Id of the resource to be locked.
* @param mode Mode in which the resource should be locked. Lock upgrades are allowed.
- * @param timeoutMs How many milliseconds to wait for the lock to be granted, before
- * returning LOCK_TIMEOUT. This parameter defaults to UINT_MAX, which means
- * wait infinitely. If 0 is passed, the request will return immediately, if
+ * @param timeout How long to wait for the lock to be granted, before
+ * returning LOCK_TIMEOUT. This parameter defaults to an infinite timeout.
+ * If Milliseconds(0) is passed, the request will return immediately, if
* the request could not be granted right away.
* @param checkDeadlock Whether to enable deadlock detection for this acquisition. This
* parameter is put in place until we can handle deadlocks at all places,
@@ -183,7 +183,7 @@ public:
*/
virtual LockResult lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs = UINT_MAX,
+ Milliseconds timeout = Milliseconds::max(),
bool checkDeadlock = false) = 0;
/**
diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h
index 29a1a5f6d55..2a84be36577 100644
--- a/src/mongo/db/concurrency/locker_noop.h
+++ b/src/mongo/db/concurrency/locker_noop.h
@@ -57,15 +57,15 @@ public:
invariant(false);
}
- virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs) {
+ virtual LockResult lockGlobal(LockMode mode) {
invariant(false);
}
- virtual LockResult lockGlobalBegin(LockMode mode) {
+ virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) {
invariant(false);
}
- virtual LockResult lockGlobalComplete(unsigned timeoutMs) {
+ virtual LockResult lockGlobalComplete(Milliseconds timeout) {
invariant(false);
}
@@ -91,7 +91,7 @@ public:
virtual LockResult lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock) {
return LockResult::LOCK_OK;
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index fa811572abc..34f18c08e80 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -999,9 +999,9 @@ static void shutdownTask() {
// of this function to prevent any operations from running that need a lock.
//
DefaultLockerImpl* globalLocker = new DefaultLockerImpl();
- LockResult result = globalLocker->lockGlobalBegin(MODE_X);
+ LockResult result = globalLocker->lockGlobalBegin(MODE_X, Milliseconds::max());
if (result == LOCK_WAITING) {
- result = globalLocker->lockGlobalComplete(UINT_MAX);
+ result = globalLocker->lockGlobalComplete(Milliseconds::max());
}
invariant(LOCK_OK == result);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 20085d98fb5..aa99ec29eac 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1770,7 +1770,8 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
return {ErrorCodes::NotMaster, "not primary so can't step down"};
}
- Lock::GlobalLock globalReadLock(opCtx, MODE_S, Lock::GlobalLock::EnqueueOnly());
+ Lock::GlobalLock globalReadLock(
+ opCtx, MODE_S, durationCount<Milliseconds>(stepdownTime), Lock::GlobalLock::EnqueueOnly());
// We've requested the global shared lock which will stop new writes from coming in,
// but existing writes could take a long time to finish, so kill all user operations
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index 5038bdca4bb..65de766a218 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -40,6 +40,15 @@ env.Library('ticketholder',
LIBDEPS=['$BUILD_DIR/mongo/base',
'$BUILD_DIR/third_party/shim_boost'])
+
+env.CppUnitTest(
+ target='ticketholder_test',
+ source=['ticketholder_test.cpp'],
+ LIBDEPS=[
+ 'ticketholder',
+ '$BUILD_DIR/mongo/unittest/unittest',
+ ])
+
env.Library(
target='spin_lock',
source=[
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index 3a99d643419..27eee5f84cc 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -59,27 +59,35 @@ TicketHolder::~TicketHolder() {
bool TicketHolder::tryAcquire() {
while (0 != sem_trywait(&_sem)) {
- switch (errno) {
- case EAGAIN:
- return false;
- case EINTR:
- break;
- default:
- _check(-1);
- }
+ if (errno == EAGAIN)
+ return false;
+ if (errno != EINTR)
+ _check(-1);
}
return true;
}
void TicketHolder::waitForTicket() {
while (0 != sem_wait(&_sem)) {
- switch (errno) {
- case EINTR:
- break;
- default:
- _check(-1);
- }
+ if (errno != EINTR)
+ _check(-1);
+ }
+}
+
+bool TicketHolder::waitForTicketUntil(Date_t until) {
+ const long long millisSinceEpoch = until.toMillisSinceEpoch();
+ struct timespec ts;
+
+ ts.tv_sec = millisSinceEpoch / 1000;
+ ts.tv_nsec = (millisSinceEpoch % 1000) * (1000 * 1000);
+ while (0 != sem_timedwait(&_sem, &ts)) {
+ if (errno == ETIMEDOUT)
+ return false;
+
+ if (errno != EINTR)
+ _check(-1);
}
+ return true;
}
void TicketHolder::release() {
@@ -146,6 +154,12 @@ void TicketHolder::waitForTicket() {
}
}
+bool TicketHolder::waitForTicketUntil(Date_t until) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ return _newTicket.wait_until(lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); });
+}
+
void TicketHolder::release() {
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 6906dafa897..7a8b34083f0 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -34,6 +34,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/mutex.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -48,6 +49,8 @@ public:
void waitForTicket();
+ bool waitForTicketUntil(Date_t until);
+
void release();
Status resize(int newSize);
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
new file mode 100644
index 00000000000..7f5afcbe0cb
--- /dev/null
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/ticketholder.h"
+
+namespace {
+using namespace mongo;
+
+TEST(TicketholderTest, BasicTimeout) {
+ TicketHolder holder(1);
+ ASSERT_EQ(holder.used(), 0);
+ ASSERT_EQ(holder.available(), 1);
+ ASSERT_EQ(holder.outof(), 1);
+
+ {
+ ScopedTicket ticket(&holder);
+ ASSERT_EQ(holder.used(), 1);
+ ASSERT_EQ(holder.available(), 0);
+ ASSERT_EQ(holder.outof(), 1);
+
+ ASSERT_FALSE(holder.tryAcquire());
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now()));
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(1)));
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(42)));
+ }
+
+ ASSERT_EQ(holder.used(), 0);
+ ASSERT_EQ(holder.available(), 1);
+ ASSERT_EQ(holder.outof(), 1);
+
+ ASSERT(holder.waitForTicketUntil(Date_t::now()));
+ holder.release();
+
+ ASSERT_EQ(holder.used(), 0);
+
+ ASSERT(holder.waitForTicketUntil(Date_t::now() + Milliseconds(20)));
+ ASSERT_EQ(holder.used(), 1);
+
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(2)));
+ holder.release();
+ ASSERT_EQ(holder.used(), 0);
+}
+} // namespace