diff options
Diffstat (limited to 'src/mongo/db/operation_context_impl.cpp')
-rw-r--r-- | src/mongo/db/operation_context_impl.cpp | 285 |
1 files changed, 142 insertions, 143 deletions
diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index 5c677c44e41..28ff6585620 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -49,176 +49,175 @@ namespace mongo { namespace { - std::unique_ptr<Locker> newLocker() { - if (isMMAPV1()) return stdx::make_unique<MMAPV1LockerImpl>(); - return stdx::make_unique<DefaultLockerImpl>(); - } - - class ClientOperationInfo { - public: - Locker* getLocker() { - if (!_locker) { - _locker = newLocker(); - } - return _locker.get(); +std::unique_ptr<Locker> newLocker() { + if (isMMAPV1()) + return stdx::make_unique<MMAPV1LockerImpl>(); + return stdx::make_unique<DefaultLockerImpl>(); +} + +class ClientOperationInfo { +public: + Locker* getLocker() { + if (!_locker) { + _locker = newLocker(); } + return _locker.get(); + } - private: - std::unique_ptr<Locker> _locker; - }; +private: + std::unique_ptr<Locker> _locker; +}; - const auto clientOperationInfoDecoration = Client::declareDecoration<ClientOperationInfo>(); +const auto clientOperationInfoDecoration = Client::declareDecoration<ClientOperationInfo>(); - AtomicUInt32 nextOpId{1}; +AtomicUInt32 nextOpId{1}; } // namespace - using std::string; +using std::string; + +OperationContextImpl::OperationContextImpl() + : OperationContext( + &cc(), nextOpId.fetchAndAdd(1), clientOperationInfoDecoration(cc()).getLocker()), + _writesAreReplicated(true) { + StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); + _recovery.reset(storageEngine->newRecoveryUnit()); + + auto client = getClient(); + stdx::lock_guard<Client> lk(*client); + client->setOperationContext(this); +} + +OperationContextImpl::~OperationContextImpl() { + lockState()->assertEmptyAndReset(); + auto client = getClient(); + stdx::lock_guard<Client> lk(*client); + client->resetOperationContext(); +} + +RecoveryUnit* OperationContextImpl::recoveryUnit() const { + return _recovery.get(); +} + +RecoveryUnit* OperationContextImpl::releaseRecoveryUnit() { + if (_recovery.get()) + _recovery->beingReleasedFromOperationContext(); + return _recovery.release(); +} + +OperationContext::RecoveryUnitState OperationContextImpl::setRecoveryUnit(RecoveryUnit* unit, + RecoveryUnitState state) { + _recovery.reset(unit); + RecoveryUnitState oldState = _ruState; + _ruState = state; + if (unit) + unit->beingSetOnOperationContext(); + return oldState; +} + +ProgressMeter* OperationContextImpl::setMessage_inlock(const char* msg, + const std::string& name, + unsigned long long progressMeterTotal, + int secondsBetween) { + return &CurOp::get(this)->setMessage_inlock(msg, name, progressMeterTotal, secondsBetween); +} + +string OperationContextImpl::getNS() const { + return CurOp::get(this)->getNS(); +} + +uint64_t OperationContextImpl::getRemainingMaxTimeMicros() const { + return CurOp::get(this)->getRemainingMaxTimeMicros(); +} + +// Enabling the checkForInterruptFail fail point will start a game of random chance on the +// connection specified in the fail point data, generating an interrupt with a given fixed +// probability. Example invocation: +// +// {configureFailPoint: "checkForInterruptFail", +// mode: "alwaysOn", +// data: {conn: 17, chance: .01, allowNested: true}} +// +// All three data fields must be specified. In the above example, all interrupt points on +// connection 17 will generate a kill on the current operation with probability p(.01), +// including interrupt points of nested operations. If "allowNested" is false, nested +// operations are not targeted. "chance" must be a double between 0 and 1, inclusive. +MONGO_FP_DECLARE(checkForInterruptFail); - OperationContextImpl::OperationContextImpl() - : OperationContext(&cc(), - nextOpId.fetchAndAdd(1), - clientOperationInfoDecoration(cc()).getLocker()), - _writesAreReplicated(true) { +namespace { - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - _recovery.reset(storageEngine->newRecoveryUnit()); +// Global state for checkForInterrupt fail point. +PseudoRandom checkForInterruptPRNG(static_cast<int64_t>(time(NULL))); - auto client = getClient(); - stdx::lock_guard<Client> lk(*client); - client->setOperationContext(this); +// Helper function for checkForInterrupt fail point. Decides whether the operation currently +// being run by the given Client meet the (probabilistic) conditions for interruption as +// specified in the fail point info. +bool opShouldFail(const OperationContextImpl* opCtx, const BSONObj& failPointInfo) { + // Only target the client with the specified connection number. + if (opCtx->getClient()->getConnectionId() != failPointInfo["conn"].safeNumberLong()) { + return false; } - OperationContextImpl::~OperationContextImpl() { - lockState()->assertEmptyAndReset(); - auto client = getClient(); - stdx::lock_guard<Client> lk(*client); - client->resetOperationContext(); + // Only target nested operations if requested. + if (!failPointInfo["allowNested"].trueValue() && CurOp::get(opCtx)->parent() != NULL) { + return false; } - RecoveryUnit* OperationContextImpl::recoveryUnit() const { - return _recovery.get(); + // Return true with (approx) probability p = "chance". Recall: 0 <= chance <= 1. + double next = static_cast<double>(std::abs(checkForInterruptPRNG.nextInt64())); + double upperBound = + std::numeric_limits<int64_t>::max() * failPointInfo["chance"].numberDouble(); + if (next > upperBound) { + return false; } + return true; +} - RecoveryUnit* OperationContextImpl::releaseRecoveryUnit() { - if ( _recovery.get() ) - _recovery->beingReleasedFromOperationContext(); - return _recovery.release(); - } +} // namespace - OperationContext::RecoveryUnitState OperationContextImpl::setRecoveryUnit(RecoveryUnit* unit, - RecoveryUnitState state) { - _recovery.reset(unit); - RecoveryUnitState oldState = _ruState; - _ruState = state; - if ( unit ) - unit->beingSetOnOperationContext(); - return oldState; - } +void OperationContextImpl::checkForInterrupt() { + // We cannot interrupt operation, while it's inside of a write unit of work, because logOp + // cannot handle being iterrupted. + if (lockState()->inAWriteUnitOfWork()) + return; - ProgressMeter* OperationContextImpl::setMessage_inlock(const char * msg, - const std::string &name, - unsigned long long progressMeterTotal, - int secondsBetween) { - return &CurOp::get(this)->setMessage_inlock(msg, name, progressMeterTotal, secondsBetween); - } + uassertStatusOK(checkForInterruptNoAssert()); +} - string OperationContextImpl::getNS() const { - return CurOp::get(this)->getNS(); +Status OperationContextImpl::checkForInterruptNoAssert() { + if (getGlobalServiceContext()->getKillAllOperations()) { + return Status(ErrorCodes::InterruptedAtShutdown, "interrupted at shutdown"); } - uint64_t OperationContextImpl::getRemainingMaxTimeMicros() const { - return CurOp::get(this)->getRemainingMaxTimeMicros(); + CurOp* curOp = CurOp::get(this); + if (curOp->maxTimeHasExpired()) { + markKilled(); + return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); } - // Enabling the checkForInterruptFail fail point will start a game of random chance on the - // connection specified in the fail point data, generating an interrupt with a given fixed - // probability. Example invocation: - // - // {configureFailPoint: "checkForInterruptFail", - // mode: "alwaysOn", - // data: {conn: 17, chance: .01, allowNested: true}} - // - // All three data fields must be specified. In the above example, all interrupt points on - // connection 17 will generate a kill on the current operation with probability p(.01), - // including interrupt points of nested operations. If "allowNested" is false, nested - // operations are not targeted. "chance" must be a double between 0 and 1, inclusive. - MONGO_FP_DECLARE(checkForInterruptFail); - - namespace { - - // Global state for checkForInterrupt fail point. - PseudoRandom checkForInterruptPRNG(static_cast<int64_t>(time(NULL))); - - // Helper function for checkForInterrupt fail point. Decides whether the operation currently - // being run by the given Client meet the (probabilistic) conditions for interruption as - // specified in the fail point info. - bool opShouldFail(const OperationContextImpl* opCtx, const BSONObj& failPointInfo) { - // Only target the client with the specified connection number. - if (opCtx->getClient()->getConnectionId() != failPointInfo["conn"].safeNumberLong()) { - return false; - } - - // Only target nested operations if requested. - if (!failPointInfo["allowNested"].trueValue() && CurOp::get(opCtx)->parent() != NULL) { - return false; - } - - // Return true with (approx) probability p = "chance". Recall: 0 <= chance <= 1. - double next = static_cast<double>(std::abs(checkForInterruptPRNG.nextInt64())); - double upperBound = - std::numeric_limits<int64_t>::max() * failPointInfo["chance"].numberDouble(); - if (next > upperBound) { - return false; - } - return true; - } - - } // namespace - - void OperationContextImpl::checkForInterrupt() { - // We cannot interrupt operation, while it's inside of a write unit of work, because logOp - // cannot handle being iterrupted. - if (lockState()->inAWriteUnitOfWork()) return; - - uassertStatusOK(checkForInterruptNoAssert()); - } - - Status OperationContextImpl::checkForInterruptNoAssert() { - if (getGlobalServiceContext()->getKillAllOperations()) { - return Status(ErrorCodes::InterruptedAtShutdown, "interrupted at shutdown"); - } - - CurOp* curOp = CurOp::get(this); - if (curOp->maxTimeHasExpired()) { + MONGO_FAIL_POINT_BLOCK(checkForInterruptFail, scopedFailPoint) { + if (opShouldFail(this, scopedFailPoint.getData())) { + log() << "set pending kill on " << (curOp->parent() ? "nested" : "top-level") << " op " + << getOpID() << ", for checkForInterruptFail"; markKilled(); - return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); - } - - MONGO_FAIL_POINT_BLOCK(checkForInterruptFail, scopedFailPoint) { - if (opShouldFail(this, scopedFailPoint.getData())) { - log() << "set pending kill on " - << (curOp->parent() ? "nested" : "top-level") - << " op " << getOpID() << ", for checkForInterruptFail"; - markKilled(); - } - } - - if (isKillPending()) { - return Status(ErrorCodes::Interrupted, "operation was interrupted"); } - - return Status::OK(); } - bool OperationContextImpl::isPrimaryFor( StringData ns ) { - return repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(NamespaceString(ns)); + if (isKillPending()) { + return Status(ErrorCodes::Interrupted, "operation was interrupted"); } - void OperationContextImpl::setReplicatedWrites(bool writesAreReplicated) { - _writesAreReplicated = writesAreReplicated; - } + return Status::OK(); +} - bool OperationContextImpl::writesAreReplicated() const { - return _writesAreReplicated; - } +bool OperationContextImpl::isPrimaryFor(StringData ns) { + return repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(NamespaceString(ns)); +} + +void OperationContextImpl::setReplicatedWrites(bool writesAreReplicated) { + _writesAreReplicated = writesAreReplicated; +} + +bool OperationContextImpl::writesAreReplicated() const { + return _writesAreReplicated; +} } // namespace mongo |