/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor #include "mongo/platform/basic.h" #include #include #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" namespace mongo { namespace { class RetryPolicyImpl : public RemoteCommandRetryScheduler::RetryPolicy { public: RetryPolicyImpl(std::size_t maximumAttempts, Milliseconds maximumResponseElapsedTotal, const std::initializer_list& retryableErrors); std::size_t getMaximumAttempts() const override; Milliseconds getMaximumResponseElapsedTotal() const override; bool shouldRetryOnError(ErrorCodes::Error error) const override; std::string toString() const override; private: std::size_t _maximumAttempts; Milliseconds _maximumResponseElapsedTotal; std::vector _retryableErrors; }; RetryPolicyImpl::RetryPolicyImpl(std::size_t maximumAttempts, Milliseconds maximumResponseElapsedTotal, const std::initializer_list& retryableErrors) : _maximumAttempts(maximumAttempts), _maximumResponseElapsedTotal(maximumResponseElapsedTotal), _retryableErrors(retryableErrors) { std::sort(_retryableErrors.begin(), _retryableErrors.end()); } std::string RetryPolicyImpl::toString() const { str::stream output; output << "RetryPolicyImpl"; output << " maxAttempts: " << _maximumAttempts; output << " maxTimeMillis: " << _maximumResponseElapsedTotal; if (_retryableErrors.size() > 0) { output << "Retryable Errors: "; for (auto error : _retryableErrors) { output << error; } } return output; } std::size_t RetryPolicyImpl::getMaximumAttempts() const { return _maximumAttempts; } Milliseconds RetryPolicyImpl::getMaximumResponseElapsedTotal() const { return _maximumResponseElapsedTotal; } bool RetryPolicyImpl::shouldRetryOnError(ErrorCodes::Error error) const { return std::binary_search(_retryableErrors.cbegin(), _retryableErrors.cend(), error); } } // namespace const std::initializer_list RemoteCommandRetryScheduler::kNotMasterErrors{ ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary}; const std::initializer_list RemoteCommandRetryScheduler::kAllRetriableErrors{ ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary, // If write concern failed to be satisfied on the remote server, this most probably means that // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe // to be retried if idempotency can be guaranteed. ErrorCodes::WriteConcernFailed, ErrorCodes::HostUnreachable, ErrorCodes::HostNotFound, ErrorCodes::NetworkTimeout, ErrorCodes::PrimarySteppedDown, ErrorCodes::InterruptedDueToReplStateChange, ErrorCodes::BalancerInterrupted}; std::unique_ptr RemoteCommandRetryScheduler::makeNoRetryPolicy() { return makeRetryPolicy(1U, executor::RemoteCommandRequest::kNoTimeout, {}); } std::unique_ptr RemoteCommandRetryScheduler::makeRetryPolicy( std::size_t maxAttempts, Milliseconds maxResponseElapsedTotal, const std::initializer_list& retryableErrors) { std::unique_ptr policy = stdx::make_unique(maxAttempts, maxResponseElapsedTotal, retryableErrors); return policy; } RemoteCommandRetryScheduler::RemoteCommandRetryScheduler( executor::TaskExecutor* executor, const executor::RemoteCommandRequest& request, const executor::TaskExecutor::RemoteCommandCallbackFn& callback, std::unique_ptr retryPolicy) : _executor(executor), _request(request), _callback(callback), _retryPolicy(std::move(retryPolicy)) { uassert(ErrorCodes::BadValue, "task executor cannot be null", executor); uassert(ErrorCodes::BadValue, "source in remote command request cannot be empty", !request.target.empty()); uassert(ErrorCodes::BadValue, "database name in remote command request cannot be empty", !request.dbname.empty()); uassert(ErrorCodes::BadValue, "command object in remote command request cannot be empty", !request.cmdObj.isEmpty()); uassert(ErrorCodes::BadValue, "remote command callback function cannot be null", callback); uassert(ErrorCodes::BadValue, "retry policy cannot be null", _retryPolicy); uassert(ErrorCodes::BadValue, "policy max attempts cannot be zero", _retryPolicy->getMaximumAttempts() != 0); uassert(ErrorCodes::BadValue, "policy max response elapsed total cannot be negative", !(_retryPolicy->getMaximumResponseElapsedTotal() != executor::RemoteCommandRequest::kNoTimeout && _retryPolicy->getMaximumResponseElapsedTotal() < Milliseconds(0))); } RemoteCommandRetryScheduler::~RemoteCommandRetryScheduler() { DESTRUCTOR_GUARD(shutdown(); join();); } bool RemoteCommandRetryScheduler::isActive() const { stdx::lock_guard lock(_mutex); return _active; } Status RemoteCommandRetryScheduler::startup() { stdx::lock_guard lock(_mutex); if (_active) { return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); } auto scheduleStatus = _schedule_inlock(); if (!scheduleStatus.isOK()) { return scheduleStatus; } return Status::OK(); } void RemoteCommandRetryScheduler::shutdown() { executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle; { stdx::lock_guard lock(_mutex); if (!_active) { return; } remoteCommandCallbackHandle = _remoteCommandCallbackHandle; } invariant(remoteCommandCallbackHandle.isValid()); _executor->cancel(remoteCommandCallbackHandle); } void RemoteCommandRetryScheduler::join() { stdx::unique_lock lock(_mutex); _condition.wait(lock, [this]() { return !_active; }); } std::string RemoteCommandRetryScheduler::toString() const { stdx::lock_guard lock(_mutex); str::stream output; output << "RemoteCommandRetryScheduler"; output << " request: " << _request.toString(); output << " active: " << _active; if (_remoteCommandCallbackHandle.isValid()) { output << " callbackHandle.valid: " << _remoteCommandCallbackHandle.isValid(); output << " callbackHandle.cancelled: " << _remoteCommandCallbackHandle.isCanceled(); } output << " attempt: " << _currentAttempt; output << " retryPolicy: " << _retryPolicy->toString(); return output; } Status RemoteCommandRetryScheduler::_schedule_inlock() { ++_currentAttempt; auto scheduleResult = _executor->scheduleRemoteCommand( _request, stdx::bind( &RemoteCommandRetryScheduler::_remoteCommandCallback, this, stdx::placeholders::_1)); if (!scheduleResult.isOK()) { return scheduleResult.getStatus(); } _remoteCommandCallbackHandle = scheduleResult.getValue(); _active = true; return Status::OK(); } void RemoteCommandRetryScheduler::_remoteCommandCallback( const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) { auto status = rcba.response.status; auto currentAttempt = _currentAttempt; { stdx::lock_guard lock(_mutex); currentAttempt = _currentAttempt; } if (status.isOK() || status == ErrorCodes::CallbackCanceled || currentAttempt == _retryPolicy->getMaximumAttempts() || !_retryPolicy->shouldRetryOnError(status.code())) { _onComplete(rcba); return; } // TODO(benety): Check cumulative elapsed time of failed responses received against retry // policy. Requires SERVER-24067. auto scheduleStatus = _schedule_inlock(); if (!scheduleStatus.isOK()) { _onComplete({rcba.executor, rcba.myHandle, rcba.request, scheduleStatus}); return; } } void RemoteCommandRetryScheduler::_onComplete( const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) { _callback(rcba); stdx::lock_guard lock(_mutex); invariant(_active); _active = false; _condition.notify_all(); } } // namespace mongo