/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO #include "mongo/platform/basic.h" #include "mongo/executor/network_interface_asio.h" #include #include #include "mongo/executor/async_stream_factory.h" #include "mongo/executor/async_stream_interface.h" #include "mongo/executor/async_timer_asio.h" #include "mongo/executor/async_timer_interface.h" #include "mongo/executor/async_timer_mock.h" #include "mongo/executor/connection_pool_asio.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/stdx/chrono.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/log.h" #include "mongo/util/net/sock.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/table_formatter.h" #include "mongo/util/time_support.h" namespace mongo { namespace executor { namespace { const std::size_t kIOServiceWorkers = 1; } // namespace NetworkInterfaceASIO::Options::Options() = default; NetworkInterfaceASIO::NetworkInterfaceASIO(Options options) : _options(std::move(options)), _io_service(), _metadataHook(std::move(_options.metadataHook)), _hook(std::move(_options.networkConnectionHook)), _state(State::kReady), _timerFactory(std::move(_options.timerFactory)), _streamFactory(std::move(_options.streamFactory)), _connectionPool(stdx::make_unique(this), _options.instanceName, _options.connectionPoolOptions), _isExecutorRunnable(false), _strand(_io_service) { invariant(_timerFactory); } std::string NetworkInterfaceASIO::getDiagnosticString() { stdx::lock_guard lk(_inProgressMutex); return _getDiagnosticString_inlock(nullptr); } std::string NetworkInterfaceASIO::_getDiagnosticString_inlock(AsyncOp* currentOp) { str::stream output; std::vector rows; output << "\nNetworkInterfaceASIO Operations' Diagnostic:\n"; rows.push_back({"Operation:", "Count:"}); rows.push_back({"Connecting", std::to_string(_inGetConnection.size())}); rows.push_back({"In Progress", std::to_string(_inProgress.size())}); rows.push_back({"Succeeded", std::to_string(getNumSucceededOps())}); rows.push_back({"Canceled", std::to_string(getNumCanceledOps())}); rows.push_back({"Failed", std::to_string(getNumFailedOps())}); rows.push_back({"Timed Out", std::to_string(getNumTimedOutOps())}); output << toTable(rows); if (_inProgress.size() > 0) { rows.clear(); rows.push_back(AsyncOp::kFieldLabels); // Push AsyncOps for (auto&& kv : _inProgress) { auto row = kv.first->getStringFields(); if (currentOp) { // If this is the AsyncOp we blew up on, mark with an asterisk if (*currentOp == *(kv.first)) { row[0] = "*"; } } rows.push_back(row); } // Format as a table output << "\n" << toTable(rows); } output << "\n"; return output; } uint64_t NetworkInterfaceASIO::getNumCanceledOps() { return _numCanceledOps.load(); } uint64_t NetworkInterfaceASIO::getNumFailedOps() { return _numFailedOps.load(); } uint64_t NetworkInterfaceASIO::getNumSucceededOps() { return _numSucceededOps.load(); } uint64_t NetworkInterfaceASIO::getNumTimedOutOps() { return _numTimedOutOps.load(); } void NetworkInterfaceASIO::appendConnectionStats(ConnectionPoolStats* stats) const { _connectionPool.appendConnectionStats(stats); } std::string NetworkInterfaceASIO::getHostName() { return getHostNameCached(); } void NetworkInterfaceASIO::startup() { _serviceRunners.resize(kIOServiceWorkers); for (std::size_t i = 0; i < kIOServiceWorkers; ++i) { _serviceRunners[i] = stdx::thread([this, i]() { setThreadName(_options.instanceName + "-" + std::to_string(i)); try { LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; asio::io_service::work work(_io_service); std::error_code ec; _io_service.run(ec); if (ec) { severe() << "Failure in _io_service.run(): " << ec.message(); fassertFailed(40335); } } catch (...) { severe() << "Uncaught exception in NetworkInterfaceASIO IO " "worker thread of type: " << exceptionToStatus(); fassertFailed(28820); } }); }; _state.store(State::kRunning); } void NetworkInterfaceASIO::shutdown() { _state.store(State::kShutdown); _io_service.stop(); for (auto&& worker : _serviceRunners) { worker.join(); } LOG(2) << "NetworkInterfaceASIO shutdown successfully"; } void NetworkInterfaceASIO::waitForWork() { stdx::unique_lock lk(_executorMutex); // TODO: This can be restructured with a lambda. while (!_isExecutorRunnable) { MONGO_IDLE_THREAD_BLOCK; _isExecutorRunnableCondition.wait(lk); } _isExecutorRunnable = false; } void NetworkInterfaceASIO::waitForWorkUntil(Date_t when) { stdx::unique_lock lk(_executorMutex); // TODO: This can be restructured with a lambda. while (!_isExecutorRunnable) { const Milliseconds waitTime(when - now()); if (waitTime <= Milliseconds(0)) { break; } MONGO_IDLE_THREAD_BLOCK; _isExecutorRunnableCondition.wait_for(lk, waitTime.toSystemDuration()); } _isExecutorRunnable = false; } void NetworkInterfaceASIO::signalWorkAvailable() { stdx::unique_lock lk(_executorMutex); _signalWorkAvailable_inlock(); } void NetworkInterfaceASIO::_signalWorkAvailable_inlock() { if (!_isExecutorRunnable) { _isExecutorRunnable = true; _isExecutorRunnableCondition.notify_one(); } } Date_t NetworkInterfaceASIO::now() { return _timerFactory->now(); } namespace { Status attachMetadataIfNeeded(RemoteCommandRequest& request, rpc::EgressMetadataHook* metadataHook) { // Append the metadata of the request with metadata from the metadata hook // if a hook is installed if (metadataHook) { BSONObjBuilder augmentedBob(std::move(request.metadata)); auto writeStatus = callNoexcept(*metadataHook, &rpc::EgressMetadataHook::writeRequestMetadata, request.opCtx, &augmentedBob); if (!writeStatus.isOK()) { return writeStatus; } request.metadata = augmentedBob.obj(); } return Status::OK(); } } // namespace Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function"); { stdx::lock_guard lk(_inProgressMutex); const auto insertResult = _inGetConnection.emplace(cbHandle); // We should never see the same CallbackHandle added twice MONGO_ASIO_INVARIANT_INLOCK(insertResult.second, "Same CallbackHandle added twice"); } if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; } LOG(2) << "startCommand: " << redact(request.toString()); auto getConnectionStartTime = now(); auto statusMetadata = attachMetadataIfNeeded(request, _metadataHook.get()); if (!statusMetadata.isOK()) { return statusMetadata; } auto nextStep = [this, getConnectionStartTime, cbHandle, request, onFinish]( StatusWith swConn) { if (!swConn.isOK()) { LOG(2) << "Failed to get connection from pool for request " << request.id << ": " << swConn.getStatus(); bool wasPreviouslyCanceled = false; { stdx::lock_guard lk(_inProgressMutex); wasPreviouslyCanceled = _inGetConnection.erase(cbHandle) == 0; } Status status = wasPreviouslyCanceled ? Status(ErrorCodes::CallbackCanceled, "Callback canceled") : swConn.getStatus(); if (ErrorCodes::isExceededTimeLimitError(status.code())) { _numTimedOutOps.fetchAndAdd(1); } if (status.code() != ErrorCodes::CallbackCanceled) { _numFailedOps.fetchAndAdd(1); } onFinish({status, now() - getConnectionStartTime}); signalWorkAvailable(); return; } auto conn = static_cast(swConn.getValue().get()); AsyncOp* op = nullptr; stdx::unique_lock lk(_inProgressMutex); const auto eraseCount = _inGetConnection.erase(cbHandle); // If we didn't find the request, we've been canceled if (eraseCount == 0) { lk.unlock(); onFinish({ErrorCodes::CallbackCanceled, "Callback canceled", now() - getConnectionStartTime}); // Though we were canceled, we know that the stream is fine, so indicate success. conn->indicateSuccess(); signalWorkAvailable(); return; } // We can't release the AsyncOp until we know we were not canceled. auto ownedOp = conn->releaseAsyncOp(); op = ownedOp.get(); // This AsyncOp may be recycled. We expect timeout and canceled to be clean. // If this op was most recently used to connect, its state transitions won't have been // reset, so we do that here. MONGO_ASIO_INVARIANT_INLOCK(!op->canceled(), "AsyncOp has dirty canceled flag", op); MONGO_ASIO_INVARIANT_INLOCK(!op->timedOut(), "AsyncOp has dirty timeout flag", op); op->clearStateTransitions(); // Now that we're inProgress, an external cancel can touch our op, but // not until we release the inProgressMutex. _inProgress.emplace(op, std::move(ownedOp)); op->_cbHandle = std::move(cbHandle); op->_request = std::move(request); op->_onFinish = std::move(onFinish); op->_connectionPoolHandle = std::move(swConn.getValue()); op->startProgress(getConnectionStartTime); // This ditches the lock and gets us onto the strand (so we're // threadsafe) op->_strand.post([this, op, getConnectionStartTime] { const auto timeout = op->_request.timeout; // Set timeout now that we have the correct request object if (timeout != RemoteCommandRequest::kNoTimeout) { // Subtract the time it took to get the connection from the pool from the request // timeout. auto getConnectionDuration = now() - getConnectionStartTime; if (getConnectionDuration >= timeout) { // We only assume that the request timer is guaranteed to fire *after* the // timeout duration - but make no stronger assumption. It is thus possible that // we have already exceeded the timeout. In this case we timeout the operation // manually. std::stringstream msg; msg << "Remote command timed out while waiting to get a connection from the " << "pool, took " << getConnectionDuration << ", timeout was set to " << timeout; auto rs = ResponseStatus(ErrorCodes::NetworkInterfaceExceededTimeLimit, msg.str(), getConnectionDuration); return _completeOperation(op, rs); } // The above conditional guarantees that the adjusted timeout will never underflow. MONGO_ASIO_INVARIANT(timeout > getConnectionDuration, "timeout underflowed", op); const auto adjustedTimeout = timeout - getConnectionDuration; const auto requestId = op->_request.id; try { op->_timeoutAlarm = op->_owner->_timerFactory->make(&op->_strand, adjustedTimeout); } catch (std::system_error& e) { severe() << "Failed to construct timer for AsyncOp: " << e.what(); fassertFailed(40334); } std::shared_ptr access; std::size_t generation; { stdx::lock_guard lk(op->_access->mutex); access = op->_access; generation = access->id; } op->_timeoutAlarm->asyncWait([op, access, generation, requestId, adjustedTimeout]( std::error_code ec) { // We must pass a check for safe access before using op inside the // callback or we may attempt access on an invalid pointer. stdx::lock_guard lk(access->mutex); if (generation != access->id) { // The operation has been cleaned up, do not access. return; } if (!ec) { LOG(2) << "Request " << requestId << " timed out" << ", adjusted timeout after getting connection from pool was " << adjustedTimeout << ", op was " << redact(op->toString()); op->timeOut_inlock(); } else { LOG(2) << "Failed to time request " << requestId << "out: " << ec.message() << ", op was " << redact(op->toString()); } }); } _beginCommunication(op); }); }; _connectionPool.get(request.target, request.timeout, nextStep); return Status::OK(); } void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { stdx::lock_guard lk(_inProgressMutex); // If we found a matching cbHandle in _inGetConnection, then // simply removing it has the same effect as cancelling it, so we // can just return. if (_inGetConnection.erase(cbHandle) != 0) { _numCanceledOps.fetchAndAdd(1); return; } // TODO: This linear scan is unfortunate. It is here because our // primary data structure is to keep the AsyncOps in an // unordered_map by pointer, but here we only have the // callback. We could keep two data structures at the risk of // having them diverge. for (auto&& kv : _inProgress) { if (kv.first->cbHandle() == cbHandle) { kv.first->cancel(); _numCanceledOps.fetchAndAdd(1); break; } } } Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function& action) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; } std::shared_ptr alarm; try { auto timeLeft = when - now(); // "alarm" must stay alive until it expires, hence the shared_ptr. alarm = std::make_shared(_io_service, timeLeft.toSystemDuration()); } catch (...) { return exceptionToStatus(); } alarm->async_wait([alarm, this, action, when](std::error_code ec) { const auto nowValue = now(); if (nowValue < when) { warning() << "ASIO alarm returned early. Expected at: " << when << ", fired at: " << nowValue; const auto status = setAlarm(when, action); if ((!status.isOK()) && (status.code() != ErrorCodes::ShutdownInProgress)) { fassertFailedWithStatus(40383, status); } return; } if (!ec) { return action(); } else if (ec != asio::error::operation_aborted) { // When the network interface is shut down, it will cancel all pending // alarms, raising an "operation_aborted" error here, which we ignore. warning() << "setAlarm() received an error: " << ec.message(); } }); return Status::OK(); }; bool NetworkInterfaceASIO::inShutdown() const { return (_state.load() == State::kShutdown); } bool NetworkInterfaceASIO::onNetworkThread() { auto id = stdx::this_thread::get_id(); return std::any_of(_serviceRunners.begin(), _serviceRunners.end(), [id](const stdx::thread& thread) { return id == thread.get_id(); }); } void NetworkInterfaceASIO::_failWithInfo(const char* file, int line, std::string error, AsyncOp* op) { stdx::lock_guard lk(_inProgressMutex); _failWithInfo_inlock(file, line, error, op); } void NetworkInterfaceASIO::_failWithInfo_inlock(const char* file, int line, std::string error, AsyncOp* op) { std::stringstream ss; ss << "Invariant failure at " << file << ":" << line << ": " << error; ss << _getDiagnosticString_inlock(op); Status status{ErrorCodes::InternalError, ss.str()}; fassertFailedWithStatus(34429, status); } void NetworkInterfaceASIO::dropConnections(const HostAndPort& hostAndPort) { _connectionPool.dropConnections(hostAndPort); } } // namespace executor } // namespace mongo