summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-01-17 14:13:34 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-28 17:48:48 +0000
commit00b2ba7b26021582bf5d26adb2223568d3362fe9 (patch)
tree23cfb75fb35d1b8d95ce5cf07d7ff5382a5d1879
parentfe855c5d116316c50905cf5900c23801fd0c6f68 (diff)
downloadmongo-00b2ba7b26021582bf5d26adb2223568d3362fe9.tar.gz
SERVER-45117 SERVER-47351 SERVER-47507 Guard NetworkInterfaceTL shutdown for alarms and commands
These three tickets were originally committed separately to v4.4 and master. However, they build upon each other to throughly shutdown our NetworkInterfaces upon shutdown.
-rw-r--r--src/mongo/executor/network_interface_tl.cpp98
-rw-r--r--src/mongo/executor/network_interface_tl.h6
2 files changed, 92 insertions, 12 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 1dfbf7b3300..e8cd15f3e60 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -44,6 +44,11 @@
namespace mongo {
namespace executor {
+namespace {
+const Status kNetworkInterfaceShutdownInProgress = {ErrorCodes::ShutdownInProgress,
+ "NetworkInterface shutdown in progress"};
+}
+
NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName,
ConnectionPool::Options connPoolOpts,
ServiceContext* svcCtx,
@@ -77,6 +82,17 @@ NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName,
std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts);
}
+NetworkInterfaceTL::~NetworkInterfaceTL() {
+ if (!inShutdown()) {
+ shutdown();
+ }
+
+ // Because we quick exit on shutdown, these invariants are usually checked only in ASAN builds
+ // and integration/unit tests.
+ invariant(_inProgress.empty());
+ invariant(_inProgressAlarms.empty());
+}
+
std::string NetworkInterfaceTL::getDiagnosticString() {
return "DEPRECATED: getDiagnosticString is deprecated in NetworkInterfaceTL";
}
@@ -134,10 +150,29 @@ void NetworkInterfaceTL::shutdown() {
LOG(2) << "Shutting down network interface.";
+ // Cancel any remaining commands. Any attempt to register new commands will throw.
+ auto inProgress = [&] {
+ stdx::lock_guard lk(_inProgressMutex);
+ return std::exchange(_inProgress, {});
+ }();
+
+ for (auto& [_, weakCmdState] : inProgress) {
+ auto cmdState = weakCmdState.lock();
+ if (!cmdState) {
+ continue;
+ }
+
+ if (cmdState->done.swap(true)) {
+ continue;
+ }
+
+ cmdState->promise.setError(kNetworkInterfaceShutdownInProgress);
+ }
+
// Stop the reactor/thread first so that nothing runs on a partially dtor'd pool.
_reactor->stop();
- _cancelAllAlarms();
+ _shutdownAllAlarms();
_ioThread.join();
}
@@ -195,6 +230,11 @@ auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface,
{
stdx::lock_guard lk(interface->_inProgressMutex);
+ if (interface->inShutdown()) {
+ // If we're in shutdown, we can't add a new command.
+ uassertStatusOK(kNetworkInterfaceShutdownInProgress);
+ }
+
interface->_inProgress.insert({cbHandle, state});
}
@@ -213,9 +253,9 @@ NetworkInterfaceTL::CommandState::~CommandState() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequestOnAny& request,
RemoteCommandCompletionFn&& onFinish,
- const BatonHandle& baton) {
+ const BatonHandle& baton) try {
if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ return kNetworkInterfaceShutdownInProgress;
}
LOG(3) << "startCommand: " << redact(request.toString());
@@ -362,6 +402,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
}
return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
}
// This is only called from within a then() callback on a future, so throwing is equivalent to
@@ -506,7 +548,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ return kNetworkInterfaceShutdownInProgress;
}
_reactor->schedule([action = std::move(action)](auto status) { action(status); });
@@ -517,7 +559,8 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
Date_t when,
unique_function<void(Status)> action) {
if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ // Pessimistically check if we're in shutdown and save some work
+ return kNetworkInterfaceShutdownInProgress;
}
if (when <= now()) {
@@ -531,19 +574,31 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
auto alarmState =
std::make_shared<AlarmState>(when, cbHandle, _reactor->makeTimer(), std::move(pf.promise));
+ auto weakAlarmState = std::weak_ptr<AlarmState>(alarmState);
+
{
stdx::lock_guard<Latch> lk(_inProgressMutex);
+ if (_inProgressAlarmsInShutdown) {
+ // Check that we've won any possible race with _shutdownAllAlarms();
+ return kNetworkInterfaceShutdownInProgress;
+ }
+
// If a user has already scheduled an alarm with a handle, make sure they intentionally
// override it by canceling and setting a new one.
- auto alarmPair = std::make_pair(cbHandle, std::shared_ptr<AlarmState>(alarmState));
- auto&& [_, wasInserted] = _inProgressAlarms.insert(std::move(alarmPair));
+ auto&& [_, wasInserted] = _inProgressAlarms.emplace(cbHandle, alarmState);
invariant(wasInserted);
}
alarmState->timer->waitUntil(alarmState->when, nullptr)
- .getAsync([this, state = std::move(alarmState)](Status status) mutable {
- _answerAlarm(status, state);
+ .getAsync([this, weakAlarmState](Status status) mutable {
+ auto state = weakAlarmState.lock();
+ if (!state) {
+ LOG(4) << "AlarmState destroyed before timer callback finished";
+ return;
+ }
+
+ _answerAlarm(status, std::move(state));
});
return Status::OK();
@@ -564,17 +619,29 @@ void NetworkInterfaceTL::cancelAlarm(const TaskExecutor::CallbackHandle& cbHandl
lk.unlock();
+ if (alarmState->done.swap(true)) {
+ return;
+ }
+
alarmState->timer->cancel();
alarmState->promise.setError(Status(ErrorCodes::CallbackCanceled, "Alarm cancelled"));
}
-void NetworkInterfaceTL::_cancelAllAlarms() {
+void NetworkInterfaceTL::_shutdownAllAlarms() {
auto alarms = [&] {
stdx::unique_lock<Latch> lk(_inProgressMutex);
+
+ // Prevent any more alarms from registering
+ _inProgressAlarmsInShutdown = true;
+
return std::exchange(_inProgressAlarms, {});
}();
for (auto&& [cbHandle, state] : alarms) {
+ if (state->done.swap(true)) {
+ continue;
+ }
+
state->timer->cancel();
state->promise.setError(Status(ErrorCodes::CallbackCanceled, "Alarm cancelled"));
}
@@ -584,7 +651,12 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
// Since the lock is released before canceling the timer, this thread can win the race with
// cancelAlarm(). Thus if status is CallbackCanceled, then this alarm is already removed from
// _inProgressAlarms.
- if (status == ErrorCodes::CallbackCanceled) {
+ if (ErrorCodes::isCancelationError(status.code())) {
+ return;
+ }
+
+ if (inShutdown()) {
+ // No alarms get processed in shutdown
return;
}
@@ -613,6 +685,10 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
_inProgressAlarms.erase(iter);
}
+ if (state->done.swap(true)) {
+ return;
+ }
+
// A not OK status here means the timer experienced a system error.
// It is not reasonable to complete the promise on a reactor thread because there is likely no
// properly functioning reactor.
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 80e510a015f..9ea0ea9fc99 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -53,6 +53,7 @@ public:
ServiceContext* ctx,
std::unique_ptr<NetworkConnectionHook> onConnectHook,
std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
+ ~NetworkInterfaceTL();
std::string getDiagnosticString() override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
@@ -130,10 +131,11 @@ private:
Date_t when;
std::unique_ptr<transport::ReactorTimer> timer;
+ AtomicWord<bool> done;
Promise<void> promise;
};
- void _cancelAllAlarms();
+ void _shutdownAllAlarms();
void _answerAlarm(Status status, std::shared_ptr<AlarmState> state);
void _run();
@@ -170,6 +172,8 @@ private:
Mutex _inProgressMutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_inProgressMutex");
stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandState>> _inProgress;
+
+ bool _inProgressAlarmsInShutdown = false;
stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<AlarmState>>
_inProgressAlarms;