From dfabadb09387a4236ee7675cd02d39b17affaf39 Mon Sep 17 00:00:00 2001 From: Waley Chen Date: Wed, 30 Mar 2016 17:36:37 -0400 Subject: SERVER-21170 NetworkInterface::startCommand should be able to reject requests due to shutdown --- src/mongo/executor/SConscript | 11 +++ src/mongo/executor/network_interface.h | 21 ++++- src/mongo/executor/network_interface_asio.cpp | 19 ++++- src/mongo/executor/network_interface_asio.h | 11 ++- src/mongo/executor/network_interface_asio_test.cpp | 37 ++++++--- src/mongo/executor/network_interface_impl.cpp | 38 ++++++--- src/mongo/executor/network_interface_impl.h | 11 +-- src/mongo/executor/network_interface_impl_test.cpp | 76 +++++++++++++++++ src/mongo/executor/network_interface_mock.cpp | 41 +++++++--- src/mongo/executor/network_interface_mock.h | 11 +-- src/mongo/executor/network_interface_mock_test.cpp | 94 ++++++++++++++-------- 11 files changed, 282 insertions(+), 88 deletions(-) create mode 100644 src/mongo/executor/network_interface_impl_test.cpp diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 7ca9d050f97..9c2b65e5135 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -68,6 +68,17 @@ env.Library(target='network_interface_impl', # TODO: rename to thread_pool_netwo 'task_executor_interface', ]) +env.CppUnitTest( + target='network_interface_impl_test', + source=[ + 'network_interface_impl_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + 'network_interface_impl', + ], +) + env.Library('network_interface_mock', [ 'network_interface_mock.cpp', diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index e35f8ed9c52..d230e2eb8a6 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -85,6 +85,11 @@ public: */ virtual void shutdown() = 0; + /** + * Returns true if shutdown has been called, false otherwise. + */ + virtual bool inShutdown() const = 0; + /** * Blocks the current thread (presumably the executor thread) until the network interface * knows of work for the executor to perform. @@ -114,10 +119,14 @@ public: /** * Starts asynchronous execution of the command described by "request". + * + * Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started + * and Status::OK() otherwise. If it returns Status::OK(), then the onFinish argument will be + * executed by NetworkInterface eventually; otherwise, it will not. */ - virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) = 0; + virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) = 0; /** * Requests cancelation of the network activity associated with "cbHandle" if it has not yet @@ -133,13 +142,17 @@ public: /** * Sets an alarm, which schedules "action" to run no sooner than "when". * + * Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started + * and true otherwise. If it returns Status::OK(), then the action will be executed by + * NetworkInterface eventually; otherwise, it will not. + * * "action" should not do anything that requires a lot of computation, or that might block for a * long time, as it may execute in a network thread. * * Any callbacks invoked from setAlarm must observe onNetworkThread to * return true. See that method for why. */ - virtual void setAlarm(Date_t when, const stdx::function& action) = 0; + virtual Status setAlarm(Date_t when, const stdx::function& action) = 0; /** * Returns true if called from a thread dedicated to networking. I.e. not a diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index cf5d82c98b3..27e971f3eb4 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -228,9 +228,9 @@ Date_t NetworkInterfaceASIO::now() { return Date_t::now(); } -void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { +Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function"); { stdx::lock_guard lk(_inProgressMutex); @@ -239,6 +239,10 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa MONGO_ASIO_INVARIANT_INLOCK(insertResult.second, "Same CallbackHandle added twice"); } + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; + } + LOG(2) << "startCommand: " << request.toString(); auto getConnectionStartTime = now(); @@ -369,6 +373,7 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa }; _connectionPool.get(request.target, request.timeout, nextStep); + return Status::OK(); } void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { @@ -408,7 +413,11 @@ void NetworkInterfaceASIO::cancelAllCommands() { } } -void NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function& action) { +Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function& action) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; + } + // "alarm" must stay alive until it expires, hence the shared_ptr. auto alarm = std::make_shared(_io_service, when.toSystemTimePoint()); alarm->async_wait([alarm, this, action](std::error_code ec) { @@ -420,6 +429,8 @@ void NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function& a warning() << "setAlarm() received an error: " << ec.message(); } }); + + return Status::OK(); }; bool NetworkInterfaceASIO::inShutdown() const { diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index a38e778470a..62e441d7d92 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -130,21 +130,20 @@ public: std::string getHostName() override; void startup() override; void shutdown() override; + bool inShutdown() const override; void waitForWork() override; void waitForWorkUntil(Date_t when) override; void signalWorkAvailable() override; Date_t now() override; - void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) override; + Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; void cancelAllCommands() override; - void setAlarm(Date_t when, const stdx::function& action) override; + Status setAlarm(Date_t when, const stdx::function& action) override; bool onNetworkThread() override; - bool inShutdown() const; - private: using ResponseStatus = TaskExecutor::ResponseStatus; using NetworkInterface::RemoteCommandCompletionFn; diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp index 2e95e0383ef..7462070bad9 100644 --- a/src/mongo/executor/network_interface_asio_test.cpp +++ b/src/mongo/executor/network_interface_asio_test.cpp @@ -104,11 +104,12 @@ public: Deferred> startCommand( const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) { Deferred> deferredResponse; - net().startCommand(cbHandle, - request, - [deferredResponse](StatusWith response) mutable { - deferredResponse.emplace(std::move(response)); - }); + ASSERT_OK(net().startCommand( + cbHandle, + request, + [deferredResponse](StatusWith response) mutable { + deferredResponse.emplace(std::move(response)); + })); return deferredResponse; } @@ -422,6 +423,19 @@ TEST_F(NetworkInterfaceASIOTest, StartCommand) { assertNumOps(0u, 0u, 0u, 1u); } +TEST_F(NetworkInterfaceASIOTest, InShutdown) { + ASSERT_FALSE(net().inShutdown()); + net().shutdown(); + ASSERT(net().inShutdown()); +} + +TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) { + net().shutdown(); + ASSERT_NOT_OK(net().startCommand(makeCallbackHandle(), + RemoteCommandRequest{}, + [&](StatusWith resp) {})); +} + class MalformedMessageTest : public NetworkInterfaceASIOTest { public: using MessageHook = stdx::function; @@ -779,13 +793,13 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { assertNumOps(0u, 0u, 1u, 0u); } -TEST_F(NetworkInterfaceASIOTest, setAlarm) { +TEST_F(NetworkInterfaceASIOTest, SetAlarm) { // set a first alarm, to execute after "expiration" Date_t expiration = net().now() + Milliseconds(100); Deferred deferred; - net().setAlarm(expiration, - [this, expiration, deferred]() mutable { deferred.emplace(net().now()); }); + ASSERT_OK(net().setAlarm( + expiration, [this, expiration, deferred]() mutable { deferred.emplace(net().now()); })); // Get our timer factory auto& factory = timerFactory(); @@ -799,12 +813,17 @@ TEST_F(NetworkInterfaceASIOTest, setAlarm) { expiration = net().now() + Milliseconds(99999999); Deferred deferred2; - net().setAlarm(expiration, [this, deferred2]() mutable { deferred2.emplace(true); }); + ASSERT_OK(net().setAlarm(expiration, [this, deferred2]() mutable { deferred2.emplace(true); })); net().shutdown(); ASSERT(!deferred2.hasCompleted()); } +TEST_F(NetworkInterfaceASIOTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { + net().shutdown(); + ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {})); +} + class NetworkInterfaceASIOMetadataTest : public NetworkInterfaceASIOTest { protected: void setUp() override {} diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp index 38037d056c0..3a025332175 100644 --- a/src/mongo/executor/network_interface_impl.cpp +++ b/src/mongo/executor/network_interface_impl.cpp @@ -69,6 +69,7 @@ NetworkInterfaceImpl::NetworkInterfaceImpl() : NetworkInterfaceImpl(nullptr){}; NetworkInterfaceImpl::NetworkInterfaceImpl(std::unique_ptr hook) : NetworkInterface(), _pool(makeOptions()), + _inShutdown(false), _commandRunner(kMessagingPortKeepOpen, std::move(hook)) {} NetworkInterfaceImpl::~NetworkInterfaceImpl() {} @@ -79,7 +80,7 @@ std::string NetworkInterfaceImpl::getDiagnosticString() { str::stream output; output << "NetworkImpl"; output << " threads:" << poolStats.numThreads; - output << " inShutdown:" << _inShutdown; + output << " inShutdown:" << inShutdown(); output << " active:" << _numActiveNetworkRequests; output << " pending:" << _pending.size(); output << " execRunable:" << _isExecutorRunnable; @@ -89,9 +90,7 @@ std::string NetworkInterfaceImpl::getDiagnosticString() { void NetworkInterfaceImpl::appendConnectionStats(ConnectionPoolStats* stats) const {} void NetworkInterfaceImpl::startup() { - stdx::unique_lock lk(_mutex); - invariant(!_inShutdown); - lk.unlock(); + invariant(!inShutdown()); _commandRunner.startup(); _pool.startup(); @@ -100,7 +99,7 @@ void NetworkInterfaceImpl::startup() { void NetworkInterfaceImpl::shutdown() { stdx::unique_lock lk(_mutex); - _inShutdown = true; + _inShutdown.store(true); _hasPending.notify_all(); _newAlarmReady.notify_all(); lk.unlock(); @@ -110,6 +109,10 @@ void NetworkInterfaceImpl::shutdown() { _commandRunner.shutdown(); } +bool NetworkInterfaceImpl::inShutdown() const { + return _inShutdown.load(); +} + void NetworkInterfaceImpl::signalWorkAvailable() { stdx::lock_guard lk(_mutex); _signalWorkAvailable_inlock(); @@ -161,17 +164,25 @@ void NetworkInterfaceImpl::_runOneCommand() { _signalWorkAvailable_inlock(); } -void NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { +Status NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceImpl shutdown in progress"}; + } + LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << request.target; + stdx::lock_guard lk(_mutex); + _pending.push_back(CommandData()); CommandData& cd = _pending.back(); cd.cbHandle = cbHandle; cd.request = request; cd.onFinish = onFinish; fassert(28730, _pool.schedule([this]() -> void { _runOneCommand(); })); + + return Status::OK(); } void NetworkInterfaceImpl::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { @@ -203,13 +214,20 @@ std::string NetworkInterfaceImpl::getHostName() { return getHostNameCached(); } -void NetworkInterfaceImpl::setAlarm(Date_t when, const stdx::function& action) { +Status NetworkInterfaceImpl::setAlarm(Date_t when, const stdx::function& action) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceImpl shutdown in progress"}; + } + stdx::unique_lock lk(_mutex); + const bool notify = _alarms.empty() || _alarms.top().when > when; _alarms.emplace(when, action); if (notify) { _newAlarmReady.notify_all(); } + + return Status::OK(); } bool NetworkInterfaceImpl::onNetworkThread() { @@ -218,7 +236,7 @@ bool NetworkInterfaceImpl::onNetworkThread() { void NetworkInterfaceImpl::_processAlarms() { stdx::unique_lock lk(_mutex); - while (!_inShutdown) { + while (!inShutdown()) { if (_alarms.empty()) { _newAlarmReady.wait(lk); } else if (now() < _alarms.top().when) { diff --git a/src/mongo/executor/network_interface_impl.h b/src/mongo/executor/network_interface_impl.h index 5915d2d99d6..55c25fe8e57 100644 --- a/src/mongo/executor/network_interface_impl.h +++ b/src/mongo/executor/network_interface_impl.h @@ -81,20 +81,21 @@ public: void appendConnectionStats(ConnectionPoolStats* stats) const override; void startup() override; void shutdown() override; + bool inShutdown() const override; void waitForWork() override; void waitForWorkUntil(Date_t when) override; void signalWorkAvailable() override; Date_t now() override; std::string getHostName() override; - void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) override; + Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; /** * Not implemented. */ void cancelAllCommands() override {} - void setAlarm(Date_t when, const stdx::function& action) override; + Status setAlarm(Date_t when, const stdx::function& action) override; bool onNetworkThread() override; private: @@ -159,7 +160,7 @@ private: bool _isExecutorRunnable = false; // Flag indicating when this interface is being shut down (because shutdown() has executed). - bool _inShutdown = false; + std::atomic _inShutdown; // Interface for running remote commands RemoteCommandRunnerImpl _commandRunner; diff --git a/src/mongo/executor/network_interface_impl_test.cpp b/src/mongo/executor/network_interface_impl_test.cpp new file mode 100644 index 00000000000..019aa929fdb --- /dev/null +++ b/src/mongo/executor/network_interface_impl_test.cpp @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/executor/network_interface_impl.h" + +#include "mongo/platform/basic.h" + +#include "mongo/db/wire_version.h" +#include "mongo/unittest/unittest.h" +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace executor { +namespace { + +class NetworkInterfaceImplTest : public mongo::unittest::Test { +public: + void setUp() override { + _net = stdx::make_unique(); + _net->startup(); + } + + NetworkInterfaceImpl& net() { + return *_net; + } + +private: + std::unique_ptr _net; +}; + +TEST_F(NetworkInterfaceImplTest, InShutdown) { + ASSERT_FALSE(net().inShutdown()); + net().shutdown(); + ASSERT(net().inShutdown()); +} + +TEST_F(NetworkInterfaceImplTest, StartCommandReturnsNotOKIfShutdownHasStarted) { + TaskExecutor::CallbackHandle cb{}; + net().shutdown(); + ASSERT_NOT_OK(net().startCommand( + cb, RemoteCommandRequest{}, [&](StatusWith resp) {})); +} + +TEST_F(NetworkInterfaceImplTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { + net().shutdown(); + ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), []() {})); +} + +} // namespace +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 34b6a2d4fc6..0e5f09c877e 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -54,7 +54,7 @@ NetworkInterfaceMock::NetworkInterfaceMock() NetworkInterfaceMock::~NetworkInterfaceMock() { stdx::unique_lock lk(_mutex); - invariant(!_hasStarted || _inShutdown); + invariant(!_hasStarted || inShutdown()); invariant(_scheduled.empty()); invariant(_blackHoled.empty()); } @@ -75,11 +75,15 @@ std::string NetworkInterfaceMock::getHostName() { return "thisisourhostname"; } -void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { +Status NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; + } + stdx::lock_guard lk(_mutex); - invariant(!_inShutdown); + const Date_t now = _now_inlock(); auto op = NetworkOperation(cbHandle, request, now, onFinish); @@ -89,6 +93,8 @@ void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHa } else { _connectThenEnqueueOperation_inlock(request.target, std::move(op)); } + + return Status::OK(); } void NetworkInterfaceMock::setHandshakeReplyForHost( @@ -121,8 +127,9 @@ static bool findAndCancelIf( } void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { + invariant(!inShutdown()); + stdx::lock_guard lk(_mutex); - invariant(!_inShutdown); stdx::function matchesHandle = stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle); const Date_t now = _now_inlock(); @@ -138,14 +145,21 @@ void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbH // No not-in-progress network command matched cbHandle. Oh, well. } -void NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function& action) { +Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function& action) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; + } + stdx::unique_lock lk(_mutex); + if (when <= _now_inlock()) { lk.unlock(); action(); - return; + return Status::OK(); } _alarms.emplace(when, action); + + return Status::OK(); } bool NetworkInterfaceMock::onNetworkThread() { @@ -156,16 +170,17 @@ void NetworkInterfaceMock::startup() { stdx::lock_guard lk(_mutex); invariant(!_hasStarted); _hasStarted = true; - _inShutdown = false; + _inShutdown.store(false); invariant(_currentlyRunning == kNoThread); _currentlyRunning = kExecutorThread; } void NetworkInterfaceMock::shutdown() { + invariant(!inShutdown()); + stdx::unique_lock lk(_mutex); invariant(_hasStarted); - invariant(!_inShutdown); - _inShutdown = true; + _inShutdown.store(true); NetworkOperationList todo; todo.splice(todo.end(), _scheduled); todo.splice(todo.end(), _unscheduled); @@ -188,6 +203,10 @@ void NetworkInterfaceMock::shutdown() { _shouldWakeNetworkCondition.notify_one(); } +bool NetworkInterfaceMock::inShutdown() const { + return _inShutdown.load(); +} + void NetworkInterfaceMock::enterNetwork() { stdx::unique_lock lk(_mutex); while (!_isNetworkThreadRunnable_inlock()) { diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 439b33e5caf..b785ece5a97 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -86,21 +86,22 @@ public: virtual void startup(); virtual void shutdown(); + virtual bool inShutdown() const; virtual void waitForWork(); virtual void waitForWorkUntil(Date_t when); virtual void setConnectionHook(std::unique_ptr hook); virtual void signalWorkAvailable(); virtual Date_t now(); virtual std::string getHostName(); - virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); + virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish); virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); /** * Not implemented. */ void cancelAllCommands() override {} - virtual void setAlarm(Date_t when, const stdx::function& action); + virtual Status setAlarm(Date_t when, const stdx::function& action); virtual bool onNetworkThread(); @@ -290,7 +291,7 @@ private: bool _hasStarted; // (M) // Set to true by "shutDown()". - bool _inShutdown; // (M) + std::atomic _inShutdown; // (M) // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call). Date_t _executorNextWakeupDate; // (M) diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp index 283ec71fe59..f8166db2c2e 100644 --- a/src/mongo/executor/network_interface_mock_test.cpp +++ b/src/mongo/executor/network_interface_mock_test.cpp @@ -74,7 +74,9 @@ public: // Wake up sleeping executor threads so they clean up. net().signalWorkAvailable(); executor().join(); - net().shutdown(); + if (!net().inShutdown()) { + net().shutdown(); + } } private: @@ -152,15 +154,16 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) { BSONObj(), Milliseconds(0)}; - net().startCommand(cb, - actualCommandExpected, - [&](StatusWith resp) { - commandFinished = true; - if (resp.isOK()) { - gotCorrectCommandReply = (actualResponseExpected.toString() == - resp.getValue().toString()); - } - }); + ASSERT_OK(net().startCommand(cb, + actualCommandExpected, + [&](StatusWith resp) { + commandFinished = true; + if (resp.isOK()) { + gotCorrectCommandReply = + (actualResponseExpected.toString() == + resp.getValue().toString()); + } + })); // At this point validate and makeRequest should have been called. ASSERT(validateCalled); @@ -223,14 +226,14 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { bool commandFinished = false; bool statusPropagated = false; - net().startCommand(cb, - RemoteCommandRequest{}, - [&](StatusWith resp) { - commandFinished = true; + ASSERT_OK(net().startCommand(cb, + RemoteCommandRequest{}, + [&](StatusWith resp) { + commandFinished = true; - statusPropagated = resp.getStatus().code() == - ErrorCodes::ConflictingOperationInProgress; - }); + statusPropagated = resp.getStatus().code() == + ErrorCodes::ConflictingOperationInProgress; + })); { net().enterNetwork(); @@ -263,9 +266,10 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { bool commandFinished = false; - net().startCommand(cb, - RemoteCommandRequest{}, - [&](StatusWith resp) { commandFinished = true; }); + ASSERT_OK(net().startCommand( + cb, + RemoteCommandRequest{}, + [&](StatusWith resp) { commandFinished = true; })); { net().enterNetwork(); @@ -298,13 +302,13 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { bool commandFinished = false; bool errorPropagated = false; - net().startCommand(cb, - RemoteCommandRequest{}, - [&](StatusWith resp) { - commandFinished = true; - errorPropagated = - resp.getStatus().code() == ErrorCodes::InvalidSyncSource; - }); + ASSERT_OK(net().startCommand(cb, + RemoteCommandRequest{}, + [&](StatusWith resp) { + commandFinished = true; + errorPropagated = + resp.getStatus().code() == ErrorCodes::InvalidSyncSource; + })); { net().enterNetwork(); @@ -336,13 +340,13 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) { bool commandFinished = false; bool errorPropagated = false; - net().startCommand(cb, - RemoteCommandRequest{}, - [&](StatusWith resp) { - commandFinished = true; - errorPropagated = - resp.getStatus().code() == ErrorCodes::CappedPositionLost; - }); + ASSERT_OK(net().startCommand(cb, + RemoteCommandRequest{}, + [&](StatusWith resp) { + commandFinished = true; + errorPropagated = + resp.getStatus().code() == ErrorCodes::CappedPositionLost; + })); ASSERT(!handleReplyCalled); @@ -360,6 +364,28 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) { ASSERT(errorPropagated); } +TEST_F(NetworkInterfaceMockTest, InShutdown) { + startNetwork(); + ASSERT_FALSE(net().inShutdown()); + net().shutdown(); + ASSERT(net().inShutdown()); +} + +TEST_F(NetworkInterfaceMockTest, StartCommandReturnsNotOKIfShutdownHasStarted) { + startNetwork(); + net().shutdown(); + + TaskExecutor::CallbackHandle cb{}; + ASSERT_NOT_OK(net().startCommand( + cb, RemoteCommandRequest{}, [](StatusWith resp) {})); +} + +TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { + startNetwork(); + net().shutdown(); + ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {})); +} + } // namespace } // namespace executor } // namespace mongo -- cgit v1.2.1