/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
#include "mongo/platform/basic.h"
#include "mongo/executor/network_interface_asio.h"
#include "mongo/base/status_with.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
#include "mongo/executor/async_stream_interface.h"
#include "mongo/executor/connection_pool_asio.h"
#include "mongo/executor/network_interface_asio.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
#define MONGO_ASYNC_OP_INVARIANT(_Expression, _Error) \
do { \
if (MONGO_unlikely(!(_Expression))) { \
_failWithInfo(__FILE__, __LINE__, _Error); \
} \
} while (false)
namespace mongo {
namespace executor {
using asio::ip::tcp;
namespace {
// Used to generate unique identifiers for AsyncOps, the same AsyncOp may
// be used to run multiple distinct requests.
AtomicUInt64 kAsyncOpIdCounter(0);
} // namespace
const NetworkInterfaceASIO::TableRow NetworkInterfaceASIO::AsyncOp::kFieldLabels = {
"", "id", "states", "start_time", "request"};
NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner,
const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish,
Date_t now)
: _owner(owner),
_cbHandle(cbHandle),
_request(request),
_onFinish(onFinish),
_start(now),
_resolver(owner->_io_service),
_id(kAsyncOpIdCounter.addAndFetch(1)),
_access(std::make_shared()),
_inSetup(true),
_inRefresh(false),
_strand(owner->_io_service) {
// No need to take lock when we aren't yet constructed.
_transitionToState_inlock(State::kUninitialized);
}
void NetworkInterfaceASIO::AsyncOp::cancel() {
LOG(2) << "Canceling operation; original request was: " << redact(request().toString());
stdx::lock_guard lk(_access->mutex);
auto access = _access;
auto generation = access->id;
// An operation may be in mid-flight when it is canceled, so we cancel any
// in-progress async ops but do not complete the operation now.
_strand.post([this, access, generation] {
stdx::lock_guard lk(access->mutex);
if (generation == access->id) {
_transitionToState_inlock(AsyncOp::State::kCanceled);
if (_connection) {
_connection->cancel();
}
}
});
}
bool NetworkInterfaceASIO::AsyncOp::canceled() const {
return _hasSeenState(State::kCanceled);
}
void NetworkInterfaceASIO::AsyncOp::timeOut_inlock() {
LOG(2) << "Operation timing out; original request was: " << redact(request().toString());
auto access = _access;
auto generation = access->id;
// An operation may be in mid-flight when it times out, so we cancel any
// in-progress stream operations but do not complete the operation now.
_strand.post([this, access, generation] {
stdx::lock_guard lk(access->mutex);
if (generation == access->id) {
_transitionToState_inlock(AsyncOp::State::kTimedOut);
if (_connection) {
_connection->cancel();
}
}
});
}
bool NetworkInterfaceASIO::AsyncOp::timedOut() const {
return _hasSeenState(State::kTimedOut);
}
const TaskExecutor::CallbackHandle& NetworkInterfaceASIO::AsyncOp::cbHandle() const {
return _cbHandle;
}
NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncOp::connection() {
MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(), "Connection not yet initialized");
return *_connection;
}
void NetworkInterfaceASIO::AsyncOp::setConnection(AsyncConnection&& conn) {
MONGO_ASYNC_OP_INVARIANT(!_connection.is_initialized(), "Connection already initialized");
_connection = std::move(conn);
}
Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand,
const HostAndPort& target) {
// NOTE: We operate based on the assumption that AsyncOp's
// AsyncConnection does not change over its lifetime.
MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(),
"Connection should not change over AsyncOp's lifetime");
auto swm = _connection->getCompressorManager().compressMessage(newCommand);
if (!swm.isOK())
return swm.getStatus();
// Construct a new AsyncCommand object for each command.
_command.emplace(_connection.get_ptr(), std::move(swm.getValue()), _owner->now(), target);
return Status::OK();
}
Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request) {
return beginCommand(
rpc::messageFromOpMsgRequest(
operationProtocol(),
OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj, request.metadata)),
request.target);
}
NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() {
MONGO_ASYNC_OP_INVARIANT(_command.is_initialized(), "Command is not yet initialized");
return *_command;
}
bool NetworkInterfaceASIO::AsyncOp::commandIsInitialized() const {
return _command.is_initialized();
}
void NetworkInterfaceASIO::AsyncOp::finish(ResponseStatus&& rs) {
// We never hold the access lock when we call finish from NetworkInterfaceASIO.
_transitionToState(AsyncOp::State::kFinished);
LOG(2) << "Request " << _request.id << " finished with response: "
<< redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
// Calling the completion handler may invalidate state in this op, so do it last.
_onFinish(rs);
}
const RemoteCommandRequest& NetworkInterfaceASIO::AsyncOp::request() const {
return _request;
}
void NetworkInterfaceASIO::AsyncOp::startProgress(Date_t startTime) {
_start = startTime;
// We never hold the access lock when we call startProgress from NetworkInterfaceASIO.
_transitionToState(AsyncOp::State::kInProgress);
}
Date_t NetworkInterfaceASIO::AsyncOp::start() const {
return _start;
}
rpc::Protocol NetworkInterfaceASIO::AsyncOp::operationProtocol() const {
MONGO_ASYNC_OP_INVARIANT(_operationProtocol.is_initialized(), "Protocol not yet set");
return *_operationProtocol;
}
void NetworkInterfaceASIO::AsyncOp::setOperationProtocol(rpc::Protocol proto) {
MONGO_ASYNC_OP_INVARIANT(!_operationProtocol.is_initialized(), "Protocol already set");
_operationProtocol = proto;
}
void NetworkInterfaceASIO::AsyncOp::setResponseMetadata(BSONObj m) {
_responseMetadata = m;
}
BSONObj NetworkInterfaceASIO::AsyncOp::getResponseMetadata() {
return _responseMetadata;
}
void NetworkInterfaceASIO::AsyncOp::reset() {
// We don't reset owner as it never changes
_cbHandle = {};
_request = {};
_onFinish = {};
_connectionPoolHandle = {};
// We don't reset _connection as we want to reuse it.
// Ditto for _operationProtocol.
_start = {};
_timeoutAlarm.reset();
// _id stays the same for the lifetime of this object.
_command = boost::none;
// _inSetup should always be false at this point.
// We never hold the access lock when we call this from NetworkInterfaceASIO.
clearStateTransitions();
}
void NetworkInterfaceASIO::AsyncOp::clearStateTransitions() {
_transitionToState(AsyncOp::State::kUninitialized);
}
void NetworkInterfaceASIO::AsyncOp::setOnFinish(RemoteCommandCompletionFn&& onFinish) {
_onFinish = std::move(onFinish);
}
// Return a string representation of the given state.
std::string NetworkInterfaceASIO::AsyncOp::_stateToString(AsyncOp::State state) const {
switch (state) {
case State::kUninitialized:
return "UNINITIALIZED";
case State::kInProgress:
return "IN_PROGRESS";
case State::kTimedOut:
return "TIMED_OUT";
case State::kCanceled:
return "CANCELED";
case State::kFinished:
return "DONE";
case State::kNoState:
return "---";
default:
MONGO_UNREACHABLE;
}
}
std::string NetworkInterfaceASIO::AsyncOp::_stateString() const {
str::stream s;
s << "[ ";
for (int i = 0; i < kMaxStateTransitions; i++) {
if (_states[i] == State::kNoState) {
break;
}
if (i != 0) {
s << ", ";
}
s << _stateToString(_states[i]);
}
s << " ]";
return s;
}
NetworkInterfaceASIO::TableRow NetworkInterfaceASIO::AsyncOp::getStringFields() const {
// We leave a placeholder for an asterisk
return {"", std::to_string(_id), _stateString(), _start.toString(), _request.toString()};
}
std::string NetworkInterfaceASIO::AsyncOp::toString() const {
str::stream s;
int fieldIdx = 1;
bool first = true;
for (auto field : getStringFields()) {
if (field != "") {
if (first) {
first = false;
} else {
s << ", ";
}
s << kFieldLabels[fieldIdx] << ": " << field;
fieldIdx++;
}
}
return s;
}
bool NetworkInterfaceASIO::AsyncOp::operator==(const AsyncOp& other) const {
return _id == other._id;
}
bool NetworkInterfaceASIO::AsyncOp::_hasSeenState(AsyncOp::State state) const {
return std::any_of(std::begin(_states), std::end(_states), [state](AsyncOp::State _state) {
return _state == state;
});
}
void NetworkInterfaceASIO::AsyncOp::_transitionToState(AsyncOp::State newState) {
stdx::lock_guard lk(_access->mutex);
_transitionToState_inlock(newState);
}
void NetworkInterfaceASIO::AsyncOp::_transitionToState_inlock(AsyncOp::State newState) {
if (newState == State::kUninitialized) {
_states[0] = State::kUninitialized;
for (int i = 1; i < kMaxStateTransitions; i++) {
_states[i] = State::kNoState;
}
return;
}
// We can transition to cancelled multiple times if cancel() is called
// multiple times. Ignore that transition if we're already cancelled.
if (newState == State::kCanceled) {
// Find the current state
auto iter = std::find_if_not(_states.rbegin(), _states.rend(), [](const State& state) {
return state == State::kNoState;
});
// If its cancelled, just return
if (iter != _states.rend() && *iter == State::kCanceled) {
return;
}
}
for (int i = 0; i < kMaxStateTransitions; i++) {
// We can't transition to the same state twice.
MONGO_ASYNC_OP_INVARIANT(_states[i] != newState,
"Cannot use the same state (" + _stateToString(newState) +
") twice");
if (_states[i] == State::kNoState) {
// Perform some validation before transitioning.
switch (newState) {
case State::kInProgress:
MONGO_ASYNC_OP_INVARIANT(i == 1,
"kInProgress must come directly after kUninitialized");
break;
case State::kTimedOut:
// During connection setup, it is possible to timeout before the stream is
// initialized, so we have to allow this transition.
break;
case State::kCanceled:
MONGO_ASYNC_OP_INVARIANT(
i > 1, _stateToString(newState) + " must come after kInProgress");
MONGO_ASYNC_OP_INVARIANT(_states[i - 1] != State::kUninitialized,
_stateToString(newState) +
" cannot come after kUninitialized");
break;
case State::kFinished:
MONGO_ASYNC_OP_INVARIANT(i > 0, "kFinished must come after kUninitialized");
break;
default:
MONGO_UNREACHABLE;
}
// Update state.
_states[i] = newState;
return;
}
}
// If we get here, we've already transitioned to the max allowed states, explode.
MONGO_UNREACHABLE;
}
void NetworkInterfaceASIO::AsyncOp::_failWithInfo(const char* file,
int line,
std::string error) const {
std::stringstream ss;
ss << "Invariant failure at " << file << ":" << line << ": " << error
<< ", Operation: " << toString();
Status status{ErrorCodes::InternalError, ss.str()};
fassertFailedWithStatus(34430, status);
}
} // namespace executor
} // namespace mongo