summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-01-17 14:13:34 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-26 20:10:42 +0000
commit57f6385025adf630a410f9d658f61b5afd140121 (patch)
tree16cf2b9a393cf513526fd2ebf9c8a9c8eddd165b /src
parent416e276e3c1f5b814a73efeb287256a747653ba1 (diff)
downloadmongo-57f6385025adf630a410f9d658f61b5afd140121.tar.gz
SERVER-45117 Guard NetworkInterfaceTL::setAlarm() more aggressively
The underlying issue is that NetworkInterfaceTL::_state and NetworkInterfaceTL::_inProgressMutex do not synchronize with each other. This is at best a temporary fix. Reasoning about alarm ordering in the context of the NetworkInterface itself is messy. This approximates what a proper composible AlarmScheudler type looks like. Hopefully, there will be more consistent work under SERVER-46419.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/executor/network_interface_tl.cpp59
-rw-r--r--src/mongo/executor/network_interface_tl.h5
2 files changed, 53 insertions, 11 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 0d070a6c8e7..c78b1602528 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -90,6 +90,11 @@ private:
Counters _data;
};
+namespace {
+const Status kNetworkInterfaceShutdownInProgress = {ErrorCodes::ShutdownInProgress,
+ "NetworkInterface shutdown in progress"};
+}
+
NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName,
ConnectionPool::Options connPoolOpts,
ServiceContext* svcCtx,
@@ -183,7 +188,7 @@ void NetworkInterfaceTL::shutdown() {
// Stop the reactor/thread first so that nothing runs on a partially dtor'd pool.
_reactor->stop();
- _cancelAllAlarms();
+ _shutdownAllAlarms();
_ioThread.join();
}
@@ -386,7 +391,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
RemoteCommandCompletionFn&& onFinish,
const BatonHandle& baton) {
if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ return kNetworkInterfaceShutdownInProgress;
}
LOGV2_DEBUG(22596,
@@ -866,7 +871,7 @@ void NetworkInterfaceTL::_killOperation(std::shared_ptr<RequestState> requestSta
Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ return kNetworkInterfaceShutdownInProgress;
}
_reactor->schedule([action = std::move(action)](auto status) { action(status); });
@@ -877,7 +882,8 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
Date_t when,
unique_function<void(Status)> action) {
if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ // Pessimistically check if we're in shutdown and save some work
+ return kNetworkInterfaceShutdownInProgress;
}
if (when <= now()) {
@@ -891,19 +897,31 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
auto alarmState =
std::make_shared<AlarmState>(when, cbHandle, _reactor->makeTimer(), std::move(pf.promise));
+ auto weakAlarmState = std::weak_ptr(alarmState);
+
{
stdx::lock_guard<Latch> lk(_inProgressMutex);
+ if (_inProgressAlarmsInShutdown) {
+ // Check that we've won any possible race with _shutdownAllAlarms();
+ return kNetworkInterfaceShutdownInProgress;
+ }
+
// If a user has already scheduled an alarm with a handle, make sure they intentionally
// override it by canceling and setting a new one.
- auto alarmPair = std::make_pair(cbHandle, std::shared_ptr<AlarmState>(alarmState));
- auto&& [_, wasInserted] = _inProgressAlarms.insert(std::move(alarmPair));
+ auto&& [_, wasInserted] = _inProgressAlarms.emplace(cbHandle, alarmState);
invariant(wasInserted);
}
alarmState->timer->waitUntil(alarmState->when, nullptr)
- .getAsync([this, state = std::move(alarmState)](Status status) mutable {
- _answerAlarm(status, state);
+ .getAsync([this, weakAlarmState](Status status) mutable {
+ auto state = weakAlarmState.lock();
+ if (!state) {
+ LOGV2_DEBUG(4511701, 4, "AlarmState destroyed before timer callback finished");
+ return;
+ }
+
+ _answerAlarm(status, std::move(state));
});
return Status::OK();
@@ -924,17 +942,29 @@ void NetworkInterfaceTL::cancelAlarm(const TaskExecutor::CallbackHandle& cbHandl
lk.unlock();
+ if (alarmState->done.swap(true)) {
+ return;
+ }
+
alarmState->timer->cancel();
alarmState->promise.setError(Status(ErrorCodes::CallbackCanceled, "Alarm cancelled"));
}
-void NetworkInterfaceTL::_cancelAllAlarms() {
+void NetworkInterfaceTL::_shutdownAllAlarms() {
auto alarms = [&] {
stdx::unique_lock<Latch> lk(_inProgressMutex);
+
+ // Prevent any more alarms from registering
+ _inProgressAlarmsInShutdown = true;
+
return std::exchange(_inProgressAlarms, {});
}();
for (auto&& [cbHandle, state] : alarms) {
+ if (state->done.swap(true)) {
+ continue;
+ }
+
state->timer->cancel();
state->promise.setError(Status(ErrorCodes::CallbackCanceled, "Alarm cancelled"));
}
@@ -944,7 +974,12 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
// Since the lock is released before canceling the timer, this thread can win the race with
// cancelAlarm(). Thus if status is CallbackCanceled, then this alarm is already removed from
// _inProgressAlarms.
- if (status == ErrorCodes::CallbackCanceled) {
+ if (ErrorCodes::isCancelationError(status)) {
+ return;
+ }
+
+ if (inShutdown()) {
+ // No alarms get processed in shutdown
return;
}
@@ -976,6 +1011,10 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
_inProgressAlarms.erase(iter);
}
+ if (state->done.swap(true)) {
+ return;
+ }
+
// A not OK status here means the timer experienced a system error.
// It is not reasonable to complete the promise on a reactor thread because there is likely no
// properly functioning reactor.
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index d89489b1b65..bdc73701dc4 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -269,10 +269,11 @@ private:
Date_t when;
std::unique_ptr<transport::ReactorTimer> timer;
+ AtomicWord<bool> done;
Promise<void> promise;
};
- void _cancelAllAlarms();
+ void _shutdownAllAlarms();
void _answerAlarm(Status status, std::shared_ptr<AlarmState> state);
void _run();
@@ -310,6 +311,8 @@ private:
Mutex _inProgressMutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_inProgressMutex");
stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandStateBase>> _inProgress;
+
+ bool _inProgressAlarmsInShutdown = false;
stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<AlarmState>>
_inProgressAlarms;