diff options
-rw-r--r-- | jstests/noPassthrough/index_commit_currentop_slow.js | 1 | ||||
-rw-r--r-- | jstests/noPassthrough/libs/index_build.js | 4 | ||||
-rw-r--r-- | jstests/noPassthrough/transaction_coordinator_curop_info.js | 4 | ||||
-rw-r--r-- | jstests/sharding/txn_two_phase_commit_killop.js | 11 | ||||
-rw-r--r-- | src/mongo/db/client.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/client.h | 6 | ||||
-rw-r--r-- | src/mongo/db/curop.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/operation_context_test.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_common.cpp | 3 | ||||
-rw-r--r-- | src/mongo/util/interruptible.h | 33 |
10 files changed, 97 insertions, 11 deletions
diff --git a/jstests/noPassthrough/index_commit_currentop_slow.js b/jstests/noPassthrough/index_commit_currentop_slow.js index b52e20032a2..dba290d768b 100644 --- a/jstests/noPassthrough/index_commit_currentop_slow.js +++ b/jstests/noPassthrough/index_commit_currentop_slow.js @@ -48,6 +48,7 @@ assert.soon(function() { const filter = { 'command.commitIndexBuild': {$exists: true}, 'waitingForLatch.captureName': 'FutureResolution', + '$all': true, }; const result = assert.commandWorked(secondaryDB.currentOp(filter)); assert.lte( diff --git a/jstests/noPassthrough/libs/index_build.js b/jstests/noPassthrough/libs/index_build.js index e60dabeb72c..eb8d640ffb7 100644 --- a/jstests/noPassthrough/libs/index_build.js +++ b/jstests/noPassthrough/libs/index_build.js @@ -19,7 +19,7 @@ class IndexBuildTest { * Accepts optional filter that can be used to customize the db.currentOp() query. */ static getIndexBuildOpId(database, collectionName, indexName, filter) { - let pipeline = [{$currentOp: {allUsers: true}}]; + let pipeline = [{$currentOp: {allUsers: true, idleConnections: true}}]; if (filter) { pipeline.push({$match: filter}); } @@ -101,7 +101,7 @@ class IndexBuildTest { * An optional 'onOperationFn' callback accepts an operation to perform any additional checks. */ static assertIndexBuildCurrentOpContents(database, opId, onOperationFn) { - const inprog = database.currentOp({opid: opId}).inprog; + const inprog = database.currentOp({opid: opId, "$all": true}).inprog; assert.eq(1, inprog.length, 'unable to find opid ' + opId + diff --git a/jstests/noPassthrough/transaction_coordinator_curop_info.js b/jstests/noPassthrough/transaction_coordinator_curop_info.js index 881bc2b6aaa..0348eff8b8b 100644 --- a/jstests/noPassthrough/transaction_coordinator_curop_info.js +++ b/jstests/noPassthrough/transaction_coordinator_curop_info.js @@ -36,7 +36,8 @@ function curOpAfterFailpoint(failPoint, filter, timesEntered = 1) { } jsTest.log(`Running curOp operation after '${failPoint.failPointName}' failpoint.`); - let result = adminDB.aggregate([{$currentOp: {}}, {$match: filter}]).toArray(); + let result = + adminDB.aggregate([{$currentOp: {'idleConnections': true}}, {$match: filter}]).toArray(); jsTest.log(`${result.length} matching curOp entries after '${failPoint.failPointName}':\n${ tojson(result)}`); @@ -49,7 +50,6 @@ function curOpAfterFailpoint(failPoint, filter, timesEntered = 1) { function makeWorkerFilterWithAction(session, action, txnNumber) { return { - active: true, 'twoPhaseCommitCoordinator.lsid.id': session.getSessionId().id, 'twoPhaseCommitCoordinator.txnNumber': NumberLong(txnNumber), 'twoPhaseCommitCoordinator.action': action, diff --git a/jstests/sharding/txn_two_phase_commit_killop.js b/jstests/sharding/txn_two_phase_commit_killop.js index f541b602601..66a2a7895ba 100644 --- a/jstests/sharding/txn_two_phase_commit_killop.js +++ b/jstests/sharding/txn_two_phase_commit_killop.js @@ -121,11 +121,12 @@ const testCommitProtocol = function(shouldCommit, failpointData) { failpointData.numTimesShouldBeHit); jsTest.log("Going to find coordinator opCtx ids"); - let coordinatorOps = - coordinator.getDB("admin") - .aggregate( - [{$currentOp: {'allUsers': true}}, {$match: {desc: "TransactionCoordinator"}}]) - .toArray(); + let coordinatorOps = coordinator.getDB("admin") + .aggregate([ + {$currentOp: {'allUsers': true, 'idleConnections': true}}, + {$match: {desc: "TransactionCoordinator"}} + ]) + .toArray(); // Use "greater than or equal to" since, for failpoints that pause the coordinator while // it's sending prepare or sending the decision, there might be one additional thread that's diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index a24e7a06453..ec0283e0468 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -171,6 +171,18 @@ void Client::setCurrent(ServiceContext::UniqueClient client) { currentClient = std::move(client); } +/** + * User connections are listed active so long as they are associated with an opCtx. + * Non-user connections are listed active if they have an opCtx and not waiting on a condvar. + */ +bool Client::hasAnyActiveCurrentOp() const { + if (!_opCtx) + return false; + if (isFromUserConnection() || !_opCtx->isWaitingForConditionOrInterrupt()) + return true; + return false; +} + ThreadClient::ThreadClient(ServiceContext* serviceContext) : ThreadClient(getThreadName(), serviceContext, nullptr) {} diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 64c0e7407c7..f98d17d5252 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -239,6 +239,12 @@ public: */ std::unique_ptr<Locker> swapLockState(std::unique_ptr<Locker> locker); + /** + * Checks if there is an active currentOp associated with this client. + * The definition of active varies between User and System connections. + * Note that the caller must hold the client lock. + */ + bool hasAnyActiveCurrentOp() const; private: friend class ServiceContext; diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 50e53f04b12..a043ef72d0d 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -261,7 +261,7 @@ void CurOp::reportCurrentOpForClient(OperationContext* opCtx, } // Fill out the rest of the BSONObj with opCtx specific details. - infoBuilder->appendBool("active", static_cast<bool>(clientOpCtx)); + infoBuilder->appendBool("active", client->hasAnyActiveCurrentOp()); infoBuilder->append("currentOpTime", opCtx->getServiceContext()->getPreciseClockSource()->now().toString()); diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp index cd2c741f51f..27a8cd8bfd9 100644 --- a/src/mongo/db/operation_context_test.cpp +++ b/src/mongo/db/operation_context_test.cpp @@ -978,6 +978,38 @@ TEST(OperationContextTest, TestWaitForConditionOrInterruptUntilAPI) { ErrorCodes::MaxTimeMSExpired); } +TEST(OperationContextTest, TestIsWaitingForConditionOrInterrupt) { + auto serviceCtx = ServiceContext::make(); + auto client = serviceCtx->makeClient("OperationContextTest"); + auto optCtx = client->makeOperationContext(); + + // Case (1) must return false (immediately after initialization) + ASSERT_FALSE(optCtx->isWaitingForConditionOrInterrupt()); + + // Case (2) must return true while waiting for the condition + + unittest::Barrier barrier(2); + + stdx::thread worker([&] { + auto mutex = MONGO_MAKE_LATCH(); + stdx::condition_variable cv; + stdx::unique_lock<Latch> lk(mutex); + Date_t deadline = Date_t::now() + Milliseconds(300); + optCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [&, i = 0]() mutable { + if (i++ == 0) { + barrier.countDownAndWait(); + } + return false; + }); + }); + + barrier.countDownAndWait(); + ASSERT_TRUE(optCtx->isWaitingForConditionOrInterrupt()); + + worker.join(); + ASSERT_FALSE(optCtx->isWaitingForConditionOrInterrupt()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp index a46305a259a..bb5466b9f8f 100644 --- a/src/mongo/db/pipeline/mongo_process_common.cpp +++ b/src/mongo/db/pipeline/mongo_process_common.cpp @@ -80,7 +80,8 @@ std::vector<BSONObj> MongoProcessCommon::getCurrentOps( } // Ignore inactive connections unless 'idleConnections' is true. - if (!client->getOperationContext() && connMode == CurrentOpConnectionsMode::kExcludeIdle) { + if (connMode == CurrentOpConnectionsMode::kExcludeIdle && + !client->hasAnyActiveCurrentOp()) { continue; } diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h index 3a036f55201..5eb321d2931 100644 --- a/src/mongo/util/interruptible.h +++ b/src/mongo/util/interruptible.h @@ -129,6 +129,24 @@ protected: class Interruptible : public InterruptibleBase { private: /** + * Helper class to properly set _isWaiting for Interruptible instances. + * Every call sequence that waits on a condition/interrupt must hold an instance of WaitContext. + */ + class WaitContext { + public: + WaitContext(Interruptible* interruptible) : _interruptible(interruptible) { + _interruptible->_isWaiting.store(true); + } + + ~WaitContext() { + _interruptible->_isWaiting.store(false); + } + + private: + Interruptible* const _interruptible; + }; + + /** * A deadline guard provides a subsidiary deadline to the parent. */ class DeadlineGuard { @@ -207,6 +225,17 @@ private: public: class WaitListener; + Interruptible() : _isWaiting(false) {} + + /** + * Returns true if currently waiting for a condition/interrupt. + * This function relies on instances of WaitContext to properly set _isWaiting. + * Note that _isWaiting remains true until waitForConditionOrInterruptUntil() returns. + */ + bool isWaitingForConditionOrInterrupt() const { + return _isWaiting.loadRelaxed(); + } + /** * Enum to convey why an Interruptible woke up */ @@ -320,6 +349,7 @@ public: LockT& m, Date_t finalDeadline, PredicateT pred) { + WaitContext waitContext(this); auto latchName = getLatchName(m); auto waitUntil = [&](Date_t deadline, WakeSpeed speed) -> boost::optional<WakeReason> { @@ -447,6 +477,9 @@ protected: static State state; return state; } + +private: + AtomicWord<bool> _isWaiting; }; /** |