/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/db/write_concern_options.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/decorable.h" #include "mongo/util/interruptible.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" namespace mongo { class Client; class CurOp; class ProgressMeter; class ServiceContext; class StringData; namespace repl { class UnreplicatedWritesBlock; } // namespace repl /** * This class encompasses the state required by an operation and lives from the time a network * operation is dispatched until its execution is finished. Note that each "getmore" on a cursor * is a separate operation. On construction, an OperationContext associates itself with the * current client, and only on destruction it deassociates itself. At any time a client can be * associated with at most one OperationContext. Each OperationContext has a RecoveryUnit * associated with it, though the lifetime is not necesarily the same, see releaseRecoveryUnit * and setRecoveryUnit. The operation context also keeps track of some transaction state * (RecoveryUnitState) to reduce complexity and duplication in the storage-engine specific * RecoveryUnit and to allow better invariant checking. */ class OperationContext : public Interruptible, public Decorable { MONGO_DISALLOW_COPYING(OperationContext); public: OperationContext(Client* client, unsigned int opId); virtual ~OperationContext() = default; /** * Interface for durability. Caller DOES NOT own pointer. */ RecoveryUnit* recoveryUnit() const { return _recoveryUnit.get(); } /** * Returns the RecoveryUnit (same return value as recoveryUnit()) but the caller takes * ownership of the returned RecoveryUnit, and the OperationContext instance relinquishes * ownership. Sets the RecoveryUnit to NULL. * * Used to transfer ownership of storage engine state from OperationContext * to ClientCursor for getMore-able queries. * * Note that we don't allow the top-level locks to be stored across getMore. * We rely on active cursors being killed when collections or databases are dropped, * or when collection metadata changes. */ std::unique_ptr releaseRecoveryUnit(); /** * Associates the OperatingContext with a different RecoveryUnit for getMore or * subtransactions, see RecoveryUnitSwap. The new state is passed and the old state is * returned separately even though the state logically belongs to the RecoveryUnit, * as it is managed by the OperationContext. */ WriteUnitOfWork::RecoveryUnitState setRecoveryUnit(std::unique_ptr unit, WriteUnitOfWork::RecoveryUnitState state); /** * Interface for locking. Caller DOES NOT own pointer. */ Locker* lockState() const { return _locker.get(); } /** * Sets the locker for use by this OperationContext. Call during OperationContext * initialization, only. */ void setLockState(std::unique_ptr locker); /** * Swaps the locker, releasing the old locker to the caller. */ std::unique_ptr swapLockState(std::unique_ptr locker); /** * Returns Status::OK() unless this operation is in a killed state. */ Status checkForInterruptNoAssert() noexcept override; /** * Returns the service context under which this operation context runs, or nullptr if there is * no such service context. */ ServiceContext* getServiceContext() const { if (!_client) { return nullptr; } return _client->getServiceContext(); } /** * Returns the client under which this context runs. */ Client* getClient() const { return _client; } /** * Returns the operation ID associated with this operation. */ unsigned int getOpID() const { return _opId; } /** * Returns the session ID associated with this operation, if there is one. */ boost::optional getLogicalSessionId() const { return _lsid; } /** * Associates a logical session id with this operation context. May only be called once for the * lifetime of the operation. */ void setLogicalSessionId(LogicalSessionId lsid); /** * Returns the transaction number associated with thes operation. The combination of logical * session id + transaction number is what constitutes the operation transaction id. */ boost::optional getTxnNumber() const { return _txnNumber; } /** * Sets a transport Baton on the operation. This will trigger the Baton on markKilled. */ void setBaton(const transport::BatonHandle& baton) { _baton = baton; } /** * Retrieves the baton associated with the operation. */ const transport::BatonHandle& getBaton() const { return _baton; } /** * Associates a transaction number with this operation context. May only be called once for the * lifetime of the operation and the operation must have a logical session id assigned. */ void setTxnNumber(TxnNumber txnNumber); /** * Returns the top-level WriteUnitOfWork associated with this operation context, if any. */ WriteUnitOfWork* getWriteUnitOfWork() { return _writeUnitOfWork.get(); } /** * Sets a top-level WriteUnitOfWork for this operation context, to be held for the duration * of the given network operation. */ void setWriteUnitOfWork(std::unique_ptr writeUnitOfWork) { invariant(writeUnitOfWork || _writeUnitOfWork); invariant(!(writeUnitOfWork && _writeUnitOfWork)); _writeUnitOfWork = std::move(writeUnitOfWork); } /** * Returns WriteConcernOptions of the current operation */ const WriteConcernOptions& getWriteConcern() const { return _writeConcern; } void setWriteConcern(const WriteConcernOptions& writeConcern) { _writeConcern = writeConcern; } /** * Returns true if operations should generate oplog entries. */ bool writesAreReplicated() const { return _writesAreReplicated; } /** * Marks this operation as killed so that subsequent calls to checkForInterrupt and * checkForInterruptNoAssert by the thread executing the operation will start returning the * specified error code. * * If multiple threads kill the same operation with different codes, only the first code * will be preserved. * * May be called by any thread that has locked the Client owning this operation context, or * by the thread executing this on behalf of this OperationContext. */ void markKilled(ErrorCodes::Error killCode = ErrorCodes::Interrupted); /** * Returns the code passed to markKilled if this operation context has been killed previously * or ErrorCodes::OK otherwise. * * May be called by any thread that has locked the Client owning this operation context, or * without lock by the thread executing on behalf of this operation context. */ ErrorCodes::Error getKillStatus() const { if (_ignoreInterrupts) { return ErrorCodes::OK; } return _killCode.loadRelaxed(); } /** * Shortcut method, which checks whether getKillStatus returns a non-OK value. Has the same * concurrency rules as getKillStatus. */ bool isKillPending() const { return getKillStatus() != ErrorCodes::OK; } /** * Returns the amount of time since the operation was constructed. Uses the system's most * precise tick source, and may not be cheap to call in a tight loop. */ Microseconds getElapsedTime() const { return _elapsedTime.elapsed(); } /** * Sets the deadline for this operation to the given point in time. * * To remove a deadline, pass in Date_t::max(). */ void setDeadlineByDate(Date_t when, ErrorCodes::Error timeoutError); /** * Sets the deadline for this operation to the maxTime plus the current time reported * by the ServiceContext's fast clock source. */ void setDeadlineAfterNowBy(Microseconds maxTime, ErrorCodes::Error timeoutError); template void setDeadlineAfterNowBy(D maxTime, ErrorCodes::Error timeoutError) { if (maxTime <= D::zero()) { maxTime = D::zero(); } if (maxTime <= Microseconds::max()) { setDeadlineAfterNowBy(duration_cast(maxTime), timeoutError); } else { setDeadlineByDate(Date_t::max(), timeoutError); } } /** * Returns the deadline for this operation, or Date_t::max() if there is no deadline. */ Date_t getDeadline() const override { return _deadline; } /** * Returns the number of milliseconds remaining for this operation's time limit or * Milliseconds::max() if the operation has no time limit. */ Milliseconds getRemainingMaxTimeMillis() const; /** * NOTE: This is a legacy "max time" method for controlling operation deadlines and it should * not be used in new code. Use getRemainingMaxTimeMillis instead. * * Returns the number of microseconds remaining for this operation's time limit, or the special * value Microseconds::max() if the operation has no time limit. */ Microseconds getRemainingMaxTimeMicros() const; StatusWith waitForConditionOrInterruptNoAssertUntil( stdx::condition_variable& cv, stdx::unique_lock& m, Date_t deadline) noexcept override; private: IgnoreInterruptsState pushIgnoreInterrupts() override { IgnoreInterruptsState iis{_ignoreInterrupts, {_deadline, _timeoutError, _hasArtificialDeadline}}; _hasArtificialDeadline = true; setDeadlineByDate(Date_t::max(), ErrorCodes::ExceededTimeLimit); _ignoreInterrupts = true; return iis; } void popIgnoreInterrupts(IgnoreInterruptsState iis) override { _ignoreInterrupts = iis.ignoreInterrupts; setDeadlineByDate(iis.deadline.deadline, iis.deadline.error); _hasArtificialDeadline = iis.deadline.hasArtificialDeadline; _markKilledIfDeadlineRequires(); } DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) override { DeadlineState ds{_deadline, _timeoutError, _hasArtificialDeadline}; _hasArtificialDeadline = true; setDeadlineByDate(std::min(_deadline, deadline), error); return ds; } void popArtificialDeadline(DeadlineState ds) override { setDeadlineByDate(ds.deadline, ds.error); _hasArtificialDeadline = ds.hasArtificialDeadline; _markKilledIfDeadlineRequires(); } void _markKilledIfDeadlineRequires() { if (!_ignoreInterrupts && !_hasArtificialDeadline && hasDeadlineExpired() && !isKillPending()) { markKilled(_timeoutError); } } /** * Returns true if this operation has a deadline and it has passed according to the fast clock * on ServiceContext. */ bool hasDeadlineExpired() const; /** * Sets the deadline and maxTime as described. It is up to the caller to ensure that * these correctly correspond. */ void setDeadlineAndMaxTime(Date_t when, Microseconds maxTime, ErrorCodes::Error timeoutError); /** * Compute maxTime based on the given deadline. */ Microseconds computeMaxTimeFromDeadline(Date_t when); /** * Returns the timepoint that is "waitFor" ms after now according to the * ServiceContext's precise clock. */ Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override; /** * Set whether or not operations should generate oplog entries. */ void setReplicatedWrites(bool writesAreReplicated = true) { _writesAreReplicated = writesAreReplicated; } friend class WriteUnitOfWork; friend class repl::UnreplicatedWritesBlock; Client* const _client; const unsigned int _opId; boost::optional _lsid; boost::optional _txnNumber; std::unique_ptr _locker; std::unique_ptr _recoveryUnit; WriteUnitOfWork::RecoveryUnitState _ruState = WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork; // Operations run within a transaction will hold a WriteUnitOfWork for the duration in order // to maintain two-phase locking. std::unique_ptr _writeUnitOfWork; // Follows the values of ErrorCodes::Error. The default value is 0 (OK), which means the // operation is not killed. If killed, it will contain a specific code. This value changes only // once from OK to some kill code. AtomicWord _killCode{ErrorCodes::OK}; // A transport Baton associated with the operation. The presence of this object implies that a // client thread is doing it's own async networking by blocking on it's own thread. transport::BatonHandle _baton; // If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the // operation is currently waiting on inside a call to waitForConditionOrInterrupt...(). // All access guarded by the Client's lock. stdx::mutex* _waitMutex = nullptr; stdx::condition_variable* _waitCV = nullptr; // If _waitMutex and _waitCV are non-null, this is the number of threads in a call to markKilled // actively attempting to kill the operation. If this value is non-zero, the operation is inside // waitForConditionOrInterrupt...() and must stay there until _numKillers reaches 0. // // All access guarded by the Client's lock. int _numKillers = 0; WriteConcernOptions _writeConcern; Date_t _deadline = Date_t::max(); // The timepoint at which this operation exceeds its time limit. ErrorCodes::Error _timeoutError = ErrorCodes::ExceededTimeLimit; bool _ignoreInterrupts = false; bool _hasArtificialDeadline = false; // Max operation time requested by the user or by the cursor in the case of a getMore with no // user-specified maxTime. This is tracked with microsecond granularity for the purpose of // assigning unused execution time back to a cursor at the end of an operation, only. The // _deadline and the service context's fast clock are the only values consulted for determining // if the operation's timelimit has been exceeded. Microseconds _maxTime = Microseconds::max(); // Timer counting the elapsed time since the construction of this OperationContext. Timer _elapsedTime; bool _writesAreReplicated = true; }; namespace repl { /** * RAII-style class to turn off replicated writes. Writes do not create oplog entries while the * object is in scope. */ class UnreplicatedWritesBlock { MONGO_DISALLOW_COPYING(UnreplicatedWritesBlock); public: UnreplicatedWritesBlock(OperationContext* opCtx) : _opCtx(opCtx), _shouldReplicateWrites(opCtx->writesAreReplicated()) { opCtx->setReplicatedWrites(false); } ~UnreplicatedWritesBlock() { _opCtx->setReplicatedWrites(_shouldReplicateWrites); } private: OperationContext* _opCtx; const bool _shouldReplicateWrites; }; } // namespace repl } // namespace mongo