diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2016-04-14 09:58:12 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2016-05-23 10:27:00 -0400 |
commit | c9aac9d6eaba6ef2eb8903f07e997b594e88addc (patch) | |
tree | b8222b86f1de0a352048dd269f547fe31f4dc13c /src | |
parent | fb052c13742deab981c7ba729dad36e33bdd7393 (diff) | |
download | mongo-c9aac9d6eaba6ef2eb8903f07e997b594e88addc.tar.gz |
SERVER-18277 Track operation deadlines in OperationContext, not CurOp.
This also unifies the implementations of checkForInterrupt and checkForInterruptNoAssert.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 9 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/commands/parallel_collection_scan.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/curop.cpp | 121 | ||||
-rw-r--r-- | src/mongo/db/curop.h | 84 | ||||
-rw-r--r-- | src/mongo/db/curop_test.cpp | 106 | ||||
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/operation_context.cpp | 155 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 77 | ||||
-rw-r--r-- | src/mongo/db/operation_context_impl.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/operation_context_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/db/operation_context_noop.h | 9 | ||||
-rw-r--r-- | src/mongo/db/operation_context_test.cpp | 72 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/operation_context_repl_mock.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/operation_context_repl_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 10 |
20 files changed, 324 insertions, 458 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 44acf010272..752b6c683fc 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -162,15 +162,15 @@ env.Library( ) env.CppUnitTest( - target='curop_test', + target='operation_context_test', source=[ - 'curop_test.cpp', + 'operation_context_test.cpp', ], LIBDEPS=[ - 'commands_test_crutch', - 'curop', + 'service_context', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/util/clock_source_mock', ], ) @@ -468,6 +468,7 @@ env.Library( '$BUILD_DIR/mongo/util/clock_sources', '$BUILD_DIR/mongo/util/concurrency/spin_lock', '$BUILD_DIR/mongo/util/decorable', + '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/hostandport', ], ) diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 7441c60c01e..547ebc4bfa1 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -331,7 +331,7 @@ public: cursorExec->saveState(); cursorExec->detachFromOperationContext(); - cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); + cursor->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); cursor->setPos(numResults); // Fill out curop based on the results. diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 341bc67b8ed..b75af0436dd 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -277,7 +277,7 @@ public: // Reset timeout timer on the cursor since the cursor is still in use. cursor->setIdleTime(0); - const bool hasOwnMaxTime = curOp->isMaxTimeSet(); + const bool hasOwnMaxTime = txn->isMaxTimeSet(); if (!hasOwnMaxTime) { // There is no time limit set directly on this getMore command. If the cursor is @@ -286,9 +286,9 @@ public: // applying it to this getMore. if (isCursorAwaitData(cursor)) { Seconds awaitDataTimeout(1); - curOp->setMaxTimeMicros(durationCount<Microseconds>(awaitDataTimeout)); + txn->setMaxTimeMicros(durationCount<Microseconds>(awaitDataTimeout)); } else { - curOp->setMaxTimeMicros(cursor->getLeftoverMaxTimeMicros()); + txn->setMaxTimeMicros(cursor->getLeftoverMaxTimeMicros()); } } txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. @@ -357,7 +357,7 @@ public: ctx.reset(); // Block waiting for data. - Microseconds timeout(static_cast<int64_t>(curOp->getRemainingMaxTimeMicros())); + Microseconds timeout(static_cast<int64_t>(txn->getRemainingMaxTimeMicros())); notifier->wait(notifierVersion, timeout); notifier.reset(); @@ -400,7 +400,7 @@ public: // from a previous find, then don't roll remaining micros over to the next // getMore. if (!hasOwnMaxTime) { - cursor->setLeftoverMaxTimeMicros(curOp->getRemainingMaxTimeMicros()); + cursor->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); } cursor->incPos(numResults); diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index bc280f4b937..425d5eb8791 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -149,7 +149,7 @@ public: exec.release(), ns.ns(), txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); - cc->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); + cc->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); BSONObjBuilder threadResult; appendCursorResponseObject(cc->cursorid(), ns.ns(), BSONArray(), &threadResult); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 5861d407828..c3154f96391 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -133,7 +133,7 @@ static bool handleCursorCommand(OperationContext* txn, if (cursor) { // If a time limit was set on the pipeline, remaining time is "rolled over" to the // cursor (for use by future getmore ops). - cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); + cursor->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); CurOp::get(txn)->debug().cursorid = cursor->cursorid(); diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 0dd0c68d7ce..3c2a30cd932 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -41,7 +41,6 @@ #include "mongo/db/json.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/plan_summary_stats.h" -#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { @@ -220,19 +219,6 @@ private: const OperationContext::Decoration<CurOp::CurOpStack> CurOp::_curopStack = OperationContext::declareDecoration<CurOp::CurOpStack>(); -// Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a -// valid non-zero max time to fail immediately. Any getmore operation on a cursor already -// created with a valid non-zero max time will also fail immediately. -// -// This fail point cannot be used with the maxTimeNeverTimeOut fail point. -MONGO_FP_DECLARE(maxTimeAlwaysTimeOut); - -// Enabling the maxTimeNeverTimeOut fail point will cause the server to never time out any -// query, command, or getmore operation, regardless of whether a max time is set. -// -// This fail point cannot be used with the maxTimeAlwaysTimeOut fail point. -MONGO_FP_DECLARE(maxTimeNeverTimeOut); - CurOp* CurOp::get(const OperationContext* opCtx) { return get(*opCtx); } @@ -244,10 +230,6 @@ CurOp* CurOp::get(const OperationContext& opCtx) { CurOp::CurOp(OperationContext* opCtx) : CurOp(opCtx, &_curopStack(opCtx)) {} CurOp::CurOp(OperationContext* opCtx, CurOpStack* stack) : _stack(stack) { - if (_stack->top()) { - // Child operation should inherit the previous operation's timeout. - setMaxTimeMicros(_stack->top()->getRemainingMaxTimeMicros()); - } if (opCtx) { _stack->push(opCtx, this); } else { @@ -284,13 +266,6 @@ void CurOp::setNS_inlock(StringData ns) { void CurOp::ensureStarted() { if (_start == 0) { _start = curTimeMicros64(); - - // If ensureStarted() is invoked after setMaxTimeMicros(), then time limit tracking will - // start here. This is because time limit tracking can only commence after the - // operation is assigned a start time. - if (_maxTimeMicros > 0) { - _maxTimeTracker.setTimeLimit(_start, _maxTimeMicros); - } } } @@ -391,102 +366,6 @@ void CurOp::reportState(BSONObjBuilder* builder) { builder->append("numYields", _numYields); } -void CurOp::setMaxTimeMicros(uint64_t maxTimeMicros) { - _maxTimeMicros = maxTimeMicros; - - if (_maxTimeMicros == 0) { - // 0 is "allow to run indefinitely". - return; - } - - // If the operation has a start time, then enable the tracker. - // - // If the operation has no start time yet, then ensureStarted() will take responsibility for - // enabling the tracker. - if (isStarted()) { - _maxTimeTracker.setTimeLimit(startTime(), _maxTimeMicros); - } -} - -bool CurOp::isMaxTimeSet() const { - return _maxTimeMicros != 0; -} - -bool CurOp::maxTimeHasExpired() { - if (MONGO_FAIL_POINT(maxTimeNeverTimeOut)) { - return false; - } - if (_maxTimeMicros > 0 && MONGO_FAIL_POINT(maxTimeAlwaysTimeOut)) { - return true; - } - return _maxTimeTracker.checkTimeLimit(); -} - -uint64_t CurOp::getRemainingMaxTimeMicros() const { - return _maxTimeTracker.getRemainingMicros(); -} - -void CurOp::MaxTimeTracker::setTimeLimit(uint64_t startEpochMicros, uint64_t durationMicros) { - dassert(durationMicros != 0); - - _enabled = true; - - _targetEpochMicros = startEpochMicros + durationMicros; - - uint64_t now = curTimeMicros64(); - // If our accurate time source thinks time is not up yet, calculate the next target for - // our approximate time source. - if (_targetEpochMicros > now) { - _approxTargetServerMillis = Listener::getElapsedTimeMillis() + - static_cast<int64_t>((_targetEpochMicros - now) / 1000); - } - // Otherwise, set our approximate time source target such that it thinks time is already - // up. - else { - _approxTargetServerMillis = Listener::getElapsedTimeMillis(); - } -} - -bool CurOp::MaxTimeTracker::checkTimeLimit() { - if (!_enabled) { - return false; - } - - // Does our approximate time source think time is not up yet? If so, return early. - if (_approxTargetServerMillis > Listener::getElapsedTimeMillis()) { - return false; - } - - uint64_t now = curTimeMicros64(); - // Does our accurate time source think time is not up yet? If so, readjust the target for - // our approximate time source and return early. - if (_targetEpochMicros > now) { - _approxTargetServerMillis = Listener::getElapsedTimeMillis() + - static_cast<int64_t>((_targetEpochMicros - now) / 1000); - return false; - } - - // Otherwise, time is up. - return true; -} - -uint64_t CurOp::MaxTimeTracker::getRemainingMicros() const { - if (!_enabled) { - // 0 is "allow to run indefinitely". - return 0; - } - - // Does our accurate time source think time is up? If so, claim there is 1 microsecond - // left for this operation. - uint64_t now = curTimeMicros64(); - if (_targetEpochMicros <= now) { - return 1; - } - - // Otherwise, calculate remaining time. - return _targetEpochMicros - now; -} - namespace { StringData getProtoString(int op) { if (op == dbQuery) { diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index 80606c276c8..7a357b42d9b 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -254,36 +254,6 @@ public: } // - // Methods for controlling CurOp "max time". - // - - /** - * Sets the amount of time operation this should be allowed to run, units of microseconds. - * The special value 0 is "allow to run indefinitely". - */ - void setMaxTimeMicros(uint64_t maxTimeMicros); - - /** - * Returns true if a time limit has been set on this operation, and false otherwise. - */ - bool isMaxTimeSet() const; - - /** - * Checks whether this operation has been running longer than its time limit. Returns - * false if not, or if the operation has no time limit. - */ - bool maxTimeHasExpired(); - - /** - * Returns the number of microseconds remaining for this operation's time limit, or the - * special value 0 if the operation has no time limit. - * - * Calling this method is more expensive than calling its sibling "maxTimeHasExpired()", - * since an accurate measure of remaining time needs to be calculated. - */ - uint64_t getRemainingMaxTimeMicros() const; - - // // Methods for getting/setting elapsed time. Note that the observed elapsed time may be // negative, if the system time has been reset during the course of this operation. // @@ -440,58 +410,6 @@ private: // so this should be 30000 in that case long long _expectedLatencyMs{0}; - // Time limit for this operation. 0 if the operation has no time limit. - uint64_t _maxTimeMicros{0u}; - std::string _planSummary; - - /** Nested class that implements tracking of a time limit for a CurOp object. */ - class MaxTimeTracker { - MONGO_DISALLOW_COPYING(MaxTimeTracker); - - public: - /** Newly-constructed MaxTimeTracker objects have the time limit disabled. */ - MaxTimeTracker() = default; - - /** Returns whether or not time tracking is enabled. */ - bool isEnabled() const { - return _enabled; - } - - /** - * Enables time tracking. The time limit is set to be "durationMicros" microseconds - * from "startEpochMicros" (units of microseconds since the epoch). - * - * "durationMicros" must be nonzero. - */ - void setTimeLimit(uint64_t startEpochMicros, uint64_t durationMicros); - - /** - * Checks whether the time limit has been hit. Returns false if not, or if time - * tracking is disabled. - */ - bool checkTimeLimit(); - - /** - * Returns the number of microseconds remaining for the time limit, or the special - * value 0 if time tracking is disabled. - * - * Calling this method is more expensive than calling its sibling "checkInterval()", - * since an accurate measure of remaining time needs to be calculated. - */ - uint64_t getRemainingMicros() const; - - private: - // Whether or not time tracking is enabled for this operation. - bool _enabled{false}; - - // Point in time at which the time limit is hit. Units of microseconds since the - // epoch. - uint64_t _targetEpochMicros{0}; - - // Approximate point in time at which the time limit is hit. Units of milliseconds - // since the server process was started. - int64_t _approxTargetServerMillis{0}; - } _maxTimeTracker; }; -} +} // namespace mongo diff --git a/src/mongo/db/curop_test.cpp b/src/mongo/db/curop_test.cpp deleted file mode 100644 index 84c85e3cef9..00000000000 --- a/src/mongo/db/curop_test.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/client.h" -#include "mongo/db/curop.h" -#include "mongo/db/operation_context_noop.h" -#include "mongo/db/service_context.h" -#include "mongo/db/service_context_noop.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { - -namespace { - -const long long intervalLong = 2000 * 1000; // 2s in micros -const long long intervalShort = 10 * 1000; // 10ms in micros - - -class TestListener : public Listener { -public: - TestListener() : Listener("test", "", 0) {} // port 0 => any available high port - void accepted(AbstractMessagingPort* mp) override {} -}; - -AtomicUInt32 threadInitialized(0); - -class TestListenerFixtureClass : public mongo::unittest::Test { -protected: - /** - * This will start the global listener thread, t, if it hasn't been initialized yet. - */ - void setUp() { - if (threadInitialized.fetchAndAdd(1) > 0) { - return; - } - stdx::thread t = stdx::thread([this]() { - TestListener listener; - listener.setAsTimeTracker(); - listener.setupSockets(); - listener.initAndListen(); - }); - t.detach(); - // Wait for listener thread to start tracking time. - while (Listener::getElapsedTimeMillis() == 0) { - sleepmillis(10); - } - } -}; - -// Long operation + short timeout => time should expire. -TEST_F(TestListenerFixtureClass, TimeHasExpiredPosSimple) { - auto service = stdx::make_unique<ServiceContextNoop>(); - auto client = service->makeClient("CurOpTest"); - OperationContextNoop txn(client.get(), 100); - CurOp curOp(&txn); - curOp.setMaxTimeMicros(intervalShort); - curOp.ensureStarted(); - sleepmicros(intervalLong); - ASSERT_TRUE(curOp.maxTimeHasExpired()); -} - -// Short operation + long timeout => time should not expire. -TEST_F(TestListenerFixtureClass, TimeHasExpiredNegSimple) { - auto service = stdx::make_unique<ServiceContextNoop>(); - auto client = service->makeClient("CurOpTest"); - OperationContextNoop txn(client.get(), 100); - CurOp curOp(&txn); - curOp.setMaxTimeMicros(intervalLong); - curOp.ensureStarted(); - sleepmicros(intervalShort); - ASSERT_FALSE(curOp.maxTimeHasExpired()); -} - -} // namespace - -} // namespace mongo diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index f04a43d1b69..aaaf4f7de67 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1352,7 +1352,7 @@ void Command::execCommand(OperationContext* txn, "no such command option $maxTimeMs; use maxTimeMS instead", extractedFields[kQueryOptionMaxTimeMSField].eoo()); - CurOp::get(txn)->setMaxTimeMicros(static_cast<unsigned long long>(maxTimeMS) * 1000); + txn->setMaxTimeMicros(static_cast<unsigned long long>(maxTimeMS) * 1000); // Operations are only versioned against the primary. We also make sure not to redo shard // version handling if this command was issued via the direct client. diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index c8e4ad61137..2006c4f0470 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -26,15 +26,51 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + #include "mongo/platform/basic.h" -#include "mongo/db/client.h" #include "mongo/db/operation_context.h" +#include "mongo/db/client.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/random.h" #include "mongo/util/assert_util.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" namespace mongo { +namespace { +// Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a +// valid non-zero max time to fail immediately. Any getmore operation on a cursor already +// created with a valid non-zero max time will also fail immediately. +// +// This fail point cannot be used with the maxTimeNeverTimeOut fail point. +MONGO_FP_DECLARE(maxTimeAlwaysTimeOut); + +// Enabling the maxTimeNeverTimeOut fail point will cause the server to never time out any +// query, command, or getmore operation, regardless of whether a max time is set. +// +// This fail point cannot be used with the maxTimeAlwaysTimeOut fail point. +MONGO_FP_DECLARE(maxTimeNeverTimeOut); + +// Enabling the checkForInterruptFail fail point will start a game of random chance on the +// connection specified in the fail point data, generating an interrupt with a given fixed +// probability. Example invocation: +// +// {configureFailPoint: "checkForInterruptFail", +// mode: "alwaysOn", +// data: {conn: 17, chance: .01}} +// +// Both data fields must be specified. In the above example, all interrupt points on connection 17 +// will generate a kill on the current operation with probability p(.01), including interrupt points +// of nested operations. "chance" must be a double between 0 and 1, inclusive. +MONGO_FP_DECLARE(checkForInterruptFail); + +} // namespace + OperationContext::OperationContext(Client* client, unsigned int opId, Locker* locker) : _client(client), _opId(opId), _locker(locker) {} @@ -43,4 +79,121 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) { _killCode.compareAndSwap(ErrorCodes::OK, killCode); } +void OperationContext::setDeadlineByDate(Date_t when) { + _deadline = when; +} + +void OperationContext::setDeadlineRelativeToNow(Milliseconds maxTimeMs) { + auto clock = getServiceContext()->getFastClockSource(); + setDeadlineByDate(clock->now() + clock->getPrecision() + maxTimeMs); +} + +bool OperationContext::hasDeadlineExpired() const { + if (MONGO_FAIL_POINT(maxTimeNeverTimeOut)) { + return false; + } + if (hasDeadline() && MONGO_FAIL_POINT(maxTimeAlwaysTimeOut)) { + return true; + } + + const auto now = getServiceContext()->getFastClockSource()->now(); + return now >= getDeadline(); +} + +Milliseconds OperationContext::getTimeUntilDeadline() const { + const auto now = getServiceContext()->getFastClockSource()->now(); + return getDeadline() - now; +} + +void OperationContext::setMaxTimeMicros(uint64_t maxTimeMicros) { + const auto maxTimeMicrosSigned = static_cast<int64_t>(maxTimeMicros); + if (maxTimeMicrosSigned <= 0) { + // Do not adjust the deadline if the user specified "0", meaning "forever", or + // if they chose a value too large to represent as a 64-bit signed integer. + return; + } + if (maxTimeMicrosSigned == 1) { + // This indicates that the time is already expired. Set the deadline to the epoch. + setDeadlineByDate(Date_t{}); + return; + } + setDeadlineRelativeToNow(Microseconds{maxTimeMicrosSigned}); +} + +bool OperationContext::isMaxTimeSet() const { + return hasDeadline(); +} + +uint64_t OperationContext::getRemainingMaxTimeMicros() const { + if (!hasDeadline()) { + return 0U; + } + const auto microsRemaining = durationCount<Microseconds>(getTimeUntilDeadline()); + if (microsRemaining <= 0) { + // If the operation deadline has passed, say there is 1 microsecond remaining, to + // distinguish from 0, which means infinity. + return 1U; + } + return static_cast<uint64_t>(microsRemaining); +} + +void OperationContext::checkForInterrupt() { + uassertStatusOK(checkForInterruptNoAssert()); +} + +namespace { + +// Helper function for checkForInterrupt fail point. Decides whether the operation currently +// being run by the given Client meet the (probabilistic) conditions for interruption as +// specified in the fail point info. +bool opShouldFail(const OperationContext* opCtx, const BSONObj& failPointInfo) { + // Only target the client with the specified connection number. + if (opCtx->getClient()->getConnectionId() != failPointInfo["conn"].safeNumberLong()) { + return false; + } + + // Return true with (approx) probability p = "chance". Recall: 0 <= chance <= 1. + double next = static_cast<double>(std::abs(opCtx->getClient()->getPrng().nextInt64())); + double upperBound = + std::numeric_limits<int64_t>::max() * failPointInfo["chance"].numberDouble(); + if (next > upperBound) { + return false; + } + return true; +} + +} // namespace + +Status OperationContext::checkForInterruptNoAssert() { + // TODO: Remove once all OperationContexts are properly connected to Clients and ServiceContexts + // in tests. + if (MONGO_unlikely(!getClient() || !getServiceContext() || + !getServiceContext()->getFastClockSource())) { + return Status::OK(); + } + + if (getServiceContext() && getServiceContext()->getKillAllOperations()) { + return Status(ErrorCodes::InterruptedAtShutdown, "interrupted at shutdown"); + } + + if (hasDeadlineExpired()) { + markKilled(); + return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); + } + + MONGO_FAIL_POINT_BLOCK(checkForInterruptFail, scopedFailPoint) { + if (opShouldFail(this, scopedFailPoint.getData())) { + log() << "set pending kill on op " << getOpID() << ", for checkForInterruptFail"; + markKilled(); + } + } + + const auto killStatus = getKillStatus(); + if (killStatus != ErrorCodes::OK) { + return Status(killStatus, "operation was interrupted"); + } + + return Status::OK(); +} + } // namespace mongo diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 0c26404dae1..70fae5f4f1b 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -37,6 +37,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/platform/atomic_word.h" #include "mongo/util/decorable.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -113,12 +114,12 @@ public: /** * Raises a UserAssertion if this operation is in a killed state. */ - virtual void checkForInterrupt() = 0; + void checkForInterrupt(); /** * Returns Status::OK() unless this operation is in a killed state. */ - virtual Status checkForInterruptNoAssert() = 0; + Status checkForInterruptNoAssert(); /** * Delegates to CurOp, but is included here to break dependencies. @@ -152,8 +153,6 @@ public: return _client; } - virtual uint64_t getRemainingMaxTimeMicros() const = 0; - /** * Returns the operation ID associated with this operation. */ @@ -218,12 +217,79 @@ public: return getKillStatus() != ErrorCodes::OK; } + /** + * Sets the deadline for this operation to the given point in time. + * + * To remove a deadline, pass in Date_t::max(). + */ + void setDeadlineByDate(Date_t when); + + /** + * Sets the deadline for this operation to the maxTimeMs plus the current time reported + * by the ServiceContext. + */ + void setDeadlineRelativeToNow(Milliseconds maxTimeMs); + template <typename D> + void setDeadlineRelativeToNow(D maxTime) { + setDeadlineRelativeToNow(duration_cast<Milliseconds>(maxTime)); + } + + /** + * Returns true if this operation has a deadline. + */ + bool hasDeadline() const { + return getDeadline() < Date_t::max(); + } + + /** + * Returns the deadline for this operation, or Date_t::max() if there is no deadline. + */ + Date_t getDeadline() const { + return _deadline; + } + + /** + * Returns the amount of time until the deadline, according to the fast clock on + * ServiceContext. If this value is less than zero, the deadline has passed. + */ + Milliseconds getTimeUntilDeadline() const; + + // + // Legacy "max time" methods for controlling operation deadlines. + // + + /** + * Sets the amount of time operation this should be allowed to run, units of microseconds. + * The special value 0 is "allow to run indefinitely". + */ + void setMaxTimeMicros(uint64_t maxTimeMicros); + + /** + * Returns true if a time limit has been set on this operation, and false otherwise. + */ + bool isMaxTimeSet() const; + + /** + * Returns the number of microseconds remaining for this operation's time limit, or the + * special value 0 if the operation has no time limit. + * + * This method is virtual because some subclasses used for tests override it. It should not + * remain virtual. + */ + virtual uint64_t getRemainingMaxTimeMicros() const; + protected: OperationContext(Client* client, unsigned int opId, Locker* locker); RecoveryUnitState _ruState = kNotInUnitOfWork; private: + /** + * Returns true if this operation has a deadline and it has passed according to the fast clock + * on ServiceContext. + */ + bool hasDeadlineExpired() const; + friend class WriteUnitOfWork; Client* const _client; const unsigned int _opId; @@ -237,6 +303,9 @@ private: AtomicWord<ErrorCodes::Error> _killCode{ErrorCodes::OK}; WriteConcernOptions _writeConcern; + + Date_t _deadline = + Date_t::max(); // The timepoint at which this operation exceeds its time limit. }; class WriteUnitOfWork { diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index 67483b4c961..4d984a9e754 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -26,8 +26,6 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - #include "mongo/platform/basic.h" #include "mongo/db/operation_context_impl.h" @@ -41,10 +39,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/storage/storage_engine.h" -#include "mongo/platform/random.h" #include "mongo/stdx/memory.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" namespace mongo { @@ -121,83 +116,6 @@ string OperationContextImpl::getNS() const { return CurOp::get(this)->getNS(); } -uint64_t OperationContextImpl::getRemainingMaxTimeMicros() const { - return CurOp::get(this)->getRemainingMaxTimeMicros(); -} - -// Enabling the checkForInterruptFail fail point will start a game of random chance on the -// connection specified in the fail point data, generating an interrupt with a given fixed -// probability. Example invocation: -// -// {configureFailPoint: "checkForInterruptFail", -// mode: "alwaysOn", -// data: {conn: 17, chance: .01, allowNested: true}} -// -// All three data fields must be specified. In the above example, all interrupt points on -// connection 17 will generate a kill on the current operation with probability p(.01), -// including interrupt points of nested operations. If "allowNested" is false, nested -// operations are not targeted. "chance" must be a double between 0 and 1, inclusive. -MONGO_FP_DECLARE(checkForInterruptFail); - -namespace { - -// Helper function for checkForInterrupt fail point. Decides whether the operation currently -// being run by the given Client meet the (probabilistic) conditions for interruption as -// specified in the fail point info. -bool opShouldFail(const OperationContextImpl* opCtx, const BSONObj& failPointInfo) { - // Only target the client with the specified connection number. - if (opCtx->getClient()->getConnectionId() != failPointInfo["conn"].safeNumberLong()) { - return false; - } - - // Only target nested operations if requested. - if (!failPointInfo["allowNested"].trueValue() && CurOp::get(opCtx)->parent() != NULL) { - return false; - } - - // Return true with (approx) probability p = "chance". Recall: 0 <= chance <= 1. - double next = static_cast<double>(std::abs(opCtx->getClient()->getPrng().nextInt64())); - double upperBound = - std::numeric_limits<int64_t>::max() * failPointInfo["chance"].numberDouble(); - if (next > upperBound) { - return false; - } - return true; -} - -} // namespace - -void OperationContextImpl::checkForInterrupt() { - uassertStatusOK(checkForInterruptNoAssert()); -} - -Status OperationContextImpl::checkForInterruptNoAssert() { - if (getServiceContext()->getKillAllOperations()) { - return Status(ErrorCodes::InterruptedAtShutdown, "interrupted at shutdown"); - } - - CurOp* curOp = CurOp::get(this); - if (curOp->maxTimeHasExpired()) { - markKilled(); - return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); - } - - MONGO_FAIL_POINT_BLOCK(checkForInterruptFail, scopedFailPoint) { - if (opShouldFail(this, scopedFailPoint.getData())) { - log() << "set pending kill on " << (curOp->parent() ? "nested" : "top-level") << " op " - << getOpID() << ", for checkForInterruptFail"; - markKilled(); - } - } - - const auto killStatus = getKillStatus(); - if (killStatus != ErrorCodes::OK) { - return Status(killStatus, "operation was interrupted"); - } - - return Status::OK(); -} - bool OperationContextImpl::isPrimaryFor(StringData ns) { return repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(NamespaceString(ns)); } diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h index 1ebee1a0f77..a2b53710c20 100644 --- a/src/mongo/db/operation_context_impl.h +++ b/src/mongo/db/operation_context_impl.h @@ -50,11 +50,6 @@ public: virtual std::string getNS() const override; - virtual uint64_t getRemainingMaxTimeMicros() const override; - - virtual void checkForInterrupt() override; - virtual Status checkForInterruptNoAssert() override; - virtual bool isPrimaryFor(StringData ns) override; virtual void setReplicatedWrites(bool writesAreReplicated = true) override; diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h index 3d380605d58..0c83bdbeec8 100644 --- a/src/mongo/db/operation_context_noop.h +++ b/src/mongo/db/operation_context_noop.h @@ -92,11 +92,6 @@ public: return &_pm; } - virtual void checkForInterrupt() override {} - virtual Status checkForInterruptNoAssert() override { - return Status::OK(); - } - virtual bool isPrimaryFor(StringData ns) override { return true; } @@ -111,10 +106,6 @@ public: return false; } - virtual uint64_t getRemainingMaxTimeMicros() const override { - return 0; - } - private: std::unique_ptr<RecoveryUnit> _recoveryUnit; std::unique_ptr<Locker> _locker; diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp new file mode 100644 index 00000000000..b1210afc358 --- /dev/null +++ b/src/mongo/db/operation_context_test.cpp @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/client.h" +#include "mongo/db/service_context.h" +#include "mongo/db/service_context_noop.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +namespace { + +class OperationDeadlineTests : public unittest::Test { +public: + void setUp() { + auto uniqueMockClock = stdx::make_unique<ClockSourceMock>(); + mockClock = uniqueMockClock.get(); + service = stdx::make_unique<ServiceContextNoop>(); + service->setFastClockSource(std::move(uniqueMockClock)); + } + + ClockSourceMock* mockClock; + std::unique_ptr<ServiceContext> service; +}; + +TEST_F(OperationDeadlineTests, OperationDeadlineExpiration) { + auto client = service->makeClient("CurOpTest"); + auto txn = client->makeOperationContext(); + txn->setMaxTimeMicros(durationCount<Microseconds>(Seconds{1})); + mockClock->advance(Milliseconds{500}); + ASSERT_OK(txn->checkForInterruptNoAssert()); + mockClock->advance(Milliseconds{499}); + ASSERT_OK(txn->checkForInterruptNoAssert()); + mockClock->advance(Milliseconds{1}); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, txn->checkForInterruptNoAssert()); + mockClock->advance(Milliseconds{1}); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, txn->checkForInterruptNoAssert()); +} + +} // namespace + +} // namespace mongo diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 454a71ddfab..7d979e4a891 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -323,7 +323,7 @@ QueryResult::View getMore(OperationContext* txn, // If the operation that spawned this cursor had a time limit set, apply leftover // time to this getmore. - curOp.setMaxTimeMicros(cc->getLeftoverMaxTimeMicros()); + txn->setMaxTimeMicros(cc->getLeftoverMaxTimeMicros()); txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. // Ensure that the original query or command object is available in the slow query log, @@ -465,7 +465,7 @@ QueryResult::View getMore(OperationContext* txn, // If the getmore had a time limit, remaining time is "rolled over" back to the // cursor (for use by future getmore ops). - cc->setLeftoverMaxTimeMicros(curOp.getRemainingMaxTimeMicros()); + cc->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); } } @@ -547,7 +547,7 @@ std::string runQuery(OperationContext* txn, } // Handle query option $maxTimeMS (not used with commands). - curOp.setMaxTimeMicros(static_cast<unsigned long long>(pq.getMaxTimeMS()) * 1000); + txn->setMaxTimeMicros(static_cast<unsigned long long>(pq.getMaxTimeMS()) * 1000); txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. // uassert if we are not on a primary, and not a secondary with SlaveOk query parameter set. @@ -664,7 +664,7 @@ std::string runQuery(OperationContext* txn, // If the query had a time limit, remaining time is "rolled over" to the cursor (for // use by future getmore ops). - cc->setLeftoverMaxTimeMicros(curOp.getRemainingMaxTimeMicros()); + cc->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); endQueryOp(txn, collection, *cc->getExecutor(), numResults, ccId); } else { diff --git a/src/mongo/db/repl/operation_context_repl_mock.cpp b/src/mongo/db/repl/operation_context_repl_mock.cpp index 8b66c6b800f..b5d0660c1be 100644 --- a/src/mongo/db/repl/operation_context_repl_mock.cpp +++ b/src/mongo/db/repl/operation_context_repl_mock.cpp @@ -49,22 +49,6 @@ OperationContextReplMock::OperationContextReplMock(Client* client, unsigned int OperationContextReplMock::~OperationContextReplMock() = default; -void OperationContextReplMock::checkForInterrupt() { - uassertStatusOK(checkForInterruptNoAssert()); -} - -Status OperationContextReplMock::checkForInterruptNoAssert() { - if (!_checkForInterruptStatus.isOK()) { - return _checkForInterruptStatus; - } - - return Status::OK(); -} - -void OperationContextReplMock::setCheckForInterruptStatus(Status status) { - _checkForInterruptStatus = std::move(status); -} - uint64_t OperationContextReplMock::getRemainingMaxTimeMicros() const { return _maxTimeMicrosRemaining; } diff --git a/src/mongo/db/repl/operation_context_repl_mock.h b/src/mongo/db/repl/operation_context_repl_mock.h index c16b55a19bf..d28633e11dd 100644 --- a/src/mongo/db/repl/operation_context_repl_mock.h +++ b/src/mongo/db/repl/operation_context_repl_mock.h @@ -49,12 +49,6 @@ public: OperationContextReplMock(Client* client, unsigned int opNum); virtual ~OperationContextReplMock(); - virtual void checkForInterrupt() override; - - virtual Status checkForInterruptNoAssert() override; - - void setCheckForInterruptStatus(Status status); - virtual uint64_t getRemainingMaxTimeMicros() const override; void setRemainingMaxTimeMicros(uint64_t micros); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f086e293cfa..01addb5cfac 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1095,7 +1095,7 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext // for a new snapshot. if (isMajorityReadConcern) { // Wait for a snapshot that meets our needs (< targetOpTime). - const auto waitTime = CurOp::get(txn)->isMaxTimeSet() + const auto waitTime = txn->isMaxTimeSet() ? Microseconds(static_cast<int64_t>(txn->getRemainingMaxTimeMicros())) : Microseconds{0}; const auto waitForever = waitTime == Microseconds{0}; @@ -1116,7 +1116,7 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext WaiterInfo waitInfo(&_opTimeWaiterList, txn->getOpID(), &targetOpTime, nullptr, &condVar); LOG(3) << "Waiting for OpTime: " << waitInfo; - if (CurOp::get(txn)->isMaxTimeSet()) { + if (txn->isMaxTimeSet()) { condVar.wait_for(lock, Microseconds(static_cast<int64_t>(txn->getRemainingMaxTimeMicros())) .toSystemDuration()); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index fb92f037e45..bdc67171d11 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -79,8 +79,6 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; -Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted"); - // Helper class to wrap Timestamp as an OpTime with term 0. struct OpTimeWithTermZero { OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {} @@ -1361,7 +1359,7 @@ TEST_F(ReplCoordTest, ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1)); - txn.setCheckForInterruptStatus(kInterruptedStatus); + txn.markKilled(ErrorCodes::Interrupted); getReplCoord()->interrupt(opID); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status); @@ -1831,7 +1829,7 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) { ASSERT_TRUE(eventHandle); ASSERT_TRUE(txn.lockState()->isReadLocked()); - txn.setCheckForInterruptStatus(kInterruptedStatus); + txn.markKilled(ErrorCodes::Interrupted); getReplCoord()->interrupt(opID); getReplExec()->waitForEvent(eventHandle); @@ -3191,7 +3189,7 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); - txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test")); + txn.markKilled(ErrorCodes::Interrupted); auto result = getReplCoord()->waitUntilOpTime( &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); @@ -3313,7 +3311,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); - txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test")); + txn.markKilled(ErrorCodes::Interrupted); auto result = getReplCoord()->waitUntilOpTime( &txn, ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); |