/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/network_connection_hook.h" #include #include #include "mongo/executor/connection_pool_stats.h" #include "mongo/stdx/functional.h" #include "mongo/util/log.h" #include "mongo/util/time_support.h" namespace mongo { namespace executor { NetworkInterfaceMock::NetworkInterfaceMock() : _waitingToRunMask(0), _currentlyRunning(kNoThread), _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))), _hasStarted(false), _inShutdown(false), _executorNextWakeupDate(Date_t::max()) {} NetworkInterfaceMock::~NetworkInterfaceMock() { stdx::unique_lock lk(_mutex); invariant(!_hasStarted || _inShutdown); invariant(_scheduled.empty()); invariant(_blackHoled.empty()); } std::string NetworkInterfaceMock::getDiagnosticString() { // TODO something better. return "NetworkInterfaceMock diagnostics here"; } void NetworkInterfaceMock::appendConnectionStats(ConnectionPoolStats* stats) const {} Date_t NetworkInterfaceMock::now() { stdx::lock_guard lk(_mutex); return _now_inlock(); } std::string NetworkInterfaceMock::getHostName() { return "thisisourhostname"; } void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { stdx::lock_guard lk(_mutex); invariant(!_inShutdown); const Date_t now = _now_inlock(); auto op = NetworkOperation(cbHandle, request, now, onFinish); // If we don't have a hook, or we have already 'connected' to this host, enqueue the op. if (!_hook || _connections.count(request.target)) { _enqueueOperation_inlock(std::move(op)); } else { _connectThenEnqueueOperation_inlock(request.target, std::move(op)); } } void NetworkInterfaceMock::setHandshakeReplyForHost( const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) { stdx::lock_guard lk(_mutex); auto it = _handshakeReplies.find(host); if (it == std::end(_handshakeReplies)) { auto res = _handshakeReplies.emplace(host, std::move(reply)); invariant(res.second); } else { it->second = std::move(reply); } } static bool findAndCancelIf( const stdx::function& matchFn, NetworkInterfaceMock::NetworkOperationList* other, NetworkInterfaceMock::NetworkOperationList* scheduled, const Date_t now) { const NetworkInterfaceMock::NetworkOperationIterator noi = std::find_if(other->begin(), other->end(), matchFn); if (noi == other->end()) { return false; } scheduled->splice(scheduled->begin(), *other, noi); noi->setResponse( now, TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Network operation canceled")); return true; } void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { stdx::lock_guard lk(_mutex); invariant(!_inShutdown); stdx::function matchesHandle = stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle); const Date_t now = _now_inlock(); if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) { return; } if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) { return; } if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) { return; } // No not-in-progress network command matched cbHandle. Oh, well. } void NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function& action) { stdx::unique_lock lk(_mutex); if (when <= _now_inlock()) { lk.unlock(); action(); return; } _alarms.emplace(when, action); } bool NetworkInterfaceMock::onNetworkThread() { return _currentlyRunning == kNetworkThread; } void NetworkInterfaceMock::startup() { stdx::lock_guard lk(_mutex); invariant(!_hasStarted); _hasStarted = true; _inShutdown = false; invariant(_currentlyRunning == kNoThread); _currentlyRunning = kExecutorThread; } void NetworkInterfaceMock::shutdown() { stdx::unique_lock lk(_mutex); invariant(_hasStarted); invariant(!_inShutdown); _inShutdown = true; NetworkOperationList todo; todo.splice(todo.end(), _scheduled); todo.splice(todo.end(), _unscheduled); todo.splice(todo.end(), _processing); todo.splice(todo.end(), _blackHoled); const Date_t now = _now_inlock(); _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling. lk.unlock(); for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) { iter->setResponse(now, TaskExecutor::ResponseStatus(ErrorCodes::ShutdownInProgress, "Shutting down mock network")); iter->finishResponse(); } lk.lock(); invariant(_currentlyRunning == kExecutorThread); _currentlyRunning = kNoThread; _waitingToRunMask = kNetworkThread; _shouldWakeNetworkCondition.notify_one(); } void NetworkInterfaceMock::enterNetwork() { stdx::unique_lock lk(_mutex); while (!_isNetworkThreadRunnable_inlock()) { _shouldWakeNetworkCondition.wait(lk); } _currentlyRunning = kNetworkThread; _waitingToRunMask &= ~kNetworkThread; } void NetworkInterfaceMock::exitNetwork() { stdx::lock_guard lk(_mutex); if (_currentlyRunning != kNetworkThread) { return; } _currentlyRunning = kNoThread; if (_isExecutorThreadRunnable_inlock()) { _shouldWakeExecutorCondition.notify_one(); } _waitingToRunMask |= kNetworkThread; } bool NetworkInterfaceMock::hasReadyRequests() { stdx::lock_guard lk(_mutex); invariant(_currentlyRunning == kNetworkThread); return _hasReadyRequests_inlock(); } bool NetworkInterfaceMock::_hasReadyRequests_inlock() { if (_unscheduled.empty()) return false; if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) { return false; } return true; } NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() { stdx::unique_lock lk(_mutex); invariant(_currentlyRunning == kNetworkThread); while (!_hasReadyRequests_inlock()) { _waitingToRunMask |= kExecutorThread; _runReadyNetworkOperations_inlock(&lk); } invariant(_hasReadyRequests_inlock()); _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin()); return _processing.begin(); } NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfUnscheduledQueue() { stdx::unique_lock lk(_mutex); invariant(_currentlyRunning == kNetworkThread); invariant(_hasReadyRequests_inlock()); return _unscheduled.begin(); } void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi, Date_t when, const TaskExecutor::ResponseStatus& response) { stdx::lock_guard lk(_mutex); invariant(_currentlyRunning == kNetworkThread); NetworkOperationIterator insertBefore = _scheduled.begin(); while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) { ++insertBefore; } noi->setResponse(when, response); _scheduled.splice(insertBefore, _processing, noi); } void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) { stdx::lock_guard lk(_mutex); invariant(_currentlyRunning == kNetworkThread); _blackHoled.splice(_blackHoled.end(), _processing, noi); } void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) { stdx::lock_guard lk(_mutex); invariant(_currentlyRunning == kNetworkThread); invariant(noi->getNextConsiderationDate() < dontAskUntil); invariant(_now_inlock() < dontAskUntil); NetworkOperationIterator insertBefore = _unscheduled.begin(); for (; insertBefore != _unscheduled.end(); ++insertBefore) { if (insertBefore->getNextConsiderationDate() >= dontAskUntil) { break; } } noi->setNextConsiderationDate(dontAskUntil); _unscheduled.splice(insertBefore, _processing, noi); } Date_t NetworkInterfaceMock::runUntil(Date_t until) { stdx::unique_lock lk(_mutex); invariant(_currentlyRunning == kNetworkThread); invariant(until > _now_inlock()); while (until > _now_inlock()) { _runReadyNetworkOperations_inlock(&lk); if (_hasReadyRequests_inlock()) { break; } Date_t newNow = _executorNextWakeupDate; if (!_alarms.empty() && _alarms.top().when < newNow) { newNow = _alarms.top().when; } if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) { newNow = _scheduled.front().getResponseDate(); } if (until < newNow) { newNow = until; } invariant(_now_inlock() <= newNow); _now = newNow; _waitingToRunMask |= kExecutorThread; } _runReadyNetworkOperations_inlock(&lk); return _now_inlock(); } void NetworkInterfaceMock::runReadyNetworkOperations() { stdx::unique_lock lk(_mutex); invariant(_currentlyRunning == kNetworkThread); _runReadyNetworkOperations_inlock(&lk); } void NetworkInterfaceMock::waitForWork() { stdx::unique_lock lk(_mutex); invariant(_currentlyRunning == kExecutorThread); _waitForWork_inlock(&lk); } void NetworkInterfaceMock::waitForWorkUntil(Date_t when) { stdx::unique_lock lk(_mutex); invariant(_currentlyRunning == kExecutorThread); _executorNextWakeupDate = when; if (_executorNextWakeupDate <= _now_inlock()) { return; } _waitForWork_inlock(&lk); } void NetworkInterfaceMock::_enqueueOperation_inlock( mongo::executor::NetworkInterfaceMock::NetworkOperation&& op) { auto insertBefore = std::upper_bound(std::begin(_unscheduled), std::end(_unscheduled), op, [](const NetworkOperation& a, const NetworkOperation& b) { return a.getNextConsiderationDate() < b.getNextConsiderationDate(); }); _unscheduled.emplace(insertBefore, std::move(op)); } void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target, NetworkOperation&& op) { invariant(_hook); // if there is no hook, we shouldn't even hit this codepath invariant(!_connections.count(target)); auto handshakeReplyIter = _handshakeReplies.find(target); auto handshakeReply = (handshakeReplyIter != std::end(_handshakeReplies)) ? handshakeReplyIter->second : RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0)); auto valid = _hook->validateHost(target, handshakeReply); if (!valid.isOK()) { op.setResponse(_now_inlock(), valid); op.finishResponse(); return; } auto swHookPostconnectCommand = _hook->makeRequest(target); if (!swHookPostconnectCommand.isOK()) { op.setResponse(_now_inlock(), swHookPostconnectCommand.getStatus()); op.finishResponse(); return; } boost::optional hookPostconnectCommand = std::move(swHookPostconnectCommand.getValue()); if (!hookPostconnectCommand) { // If we don't have a post connect command, enqueue the actual command. _enqueueOperation_inlock(std::move(op)); _connections.emplace(op.getRequest().target); return; } // The completion handler for the postconnect command schedules the original command. auto postconnectCompletionHandler = [this, op](StatusWith response) mutable { stdx::lock_guard lk(_mutex); if (!response.isOK()) { op.setResponse(_now_inlock(), response.getStatus()); op.finishResponse(); return; } auto handleStatus = _hook->handleReply(op.getRequest().target, std::move(response.getValue())); if (!handleStatus.isOK()) { op.setResponse(_now_inlock(), handleStatus); op.finishResponse(); return; } _enqueueOperation_inlock(std::move(op)); _connections.emplace(op.getRequest().target); }; auto postconnectOp = NetworkOperation(op.getCallbackHandle(), std::move(*hookPostconnectCommand), _now_inlock(), std::move(postconnectCompletionHandler)); _enqueueOperation_inlock(std::move(postconnectOp)); } void NetworkInterfaceMock::setConnectionHook(std::unique_ptr hook) { stdx::lock_guard lk(_mutex); invariant(!_hasStarted); invariant(!_hook); _hook = std::move(hook); } void NetworkInterfaceMock::signalWorkAvailable() { stdx::lock_guard lk(_mutex); _waitingToRunMask |= kExecutorThread; if (_currentlyRunning == kNoThread) { _shouldWakeExecutorCondition.notify_one(); } } void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock* lk) { while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) { auto fn = _alarms.top().action; _alarms.pop(); lk->unlock(); fn(); lk->lock(); } while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) { invariant(_currentlyRunning == kNetworkThread); NetworkOperation op = _scheduled.front(); _scheduled.pop_front(); _waitingToRunMask |= kExecutorThread; lk->unlock(); op.finishResponse(); lk->lock(); } invariant(_currentlyRunning == kNetworkThread); if (!(_waitingToRunMask & kExecutorThread)) { return; } _shouldWakeExecutorCondition.notify_one(); _currentlyRunning = kNoThread; while (!_isNetworkThreadRunnable_inlock()) { _shouldWakeNetworkCondition.wait(*lk); } _currentlyRunning = kNetworkThread; _waitingToRunMask &= ~kNetworkThread; } void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock* lk) { if (_waitingToRunMask & kExecutorThread) { _waitingToRunMask &= ~kExecutorThread; return; } _currentlyRunning = kNoThread; while (!_isExecutorThreadRunnable_inlock()) { _waitingToRunMask |= kNetworkThread; _shouldWakeNetworkCondition.notify_one(); _shouldWakeExecutorCondition.wait(*lk); } _currentlyRunning = kExecutorThread; _waitingToRunMask &= ~kExecutorThread; } bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() { if (_currentlyRunning != kNoThread) { return false; } if (_waitingToRunMask != kNetworkThread) { return false; } return true; } bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() { if (_currentlyRunning != kNoThread) { return false; } return _waitingToRunMask & kExecutorThread; } static const StatusWith kUnsetResponse( ErrorCodes::InternalError, "NetworkOperation::_response never set"); NetworkInterfaceMock::NetworkOperation::NetworkOperation() : _requestDate(), _nextConsiderationDate(), _responseDate(), _request(), _response(kUnsetResponse), _onFinish() {} NetworkInterfaceMock::NetworkOperation::NetworkOperation( const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& theRequest, Date_t theRequestDate, const RemoteCommandCompletionFn& onFinish) : _requestDate(theRequestDate), _nextConsiderationDate(theRequestDate), _responseDate(), _cbHandle(cbHandle), _request(theRequest), _response(kUnsetResponse), _onFinish(onFinish) {} NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {} void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate( Date_t nextConsiderationDate) { invariant(nextConsiderationDate > _nextConsiderationDate); _nextConsiderationDate = nextConsiderationDate; } void NetworkInterfaceMock::NetworkOperation::setResponse( Date_t responseDate, const TaskExecutor::ResponseStatus& response) { invariant(responseDate >= _requestDate); _responseDate = responseDate; _response = response; } void NetworkInterfaceMock::NetworkOperation::finishResponse() { invariant(_onFinish); _onFinish(_response); _onFinish = RemoteCommandCompletionFn(); } } // namespace executor } // namespace mongo