summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorHenrik Edin <henrik.edin@mongodb.com>2017-09-25 14:24:24 -0400
committerHenrik Edin <henrik.edin@mongodb.com>2017-10-04 09:08:02 -0400
commita73cff087d5142c378dd05e6fad3f8f6825c5401 (patch)
tree955ac8bede1fe71d105403d5023d10feecd36ecd /src/mongo/transport
parent939f2bcda18db41b774bbe3cf16e1d4928c58e5d (diff)
downloadmongo-a73cff087d5142c378dd05e6fad3f8f6825c5401.tar.gz
SERVER-31265 Mark new sessions as pending so terminateIfTagsDontMatch don't terminate sessions that haven't had their tags set yet. Clear this flag when we get the first set of tags.
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp10
-rw-r--r--src/mongo/transport/service_state_machine.cpp15
-rw-r--r--src/mongo/transport/service_state_machine.h9
-rw-r--r--src/mongo/transport/session.cpp8
-rw-r--r--src/mongo/transport/session.h4
-rw-r--r--src/mongo/transport/transport_layer_legacy.cpp1
-rw-r--r--src/mongo/transport/transport_layer_legacy.h2
7 files changed, 37 insertions, 12 deletions
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 1716e69e18e..df02f234f11 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -146,8 +146,13 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
using logger::LogComponent;
- // Request that all sessions end.
- endAllSessions(transport::Session::kEmptyTagMask);
+ stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
+
+ // Request that all sessions end, while holding the _sesionsMutex, loop over all the current
+ // connections and terminate them
+ for (auto& ssm : _sessions) {
+ ssm->terminate();
+ }
// Close all sockets and then wait for the number of active connections to reach zero with a
// condition_variable that notifies in the session cleanup hook. If we haven't closed drained
@@ -157,7 +162,6 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
const auto checkInterval = std::min(Milliseconds(250), timeout);
auto noWorkersLeft = [this] { return numOpenSessions() == 0; };
- stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
while (timeSpent < timeout &&
!_shutdownCondition.wait_for(lk, checkInterval.toSystemDuration(), noWorkersLeft)) {
log(LogComponent::kNetwork) << "shutdown: still waiting on " << numOpenSessions()
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 35694c390ba..47764de2444 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -429,11 +429,22 @@ void ServiceStateMachine::scheduleNext(ServiceExecutor::ScheduleFlags flags) {
_scheduleFunc([this] { runNext(); }, flags);
}
+void ServiceStateMachine::terminate() {
+ if (state() == State::Ended)
+ return;
+
+ _session()->getTransportLayer()->end(_session());
+}
+
void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) {
if (state() == State::Ended)
return;
- if (_session()->getTags() & tags) {
+ auto sessionTags = _session()->getTags();
+
+ // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have been
+ // set, then skip the termination check.
+ if ((sessionTags & tags) || (sessionTags & transport::Session::kPending)) {
log() << "Skip closing connection for connection # " << _session()->id();
return;
}
@@ -453,7 +464,7 @@ ServiceStateMachine::State ServiceStateMachine::state() {
void ServiceStateMachine::_terminateAndLogIfError(Status status) {
if (!status.isOK()) {
warning(logger::LogComponent::kExecutor) << "Terminating session due to error: " << status;
- terminateIfTagsDontMatch(transport::Session::kEmptyTagMask);
+ terminate();
}
}
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index b89b3a34d24..35812b86940 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -119,7 +119,16 @@ public:
State state();
/*
+ * Terminates the associated transport Session, regardless of tags.
+ *
+ * This will not block on the session terminating cleaning itself up, it returns immediately.
+ */
+ void terminate();
+
+ /*
* Terminates the associated transport Session if its tags don't match the supplied tags.
+ * If the session is in a pending state, before any tags have been set, it will not be
+ * terminated.
*
* This will not block on the session terminating cleaning itself up, it returns immediately.
*/
diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp
index 3e4c7a7fc22..5cf18eeb035 100644
--- a/src/mongo/transport/session.cpp
+++ b/src/mongo/transport/session.cpp
@@ -43,7 +43,7 @@ AtomicUInt64 sessionIdCounter(0);
} // namespace
-Session::Session() : _id(sessionIdCounter.addAndFetch(1)), _tags(kExternalClientKeepOpen) {}
+Session::Session() : _id(sessionIdCounter.addAndFetch(1)), _tags(kPending) {}
Ticket Session::sourceMessage(Message* message, Date_t expiration) {
return getTransportLayer()->sourceMessage(shared_from_this(), message, expiration);
@@ -54,11 +54,13 @@ Ticket Session::sinkMessage(const Message& message, Date_t expiration) {
}
void Session::replaceTags(TagMask tags) {
- _tags = tags;
+ // Don't allow explicit assignment of the pending tag, it's only used to describe a new session
+ // until the tags gets assigned.
+ _tags.store(tags & ~kPending);
}
Session::TagMask Session::getTags() const {
- return _tags;
+ return _tags.load();
}
} // namespace transport
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 2cfae3815f2..c5844ade1b4 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -31,6 +31,7 @@
#include <memory>
#include "mongo/base/disallow_copying.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/transport/session_id.h"
#include "mongo/transport/ticket.h"
#include "mongo/util/decorable.h"
@@ -72,6 +73,7 @@ public:
static constexpr TagMask kInternalClient = 2;
static constexpr TagMask kLatestVersionInternalClientKeepOpen = 4;
static constexpr TagMask kExternalClientKeepOpen = 8;
+ static constexpr TagMask kPending = 1 << 31;
/**
* Destroys a session, calling end() for this session in its TransportLayer.
@@ -136,7 +138,7 @@ protected:
private:
const Id _id;
- TagMask _tags;
+ AtomicWord<TagMask> _tags;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp
index 06e8c00969b..a974313b721 100644
--- a/src/mongo/transport/transport_layer_legacy.cpp
+++ b/src/mongo/transport/transport_layer_legacy.cpp
@@ -83,7 +83,6 @@ TransportLayerLegacy::LegacySession::LegacySession(std::unique_ptr<AbstractMessa
: _remote(amp->remoteAddr()),
_local(amp->localAddr()),
_tl(tl),
- _tags(kEmptyTagMask),
_connection(stdx::make_unique<Connection>(std::move(amp))) {}
TransportLayerLegacy::LegacySession::~LegacySession() {
diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h
index 796d47de1f4..47a97b6050d 100644
--- a/src/mongo/transport/transport_layer_legacy.h
+++ b/src/mongo/transport/transport_layer_legacy.h
@@ -157,8 +157,6 @@ private:
TransportLayerLegacy* _tl;
- TagMask _tags;
-
std::unique_ptr<Connection> _connection;
};