summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2016-08-31 17:35:11 -0400
committersamantharitter <samantha.ritter@10gen.com>2016-09-12 17:19:25 -0400
commitd614dda4286e31680f2773f511eeda997ae66e38 (patch)
tree3a8710baf2f1c0caf0d567b91237a6b185fe9e4d
parent710eea36a153c9d3e0cfe820b39bd99137520aea (diff)
downloadmongo-d614dda4286e31680f2773f511eeda997ae66e38.tar.gz
SERVER-25918 Operations on closed sessions should return a TransportSessionClosed status
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp5
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp5
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.cpp16
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.h4
-rw-r--r--src/mongo/transport/session.cpp19
-rw-r--r--src/mongo/transport/session.h14
-rw-r--r--src/mongo/transport/transport_layer.cpp3
-rw-r--r--src/mongo/transport/transport_layer.h10
-rw-r--r--src/mongo/transport/transport_layer_legacy.cpp37
-rw-r--r--src/mongo/transport/transport_layer_legacy.h12
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp4
-rw-r--r--src/mongo/transport/transport_layer_manager.h2
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp20
-rw-r--r--src/mongo/transport/transport_layer_mock.h3
-rw-r--r--src/mongo/transport/transport_layer_mock_test.cpp19
15 files changed, 97 insertions, 76 deletions
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 8135afbec6b..310c939c074 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -117,6 +117,11 @@ void ServiceEntryPointMongod::_sessionLoop(Session* session) {
break;
}
+ // Our session may have been closed internally.
+ if (status == TransportLayer::TicketSessionClosedStatus) {
+ break;
+ }
+
uassertStatusOK(status);
}
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
index d7d40e4beee..427152c8adb 100644
--- a/src/mongo/s/service_entry_point_mongos.cpp
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -97,6 +97,11 @@ void ServiceEntryPointMongos::_sessionLoop(Session* session) {
break;
}
+ // Our session may have been closed internally.
+ if (status == TransportLayer::TicketSessionClosedStatus) {
+ break;
+ }
+
uassertStatusOK(status);
}
diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp
index 04dd760db4a..1a07ee31132 100644
--- a/src/mongo/transport/service_entry_point_test_suite.cpp
+++ b/src/mongo/transport/service_entry_point_test_suite.cpp
@@ -91,6 +91,7 @@ void setPingCommand(Message* m) {
// Some default method implementations
const auto kDefaultEnd = [](const Session& session) { return; };
+const auto kDefaultDestroyHook = [](Session& session) { return; };
const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); };
const auto kNoopFunction = [] { return; };
@@ -218,6 +219,7 @@ void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() {
_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1);
_asyncWait = kDefaultAsyncWait;
_end = kDefaultEnd;
+ _destroy_hook = kDefaultDestroyHook;
}
ServiceEntryPointTestSuite::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket(
@@ -225,6 +227,10 @@ ServiceEntryPointTestSuite::MockTicket* ServiceEntryPointTestSuite::MockTLHarnes
return dynamic_cast<ServiceEntryPointTestSuite::MockTicket*>(getTicketImpl(ticket));
}
+void ServiceEntryPointTestSuite::MockTLHarness::_destroy(Session& session) {
+ return _destroy_hook(session);
+}
+
void ServiceEntryPointTestSuite::setUp() {
_tl = stdx::make_unique<MockTLHarness>();
}
@@ -245,7 +251,7 @@ void ServiceEntryPointTestSuite::noLifeCycleTest() {
_tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1);
// Step 3: SEP destroys the session, which calls end()
- _tl->_end = [&testComplete](const Session&) { testComplete.set_value(); };
+ _tl->_destroy_hook = [&testComplete](const Session&) { testComplete.set_value(); };
// Kick off the SEP
Session s(HostAndPort(), HostAndPort(), _tl.get());
@@ -274,7 +280,7 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() {
};
// Step 5: SEP destroys the session, which calls end()
- _tl->_end = [&testComplete](const Session&) { testComplete.set_value(); };
+ _tl->_destroy_hook = [&testComplete](const Session&) { testComplete.set_value(); };
// Kick off the SEP
Session s(HostAndPort(), HostAndPort(), _tl.get());
@@ -300,7 +306,7 @@ void ServiceEntryPointTestSuite::fullLifeCycleTest() {
// Step 5: SEP gets a ticket to source a Message
// Step 6: SEP calls wait() on the ticket and receives and error
// Step 7: SEP destroys the session, which calls end()
- _tl->_end = [&testComplete](const Session& session) { testComplete.set_value(); };
+ _tl->_destroy_hook = [&testComplete](const Session& session) { testComplete.set_value(); };
// Kick off the SEP
Session s(HostAndPort(), HostAndPort(), _tl.get());
@@ -354,7 +360,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() {
// Step 7: SEP calls sourceMessage() for B, gets tB3
// Step 8: SEP calls wait() for tB3, gets an error
// Step 9: SEP calls end(B)
- _tl->_end = [this, idA, idB, &resumeA, &testComplete](const Session& session) {
+ _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](const Session& session) {
// When end(B) is called, time to resume session A
if (session.id() == idB) {
@@ -442,7 +448,7 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions,
};
// When we end the last session, end the test.
- _tl->_end = [&allSessionsComplete, numSessions, &ended](const Session& session) {
+ _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](const Session& session) {
if (ended.fetchAndAdd(1) == (numSessions - 1)) {
allSessionsComplete.set_value();
}
diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h
index c9b9c20702e..ba1d60fd027 100644
--- a/src/mongo/transport/service_entry_point_test_suite.h
+++ b/src/mongo/transport/service_entry_point_test_suite.h
@@ -146,6 +146,7 @@ private:
stdx::function<Status(transport::Ticket)> _wait;
stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait;
stdx::function<void(const transport::Session&)> _end;
+ stdx::function<void(transport::Session& session)> _destroy_hook;
stdx::function<void(transport::Session::TagMask tags)> _endAllSessions =
[](transport::Session::TagMask tags) {};
stdx::function<Status(void)> _start = [] { return Status::OK(); };
@@ -162,6 +163,9 @@ private:
// Reset all hooks to their defaults
void _resetHooks();
+
+ private:
+ void _destroy(transport::Session& session) override;
};
std::unique_ptr<MockTLHarness> _tl;
diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp
index 851f348edc0..b32c42609b8 100644
--- a/src/mongo/transport/session.cpp
+++ b/src/mongo/transport/session.cpp
@@ -43,20 +43,16 @@ AtomicUInt64 sessionIdCounter(0);
} // namespace
-const Status Session::ClosedStatus =
- Status(ErrorCodes::TransportSessionClosed, "Session is closed.");
-
Session::Session(HostAndPort remote, HostAndPort local, TransportLayer* tl)
- : _ended(false),
- _id(sessionIdCounter.addAndFetch(1)),
+ : _id(sessionIdCounter.addAndFetch(1)),
_remote(std::move(remote)),
_local(std::move(local)),
_tags(kEmptyTagMask),
_tl(tl) {}
Session::~Session() {
- if (_tl != nullptr && !_ended) {
- _tl->end(*this);
+ if (_tl != nullptr) {
+ _tl->_destroy(*this);
}
}
@@ -65,7 +61,7 @@ Session::Session(Session&& other)
_remote(std::move(other._remote)),
_local(std::move(other._local)),
_tl(other._tl) {
- // We do not want to call tl->end() on moved-from Sessions.
+ // We do not want to call tl->destroy() on moved-from Sessions.
other._tl = nullptr;
}
@@ -100,12 +96,5 @@ SSLPeerInfo Session::getX509PeerInfo() const {
return _tl->getX509PeerInfo(*this);
}
-void Session::end() {
- if (!_ended) {
- _ended = true;
- _tl->end(*this);
- }
-}
-
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 76c2a975478..026a3445bb4 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -144,25 +144,11 @@ public:
return _tl;
}
- /*
- * End the session.
- */
- void end();
-
- /*
- * Return true if the session ended, false otherwise.
- */
- bool ended() const {
- return _ended;
- }
-
MessageCompressorManager& getCompressorManager() {
return _messageCompressorManager;
}
private:
- bool _ended = false;
-
Id _id;
HostAndPort _remote;
diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/transport_layer.cpp
index 877070191b4..9ae229c6559 100644
--- a/src/mongo/transport/transport_layer.cpp
+++ b/src/mongo/transport/transport_layer.cpp
@@ -43,5 +43,8 @@ const Status TransportLayer::ShutdownStatus =
const Status TransportLayer::TicketSessionUnknownStatus = Status(
ErrorCodes::TransportSessionUnknown, "TransportLayer does not own the Ticket's Session.");
+const Status TransportLayer::TicketSessionClosedStatus = Status(
+ ErrorCodes::TransportSessionClosed, "Operation attempted on a closed transport Session.");
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index b79921c38c3..3313a68108d 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -60,6 +60,9 @@ public:
static const Status SessionUnknownStatus;
static const Status ShutdownStatus;
static const Status TicketSessionUnknownStatus;
+ static const Status TicketSessionClosedStatus;
+
+ friend class Session;
/**
* Stats for sessions open in the Transport Layer.
@@ -218,6 +221,13 @@ protected:
TransportLayer* getTicketTransportLayer(const Ticket& ticket) {
return ticket._tl;
}
+
+private:
+ /**
+ * Destroys any information linked to this Session in the TransportLayer.
+ * This should only be called from within Session's destructor.
+ */
+ virtual void _destroy(Session& session) = 0;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp
index d43ea95e476..3ad4fa682cd 100644
--- a/src/mongo/transport/transport_layer_legacy.cpp
+++ b/src/mongo/transport/transport_layer_legacy.cpp
@@ -197,18 +197,9 @@ void TransportLayerLegacy::registerTags(const Session& session) {
void TransportLayerLegacy::_endSession_inlock(
decltype(TransportLayerLegacy::_connections.begin()) conn) {
+ conn->second.ended = true;
conn->second.amp->shutdown();
-
- // If the amp is in use it means that we're in the middle of fulfilling the ticket in _runTicket
- // and can't erase it immediately.
- //
- // In that case we'll rely on _runTicket to do the removal for us later
- if (conn->second.inUse) {
- conn->second.ended = true;
- } else {
- Listener::globalTicketHolder.release();
- _connections.erase(conn);
- }
+ Listener::globalTicketHolder.release();
}
void TransportLayerLegacy::endAllSessions(Session::TagMask tags) {
@@ -239,6 +230,19 @@ void TransportLayerLegacy::shutdown() {
endAllSessions(Session::kEmptyTagMask);
}
+void TransportLayerLegacy::_destroy(Session& session) {
+ stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ auto conn = _connections.find(session.id());
+
+ invariant(conn != _connections.end());
+ if (!conn->second.ended) {
+ _endSession_inlock(conn);
+ }
+
+ invariant(!conn->second.inUse);
+ _connections.erase(conn);
+}
+
Status TransportLayerLegacy::_runTicket(Ticket ticket) {
if (!_running.load()) {
return TransportLayer::ShutdownStatus;
@@ -253,11 +257,17 @@ Status TransportLayerLegacy::_runTicket(Ticket ticket) {
{
stdx::lock_guard<stdx::mutex> lk(_connectionsMutex);
+ // Error if we cannot find the session.
auto conn = _connections.find(ticket.sessionId());
if (conn == _connections.end()) {
return TransportLayer::TicketSessionUnknownStatus;
}
+ // Error if we find the session but its connection is closed.
+ if (conn->second.ended) {
+ return TransportLayer::TicketSessionClosedStatus;
+ }
+
// "check out" the port
conn->second.inUse = true;
amp = conn->second.amp.get();
@@ -282,11 +292,6 @@ Status TransportLayerLegacy::_runTicket(Ticket ticket) {
}
#endif
conn->second.inUse = false;
-
- if (conn->second.ended) {
- Listener::globalTicketHolder.release();
- _connections.erase(conn);
- }
}
return res;
diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h
index 90078f2cf53..8135f559070 100644
--- a/src/mongo/transport/transport_layer_legacy.h
+++ b/src/mongo/transport/transport_layer_legacy.h
@@ -88,6 +88,8 @@ public:
void shutdown() override;
private:
+ void _destroy(Session& session) override;
+
void _handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp);
Status _runTicket(Ticket ticket);
@@ -139,11 +141,7 @@ private:
*/
struct Connection {
Connection(std::unique_ptr<AbstractMessagingPort> port, bool ended, Session::TagMask tags)
- : amp(std::move(port)),
- connectionId(amp->connectionId()),
- tags(tags),
- inUse(false),
- ended(false) {}
+ : amp(std::move(port)), connectionId(amp->connectionId()), tags(tags) {}
std::unique_ptr<AbstractMessagingPort> amp;
@@ -151,8 +149,8 @@ private:
boost::optional<SSLPeerInfo> sslPeerInfo;
Session::TagMask tags;
- bool inUse;
- bool ended;
+ bool inUse = false;
+ bool ended = false;
};
ServiceEntryPoint* _sep;
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index 6fc191bf49f..2c12fe98c25 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -124,5 +124,9 @@ Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<Transpor
return ptr->start();
}
+void TransportLayerManager::_destroy(Session& session) {
+ MONGO_UNREACHABLE;
+}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index a83aa7bd4ac..bfdf7046da1 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -78,6 +78,8 @@ public:
Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl);
private:
+ void _destroy(Session& session) override;
+
template <typename Callable>
void _foreach(Callable&& cb);
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
index 749ecdaf4d3..623980a19fc 100644
--- a/src/mongo/transport/transport_layer_mock.cpp
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -69,8 +69,8 @@ Ticket TransportLayerMock::sourceMessage(Session& session, Message* message, Dat
return Ticket(TransportLayer::ShutdownStatus);
} else if (!owns(session.id())) {
return Ticket(TransportLayer::SessionUnknownStatus);
- } else if (session.ended()) {
- return Ticket(Session::ClosedStatus);
+ } else if (_sessions[session.id()].ended) {
+ return Ticket(TransportLayer::TicketSessionClosedStatus);
}
return Ticket(this,
@@ -84,8 +84,8 @@ Ticket TransportLayerMock::sinkMessage(Session& session,
return Ticket(TransportLayer::ShutdownStatus);
} else if (!owns(session.id())) {
return Ticket(TransportLayer::SessionUnknownStatus);
- } else if (session.ended()) {
- return Ticket(Session::ClosedStatus);
+ } else if (_sessions[session.id()].ended) {
+ return Ticket(TransportLayer::TicketSessionClosedStatus);
}
return Ticket(this, stdx::make_unique<TransportLayerMock::TicketMock>(&session, expiration));
@@ -98,8 +98,8 @@ Status TransportLayerMock::wait(Ticket&& ticket) {
return ticket.status();
} else if (!owns(ticket.sessionId())) {
return TicketSessionUnknownStatus;
- } else if (get(ticket.sessionId())->ended()) {
- return Ticket::SessionClosedStatus;
+ } else if (_sessions[ticket.sessionId()].ended) {
+ return TransportLayer::TicketSessionClosedStatus;
}
return Status::OK();
@@ -129,7 +129,7 @@ Session* TransportLayerMock::createSession() {
stdx::make_unique<Session>(HostAndPort(), HostAndPort(), this);
Session::Id sessionId = session->id();
- _sessions[sessionId] = Connection{std::move(session), SSLPeerInfo()};
+ _sessions[sessionId] = Connection{false, std::move(session), SSLPeerInfo()};
return _sessions[sessionId].session.get();
}
@@ -146,7 +146,9 @@ bool TransportLayerMock::owns(Session::Id id) {
}
void TransportLayerMock::end(Session& session) {
- session.end();
+ if (!owns(session.id()))
+ return;
+ _sessions[session.id()].ended = true;
}
void TransportLayerMock::endAllSessions(Session::TagMask tags) {
@@ -176,5 +178,7 @@ TransportLayerMock::~TransportLayerMock() {
shutdown();
}
+void TransportLayerMock::_destroy(Session& session) {}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
index 73a14006dbb..a8d37ab7a0d 100644
--- a/src/mongo/transport/transport_layer_mock.h
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -103,7 +103,10 @@ public:
bool inShutdown() const;
private:
+ void _destroy(Session& session) override;
+
struct Connection {
+ bool ended;
std::unique_ptr<Session> session;
SSLPeerInfo peerInfo;
};
diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp
index 2a6952989a1..a86874b4b00 100644
--- a/src/mongo/transport/transport_layer_mock_test.cpp
+++ b/src/mongo/transport/transport_layer_mock_test.cpp
@@ -228,13 +228,6 @@ TEST_F(TransportLayerMockTest, WaitTLShutdown) {
ASSERT_EQUALS(status.code(), ErrorCodes::ShutdownInProgress);
}
-// end() closes the session
-TEST_F(TransportLayerMockTest, EndSession) {
- Session* session = tl()->createSession();
- tl()->end(*session);
- ASSERT(session->ended());
-}
-
std::vector<Session*> createSessions(TransportLayerMock* tl) {
int numSessions = 10;
std::vector<Session*> sessions;
@@ -245,9 +238,13 @@ std::vector<Session*> createSessions(TransportLayerMock* tl) {
return sessions;
}
-void assertEnded(std::vector<Session*> sessions) {
+void assertEnded(TransportLayer* tl,
+ std::vector<Session*> sessions,
+ ErrorCodes::Error code = ErrorCodes::TransportSessionClosed) {
for (auto session : sessions) {
- ASSERT(session->ended());
+ Ticket ticket = Ticket(tl, stdx::make_unique<TransportLayerMock::TicketMock>(session));
+ Status status = tl->wait(std::move(ticket));
+ ASSERT_EQUALS(status.code(), code);
}
}
@@ -255,14 +252,14 @@ void assertEnded(std::vector<Session*> sessions) {
TEST_F(TransportLayerMockTest, EndAllSessions) {
std::vector<Session*> sessions = createSessions(tl());
tl()->endAllSessions(Session::kEmptyTagMask);
- assertEnded(sessions);
+ assertEnded(tl(), sessions);
}
// shutdown() ends all sessions and shuts down
TEST_F(TransportLayerMockTest, Shutdown) {
std::vector<Session*> sessions = createSessions(tl());
tl()->shutdown();
- assertEnded(sessions);
+ assertEnded(tl(), sessions, ErrorCodes::ShutdownInProgress);
ASSERT(tl()->inShutdown());
}