diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2019-04-09 16:26:57 -0400 |
---|---|---|
committer | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2019-04-09 22:58:09 -0400 |
commit | 28a6425d9e773c3a0d6879f65b4e2ee47c96fd21 (patch) | |
tree | e113f72d31cda3ab08c915545037cd3f540737d3 /src/mongo | |
parent | 408888993045ebf65e05f3fe6070d58580b774f8 (diff) | |
download | mongo-28a6425d9e773c3a0d6879f65b4e2ee47c96fd21.tar.gz |
SERVER-39848: Add flow control diagnostics.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/concurrency/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/concurrency/flow_control_ticketholder.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/concurrency/flow_control_ticketholder.h | 46 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.h | 9 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker.h | 16 | ||||
-rw-r--r-- | src/mongo/db/curop.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/curop.h | 11 | ||||
-rw-r--r-- | src/mongo/db/introspect.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/operation_context_test.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/flow_control.cpp | 3 |
14 files changed, 223 insertions, 33 deletions
diff --git a/src/mongo/db/concurrency/SConscript b/src/mongo/db/concurrency/SConscript index b2364a65afe..cd43fc4fb44 100644 --- a/src/mongo/db/concurrency/SConscript +++ b/src/mongo/db/concurrency/SConscript @@ -48,8 +48,8 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/util/background_job', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/util/background_job', '$BUILD_DIR/mongo/util/concurrency/spin_lock', '$BUILD_DIR/mongo/util/concurrency/ticketholder', '$BUILD_DIR/third_party/shim_boost', diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp index d15c7a1cafc..df7064ea79c 100644 --- a/src/mongo/db/concurrency/d_concurrency.cpp +++ b/src/mongo/db/concurrency/d_concurrency.cpp @@ -169,12 +169,7 @@ Lock::GlobalLock::GlobalLock(GlobalLock&& otherLock) } void Lock::GlobalLock::_enqueue(LockMode lockMode, Date_t deadline) { - if (lockMode == LockMode::MODE_IX) { - auto ticketholder = FlowControlTicketholder::get(_opCtx); - if (ticketholder) { - ticketholder->getTicket(_opCtx); - } - } + _opCtx->lockState()->getFlowControlTicket(_opCtx, lockMode); try { if (_opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()) { diff --git a/src/mongo/db/concurrency/flow_control_ticketholder.cpp b/src/mongo/db/concurrency/flow_control_ticketholder.cpp index 3c0c5b4a5f4..4439f07b945 100644 --- a/src/mongo/db/concurrency/flow_control_ticketholder.cpp +++ b/src/mongo/db/concurrency/flow_control_ticketholder.cpp @@ -33,17 +33,33 @@ #include "mongo/db/concurrency/flow_control_ticketholder.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/db/operation_context.h" #include "mongo/util/log.h" +#include "mongo/util/time_support.h" namespace mongo { - namespace { const auto getFlowControlTicketholder = ServiceContext::declareDecoration<std::unique_ptr<FlowControlTicketholder>>(); } // namespace +void FlowControlTicketholder::CurOp::writeToBuilder(BSONObjBuilder& infoBuilder) { + infoBuilder.append("waitingForFlowControl", waiting); + BSONObjBuilder flowControl(infoBuilder.subobjStart("flowControlStats")); + if (ticketsAcquired > 0) { + flowControl.append("acquireCount", ticketsAcquired); + } + + if (acquireWaitCount) { + flowControl.append("acquireWaitCount", acquireWaitCount); + } + + if (timeAcquiringMicros) { + flowControl.append("timeAcquiringMicros", timeAcquiringMicros); + } + flowControl.done(); +} + FlowControlTicketholder* FlowControlTicketholder::get(ServiceContext* service) { return getFlowControlTicketholder(service).get(); } @@ -52,8 +68,8 @@ FlowControlTicketholder* FlowControlTicketholder::get(ServiceContext& service) { return getFlowControlTicketholder(service).get(); } -FlowControlTicketholder* FlowControlTicketholder::get(OperationContext* ctx) { - return get(ctx->getClient()->getServiceContext()); +FlowControlTicketholder* FlowControlTicketholder::get(OperationContext* opCtx) { + return get(opCtx->getClient()->getServiceContext()); } void FlowControlTicketholder::set(ServiceContext* service, @@ -70,14 +86,39 @@ void FlowControlTicketholder::refreshTo(int numTickets) { _cv.notify_all(); } -void FlowControlTicketholder::getTicket(OperationContext* opCtx) { +void FlowControlTicketholder::getTicket(OperationContext* opCtx, + FlowControlTicketholder::CurOp* stats) { stdx::unique_lock<stdx::mutex> lk(_mutex); LOG(4) << "Taking ticket. Available: " << _tickets; + if (_tickets == 0) { + ++stats->acquireWaitCount; + } + while (_tickets == 0) { - auto code = uassertStatusOK( - opCtx->waitForConditionOrInterruptNoAssertUntil(_cv, lk, Date_t::max())); - invariant(code != stdx::cv_status::timeout); + stats->waiting = true; + const std::uint64_t startWaitTime = curTimeMicros64(); + + // This method will wait forever for a ticket. However, it will wake up every so often to + // update the time spent waiting on the ticket. + auto waitDeadline = Date_t::now() + Milliseconds(500); + StatusWith<stdx::cv_status> swCondStatus = + opCtx->waitForConditionOrInterruptNoAssertUntil(_cv, lk, waitDeadline); + + auto waitTime = curTimeMicros64() - startWaitTime; + _totalTimeAcquiringMicros.fetchAndAddRelaxed(waitTime); + stats->timeAcquiringMicros += waitTime; + + // If the operation context state interrupted this wait, the StatusWith result will contain + // the error. If the `waitDeadline` expired, the Status variable will be OK, and the + // `cv_status` value will be `cv_status::timeout`. In either case where Status::OK is + // returned, the loop must re-check the predicate. If the operation context is interrupted + // (and an error status is returned), the intended behavior is to bubble an exception up to + // the user. + uassertStatusOK(swCondStatus); } + stats->waiting = false; + ++stats->ticketsAcquired; + --_tickets; } diff --git a/src/mongo/db/concurrency/flow_control_ticketholder.h b/src/mongo/db/concurrency/flow_control_ticketholder.h index c8f2e7aca8f..f8b09b863f0 100644 --- a/src/mongo/db/concurrency/flow_control_ticketholder.h +++ b/src/mongo/db/concurrency/flow_control_ticketholder.h @@ -29,36 +29,68 @@ #pragma once -#include "mongo/db/operation_context.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" namespace mongo { +class OperationContext; +class ServiceContext; + /** * This class is fundamentally a semaphore, but allows a caller to increment by X in constant time. * - * It's expected usage maybe differs from a classic resource management protocol. Typically a client + * Its expected usage maybe differs from a classic resource management protocol. Typically a client * would acquire a ticket when it begins an operation and release the ticket to the pool when the * operation is completed. * * In the context of flow control, clients take a ticket and do not return them to the pool. There - * is an external client that calculates the maximum number of tickets that should be allotted for - * the next second. The consumers will call `getTicket` and the producer will call `refreshTo`. + * is an external service that calculates the maximum number of tickets that should be allotted for + * the next time period (one second). The consumers will call `getTicket` and the producer will call + * `refreshTo`. */ class FlowControlTicketholder { public: - FlowControlTicketholder(int startTickets) : _tickets(startTickets) {} + /** + * A structure to accommodate curop reporting. + */ + struct CurOp { + bool waiting = false; + long long ticketsAcquired = 0; + long long acquireWaitCount = 0; + long long timeAcquiringMicros = 0; + + /** + * Create a sub-object "flowControlStats" on the input builder and write in the structure's + * fields. + */ + void writeToBuilder(BSONObjBuilder& infoBuilder); + }; + + FlowControlTicketholder(int startTickets) : _tickets(startTickets) { + _totalTimeAcquiringMicros.store(0); + } static FlowControlTicketholder* get(ServiceContext* service); static FlowControlTicketholder* get(ServiceContext& service); - static FlowControlTicketholder* get(OperationContext* ctx); + static FlowControlTicketholder* get(OperationContext* opCtx); static void set(ServiceContext* service, std::unique_ptr<FlowControlTicketholder> flowControl); void refreshTo(int numTickets); - void getTicket(OperationContext* opCtx); + void getTicket(OperationContext* opCtx, FlowControlTicketholder::CurOp* stats); + + std::int64_t totalTimeAcquiringMicros() const { + return _totalTimeAcquiringMicros.load(); + } private: + // Use an int64_t as this is serialized to bson which does not support unsigned 64-bit numbers. + AtomicWord<std::int64_t> _totalTimeAcquiringMicros; + stdx::mutex _mutex; stdx::condition_variable _cv; int _tickets; diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index e6d85f0953a..77fdd470462 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -711,11 +711,8 @@ void LockerImpl::restoreLockState(OperationContext* opCtx, const Locker::LockSna invariant(!inAWriteUnitOfWork()); invariant(_modeForTicket == MODE_NONE); - if (opCtx && (state.globalMode == LockMode::MODE_IX)) { - auto ticketholder = FlowControlTicketholder::get(opCtx); - if (ticketholder) { - ticketholder->getTicket(opCtx); - } + if (opCtx) { + getFlowControlTicket(opCtx, state.globalMode); } std::vector<OneLock>::const_iterator it = state.locks.begin(); @@ -899,6 +896,19 @@ void LockerImpl::lockComplete(OperationContext* opCtx, unlockOnErrorGuard.dismiss(); } +void LockerImpl::getFlowControlTicket(OperationContext* opCtx, LockMode lockMode) { + auto ticketholder = FlowControlTicketholder::get(opCtx); + if (ticketholder && lockMode == LockMode::MODE_IX) { + // FlowControl only acts when a MODE_IX global lock is being taken. The clientState + // necessarily should only swap to being a queued writer followed by becoming an active + // writer. This will report clients waiting on flow control inside serverStatus' + // `currentQueue` metrics. + _clientState.store(kQueuedWriter); + ticketholder->getTicket(opCtx, &_flowControlStats); + _clientState.store(kActiveWriter); + } +} + LockResult LockerImpl::lockRSTLBegin(OperationContext* opCtx, LockMode mode) { return lockBegin(opCtx, resourceIdReplicationStateTransitionLock, mode); } diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index f7659dc9b4b..4a0654cc2b8 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -249,6 +249,12 @@ public: lockComplete(nullptr, resId, mode, deadline); } + void getFlowControlTicket(OperationContext* opCtx, LockMode lockMode) override; + + FlowControlTicketholder::CurOp getFlowControlStats() const override { + return _flowControlStats; + } + /** * This function is for unit testing only. */ @@ -348,6 +354,9 @@ private: // available. Note this will be ineffective if uninterruptible lock guard is set. boost::optional<Milliseconds> _maxLockTimeout; + // A structure for accumulating time spent getting flow control tickets. + FlowControlTicketholder::CurOp _flowControlStats; + ////////////////////////////////////////////////////////////////////////////////////////// // // Methods merged from LockState, which should eventually be removed or changed to methods diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index 24f9dfbd532..8585cf0bb1c 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -32,6 +32,7 @@ #include <climits> // For UINT_MAX #include <vector> +#include "mongo/db/concurrency/flow_control_ticketholder.h" #include "mongo/db/concurrency/lock_manager.h" #include "mongo/db/concurrency/lock_stats.h" #include "mongo/db/operation_context.h" @@ -464,6 +465,21 @@ public: bool shouldAcquireTicket() const { return _shouldAcquireTicket; } + + /** + * Acquire a flow control admission ticket into the system. Flow control is used as a + * backpressure mechanism to limit replication majority point lag. + */ + virtual void getFlowControlTicket(OperationContext* opCtx, LockMode lockMode) {} + + /** + * If tracked by an implementation, returns statistics on effort spent acquiring a flow control + * ticket. + */ + virtual FlowControlTicketholder::CurOp getFlowControlStats() const { + return FlowControlTicketholder::CurOp(); + } + /** * This function is for unit testing only. */ diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index cb18faa1337..068de93ec44 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -433,7 +433,10 @@ bool CurOp::completeAndLogOperation(OperationContext* opCtx, << "Interrupted while trying to gather storage statistics for a slow operation"; } } - log(component) << _debug.report(client, *this, (lockerInfo ? &lockerInfo->stats : nullptr)); + log(component) << _debug.report(client, + *this, + (lockerInfo ? &lockerInfo->stats : nullptr), + opCtx->lockState()->getFlowControlStats()); } // Return 'true' if this operation should also be added to the profiler. @@ -598,7 +601,8 @@ StringData getProtoString(int op) { string OpDebug::report(Client* client, const CurOp& curop, - const SingleThreadedLockStats* lockStats) const { + const SingleThreadedLockStats* lockStats, + FlowControlTicketholder::CurOp flowControlStats) const { StringBuilder s; if (iscommand) s << "command "; @@ -699,6 +703,11 @@ string OpDebug::report(Client* client, s << " locks:" << locks.obj().toString(); } + BSONObj flowControlObj = makeFlowControlObject(flowControlStats); + if (flowControlObj.nFields() > 0) { + s << " flowControl:" << flowControlObj.toString(); + } + if (storageStats) { s << " storage:" << storageStats->toBSON().toString(); } @@ -724,6 +733,7 @@ string OpDebug::report(Client* client, void OpDebug::append(const CurOp& curop, const SingleThreadedLockStats& lockStats, + FlowControlTicketholder::CurOp flowControlStats, BSONObjBuilder& b) const { const size_t maxElementSize = 50 * 1024; @@ -776,6 +786,12 @@ void OpDebug::append(const CurOp& curop, lockStats.report(&locks); } + { + BSONObj flowControlMetrics = makeFlowControlObject(flowControlStats); + BSONObjBuilder flowControlBuilder(b.subobjStart("flowControl")); + flowControlBuilder.appendElements(flowControlMetrics); + } + if (storageStats) { b.append("storage", storageStats->toBSON()); } @@ -813,6 +829,24 @@ void OpDebug::setPlanSummaryMetrics(const PlanSummaryStats& planSummaryStats) { replanned = planSummaryStats.replanned; } +BSONObj OpDebug::makeFlowControlObject(FlowControlTicketholder::CurOp stats) const { + BSONObjBuilder builder; + if (stats.ticketsAcquired > 0) { + builder.append("acquireCount", stats.ticketsAcquired); + } + + if (stats.acquireWaitCount > 0) { + builder.append("acquireWaitCount", stats.acquireWaitCount); + } + + if (stats.timeAcquiringMicros > 0) { + builder.append("timeAcquiringMicros", stats.timeAcquiringMicros); + } + + return builder.obj(); +} + + namespace { /** diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index b450928814b..a2466ffeeb2 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -122,7 +122,8 @@ public: std::string report(Client* client, const CurOp& curop, - const SingleThreadedLockStats* lockStats) const; + const SingleThreadedLockStats* lockStats, + FlowControlTicketholder::CurOp flowControlStats) const; /** * Appends information about the current operation to "builder" @@ -132,6 +133,7 @@ public: */ void append(const CurOp& curop, const SingleThreadedLockStats& lockStats, + FlowControlTicketholder::CurOp flowControlStats, BSONObjBuilder& builder) const; /** @@ -139,6 +141,11 @@ public: */ void setPlanSummaryMetrics(const PlanSummaryStats& planSummaryStats); + /** + * The resulting object has zeros omitted. As is typical in this file. + */ + BSONObj makeFlowControlObject(FlowControlTicketholder::CurOp flowControlStats) const; + // ------------------- // basic options @@ -196,6 +203,8 @@ public: // Stores storage statistics. std::shared_ptr<StorageStats> storageStats; + + bool waitingForFlowControl{false}; }; /** diff --git a/src/mongo/db/introspect.cpp b/src/mongo/db/introspect.cpp index febe8742954..bff47300a08 100644 --- a/src/mongo/db/introspect.cpp +++ b/src/mongo/db/introspect.cpp @@ -92,7 +92,8 @@ void profile(OperationContext* opCtx, NetworkOp op) { { Locker::LockerInfo lockerInfo; opCtx->lockState()->getLockerInfo(&lockerInfo, CurOp::get(opCtx)->getLockStatsBase()); - CurOp::get(opCtx)->debug().append(*CurOp::get(opCtx), lockerInfo.stats, b); + CurOp::get(opCtx)->debug().append( + *CurOp::get(opCtx), lockerInfo.stats, opCtx->lockState()->getFlowControlStats(), b); } b.appendDate("ts", jsTime()); diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp index 998084a74b3..4dd0e8ad91d 100644 --- a/src/mongo/db/operation_context_test.cpp +++ b/src/mongo/db/operation_context_test.cpp @@ -957,6 +957,44 @@ TEST_F(ThreadedOperationDeadlineTests, SleepForWithExpiredForDoesNotBlock) { ASSERT_FALSE(waiterResult.get()); } +TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) { + // `waitForConditionOrInterruptNoAssertUntil` can have three outcomes: + // + // 1) The condition is satisfied before any timeouts. + // 2) The explicit `deadline` function argument is triggered. + // 3) The operation context implicitly times out, or is interrupted from a killOp command or + // shutdown, etc. + // + // Case (1) must return a Status::OK with a value of `cv_status::no_timeout`. Case (2) must also + // return a Status::OK with a value of `cv_status::timeout`. Case (3) must return an error + // status. The error status returned is otherwise configurable. + // + // Case (1) is the hardest to test. The condition variable must be notified by a second thread + // when the client is waiting on it. Case (1) is also the least in need of having the API + // tested, thus it's omitted from being tested here. + auto serviceCtx = ServiceContext::make(); + auto client = serviceCtx->makeClient("OperationContextTest"); + auto opCtx = client->makeOperationContext(); + + stdx::mutex mutex; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(mutex); + + // Case (2). Expect a Status::OK with a cv_status::timeout. + Date_t deadline = Date_t::now() + Milliseconds(500); + StatusWith<stdx::cv_status> ret = + opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline); + ASSERT_OK(ret.getStatus()); + ASSERT(ret.getValue() == stdx::cv_status::timeout); + + // Case (3). Expect an error of `MaxTimeMSExpired`. + opCtx->setDeadlineByDate(Date_t::now(), ErrorCodes::MaxTimeMSExpired); + deadline = Date_t::now() + Seconds(500); + ret = opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline); + ASSERT_FALSE(ret.isOK()); + ASSERT_EQUALS(ErrorCodes::MaxTimeMSExpired, ret.getStatus().code()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 19c0dce4418..d8cd37fe5a3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -315,6 +315,7 @@ env.Library( 'mongo_process_common', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/concurrency/flow_control_ticketholder', '$BUILD_DIR/mongo/db/session_catalog', '$BUILD_DIR/mongo/db/storage/backup_cursor_hooks', '$BUILD_DIR/mongo/db/transaction', diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 58237a83689..ca8f42f7d16 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -573,6 +573,9 @@ BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient( CurOp::get(*clientOpCtx)->getLockStatsBase())) { fillLockerInfo(*lockerInfo, builder); } + + auto flowControlStats = clientOpCtx->lockState()->getFlowControlStats(); + flowControlStats.writeToBuilder(builder); } return builder.obj(); diff --git a/src/mongo/db/storage/flow_control.cpp b/src/mongo/db/storage/flow_control.cpp index 8eb1c8022aa..b600bdb3117 100644 --- a/src/mongo/db/storage/flow_control.cpp +++ b/src/mongo/db/storage/flow_control.cpp @@ -127,7 +127,8 @@ BSONObj FlowControl::generateSection(OperationContext* opCtx, BSONObjBuilder bob; bob.append("targetRateLimit", _lastTargetTicketsPermitted.load()); - bob.append("timeAcquiringMicros", 0); + bob.append("timeAcquiringMicros", + FlowControlTicketholder::get(opCtx)->totalTimeAcquiringMicros()); bob.append("locksPerOp", _lastLocksPerOp.load()); bob.append("sustainerRate", _lastSustainerAppliedCount.load()); bob.append("isLagged", lagSecs >= gFlowControlTargetLagSeconds.load()); |