summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2017-04-21 18:09:06 -0400
committerGeert Bosch <geert@mongodb.com>2017-05-05 18:29:43 -0400
commitaa8ab6611d27a6a4b014d82a37eb658760fa7425 (patch)
tree559e80bf696e2a62d6c979ae768f52cdb1ae6856
parent02728d11d54f6bd43276ff3c76dea6d2484b4134 (diff)
downloadmongo-aa8ab6611d27a6a4b014d82a37eb658760fa7425.tar.gz
SERVER-28427 Implement timeouts for the TicketHolder
(cherry picked from commit 498df9ab853bb03514b8803b9b1f6c2b6900b533) Conflicts: src/mongo/db/concurrency/SConscript src/mongo/db/concurrency/d_concurrency.cpp src/mongo/db/concurrency/d_concurrency.h src/mongo/db/concurrency/d_concurrency_test.cpp src/mongo/db/repl/replication_coordinator_impl.cpp
-rw-r--r--src/mongo/db/concurrency/SConscript1
-rw-r--r--src/mongo/db/concurrency/d_concurrency.cpp15
-rw-r--r--src/mongo/db/concurrency/d_concurrency.h4
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp78
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp64
-rw-r--r--src/mongo/db/concurrency/lock_state.h22
-rw-r--r--src/mongo/db/concurrency/lock_state_test.cpp15
-rw-r--r--src/mongo/db/concurrency/lock_stats_test.cpp3
-rw-r--r--src/mongo/db/concurrency/locker.h18
-rw-r--r--src/mongo/db/concurrency/locker_noop.h8
-rw-r--r--src/mongo/db/db.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
-rw-r--r--src/mongo/util/concurrency/SConscript9
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp42
-rw-r--r--src/mongo/util/concurrency/ticketholder.h3
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp74
16 files changed, 279 insertions, 86 deletions
diff --git a/src/mongo/db/concurrency/SConscript b/src/mongo/db/concurrency/SConscript
index c823280f415..942b7a4f7f1 100644
--- a/src/mongo/db/concurrency/SConscript
+++ b/src/mongo/db/concurrency/SConscript
@@ -42,6 +42,7 @@ env.CppUnitTest(
'lock_stats_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/util/progress_meter',
'lock_manager',
]
diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp
index de670e0a49e..f7dabcc2248 100644
--- a/src/mongo/db/concurrency/d_concurrency.cpp
+++ b/src/mongo/db/concurrency/d_concurrency.cpp
@@ -63,26 +63,29 @@ AtomicWord<uint64_t> lastResourceMutexHash{0};
Lock::ResourceMutex::ResourceMutex() : _rid(RESOURCE_MUTEX, lastResourceMutexHash.fetchAndAdd(1)) {}
Lock::GlobalLock::GlobalLock(Locker* locker, LockMode lockMode, unsigned timeoutMs)
- : GlobalLock(locker, lockMode, EnqueueOnly()) {
+ : GlobalLock(locker, lockMode, timeoutMs, EnqueueOnly()) {
waitForLock(timeoutMs);
}
-Lock::GlobalLock::GlobalLock(Locker* locker, LockMode lockMode, EnqueueOnly enqueueOnly)
+Lock::GlobalLock::GlobalLock(Locker* locker,
+ LockMode lockMode,
+ unsigned timeoutMs,
+ EnqueueOnly enqueueOnly)
: _locker(locker), _result(LOCK_INVALID), _pbwm(locker, resourceIdParallelBatchWriterMode) {
- _enqueue(lockMode);
+ _enqueue(lockMode, timeoutMs);
}
-void Lock::GlobalLock::_enqueue(LockMode lockMode) {
+void Lock::GlobalLock::_enqueue(LockMode lockMode, unsigned timeoutMs) {
if (_locker->shouldConflictWithSecondaryBatchApplication()) {
_pbwm.lock(MODE_IS);
}
- _result = _locker->lockGlobalBegin(lockMode);
+ _result = _locker->lockGlobalBegin(lockMode, Milliseconds(timeoutMs));
}
void Lock::GlobalLock::waitForLock(unsigned timeoutMs) {
if (_result == LOCK_WAITING) {
- _result = _locker->lockGlobalComplete(timeoutMs);
+ _result = _locker->lockGlobalComplete(Milliseconds(timeoutMs));
}
if (_result != LOCK_OK && _locker->shouldConflictWithSecondaryBatchApplication()) {
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h
index 3f909d92614..6274814ca1c 100644
--- a/src/mongo/db/concurrency/d_concurrency.h
+++ b/src/mongo/db/concurrency/d_concurrency.h
@@ -164,7 +164,7 @@ public:
* Enqueues lock but does not block on lock acquisition.
* Call waitForLock() to complete locking process.
*/
- GlobalLock(Locker* locker, LockMode lockMode, EnqueueOnly enqueueOnly);
+ GlobalLock(Locker* locker, LockMode lockMode, unsigned timeoutMs, EnqueueOnly enqueueOnly);
~GlobalLock() {
_unlock();
@@ -180,7 +180,7 @@ public:
}
private:
- void _enqueue(LockMode lockMode);
+ void _enqueue(LockMode lockMode, unsigned timeoutMs);
void _unlock();
Locker* const _locker;
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index 34b97d0e909..22aae759650 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -35,12 +35,16 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/lock_manager_test_help.h"
+#include "mongo/db/operation_context.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/log.h"
#include "mongo/util/progress_meter.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -70,6 +74,47 @@ private:
};
/**
+ * A RAII object that instantiates a TicketHolder that limits number of allowed global lock
+ * acquisitions to numTickets. The opCtx must live as long as the UseGlobalThrottling instance.
+ */
+class UseGlobalThrottling {
+public:
+ explicit UseGlobalThrottling(OperationContext* opCtx, int numTickets)
+ : _opCtx(opCtx), _holder(1) {
+ _opCtx->lockState()->setGlobalThrottling(&_holder, &_holder);
+ }
+ ~UseGlobalThrottling() {
+ // Reset the global setting as we're about to destroy the ticket holder.
+ _opCtx->lockState()->setGlobalThrottling(nullptr, nullptr);
+ }
+
+private:
+ OperationContext* _opCtx;
+ TicketHolder _holder;
+};
+
+/**
+ * Returns a vector of Clients of length 'k', each of which has an OperationContext with its
+ * lockState set to a DefaultLockerImpl.
+ */
+template <typename LockerType>
+std::vector<std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>>
+makeKClientsWithLockers(int k) {
+ std::vector<std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>>
+ clients;
+ clients.reserve(k);
+ for (int i = 0; i < k; ++i) {
+ auto client =
+ getGlobalServiceContext()->makeClient(str::stream() << "test client for thread " << i);
+ auto opCtx = client->makeOperationContext();
+ opCtx->releaseLockState();
+ opCtx->setLockState(stdx::make_unique<LockerType>());
+ clients.emplace_back(std::move(client), std::move(opCtx));
+ }
+ return clients;
+}
+
+/**
* Calls fn the given number of iterations, spread out over up to maxThreads threads.
* The threadNr passed is an integer between 0 and maxThreads exclusive. Logs timing
* statistics for for all power-of-two thread counts from 1 up to maxThreds.
@@ -642,6 +687,39 @@ TEST(DConcurrency, StressPartitioned) {
}
}
+TEST(DConcurrency, Throttling) {
+ auto clientOpctxPairs = makeKClientsWithLockers<DefaultLockerImpl>(2);
+ auto opctx1 = clientOpctxPairs[0].second.get();
+ auto opctx2 = clientOpctxPairs[1].second.get();
+ UseGlobalThrottling throttle(opctx1, 1);
+
+ bool overlongWait;
+ int tries = 0;
+ const int maxTries = 15;
+ const int timeoutMillis = 42;
+
+ do {
+ // Test that throttling will correctly handle timeouts.
+ Lock::GlobalRead R1(opctx1->lockState(), 0);
+ ASSERT(R1.isLocked());
+
+ Date_t t1 = Date_t::now();
+ {
+ Lock::GlobalRead R2(opctx2->lockState(), timeoutMillis);
+ ASSERT(!R2.isLocked());
+ }
+ Date_t t2 = Date_t::now();
+
+ // Test that the timeout did result in at least the requested wait.
+ ASSERT_GTE(t2 - t1, Milliseconds(timeoutMillis));
+
+ // Timeouts should be reasonably immediate. In maxTries attempts at least one test should be
+ // able to complete within a second, as the theoretical test duration is less than 50 ms.
+ overlongWait = t2 - t1 >= Seconds(1);
+ } while (overlongWait && ++tries < maxTries);
+ ASSERT(!overlongWait);
+}
+
// These tests exercise single- and multi-threaded performance of uncontended lock acquisition. It
// is neither practical nor useful to run them on debug builds.
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 898f9cd462e..45ea279c137 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -116,7 +116,7 @@ const ResourceId resourceIdMMAPV1Flush =
ResourceId(RESOURCE_MMAPV1_FLUSH, ResourceId::SINGLETON_MMAPV1_FLUSH);
// How often (in millis) to check for deadlock if a lock has not been granted for some time
-const unsigned DeadlockTimeoutMs = 500;
+const Milliseconds DeadlockTimeout = Milliseconds(500);
// Dispenses unique LockerId identifiers
AtomicUInt64 idCounter(0);
@@ -231,18 +231,12 @@ void CondVarLockGrantNotification::clear() {
_result = LOCK_INVALID;
}
-LockResult CondVarLockGrantNotification::wait(unsigned timeoutMs) {
+LockResult CondVarLockGrantNotification::wait(Milliseconds timeout) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- while (_result == LOCK_INVALID) {
- if (stdx::cv_status::timeout ==
- _cond.wait_for(lock,
- Milliseconds(static_cast<int64_t>(timeoutMs)).toSystemDuration())) {
- // Timeout
- return LOCK_TIMEOUT;
- }
- }
-
- return _result;
+ return _cond.wait_for(
+ lock, timeout.toSystemDuration(), [this] { return _result != LOCK_INVALID; })
+ ? _result
+ : LOCK_TIMEOUT;
}
void CondVarLockGrantNotification::notify(ResourceId resId, LockResult result) {
@@ -298,10 +292,11 @@ Locker::ClientState LockerImpl<IsForMMAPV1>::getClientState() const {
}
template <bool IsForMMAPV1>
-LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode, unsigned timeoutMs) {
- LockResult result = lockGlobalBegin(mode);
+LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode) {
+ LockResult result = _lockGlobalBegin(mode, Milliseconds::max());
+
if (result == LOCK_WAITING) {
- result = lockGlobalComplete(timeoutMs);
+ result = lockGlobalComplete(Milliseconds::max());
}
if (result == LOCK_OK) {
@@ -312,14 +307,19 @@ LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode, unsigned timeoutMs
}
template <bool IsForMMAPV1>
-LockResult LockerImpl<IsForMMAPV1>::lockGlobalBegin(LockMode mode) {
+LockResult LockerImpl<IsForMMAPV1>::_lockGlobalBegin(LockMode mode, Milliseconds timeout) {
dassert(isLocked() == (_modeForTicket != MODE_NONE));
if (_modeForTicket == MODE_NONE) {
const bool reader = isSharedLockMode(mode);
auto holder = ticketHolders[mode];
if (holder) {
_clientState.store(reader ? kQueuedReader : kQueuedWriter);
- holder->waitForTicket();
+ if (timeout == Milliseconds::max()) {
+ holder->waitForTicket();
+ } else if (!holder->waitForTicketUntil(Date_t::now() + timeout)) {
+ _clientState.store(kInactive);
+ return LOCK_TIMEOUT;
+ }
}
_clientState.store(reader ? kActiveReader : kActiveWriter);
_modeForTicket = mode;
@@ -336,8 +336,8 @@ LockResult LockerImpl<IsForMMAPV1>::lockGlobalBegin(LockMode mode) {
}
template <bool IsForMMAPV1>
-LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(unsigned timeoutMs) {
- return lockComplete(resourceIdGlobal, getLockMode(resourceIdGlobal), timeoutMs, false);
+LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(Milliseconds timeout) {
+ return lockComplete(resourceIdGlobal, getLockMode(resourceIdGlobal), timeout, false);
}
template <bool IsForMMAPV1>
@@ -435,7 +435,7 @@ void LockerImpl<IsForMMAPV1>::endWriteUnitOfWork() {
template <bool IsForMMAPV1>
LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock) {
const LockResult result = lockBegin(resId, mode);
@@ -447,7 +447,7 @@ LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId,
// unsuccessful result that the lock manager would return is LOCK_WAITING.
invariant(result == LOCK_WAITING);
- return lockComplete(resId, mode, timeoutMs, checkDeadlock);
+ return lockComplete(resId, mode, timeout, checkDeadlock);
}
template <bool IsForMMAPV1>
@@ -723,7 +723,7 @@ LockResult LockerImpl<IsForMMAPV1>::lockBegin(ResourceId resId, LockMode mode) {
template <bool IsForMMAPV1>
LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock) {
// Under MMAP V1 engine a deadlock can occur if a thread goes to sleep waiting on
// DB lock, while holding the flush lock, so it has to be released. This is only
@@ -739,14 +739,14 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId,
// Don't go sleeping without bound in order to be able to report long waits or wake up for
// deadlock detection.
- unsigned waitTimeMs = std::min(timeoutMs, DeadlockTimeoutMs);
+ Milliseconds waitTime = std::min(timeout, DeadlockTimeout);
const uint64_t startOfTotalWaitTime = curTimeMicros64();
uint64_t startOfCurrentWaitTime = startOfTotalWaitTime;
while (true) {
// It is OK if this call wakes up spuriously, because we re-evaluate the remaining
// wait time anyways.
- result = _notify.wait(waitTimeMs);
+ result = _notify.wait(waitTime);
// Account for the time spent waiting on the notification object
const uint64_t curTimeMicros = curTimeMicros64();
@@ -773,16 +773,16 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId,
}
// If infinite timeout was requested, just keep waiting
- if (timeoutMs == UINT_MAX) {
+ if (timeout == Milliseconds::max()) {
continue;
}
- const unsigned totalBlockTimeMs = (curTimeMicros - startOfTotalWaitTime) / 1000;
- waitTimeMs = (totalBlockTimeMs < timeoutMs)
- ? std::min(timeoutMs - totalBlockTimeMs, DeadlockTimeoutMs)
- : 0;
+ const auto totalBlockTime = duration_cast<Milliseconds>(
+ Microseconds(int64_t(curTimeMicros - startOfTotalWaitTime)));
+ waitTime = (totalBlockTime < timeout) ? std::min(timeout - totalBlockTime, DeadlockTimeout)
+ : Milliseconds(0);
- if (waitTimeMs == 0) {
+ if (waitTime == Milliseconds(0)) {
break;
}
}
@@ -882,7 +882,7 @@ AutoAcquireFlushLockForMMAPV1Commit::AutoAcquireFlushLockForMMAPV1Commit(Locker*
// due to too much uncommitted in-memory journal, but won't have corruption.
while (true) {
- LockResult result = _locker->lock(resourceIdMMAPV1Flush, MODE_S, UINT_MAX, true);
+ LockResult result = _locker->lock(resourceIdMMAPV1Flush, MODE_S, Milliseconds::max(), true);
if (result == LOCK_OK) {
break;
}
@@ -899,7 +899,7 @@ void AutoAcquireFlushLockForMMAPV1Commit::upgradeFlushLockToExclusive() {
// This should not be able to deadlock, since we already hold the S journal lock, which
// means all writers are kicked out. Readers always yield the journal lock if they block
// waiting on any other lock.
- invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, MODE_X, UINT_MAX, false));
+ invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, MODE_X, Milliseconds::max(), false));
// Lock bumps the recursive count. Drop it back down so that the destructor doesn't
// complain.
diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h
index 79f348d4ac6..2be9f3c9c87 100644
--- a/src/mongo/db/concurrency/lock_state.h
+++ b/src/mongo/db/concurrency/lock_state.h
@@ -55,9 +55,9 @@ public:
/**
* Uninterruptible blocking method, which waits for the notification to fire.
*
- * @param timeoutMs How many milliseconds to wait before returning LOCK_TIMEOUT.
+ * @param timeout How many milliseconds to wait before returning LOCK_TIMEOUT.
*/
- LockResult wait(unsigned timeoutMs);
+ LockResult wait(Milliseconds timeout);
private:
virtual void notify(ResourceId resId, LockResult result);
@@ -101,9 +101,11 @@ public:
stdx::thread::id getThreadId() const override;
- virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs = UINT_MAX);
- virtual LockResult lockGlobalBegin(LockMode mode);
- virtual LockResult lockGlobalComplete(unsigned timeoutMs);
+ virtual LockResult lockGlobal(LockMode mode);
+ virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) {
+ return _lockGlobalBegin(mode, timeout);
+ }
+ virtual LockResult lockGlobalComplete(Milliseconds timeout);
virtual void lockMMAPV1Flush();
virtual void downgradeGlobalXtoSForMMAPV1();
@@ -118,7 +120,7 @@ public:
virtual LockResult lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs = UINT_MAX,
+ Milliseconds timeout = Milliseconds::max(),
bool checkDeadlock = false);
virtual void downgrade(ResourceId resId, LockMode newMode);
@@ -167,12 +169,12 @@ public:
*
* @param resId Resource id which was passed to an earlier lockBegin call. Must match.
* @param mode Mode which was passed to an earlier lockBegin call. Must match.
- * @param timeoutMs How long to wait for the lock acquisition to complete.
+ * @param timeout How long to wait for the lock acquisition to complete.
* @param checkDeadlock whether to perform deadlock detection while waiting.
*/
LockResult lockComplete(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock);
private:
@@ -180,6 +182,10 @@ private:
typedef FastMapNoAlloc<ResourceId, LockRequest, 16> LockRequestsMap;
+ /**
+ * Like lockGlobalBegin, but accepts a timeout for acquiring a ticket.
+ */
+ LockResult _lockGlobalBegin(LockMode, Milliseconds timeout);
/**
* The main functionality of the unlock method, except accepts iterator in order to avoid
diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp
index 13bc6b58cbc..e96a460d61f 100644
--- a/src/mongo/db/concurrency/lock_state_test.cpp
+++ b/src/mongo/db/concurrency/lock_state_test.cpp
@@ -88,7 +88,7 @@ TEST(LockerImpl, ConflictWithTimeout) {
DefaultLockerImpl locker2;
ASSERT(LOCK_OK == locker2.lockGlobal(MODE_IX));
- ASSERT(LOCK_TIMEOUT == locker2.lock(resId, MODE_S, 0));
+ ASSERT(LOCK_TIMEOUT == locker2.lock(resId, MODE_S, Milliseconds(0)));
ASSERT(locker2.getLockMode(resId) == MODE_NONE);
@@ -110,7 +110,7 @@ TEST(LockerImpl, ConflictUpgradeWithTimeout) {
ASSERT(LOCK_OK == locker2.lock(resId, MODE_S));
// Try upgrading locker 1, which should block and timeout
- ASSERT(LOCK_TIMEOUT == locker1.lock(resId, MODE_X, 1));
+ ASSERT(LOCK_TIMEOUT == locker1.lock(resId, MODE_X, Milliseconds(1)));
locker1.unlockGlobal();
locker2.unlockGlobal();
@@ -276,13 +276,14 @@ TEST(LockerImpl, CanceledDeadlockUnblocks) {
ASSERT(LOCK_WAITING == locker3.lockBegin(db1, MODE_S));
// Detect deadlock, canceling our request
- ASSERT(LOCK_DEADLOCK == locker2.lockComplete(db1, MODE_X, 1, /*checkDeadlock*/ true));
+ ASSERT(LOCK_DEADLOCK ==
+ locker2.lockComplete(db1, MODE_X, Milliseconds(1), /*checkDeadlock*/ true));
// Now locker3 must be able to complete its request
- ASSERT(LOCK_OK == locker3.lockComplete(db1, MODE_S, 1, /*checkDeadlock*/ false));
+ ASSERT(LOCK_OK == locker3.lockComplete(db1, MODE_S, Milliseconds(1), /*checkDeadlock*/ false));
// Locker1 still can't complete its request
- ASSERT(LOCK_TIMEOUT == locker1.lockComplete(db2, MODE_X, 1, false));
+ ASSERT(LOCK_TIMEOUT == locker1.lockComplete(db2, MODE_X, Milliseconds(1), false));
// Check ownership for db1
ASSERT(locker1.getLockMode(db1) == MODE_S);
@@ -373,10 +374,10 @@ TEST(LockerImpl, GetLockerInfoShouldReportPendingLocks) {
ASSERT(successfulLocker.unlock(dbId));
ASSERT(successfulLocker.unlockGlobal());
- const unsigned timeoutMs = 0;
+ const Milliseconds timeout = Milliseconds(0);
const bool checkDeadlock = false;
ASSERT_EQ(LOCK_OK,
- conflictingLocker.lockComplete(collectionId, MODE_IS, timeoutMs, checkDeadlock));
+ conflictingLocker.lockComplete(collectionId, MODE_IS, timeout, checkDeadlock));
conflictingLocker.getLockerInfo(&lockerInfo);
ASSERT_FALSE(lockerInfo.waitingResource.isValid());
diff --git a/src/mongo/db/concurrency/lock_stats_test.cpp b/src/mongo/db/concurrency/lock_stats_test.cpp
index 8ae6eb3c010..2dc06744c88 100644
--- a/src/mongo/db/concurrency/lock_stats_test.cpp
+++ b/src/mongo/db/concurrency/lock_stats_test.cpp
@@ -66,7 +66,8 @@ TEST(LockStats, Wait) {
ASSERT_EQUALS(LOCK_WAITING, lockerConflict.lockBegin(resId, MODE_S));
// Sleep 1 millisecond so the wait time passes
- ASSERT_EQUALS(LOCK_TIMEOUT, lockerConflict.lockComplete(resId, MODE_S, 1, false));
+ ASSERT_EQUALS(LOCK_TIMEOUT,
+ lockerConflict.lockComplete(resId, MODE_S, Milliseconds(1), false));
}
// Make sure that the waits/blocks are non-zero
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index e00882a498d..37af569124f 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -105,22 +105,22 @@ public:
*
* @param mode Mode in which the global lock should be acquired. Also indicates the intent
* of the operation.
- * @param timeoutMs How long to wait for the global lock (and the flush lock, for the MMAP
- * V1 engine) to be acquired.
*
* @return LOCK_OK, if the global lock (and the flush lock, for the MMAP V1 engine) were
* acquired within the specified time bound. Otherwise, the respective failure
* code and neither lock will be acquired.
*/
- virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs = UINT_MAX) = 0;
+ virtual LockResult lockGlobal(LockMode mode) = 0;
/**
* Requests the global lock to be acquired in the specified mode.
*
* See the comments for lockBegin/Complete for more information on the semantics.
+ * The timeout indicates how long to wait for the lock to be acquired. The lockGlobalBegin
+ * method has a timeout for use with the TicketHolder, if there is one.
*/
- virtual LockResult lockGlobalBegin(LockMode mode) = 0;
- virtual LockResult lockGlobalComplete(unsigned timeoutMs) = 0;
+ virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) = 0;
+ virtual LockResult lockGlobalComplete(Milliseconds timeout) = 0;
/**
* This method is used only in the MMAP V1 storage engine, otherwise it is a no-op. See the
@@ -171,9 +171,9 @@ public:
*
* @param resId Id of the resource to be locked.
* @param mode Mode in which the resource should be locked. Lock upgrades are allowed.
- * @param timeoutMs How many milliseconds to wait for the lock to be granted, before
- * returning LOCK_TIMEOUT. This parameter defaults to UINT_MAX, which means
- * wait infinitely. If 0 is passed, the request will return immediately, if
+ * @param timeout How long to wait for the lock to be granted, before
+ * returning LOCK_TIMEOUT. This parameter defaults to an infinite timeout.
+ * If Milliseconds(0) is passed, the request will return immediately, if
* the request could not be granted right away.
* @param checkDeadlock Whether to enable deadlock detection for this acquisition. This
* parameter is put in place until we can handle deadlocks at all places,
@@ -183,7 +183,7 @@ public:
*/
virtual LockResult lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs = UINT_MAX,
+ Milliseconds timeout = Milliseconds::max(),
bool checkDeadlock = false) = 0;
/**
diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h
index 24450ceeecc..2de30224d83 100644
--- a/src/mongo/db/concurrency/locker_noop.h
+++ b/src/mongo/db/concurrency/locker_noop.h
@@ -57,15 +57,15 @@ public:
invariant(false);
}
- virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs) {
+ virtual LockResult lockGlobal(LockMode mode) {
invariant(false);
}
- virtual LockResult lockGlobalBegin(LockMode mode) {
+ virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) {
invariant(false);
}
- virtual LockResult lockGlobalComplete(unsigned timeoutMs) {
+ virtual LockResult lockGlobalComplete(Milliseconds timeout) {
invariant(false);
}
@@ -91,7 +91,7 @@ public:
virtual LockResult lock(ResourceId resId,
LockMode mode,
- unsigned timeoutMs,
+ Milliseconds timeout,
bool checkDeadlock) {
invariant(false);
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 3c05f89ea64..2dbb2b1065e 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -1125,9 +1125,9 @@ static void shutdownTask() {
// of this function to prevent any operations from running that need a lock.
//
DefaultLockerImpl* globalLocker = new DefaultLockerImpl();
- LockResult result = globalLocker->lockGlobalBegin(MODE_X);
+ LockResult result = globalLocker->lockGlobalBegin(MODE_X, Milliseconds::max());
if (result == LOCK_WAITING) {
- result = globalLocker->lockGlobalComplete(UINT_MAX);
+ result = globalLocker->lockGlobalComplete(Milliseconds::max());
}
invariant(LOCK_OK == result);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 51044301e09..8632eddc42a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1770,7 +1770,10 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
return {ErrorCodes::NotMaster, "not primary so can't step down"};
}
- Lock::GlobalLock globalReadLock(txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly());
+ Lock::GlobalLock globalReadLock(txn->lockState(),
+ MODE_S,
+ durationCount<Milliseconds>(stepdownTime),
+ Lock::GlobalLock::EnqueueOnly());
// We've requested the global shared lock which will stop new writes from coming in,
// but existing writes could take a long time to finish, so kill all user operations
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index 78a29709ea1..a250f9c20e0 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -38,6 +38,15 @@ env.Library('ticketholder',
LIBDEPS=['$BUILD_DIR/mongo/base',
'$BUILD_DIR/third_party/shim_boost'])
+
+env.CppUnitTest(
+ target='ticketholder_test',
+ source=['ticketholder_test.cpp'],
+ LIBDEPS=[
+ 'ticketholder',
+ '$BUILD_DIR/mongo/unittest/unittest',
+ ])
+
env.Library(
target='spin_lock',
source=[
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index 3a99d643419..27eee5f84cc 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -59,27 +59,35 @@ TicketHolder::~TicketHolder() {
bool TicketHolder::tryAcquire() {
while (0 != sem_trywait(&_sem)) {
- switch (errno) {
- case EAGAIN:
- return false;
- case EINTR:
- break;
- default:
- _check(-1);
- }
+ if (errno == EAGAIN)
+ return false;
+ if (errno != EINTR)
+ _check(-1);
}
return true;
}
void TicketHolder::waitForTicket() {
while (0 != sem_wait(&_sem)) {
- switch (errno) {
- case EINTR:
- break;
- default:
- _check(-1);
- }
+ if (errno != EINTR)
+ _check(-1);
+ }
+}
+
+bool TicketHolder::waitForTicketUntil(Date_t until) {
+ const long long millisSinceEpoch = until.toMillisSinceEpoch();
+ struct timespec ts;
+
+ ts.tv_sec = millisSinceEpoch / 1000;
+ ts.tv_nsec = (millisSinceEpoch % 1000) * (1000 * 1000);
+ while (0 != sem_timedwait(&_sem, &ts)) {
+ if (errno == ETIMEDOUT)
+ return false;
+
+ if (errno != EINTR)
+ _check(-1);
}
+ return true;
}
void TicketHolder::release() {
@@ -146,6 +154,12 @@ void TicketHolder::waitForTicket() {
}
}
+bool TicketHolder::waitForTicketUntil(Date_t until) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ return _newTicket.wait_until(lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); });
+}
+
void TicketHolder::release() {
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 6906dafa897..7a8b34083f0 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -34,6 +34,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/mutex.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -48,6 +49,8 @@ public:
void waitForTicket();
+ bool waitForTicketUntil(Date_t until);
+
void release();
Status resize(int newSize);
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
new file mode 100644
index 00000000000..7f5afcbe0cb
--- /dev/null
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/ticketholder.h"
+
+namespace {
+using namespace mongo;
+
+TEST(TicketholderTest, BasicTimeout) {
+ TicketHolder holder(1);
+ ASSERT_EQ(holder.used(), 0);
+ ASSERT_EQ(holder.available(), 1);
+ ASSERT_EQ(holder.outof(), 1);
+
+ {
+ ScopedTicket ticket(&holder);
+ ASSERT_EQ(holder.used(), 1);
+ ASSERT_EQ(holder.available(), 0);
+ ASSERT_EQ(holder.outof(), 1);
+
+ ASSERT_FALSE(holder.tryAcquire());
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now()));
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(1)));
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(42)));
+ }
+
+ ASSERT_EQ(holder.used(), 0);
+ ASSERT_EQ(holder.available(), 1);
+ ASSERT_EQ(holder.outof(), 1);
+
+ ASSERT(holder.waitForTicketUntil(Date_t::now()));
+ holder.release();
+
+ ASSERT_EQ(holder.used(), 0);
+
+ ASSERT(holder.waitForTicketUntil(Date_t::now() + Milliseconds(20)));
+ ASSERT_EQ(holder.used(), 1);
+
+ ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(2)));
+ holder.release();
+ ASSERT_EQ(holder.used(), 0);
+}
+} // namespace