summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWaley Chen <waleycz@gmail.com>2016-03-30 17:36:37 -0400
committerWaley Chen <waleycz@gmail.com>2016-03-30 21:13:50 -0400
commitdfabadb09387a4236ee7675cd02d39b17affaf39 (patch)
treefca4ba78cd6bb9b96810f712b2b1abf64c55cfe2
parente45aba42f783cf2e2974b7ea6944a6c81b0a6be4 (diff)
downloadmongo-dfabadb09387a4236ee7675cd02d39b17affaf39.tar.gz
SERVER-21170 NetworkInterface::startCommand should be able to reject requests due to shutdown
-rw-r--r--src/mongo/executor/SConscript11
-rw-r--r--src/mongo/executor/network_interface.h21
-rw-r--r--src/mongo/executor/network_interface_asio.cpp19
-rw-r--r--src/mongo/executor/network_interface_asio.h11
-rw-r--r--src/mongo/executor/network_interface_asio_test.cpp37
-rw-r--r--src/mongo/executor/network_interface_impl.cpp38
-rw-r--r--src/mongo/executor/network_interface_impl.h11
-rw-r--r--src/mongo/executor/network_interface_impl_test.cpp76
-rw-r--r--src/mongo/executor/network_interface_mock.cpp41
-rw-r--r--src/mongo/executor/network_interface_mock.h11
-rw-r--r--src/mongo/executor/network_interface_mock_test.cpp94
11 files changed, 282 insertions, 88 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 7ca9d050f97..9c2b65e5135 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -68,6 +68,17 @@ env.Library(target='network_interface_impl', # TODO: rename to thread_pool_netwo
'task_executor_interface',
])
+env.CppUnitTest(
+ target='network_interface_impl_test',
+ source=[
+ 'network_interface_impl_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ 'network_interface_impl',
+ ],
+)
+
env.Library('network_interface_mock',
[
'network_interface_mock.cpp',
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index e35f8ed9c52..d230e2eb8a6 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -86,6 +86,11 @@ public:
virtual void shutdown() = 0;
/**
+ * Returns true if shutdown has been called, false otherwise.
+ */
+ virtual bool inShutdown() const = 0;
+
+ /**
* Blocks the current thread (presumably the executor thread) until the network interface
* knows of work for the executor to perform.
*/
@@ -114,10 +119,14 @@ public:
/**
* Starts asynchronous execution of the command described by "request".
+ *
+ * Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started
+ * and Status::OK() otherwise. If it returns Status::OK(), then the onFinish argument will be
+ * executed by NetworkInterface eventually; otherwise, it will not.
*/
- virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) = 0;
+ virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) = 0;
/**
* Requests cancelation of the network activity associated with "cbHandle" if it has not yet
@@ -133,13 +142,17 @@ public:
/**
* Sets an alarm, which schedules "action" to run no sooner than "when".
*
+ * Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started
+ * and true otherwise. If it returns Status::OK(), then the action will be executed by
+ * NetworkInterface eventually; otherwise, it will not.
+ *
* "action" should not do anything that requires a lot of computation, or that might block for a
* long time, as it may execute in a network thread.
*
* Any callbacks invoked from setAlarm must observe onNetworkThread to
* return true. See that method for why.
*/
- virtual void setAlarm(Date_t when, const stdx::function<void()>& action) = 0;
+ virtual Status setAlarm(Date_t when, const stdx::function<void()>& action) = 0;
/**
* Returns true if called from a thread dedicated to networking. I.e. not a
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index cf5d82c98b3..27e971f3eb4 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -228,9 +228,9 @@ Date_t NetworkInterfaceASIO::now() {
return Date_t::now();
}
-void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function");
{
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
@@ -239,6 +239,10 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
MONGO_ASIO_INVARIANT_INLOCK(insertResult.second, "Same CallbackHandle added twice");
}
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"};
+ }
+
LOG(2) << "startCommand: " << request.toString();
auto getConnectionStartTime = now();
@@ -369,6 +373,7 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
};
_connectionPool.get(request.target, request.timeout, nextStep);
+ return Status::OK();
}
void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
@@ -408,7 +413,11 @@ void NetworkInterfaceASIO::cancelAllCommands() {
}
}
-void NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"};
+ }
+
// "alarm" must stay alive until it expires, hence the shared_ptr.
auto alarm = std::make_shared<asio::system_timer>(_io_service, when.toSystemTimePoint());
alarm->async_wait([alarm, this, action](std::error_code ec) {
@@ -420,6 +429,8 @@ void NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& a
warning() << "setAlarm() received an error: " << ec.message();
}
});
+
+ return Status::OK();
};
bool NetworkInterfaceASIO::inShutdown() const {
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index a38e778470a..62e441d7d92 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -130,21 +130,20 @@ public:
std::string getHostName() override;
void startup() override;
void shutdown() override;
+ bool inShutdown() const override;
void waitForWork() override;
void waitForWorkUntil(Date_t when) override;
void signalWorkAvailable() override;
Date_t now() override;
- void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) override;
+ Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
void cancelAllCommands() override;
- void setAlarm(Date_t when, const stdx::function<void()>& action) override;
+ Status setAlarm(Date_t when, const stdx::function<void()>& action) override;
bool onNetworkThread() override;
- bool inShutdown() const;
-
private:
using ResponseStatus = TaskExecutor::ResponseStatus;
using NetworkInterface::RemoteCommandCompletionFn;
diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp
index 2e95e0383ef..7462070bad9 100644
--- a/src/mongo/executor/network_interface_asio_test.cpp
+++ b/src/mongo/executor/network_interface_asio_test.cpp
@@ -104,11 +104,12 @@ public:
Deferred<StatusWith<RemoteCommandResponse>> startCommand(
const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) {
Deferred<StatusWith<RemoteCommandResponse>> deferredResponse;
- net().startCommand(cbHandle,
- request,
- [deferredResponse](StatusWith<RemoteCommandResponse> response) mutable {
- deferredResponse.emplace(std::move(response));
- });
+ ASSERT_OK(net().startCommand(
+ cbHandle,
+ request,
+ [deferredResponse](StatusWith<RemoteCommandResponse> response) mutable {
+ deferredResponse.emplace(std::move(response));
+ }));
return deferredResponse;
}
@@ -422,6 +423,19 @@ TEST_F(NetworkInterfaceASIOTest, StartCommand) {
assertNumOps(0u, 0u, 0u, 1u);
}
+TEST_F(NetworkInterfaceASIOTest, InShutdown) {
+ ASSERT_FALSE(net().inShutdown());
+ net().shutdown();
+ ASSERT(net().inShutdown());
+}
+
+TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
+ net().shutdown();
+ ASSERT_NOT_OK(net().startCommand(makeCallbackHandle(),
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) {}));
+}
+
class MalformedMessageTest : public NetworkInterfaceASIOTest {
public:
using MessageHook = stdx::function<void(MsgData::View)>;
@@ -779,13 +793,13 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) {
assertNumOps(0u, 0u, 1u, 0u);
}
-TEST_F(NetworkInterfaceASIOTest, setAlarm) {
+TEST_F(NetworkInterfaceASIOTest, SetAlarm) {
// set a first alarm, to execute after "expiration"
Date_t expiration = net().now() + Milliseconds(100);
Deferred<Date_t> deferred;
- net().setAlarm(expiration,
- [this, expiration, deferred]() mutable { deferred.emplace(net().now()); });
+ ASSERT_OK(net().setAlarm(
+ expiration, [this, expiration, deferred]() mutable { deferred.emplace(net().now()); }));
// Get our timer factory
auto& factory = timerFactory();
@@ -799,12 +813,17 @@ TEST_F(NetworkInterfaceASIOTest, setAlarm) {
expiration = net().now() + Milliseconds(99999999);
Deferred<bool> deferred2;
- net().setAlarm(expiration, [this, deferred2]() mutable { deferred2.emplace(true); });
+ ASSERT_OK(net().setAlarm(expiration, [this, deferred2]() mutable { deferred2.emplace(true); }));
net().shutdown();
ASSERT(!deferred2.hasCompleted());
}
+TEST_F(NetworkInterfaceASIOTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
+ net().shutdown();
+ ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {}));
+}
+
class NetworkInterfaceASIOMetadataTest : public NetworkInterfaceASIOTest {
protected:
void setUp() override {}
diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp
index 38037d056c0..3a025332175 100644
--- a/src/mongo/executor/network_interface_impl.cpp
+++ b/src/mongo/executor/network_interface_impl.cpp
@@ -69,6 +69,7 @@ NetworkInterfaceImpl::NetworkInterfaceImpl() : NetworkInterfaceImpl(nullptr){};
NetworkInterfaceImpl::NetworkInterfaceImpl(std::unique_ptr<NetworkConnectionHook> hook)
: NetworkInterface(),
_pool(makeOptions()),
+ _inShutdown(false),
_commandRunner(kMessagingPortKeepOpen, std::move(hook)) {}
NetworkInterfaceImpl::~NetworkInterfaceImpl() {}
@@ -79,7 +80,7 @@ std::string NetworkInterfaceImpl::getDiagnosticString() {
str::stream output;
output << "NetworkImpl";
output << " threads:" << poolStats.numThreads;
- output << " inShutdown:" << _inShutdown;
+ output << " inShutdown:" << inShutdown();
output << " active:" << _numActiveNetworkRequests;
output << " pending:" << _pending.size();
output << " execRunable:" << _isExecutorRunnable;
@@ -89,9 +90,7 @@ std::string NetworkInterfaceImpl::getDiagnosticString() {
void NetworkInterfaceImpl::appendConnectionStats(ConnectionPoolStats* stats) const {}
void NetworkInterfaceImpl::startup() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(!_inShutdown);
- lk.unlock();
+ invariant(!inShutdown());
_commandRunner.startup();
_pool.startup();
@@ -100,7 +99,7 @@ void NetworkInterfaceImpl::startup() {
void NetworkInterfaceImpl::shutdown() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
+ _inShutdown.store(true);
_hasPending.notify_all();
_newAlarmReady.notify_all();
lk.unlock();
@@ -110,6 +109,10 @@ void NetworkInterfaceImpl::shutdown() {
_commandRunner.shutdown();
}
+bool NetworkInterfaceImpl::inShutdown() const {
+ return _inShutdown.load();
+}
+
void NetworkInterfaceImpl::signalWorkAvailable() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_signalWorkAvailable_inlock();
@@ -161,17 +164,25 @@ void NetworkInterfaceImpl::_runOneCommand() {
_signalWorkAvailable_inlock();
}
-void NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+Status NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceImpl shutdown in progress"};
+ }
+
LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << request.target;
+
stdx::lock_guard<stdx::mutex> lk(_mutex);
+
_pending.push_back(CommandData());
CommandData& cd = _pending.back();
cd.cbHandle = cbHandle;
cd.request = request;
cd.onFinish = onFinish;
fassert(28730, _pool.schedule([this]() -> void { _runOneCommand(); }));
+
+ return Status::OK();
}
void NetworkInterfaceImpl::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
@@ -203,13 +214,20 @@ std::string NetworkInterfaceImpl::getHostName() {
return getHostNameCached();
}
-void NetworkInterfaceImpl::setAlarm(Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceImpl::setAlarm(Date_t when, const stdx::function<void()>& action) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceImpl shutdown in progress"};
+ }
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
+
const bool notify = _alarms.empty() || _alarms.top().when > when;
_alarms.emplace(when, action);
if (notify) {
_newAlarmReady.notify_all();
}
+
+ return Status::OK();
}
bool NetworkInterfaceImpl::onNetworkThread() {
@@ -218,7 +236,7 @@ bool NetworkInterfaceImpl::onNetworkThread() {
void NetworkInterfaceImpl::_processAlarms() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_inShutdown) {
+ while (!inShutdown()) {
if (_alarms.empty()) {
_newAlarmReady.wait(lk);
} else if (now() < _alarms.top().when) {
diff --git a/src/mongo/executor/network_interface_impl.h b/src/mongo/executor/network_interface_impl.h
index 5915d2d99d6..55c25fe8e57 100644
--- a/src/mongo/executor/network_interface_impl.h
+++ b/src/mongo/executor/network_interface_impl.h
@@ -81,20 +81,21 @@ public:
void appendConnectionStats(ConnectionPoolStats* stats) const override;
void startup() override;
void shutdown() override;
+ bool inShutdown() const override;
void waitForWork() override;
void waitForWorkUntil(Date_t when) override;
void signalWorkAvailable() override;
Date_t now() override;
std::string getHostName() override;
- void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) override;
+ Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
/**
* Not implemented.
*/
void cancelAllCommands() override {}
- void setAlarm(Date_t when, const stdx::function<void()>& action) override;
+ Status setAlarm(Date_t when, const stdx::function<void()>& action) override;
bool onNetworkThread() override;
private:
@@ -159,7 +160,7 @@ private:
bool _isExecutorRunnable = false;
// Flag indicating when this interface is being shut down (because shutdown() has executed).
- bool _inShutdown = false;
+ std::atomic<bool> _inShutdown;
// Interface for running remote commands
RemoteCommandRunnerImpl _commandRunner;
diff --git a/src/mongo/executor/network_interface_impl_test.cpp b/src/mongo/executor/network_interface_impl_test.cpp
new file mode 100644
index 00000000000..019aa929fdb
--- /dev/null
+++ b/src/mongo/executor/network_interface_impl_test.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/executor/network_interface_impl.h"
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/wire_version.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+class NetworkInterfaceImplTest : public mongo::unittest::Test {
+public:
+ void setUp() override {
+ _net = stdx::make_unique<NetworkInterfaceImpl>();
+ _net->startup();
+ }
+
+ NetworkInterfaceImpl& net() {
+ return *_net;
+ }
+
+private:
+ std::unique_ptr<NetworkInterfaceImpl> _net;
+};
+
+TEST_F(NetworkInterfaceImplTest, InShutdown) {
+ ASSERT_FALSE(net().inShutdown());
+ net().shutdown();
+ ASSERT(net().inShutdown());
+}
+
+TEST_F(NetworkInterfaceImplTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
+ TaskExecutor::CallbackHandle cb{};
+ net().shutdown();
+ ASSERT_NOT_OK(net().startCommand(
+ cb, RemoteCommandRequest{}, [&](StatusWith<RemoteCommandResponse> resp) {}));
+}
+
+TEST_F(NetworkInterfaceImplTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
+ net().shutdown();
+ ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), []() {}));
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 34b6a2d4fc6..0e5f09c877e 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -54,7 +54,7 @@ NetworkInterfaceMock::NetworkInterfaceMock()
NetworkInterfaceMock::~NetworkInterfaceMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(!_hasStarted || _inShutdown);
+ invariant(!_hasStarted || inShutdown());
invariant(_scheduled.empty());
invariant(_blackHoled.empty());
}
@@ -75,11 +75,15 @@ std::string NetworkInterfaceMock::getHostName() {
return "thisisourhostname";
}
-void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+Status NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
+ }
+
stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_inShutdown);
+
const Date_t now = _now_inlock();
auto op = NetworkOperation(cbHandle, request, now, onFinish);
@@ -89,6 +93,8 @@ void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHa
} else {
_connectThenEnqueueOperation_inlock(request.target, std::move(op));
}
+
+ return Status::OK();
}
void NetworkInterfaceMock::setHandshakeReplyForHost(
@@ -121,8 +127,9 @@ static bool findAndCancelIf(
}
void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+ invariant(!inShutdown());
+
stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_inShutdown);
stdx::function<bool(const NetworkOperation&)> matchesHandle =
stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle);
const Date_t now = _now_inlock();
@@ -138,14 +145,21 @@ void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbH
// No not-in-progress network command matched cbHandle. Oh, well.
}
-void NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
+ }
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
+
if (when <= _now_inlock()) {
lk.unlock();
action();
- return;
+ return Status::OK();
}
_alarms.emplace(when, action);
+
+ return Status::OK();
}
bool NetworkInterfaceMock::onNetworkThread() {
@@ -156,16 +170,17 @@ void NetworkInterfaceMock::startup() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_hasStarted);
_hasStarted = true;
- _inShutdown = false;
+ _inShutdown.store(false);
invariant(_currentlyRunning == kNoThread);
_currentlyRunning = kExecutorThread;
}
void NetworkInterfaceMock::shutdown() {
+ invariant(!inShutdown());
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_hasStarted);
- invariant(!_inShutdown);
- _inShutdown = true;
+ _inShutdown.store(true);
NetworkOperationList todo;
todo.splice(todo.end(), _scheduled);
todo.splice(todo.end(), _unscheduled);
@@ -188,6 +203,10 @@ void NetworkInterfaceMock::shutdown() {
_shouldWakeNetworkCondition.notify_one();
}
+bool NetworkInterfaceMock::inShutdown() const {
+ return _inShutdown.load();
+}
+
void NetworkInterfaceMock::enterNetwork() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
while (!_isNetworkThreadRunnable_inlock()) {
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 439b33e5caf..b785ece5a97 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -86,21 +86,22 @@ public:
virtual void startup();
virtual void shutdown();
+ virtual bool inShutdown() const;
virtual void waitForWork();
virtual void waitForWorkUntil(Date_t when);
virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
virtual void signalWorkAvailable();
virtual Date_t now();
virtual std::string getHostName();
- virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
+ virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish);
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
/**
* Not implemented.
*/
void cancelAllCommands() override {}
- virtual void setAlarm(Date_t when, const stdx::function<void()>& action);
+ virtual Status setAlarm(Date_t when, const stdx::function<void()>& action);
virtual bool onNetworkThread();
@@ -290,7 +291,7 @@ private:
bool _hasStarted; // (M)
// Set to true by "shutDown()".
- bool _inShutdown; // (M)
+ std::atomic<bool> _inShutdown; // (M)
// Next date that the executor expects to wake up at (due to a scheduleWorkAt() call).
Date_t _executorNextWakeupDate; // (M)
diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp
index 283ec71fe59..f8166db2c2e 100644
--- a/src/mongo/executor/network_interface_mock_test.cpp
+++ b/src/mongo/executor/network_interface_mock_test.cpp
@@ -74,7 +74,9 @@ public:
// Wake up sleeping executor threads so they clean up.
net().signalWorkAvailable();
executor().join();
- net().shutdown();
+ if (!net().inShutdown()) {
+ net().shutdown();
+ }
}
private:
@@ -152,15 +154,16 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
BSONObj(),
Milliseconds(0)};
- net().startCommand(cb,
- actualCommandExpected,
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
- if (resp.isOK()) {
- gotCorrectCommandReply = (actualResponseExpected.toString() ==
- resp.getValue().toString());
- }
- });
+ ASSERT_OK(net().startCommand(cb,
+ actualCommandExpected,
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ if (resp.isOK()) {
+ gotCorrectCommandReply =
+ (actualResponseExpected.toString() ==
+ resp.getValue().toString());
+ }
+ }));
// At this point validate and makeRequest should have been called.
ASSERT(validateCalled);
@@ -223,14 +226,14 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
bool commandFinished = false;
bool statusPropagated = false;
- net().startCommand(cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
+ ASSERT_OK(net().startCommand(cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
- statusPropagated = resp.getStatus().code() ==
- ErrorCodes::ConflictingOperationInProgress;
- });
+ statusPropagated = resp.getStatus().code() ==
+ ErrorCodes::ConflictingOperationInProgress;
+ }));
{
net().enterNetwork();
@@ -263,9 +266,10 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
bool commandFinished = false;
- net().startCommand(cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; });
+ ASSERT_OK(net().startCommand(
+ cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; }));
{
net().enterNetwork();
@@ -298,13 +302,13 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
bool commandFinished = false;
bool errorPropagated = false;
- net().startCommand(cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
- errorPropagated =
- resp.getStatus().code() == ErrorCodes::InvalidSyncSource;
- });
+ ASSERT_OK(net().startCommand(cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ errorPropagated =
+ resp.getStatus().code() == ErrorCodes::InvalidSyncSource;
+ }));
{
net().enterNetwork();
@@ -336,13 +340,13 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
bool commandFinished = false;
bool errorPropagated = false;
- net().startCommand(cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
- errorPropagated =
- resp.getStatus().code() == ErrorCodes::CappedPositionLost;
- });
+ ASSERT_OK(net().startCommand(cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ errorPropagated =
+ resp.getStatus().code() == ErrorCodes::CappedPositionLost;
+ }));
ASSERT(!handleReplyCalled);
@@ -360,6 +364,28 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
ASSERT(errorPropagated);
}
+TEST_F(NetworkInterfaceMockTest, InShutdown) {
+ startNetwork();
+ ASSERT_FALSE(net().inShutdown());
+ net().shutdown();
+ ASSERT(net().inShutdown());
+}
+
+TEST_F(NetworkInterfaceMockTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
+ startNetwork();
+ net().shutdown();
+
+ TaskExecutor::CallbackHandle cb{};
+ ASSERT_NOT_OK(net().startCommand(
+ cb, RemoteCommandRequest{}, [](StatusWith<RemoteCommandResponse> resp) {}));
+}
+
+TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
+ startNetwork();
+ net().shutdown();
+ ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {}));
+}
+
} // namespace
} // namespace executor
} // namespace mongo