diff options
author | Geert Bosch <geert@mongodb.com> | 2017-04-21 18:09:06 -0400 |
---|---|---|
committer | Geert Bosch <geert@mongodb.com> | 2017-05-05 18:29:43 -0400 |
commit | aa8ab6611d27a6a4b014d82a37eb658760fa7425 (patch) | |
tree | 559e80bf696e2a62d6c979ae768f52cdb1ae6856 | |
parent | 02728d11d54f6bd43276ff3c76dea6d2484b4134 (diff) | |
download | mongo-aa8ab6611d27a6a4b014d82a37eb658760fa7425.tar.gz |
SERVER-28427 Implement timeouts for the TicketHolder
(cherry picked from commit 498df9ab853bb03514b8803b9b1f6c2b6900b533)
Conflicts:
src/mongo/db/concurrency/SConscript
src/mongo/db/concurrency/d_concurrency.cpp
src/mongo/db/concurrency/d_concurrency.h
src/mongo/db/concurrency/d_concurrency_test.cpp
src/mongo/db/repl/replication_coordinator_impl.cpp
-rw-r--r-- | src/mongo/db/concurrency/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency.h | 4 | ||||
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency_test.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.h | 22 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state_test.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_stats_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker.h | 18 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker_noop.h | 8 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/util/concurrency/SConscript | 9 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 42 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 3 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 74 |
16 files changed, 279 insertions, 86 deletions
diff --git a/src/mongo/db/concurrency/SConscript b/src/mongo/db/concurrency/SConscript index c823280f415..942b7a4f7f1 100644 --- a/src/mongo/db/concurrency/SConscript +++ b/src/mongo/db/concurrency/SConscript @@ -42,6 +42,7 @@ env.CppUnitTest( 'lock_stats_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/util/progress_meter', 'lock_manager', ] diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp index de670e0a49e..f7dabcc2248 100644 --- a/src/mongo/db/concurrency/d_concurrency.cpp +++ b/src/mongo/db/concurrency/d_concurrency.cpp @@ -63,26 +63,29 @@ AtomicWord<uint64_t> lastResourceMutexHash{0}; Lock::ResourceMutex::ResourceMutex() : _rid(RESOURCE_MUTEX, lastResourceMutexHash.fetchAndAdd(1)) {} Lock::GlobalLock::GlobalLock(Locker* locker, LockMode lockMode, unsigned timeoutMs) - : GlobalLock(locker, lockMode, EnqueueOnly()) { + : GlobalLock(locker, lockMode, timeoutMs, EnqueueOnly()) { waitForLock(timeoutMs); } -Lock::GlobalLock::GlobalLock(Locker* locker, LockMode lockMode, EnqueueOnly enqueueOnly) +Lock::GlobalLock::GlobalLock(Locker* locker, + LockMode lockMode, + unsigned timeoutMs, + EnqueueOnly enqueueOnly) : _locker(locker), _result(LOCK_INVALID), _pbwm(locker, resourceIdParallelBatchWriterMode) { - _enqueue(lockMode); + _enqueue(lockMode, timeoutMs); } -void Lock::GlobalLock::_enqueue(LockMode lockMode) { +void Lock::GlobalLock::_enqueue(LockMode lockMode, unsigned timeoutMs) { if (_locker->shouldConflictWithSecondaryBatchApplication()) { _pbwm.lock(MODE_IS); } - _result = _locker->lockGlobalBegin(lockMode); + _result = _locker->lockGlobalBegin(lockMode, Milliseconds(timeoutMs)); } void Lock::GlobalLock::waitForLock(unsigned timeoutMs) { if (_result == LOCK_WAITING) { - _result = _locker->lockGlobalComplete(timeoutMs); + _result = _locker->lockGlobalComplete(Milliseconds(timeoutMs)); } if (_result != LOCK_OK && _locker->shouldConflictWithSecondaryBatchApplication()) { diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h index 3f909d92614..6274814ca1c 100644 --- a/src/mongo/db/concurrency/d_concurrency.h +++ b/src/mongo/db/concurrency/d_concurrency.h @@ -164,7 +164,7 @@ public: * Enqueues lock but does not block on lock acquisition. * Call waitForLock() to complete locking process. */ - GlobalLock(Locker* locker, LockMode lockMode, EnqueueOnly enqueueOnly); + GlobalLock(Locker* locker, LockMode lockMode, unsigned timeoutMs, EnqueueOnly enqueueOnly); ~GlobalLock() { _unlock(); @@ -180,7 +180,7 @@ public: } private: - void _enqueue(LockMode lockMode); + void _enqueue(LockMode lockMode, unsigned timeoutMs); void _unlock(); Locker* const _locker; diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index 34b97d0e909..22aae759650 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -35,12 +35,16 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/lock_manager_test_help.h" +#include "mongo/db/operation_context.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/debug_util.h" #include "mongo/util/log.h" #include "mongo/util/progress_meter.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -70,6 +74,47 @@ private: }; /** + * A RAII object that instantiates a TicketHolder that limits number of allowed global lock + * acquisitions to numTickets. The opCtx must live as long as the UseGlobalThrottling instance. + */ +class UseGlobalThrottling { +public: + explicit UseGlobalThrottling(OperationContext* opCtx, int numTickets) + : _opCtx(opCtx), _holder(1) { + _opCtx->lockState()->setGlobalThrottling(&_holder, &_holder); + } + ~UseGlobalThrottling() { + // Reset the global setting as we're about to destroy the ticket holder. + _opCtx->lockState()->setGlobalThrottling(nullptr, nullptr); + } + +private: + OperationContext* _opCtx; + TicketHolder _holder; +}; + +/** + * Returns a vector of Clients of length 'k', each of which has an OperationContext with its + * lockState set to a DefaultLockerImpl. + */ +template <typename LockerType> +std::vector<std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>> +makeKClientsWithLockers(int k) { + std::vector<std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>> + clients; + clients.reserve(k); + for (int i = 0; i < k; ++i) { + auto client = + getGlobalServiceContext()->makeClient(str::stream() << "test client for thread " << i); + auto opCtx = client->makeOperationContext(); + opCtx->releaseLockState(); + opCtx->setLockState(stdx::make_unique<LockerType>()); + clients.emplace_back(std::move(client), std::move(opCtx)); + } + return clients; +} + +/** * Calls fn the given number of iterations, spread out over up to maxThreads threads. * The threadNr passed is an integer between 0 and maxThreads exclusive. Logs timing * statistics for for all power-of-two thread counts from 1 up to maxThreds. @@ -642,6 +687,39 @@ TEST(DConcurrency, StressPartitioned) { } } +TEST(DConcurrency, Throttling) { + auto clientOpctxPairs = makeKClientsWithLockers<DefaultLockerImpl>(2); + auto opctx1 = clientOpctxPairs[0].second.get(); + auto opctx2 = clientOpctxPairs[1].second.get(); + UseGlobalThrottling throttle(opctx1, 1); + + bool overlongWait; + int tries = 0; + const int maxTries = 15; + const int timeoutMillis = 42; + + do { + // Test that throttling will correctly handle timeouts. + Lock::GlobalRead R1(opctx1->lockState(), 0); + ASSERT(R1.isLocked()); + + Date_t t1 = Date_t::now(); + { + Lock::GlobalRead R2(opctx2->lockState(), timeoutMillis); + ASSERT(!R2.isLocked()); + } + Date_t t2 = Date_t::now(); + + // Test that the timeout did result in at least the requested wait. + ASSERT_GTE(t2 - t1, Milliseconds(timeoutMillis)); + + // Timeouts should be reasonably immediate. In maxTries attempts at least one test should be + // able to complete within a second, as the theoretical test duration is less than 50 ms. + overlongWait = t2 - t1 >= Seconds(1); + } while (overlongWait && ++tries < maxTries); + ASSERT(!overlongWait); +} + // These tests exercise single- and multi-threaded performance of uncontended lock acquisition. It // is neither practical nor useful to run them on debug builds. diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 898f9cd462e..45ea279c137 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -116,7 +116,7 @@ const ResourceId resourceIdMMAPV1Flush = ResourceId(RESOURCE_MMAPV1_FLUSH, ResourceId::SINGLETON_MMAPV1_FLUSH); // How often (in millis) to check for deadlock if a lock has not been granted for some time -const unsigned DeadlockTimeoutMs = 500; +const Milliseconds DeadlockTimeout = Milliseconds(500); // Dispenses unique LockerId identifiers AtomicUInt64 idCounter(0); @@ -231,18 +231,12 @@ void CondVarLockGrantNotification::clear() { _result = LOCK_INVALID; } -LockResult CondVarLockGrantNotification::wait(unsigned timeoutMs) { +LockResult CondVarLockGrantNotification::wait(Milliseconds timeout) { stdx::unique_lock<stdx::mutex> lock(_mutex); - while (_result == LOCK_INVALID) { - if (stdx::cv_status::timeout == - _cond.wait_for(lock, - Milliseconds(static_cast<int64_t>(timeoutMs)).toSystemDuration())) { - // Timeout - return LOCK_TIMEOUT; - } - } - - return _result; + return _cond.wait_for( + lock, timeout.toSystemDuration(), [this] { return _result != LOCK_INVALID; }) + ? _result + : LOCK_TIMEOUT; } void CondVarLockGrantNotification::notify(ResourceId resId, LockResult result) { @@ -298,10 +292,11 @@ Locker::ClientState LockerImpl<IsForMMAPV1>::getClientState() const { } template <bool IsForMMAPV1> -LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode, unsigned timeoutMs) { - LockResult result = lockGlobalBegin(mode); +LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode) { + LockResult result = _lockGlobalBegin(mode, Milliseconds::max()); + if (result == LOCK_WAITING) { - result = lockGlobalComplete(timeoutMs); + result = lockGlobalComplete(Milliseconds::max()); } if (result == LOCK_OK) { @@ -312,14 +307,19 @@ LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode, unsigned timeoutMs } template <bool IsForMMAPV1> -LockResult LockerImpl<IsForMMAPV1>::lockGlobalBegin(LockMode mode) { +LockResult LockerImpl<IsForMMAPV1>::_lockGlobalBegin(LockMode mode, Milliseconds timeout) { dassert(isLocked() == (_modeForTicket != MODE_NONE)); if (_modeForTicket == MODE_NONE) { const bool reader = isSharedLockMode(mode); auto holder = ticketHolders[mode]; if (holder) { _clientState.store(reader ? kQueuedReader : kQueuedWriter); - holder->waitForTicket(); + if (timeout == Milliseconds::max()) { + holder->waitForTicket(); + } else if (!holder->waitForTicketUntil(Date_t::now() + timeout)) { + _clientState.store(kInactive); + return LOCK_TIMEOUT; + } } _clientState.store(reader ? kActiveReader : kActiveWriter); _modeForTicket = mode; @@ -336,8 +336,8 @@ LockResult LockerImpl<IsForMMAPV1>::lockGlobalBegin(LockMode mode) { } template <bool IsForMMAPV1> -LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(unsigned timeoutMs) { - return lockComplete(resourceIdGlobal, getLockMode(resourceIdGlobal), timeoutMs, false); +LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(Milliseconds timeout) { + return lockComplete(resourceIdGlobal, getLockMode(resourceIdGlobal), timeout, false); } template <bool IsForMMAPV1> @@ -435,7 +435,7 @@ void LockerImpl<IsForMMAPV1>::endWriteUnitOfWork() { template <bool IsForMMAPV1> LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId, LockMode mode, - unsigned timeoutMs, + Milliseconds timeout, bool checkDeadlock) { const LockResult result = lockBegin(resId, mode); @@ -447,7 +447,7 @@ LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId, // unsuccessful result that the lock manager would return is LOCK_WAITING. invariant(result == LOCK_WAITING); - return lockComplete(resId, mode, timeoutMs, checkDeadlock); + return lockComplete(resId, mode, timeout, checkDeadlock); } template <bool IsForMMAPV1> @@ -723,7 +723,7 @@ LockResult LockerImpl<IsForMMAPV1>::lockBegin(ResourceId resId, LockMode mode) { template <bool IsForMMAPV1> LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId, LockMode mode, - unsigned timeoutMs, + Milliseconds timeout, bool checkDeadlock) { // Under MMAP V1 engine a deadlock can occur if a thread goes to sleep waiting on // DB lock, while holding the flush lock, so it has to be released. This is only @@ -739,14 +739,14 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId, // Don't go sleeping without bound in order to be able to report long waits or wake up for // deadlock detection. - unsigned waitTimeMs = std::min(timeoutMs, DeadlockTimeoutMs); + Milliseconds waitTime = std::min(timeout, DeadlockTimeout); const uint64_t startOfTotalWaitTime = curTimeMicros64(); uint64_t startOfCurrentWaitTime = startOfTotalWaitTime; while (true) { // It is OK if this call wakes up spuriously, because we re-evaluate the remaining // wait time anyways. - result = _notify.wait(waitTimeMs); + result = _notify.wait(waitTime); // Account for the time spent waiting on the notification object const uint64_t curTimeMicros = curTimeMicros64(); @@ -773,16 +773,16 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId, } // If infinite timeout was requested, just keep waiting - if (timeoutMs == UINT_MAX) { + if (timeout == Milliseconds::max()) { continue; } - const unsigned totalBlockTimeMs = (curTimeMicros - startOfTotalWaitTime) / 1000; - waitTimeMs = (totalBlockTimeMs < timeoutMs) - ? std::min(timeoutMs - totalBlockTimeMs, DeadlockTimeoutMs) - : 0; + const auto totalBlockTime = duration_cast<Milliseconds>( + Microseconds(int64_t(curTimeMicros - startOfTotalWaitTime))); + waitTime = (totalBlockTime < timeout) ? std::min(timeout - totalBlockTime, DeadlockTimeout) + : Milliseconds(0); - if (waitTimeMs == 0) { + if (waitTime == Milliseconds(0)) { break; } } @@ -882,7 +882,7 @@ AutoAcquireFlushLockForMMAPV1Commit::AutoAcquireFlushLockForMMAPV1Commit(Locker* // due to too much uncommitted in-memory journal, but won't have corruption. while (true) { - LockResult result = _locker->lock(resourceIdMMAPV1Flush, MODE_S, UINT_MAX, true); + LockResult result = _locker->lock(resourceIdMMAPV1Flush, MODE_S, Milliseconds::max(), true); if (result == LOCK_OK) { break; } @@ -899,7 +899,7 @@ void AutoAcquireFlushLockForMMAPV1Commit::upgradeFlushLockToExclusive() { // This should not be able to deadlock, since we already hold the S journal lock, which // means all writers are kicked out. Readers always yield the journal lock if they block // waiting on any other lock. - invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, MODE_X, UINT_MAX, false)); + invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, MODE_X, Milliseconds::max(), false)); // Lock bumps the recursive count. Drop it back down so that the destructor doesn't // complain. diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index 79f348d4ac6..2be9f3c9c87 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -55,9 +55,9 @@ public: /** * Uninterruptible blocking method, which waits for the notification to fire. * - * @param timeoutMs How many milliseconds to wait before returning LOCK_TIMEOUT. + * @param timeout How many milliseconds to wait before returning LOCK_TIMEOUT. */ - LockResult wait(unsigned timeoutMs); + LockResult wait(Milliseconds timeout); private: virtual void notify(ResourceId resId, LockResult result); @@ -101,9 +101,11 @@ public: stdx::thread::id getThreadId() const override; - virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs = UINT_MAX); - virtual LockResult lockGlobalBegin(LockMode mode); - virtual LockResult lockGlobalComplete(unsigned timeoutMs); + virtual LockResult lockGlobal(LockMode mode); + virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) { + return _lockGlobalBegin(mode, timeout); + } + virtual LockResult lockGlobalComplete(Milliseconds timeout); virtual void lockMMAPV1Flush(); virtual void downgradeGlobalXtoSForMMAPV1(); @@ -118,7 +120,7 @@ public: virtual LockResult lock(ResourceId resId, LockMode mode, - unsigned timeoutMs = UINT_MAX, + Milliseconds timeout = Milliseconds::max(), bool checkDeadlock = false); virtual void downgrade(ResourceId resId, LockMode newMode); @@ -167,12 +169,12 @@ public: * * @param resId Resource id which was passed to an earlier lockBegin call. Must match. * @param mode Mode which was passed to an earlier lockBegin call. Must match. - * @param timeoutMs How long to wait for the lock acquisition to complete. + * @param timeout How long to wait for the lock acquisition to complete. * @param checkDeadlock whether to perform deadlock detection while waiting. */ LockResult lockComplete(ResourceId resId, LockMode mode, - unsigned timeoutMs, + Milliseconds timeout, bool checkDeadlock); private: @@ -180,6 +182,10 @@ private: typedef FastMapNoAlloc<ResourceId, LockRequest, 16> LockRequestsMap; + /** + * Like lockGlobalBegin, but accepts a timeout for acquiring a ticket. + */ + LockResult _lockGlobalBegin(LockMode, Milliseconds timeout); /** * The main functionality of the unlock method, except accepts iterator in order to avoid diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp index 13bc6b58cbc..e96a460d61f 100644 --- a/src/mongo/db/concurrency/lock_state_test.cpp +++ b/src/mongo/db/concurrency/lock_state_test.cpp @@ -88,7 +88,7 @@ TEST(LockerImpl, ConflictWithTimeout) { DefaultLockerImpl locker2; ASSERT(LOCK_OK == locker2.lockGlobal(MODE_IX)); - ASSERT(LOCK_TIMEOUT == locker2.lock(resId, MODE_S, 0)); + ASSERT(LOCK_TIMEOUT == locker2.lock(resId, MODE_S, Milliseconds(0))); ASSERT(locker2.getLockMode(resId) == MODE_NONE); @@ -110,7 +110,7 @@ TEST(LockerImpl, ConflictUpgradeWithTimeout) { ASSERT(LOCK_OK == locker2.lock(resId, MODE_S)); // Try upgrading locker 1, which should block and timeout - ASSERT(LOCK_TIMEOUT == locker1.lock(resId, MODE_X, 1)); + ASSERT(LOCK_TIMEOUT == locker1.lock(resId, MODE_X, Milliseconds(1))); locker1.unlockGlobal(); locker2.unlockGlobal(); @@ -276,13 +276,14 @@ TEST(LockerImpl, CanceledDeadlockUnblocks) { ASSERT(LOCK_WAITING == locker3.lockBegin(db1, MODE_S)); // Detect deadlock, canceling our request - ASSERT(LOCK_DEADLOCK == locker2.lockComplete(db1, MODE_X, 1, /*checkDeadlock*/ true)); + ASSERT(LOCK_DEADLOCK == + locker2.lockComplete(db1, MODE_X, Milliseconds(1), /*checkDeadlock*/ true)); // Now locker3 must be able to complete its request - ASSERT(LOCK_OK == locker3.lockComplete(db1, MODE_S, 1, /*checkDeadlock*/ false)); + ASSERT(LOCK_OK == locker3.lockComplete(db1, MODE_S, Milliseconds(1), /*checkDeadlock*/ false)); // Locker1 still can't complete its request - ASSERT(LOCK_TIMEOUT == locker1.lockComplete(db2, MODE_X, 1, false)); + ASSERT(LOCK_TIMEOUT == locker1.lockComplete(db2, MODE_X, Milliseconds(1), false)); // Check ownership for db1 ASSERT(locker1.getLockMode(db1) == MODE_S); @@ -373,10 +374,10 @@ TEST(LockerImpl, GetLockerInfoShouldReportPendingLocks) { ASSERT(successfulLocker.unlock(dbId)); ASSERT(successfulLocker.unlockGlobal()); - const unsigned timeoutMs = 0; + const Milliseconds timeout = Milliseconds(0); const bool checkDeadlock = false; ASSERT_EQ(LOCK_OK, - conflictingLocker.lockComplete(collectionId, MODE_IS, timeoutMs, checkDeadlock)); + conflictingLocker.lockComplete(collectionId, MODE_IS, timeout, checkDeadlock)); conflictingLocker.getLockerInfo(&lockerInfo); ASSERT_FALSE(lockerInfo.waitingResource.isValid()); diff --git a/src/mongo/db/concurrency/lock_stats_test.cpp b/src/mongo/db/concurrency/lock_stats_test.cpp index 8ae6eb3c010..2dc06744c88 100644 --- a/src/mongo/db/concurrency/lock_stats_test.cpp +++ b/src/mongo/db/concurrency/lock_stats_test.cpp @@ -66,7 +66,8 @@ TEST(LockStats, Wait) { ASSERT_EQUALS(LOCK_WAITING, lockerConflict.lockBegin(resId, MODE_S)); // Sleep 1 millisecond so the wait time passes - ASSERT_EQUALS(LOCK_TIMEOUT, lockerConflict.lockComplete(resId, MODE_S, 1, false)); + ASSERT_EQUALS(LOCK_TIMEOUT, + lockerConflict.lockComplete(resId, MODE_S, Milliseconds(1), false)); } // Make sure that the waits/blocks are non-zero diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index e00882a498d..37af569124f 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -105,22 +105,22 @@ public: * * @param mode Mode in which the global lock should be acquired. Also indicates the intent * of the operation. - * @param timeoutMs How long to wait for the global lock (and the flush lock, for the MMAP - * V1 engine) to be acquired. * * @return LOCK_OK, if the global lock (and the flush lock, for the MMAP V1 engine) were * acquired within the specified time bound. Otherwise, the respective failure * code and neither lock will be acquired. */ - virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs = UINT_MAX) = 0; + virtual LockResult lockGlobal(LockMode mode) = 0; /** * Requests the global lock to be acquired in the specified mode. * * See the comments for lockBegin/Complete for more information on the semantics. + * The timeout indicates how long to wait for the lock to be acquired. The lockGlobalBegin + * method has a timeout for use with the TicketHolder, if there is one. */ - virtual LockResult lockGlobalBegin(LockMode mode) = 0; - virtual LockResult lockGlobalComplete(unsigned timeoutMs) = 0; + virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) = 0; + virtual LockResult lockGlobalComplete(Milliseconds timeout) = 0; /** * This method is used only in the MMAP V1 storage engine, otherwise it is a no-op. See the @@ -171,9 +171,9 @@ public: * * @param resId Id of the resource to be locked. * @param mode Mode in which the resource should be locked. Lock upgrades are allowed. - * @param timeoutMs How many milliseconds to wait for the lock to be granted, before - * returning LOCK_TIMEOUT. This parameter defaults to UINT_MAX, which means - * wait infinitely. If 0 is passed, the request will return immediately, if + * @param timeout How long to wait for the lock to be granted, before + * returning LOCK_TIMEOUT. This parameter defaults to an infinite timeout. + * If Milliseconds(0) is passed, the request will return immediately, if * the request could not be granted right away. * @param checkDeadlock Whether to enable deadlock detection for this acquisition. This * parameter is put in place until we can handle deadlocks at all places, @@ -183,7 +183,7 @@ public: */ virtual LockResult lock(ResourceId resId, LockMode mode, - unsigned timeoutMs = UINT_MAX, + Milliseconds timeout = Milliseconds::max(), bool checkDeadlock = false) = 0; /** diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h index 24450ceeecc..2de30224d83 100644 --- a/src/mongo/db/concurrency/locker_noop.h +++ b/src/mongo/db/concurrency/locker_noop.h @@ -57,15 +57,15 @@ public: invariant(false); } - virtual LockResult lockGlobal(LockMode mode, unsigned timeoutMs) { + virtual LockResult lockGlobal(LockMode mode) { invariant(false); } - virtual LockResult lockGlobalBegin(LockMode mode) { + virtual LockResult lockGlobalBegin(LockMode mode, Milliseconds timeout) { invariant(false); } - virtual LockResult lockGlobalComplete(unsigned timeoutMs) { + virtual LockResult lockGlobalComplete(Milliseconds timeout) { invariant(false); } @@ -91,7 +91,7 @@ public: virtual LockResult lock(ResourceId resId, LockMode mode, - unsigned timeoutMs, + Milliseconds timeout, bool checkDeadlock) { invariant(false); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 3c05f89ea64..2dbb2b1065e 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -1125,9 +1125,9 @@ static void shutdownTask() { // of this function to prevent any operations from running that need a lock. // DefaultLockerImpl* globalLocker = new DefaultLockerImpl(); - LockResult result = globalLocker->lockGlobalBegin(MODE_X); + LockResult result = globalLocker->lockGlobalBegin(MODE_X, Milliseconds::max()); if (result == LOCK_WAITING) { - result = globalLocker->lockGlobalComplete(UINT_MAX); + result = globalLocker->lockGlobalComplete(Milliseconds::max()); } invariant(LOCK_OK == result); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 51044301e09..8632eddc42a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1770,7 +1770,10 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, return {ErrorCodes::NotMaster, "not primary so can't step down"}; } - Lock::GlobalLock globalReadLock(txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly()); + Lock::GlobalLock globalReadLock(txn->lockState(), + MODE_S, + durationCount<Milliseconds>(stepdownTime), + Lock::GlobalLock::EnqueueOnly()); // We've requested the global shared lock which will stop new writes from coming in, // but existing writes could take a long time to finish, so kill all user operations diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index 78a29709ea1..a250f9c20e0 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -38,6 +38,15 @@ env.Library('ticketholder', LIBDEPS=['$BUILD_DIR/mongo/base', '$BUILD_DIR/third_party/shim_boost']) + +env.CppUnitTest( + target='ticketholder_test', + source=['ticketholder_test.cpp'], + LIBDEPS=[ + 'ticketholder', + '$BUILD_DIR/mongo/unittest/unittest', + ]) + env.Library( target='spin_lock', source=[ diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index 3a99d643419..27eee5f84cc 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -59,27 +59,35 @@ TicketHolder::~TicketHolder() { bool TicketHolder::tryAcquire() { while (0 != sem_trywait(&_sem)) { - switch (errno) { - case EAGAIN: - return false; - case EINTR: - break; - default: - _check(-1); - } + if (errno == EAGAIN) + return false; + if (errno != EINTR) + _check(-1); } return true; } void TicketHolder::waitForTicket() { while (0 != sem_wait(&_sem)) { - switch (errno) { - case EINTR: - break; - default: - _check(-1); - } + if (errno != EINTR) + _check(-1); + } +} + +bool TicketHolder::waitForTicketUntil(Date_t until) { + const long long millisSinceEpoch = until.toMillisSinceEpoch(); + struct timespec ts; + + ts.tv_sec = millisSinceEpoch / 1000; + ts.tv_nsec = (millisSinceEpoch % 1000) * (1000 * 1000); + while (0 != sem_timedwait(&_sem, &ts)) { + if (errno == ETIMEDOUT) + return false; + + if (errno != EINTR) + _check(-1); } + return true; } void TicketHolder::release() { @@ -146,6 +154,12 @@ void TicketHolder::waitForTicket() { } } +bool TicketHolder::waitForTicketUntil(Date_t until) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + return _newTicket.wait_until(lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); }); +} + void TicketHolder::release() { { stdx::lock_guard<stdx::mutex> lk(_mutex); diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 6906dafa897..7a8b34083f0 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -34,6 +34,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/mutex.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -48,6 +49,8 @@ public: void waitForTicket(); + bool waitForTicketUntil(Date_t until); + void release(); Status resize(int newSize); diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp new file mode 100644 index 00000000000..7f5afcbe0cb --- /dev/null +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/ticketholder.h" + +namespace { +using namespace mongo; + +TEST(TicketholderTest, BasicTimeout) { + TicketHolder holder(1); + ASSERT_EQ(holder.used(), 0); + ASSERT_EQ(holder.available(), 1); + ASSERT_EQ(holder.outof(), 1); + + { + ScopedTicket ticket(&holder); + ASSERT_EQ(holder.used(), 1); + ASSERT_EQ(holder.available(), 0); + ASSERT_EQ(holder.outof(), 1); + + ASSERT_FALSE(holder.tryAcquire()); + ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now())); + ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(1))); + ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(42))); + } + + ASSERT_EQ(holder.used(), 0); + ASSERT_EQ(holder.available(), 1); + ASSERT_EQ(holder.outof(), 1); + + ASSERT(holder.waitForTicketUntil(Date_t::now())); + holder.release(); + + ASSERT_EQ(holder.used(), 0); + + ASSERT(holder.waitForTicketUntil(Date_t::now() + Milliseconds(20))); + ASSERT_EQ(holder.used(), 1); + + ASSERT_FALSE(holder.waitForTicketUntil(Date_t::now() + Milliseconds(2))); + holder.release(); + ASSERT_EQ(holder.used(), 0); +} +} // namespace |