diff options
author | samantharitter <samantha.ritter@10gen.com> | 2016-08-31 17:35:11 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2016-09-12 17:19:25 -0400 |
commit | d614dda4286e31680f2773f511eeda997ae66e38 (patch) | |
tree | 3a8710baf2f1c0caf0d567b91237a6b185fe9e4d | |
parent | 710eea36a153c9d3e0cfe820b39bd99137520aea (diff) | |
download | mongo-d614dda4286e31680f2773f511eeda997ae66e38.tar.gz |
SERVER-25918 Operations on closed sessions should return a TransportSessionClosed status
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.cpp | 5 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_test_suite.cpp | 16 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_test_suite.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/session.cpp | 19 | ||||
-rw-r--r-- | src/mongo/transport/session.h | 14 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.cpp | 3 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.h | 10 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_legacy.cpp | 37 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_legacy.h | 12 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.cpp | 20 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock_test.cpp | 19 |
15 files changed, 97 insertions, 76 deletions
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 8135afbec6b..310c939c074 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -117,6 +117,11 @@ void ServiceEntryPointMongod::_sessionLoop(Session* session) { break; } + // Our session may have been closed internally. + if (status == TransportLayer::TicketSessionClosedStatus) { + break; + } + uassertStatusOK(status); } diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index d7d40e4beee..427152c8adb 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -97,6 +97,11 @@ void ServiceEntryPointMongos::_sessionLoop(Session* session) { break; } + // Our session may have been closed internally. + if (status == TransportLayer::TicketSessionClosedStatus) { + break; + } + uassertStatusOK(status); } diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index 04dd760db4a..1a07ee31132 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -91,6 +91,7 @@ void setPingCommand(Message* m) { // Some default method implementations const auto kDefaultEnd = [](const Session& session) { return; }; +const auto kDefaultDestroyHook = [](Session& session) { return; }; const auto kDefaultAsyncWait = [](Ticket, TicketCallback cb) { cb(Status::OK()); }; const auto kNoopFunction = [] { return; }; @@ -218,6 +219,7 @@ void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() { _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1); _asyncWait = kDefaultAsyncWait; _end = kDefaultEnd; + _destroy_hook = kDefaultDestroyHook; } ServiceEntryPointTestSuite::MockTicket* ServiceEntryPointTestSuite::MockTLHarness::getMockTicket( @@ -225,6 +227,10 @@ ServiceEntryPointTestSuite::MockTicket* ServiceEntryPointTestSuite::MockTLHarnes return dynamic_cast<ServiceEntryPointTestSuite::MockTicket*>(getTicketImpl(ticket)); } +void ServiceEntryPointTestSuite::MockTLHarness::_destroy(Session& session) { + return _destroy_hook(session); +} + void ServiceEntryPointTestSuite::setUp() { _tl = stdx::make_unique<MockTLHarness>(); } @@ -245,7 +251,7 @@ void ServiceEntryPointTestSuite::noLifeCycleTest() { _tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1); // Step 3: SEP destroys the session, which calls end() - _tl->_end = [&testComplete](const Session&) { testComplete.set_value(); }; + _tl->_destroy_hook = [&testComplete](const Session&) { testComplete.set_value(); }; // Kick off the SEP Session s(HostAndPort(), HostAndPort(), _tl.get()); @@ -274,7 +280,7 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() { }; // Step 5: SEP destroys the session, which calls end() - _tl->_end = [&testComplete](const Session&) { testComplete.set_value(); }; + _tl->_destroy_hook = [&testComplete](const Session&) { testComplete.set_value(); }; // Kick off the SEP Session s(HostAndPort(), HostAndPort(), _tl.get()); @@ -300,7 +306,7 @@ void ServiceEntryPointTestSuite::fullLifeCycleTest() { // Step 5: SEP gets a ticket to source a Message // Step 6: SEP calls wait() on the ticket and receives and error // Step 7: SEP destroys the session, which calls end() - _tl->_end = [&testComplete](const Session& session) { testComplete.set_value(); }; + _tl->_destroy_hook = [&testComplete](const Session& session) { testComplete.set_value(); }; // Kick off the SEP Session s(HostAndPort(), HostAndPort(), _tl.get()); @@ -354,7 +360,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() { // Step 7: SEP calls sourceMessage() for B, gets tB3 // Step 8: SEP calls wait() for tB3, gets an error // Step 9: SEP calls end(B) - _tl->_end = [this, idA, idB, &resumeA, &testComplete](const Session& session) { + _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](const Session& session) { // When end(B) is called, time to resume session A if (session.id() == idB) { @@ -442,7 +448,7 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions, }; // When we end the last session, end the test. - _tl->_end = [&allSessionsComplete, numSessions, &ended](const Session& session) { + _tl->_destroy_hook = [&allSessionsComplete, numSessions, &ended](const Session& session) { if (ended.fetchAndAdd(1) == (numSessions - 1)) { allSessionsComplete.set_value(); } diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index c9b9c20702e..ba1d60fd027 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -146,6 +146,7 @@ private: stdx::function<Status(transport::Ticket)> _wait; stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait; stdx::function<void(const transport::Session&)> _end; + stdx::function<void(transport::Session& session)> _destroy_hook; stdx::function<void(transport::Session::TagMask tags)> _endAllSessions = [](transport::Session::TagMask tags) {}; stdx::function<Status(void)> _start = [] { return Status::OK(); }; @@ -162,6 +163,9 @@ private: // Reset all hooks to their defaults void _resetHooks(); + + private: + void _destroy(transport::Session& session) override; }; std::unique_ptr<MockTLHarness> _tl; diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp index 851f348edc0..b32c42609b8 100644 --- a/src/mongo/transport/session.cpp +++ b/src/mongo/transport/session.cpp @@ -43,20 +43,16 @@ AtomicUInt64 sessionIdCounter(0); } // namespace -const Status Session::ClosedStatus = - Status(ErrorCodes::TransportSessionClosed, "Session is closed."); - Session::Session(HostAndPort remote, HostAndPort local, TransportLayer* tl) - : _ended(false), - _id(sessionIdCounter.addAndFetch(1)), + : _id(sessionIdCounter.addAndFetch(1)), _remote(std::move(remote)), _local(std::move(local)), _tags(kEmptyTagMask), _tl(tl) {} Session::~Session() { - if (_tl != nullptr && !_ended) { - _tl->end(*this); + if (_tl != nullptr) { + _tl->_destroy(*this); } } @@ -65,7 +61,7 @@ Session::Session(Session&& other) _remote(std::move(other._remote)), _local(std::move(other._local)), _tl(other._tl) { - // We do not want to call tl->end() on moved-from Sessions. + // We do not want to call tl->destroy() on moved-from Sessions. other._tl = nullptr; } @@ -100,12 +96,5 @@ SSLPeerInfo Session::getX509PeerInfo() const { return _tl->getX509PeerInfo(*this); } -void Session::end() { - if (!_ended) { - _ended = true; - _tl->end(*this); - } -} - } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 76c2a975478..026a3445bb4 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -144,25 +144,11 @@ public: return _tl; } - /* - * End the session. - */ - void end(); - - /* - * Return true if the session ended, false otherwise. - */ - bool ended() const { - return _ended; - } - MessageCompressorManager& getCompressorManager() { return _messageCompressorManager; } private: - bool _ended = false; - Id _id; HostAndPort _remote; diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/transport_layer.cpp index 877070191b4..9ae229c6559 100644 --- a/src/mongo/transport/transport_layer.cpp +++ b/src/mongo/transport/transport_layer.cpp @@ -43,5 +43,8 @@ const Status TransportLayer::ShutdownStatus = const Status TransportLayer::TicketSessionUnknownStatus = Status( ErrorCodes::TransportSessionUnknown, "TransportLayer does not own the Ticket's Session."); +const Status TransportLayer::TicketSessionClosedStatus = Status( + ErrorCodes::TransportSessionClosed, "Operation attempted on a closed transport Session."); + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index b79921c38c3..3313a68108d 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -60,6 +60,9 @@ public: static const Status SessionUnknownStatus; static const Status ShutdownStatus; static const Status TicketSessionUnknownStatus; + static const Status TicketSessionClosedStatus; + + friend class Session; /** * Stats for sessions open in the Transport Layer. @@ -218,6 +221,13 @@ protected: TransportLayer* getTicketTransportLayer(const Ticket& ticket) { return ticket._tl; } + +private: + /** + * Destroys any information linked to this Session in the TransportLayer. + * This should only be called from within Session's destructor. + */ + virtual void _destroy(Session& session) = 0; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp index d43ea95e476..3ad4fa682cd 100644 --- a/src/mongo/transport/transport_layer_legacy.cpp +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -197,18 +197,9 @@ void TransportLayerLegacy::registerTags(const Session& session) { void TransportLayerLegacy::_endSession_inlock( decltype(TransportLayerLegacy::_connections.begin()) conn) { + conn->second.ended = true; conn->second.amp->shutdown(); - - // If the amp is in use it means that we're in the middle of fulfilling the ticket in _runTicket - // and can't erase it immediately. - // - // In that case we'll rely on _runTicket to do the removal for us later - if (conn->second.inUse) { - conn->second.ended = true; - } else { - Listener::globalTicketHolder.release(); - _connections.erase(conn); - } + Listener::globalTicketHolder.release(); } void TransportLayerLegacy::endAllSessions(Session::TagMask tags) { @@ -239,6 +230,19 @@ void TransportLayerLegacy::shutdown() { endAllSessions(Session::kEmptyTagMask); } +void TransportLayerLegacy::_destroy(Session& session) { + stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + auto conn = _connections.find(session.id()); + + invariant(conn != _connections.end()); + if (!conn->second.ended) { + _endSession_inlock(conn); + } + + invariant(!conn->second.inUse); + _connections.erase(conn); +} + Status TransportLayerLegacy::_runTicket(Ticket ticket) { if (!_running.load()) { return TransportLayer::ShutdownStatus; @@ -253,11 +257,17 @@ Status TransportLayerLegacy::_runTicket(Ticket ticket) { { stdx::lock_guard<stdx::mutex> lk(_connectionsMutex); + // Error if we cannot find the session. auto conn = _connections.find(ticket.sessionId()); if (conn == _connections.end()) { return TransportLayer::TicketSessionUnknownStatus; } + // Error if we find the session but its connection is closed. + if (conn->second.ended) { + return TransportLayer::TicketSessionClosedStatus; + } + // "check out" the port conn->second.inUse = true; amp = conn->second.amp.get(); @@ -282,11 +292,6 @@ Status TransportLayerLegacy::_runTicket(Ticket ticket) { } #endif conn->second.inUse = false; - - if (conn->second.ended) { - Listener::globalTicketHolder.release(); - _connections.erase(conn); - } } return res; diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h index 90078f2cf53..8135f559070 100644 --- a/src/mongo/transport/transport_layer_legacy.h +++ b/src/mongo/transport/transport_layer_legacy.h @@ -88,6 +88,8 @@ public: void shutdown() override; private: + void _destroy(Session& session) override; + void _handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp); Status _runTicket(Ticket ticket); @@ -139,11 +141,7 @@ private: */ struct Connection { Connection(std::unique_ptr<AbstractMessagingPort> port, bool ended, Session::TagMask tags) - : amp(std::move(port)), - connectionId(amp->connectionId()), - tags(tags), - inUse(false), - ended(false) {} + : amp(std::move(port)), connectionId(amp->connectionId()), tags(tags) {} std::unique_ptr<AbstractMessagingPort> amp; @@ -151,8 +149,8 @@ private: boost::optional<SSLPeerInfo> sslPeerInfo; Session::TagMask tags; - bool inUse; - bool ended; + bool inUse = false; + bool ended = false; }; ServiceEntryPoint* _sep; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 6fc191bf49f..2c12fe98c25 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -124,5 +124,9 @@ Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<Transpor return ptr->start(); } +void TransportLayerManager::_destroy(Session& session) { + MONGO_UNREACHABLE; +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index a83aa7bd4ac..bfdf7046da1 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -78,6 +78,8 @@ public: Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl); private: + void _destroy(Session& session) override; + template <typename Callable> void _foreach(Callable&& cb); diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 749ecdaf4d3..623980a19fc 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -69,8 +69,8 @@ Ticket TransportLayerMock::sourceMessage(Session& session, Message* message, Dat return Ticket(TransportLayer::ShutdownStatus); } else if (!owns(session.id())) { return Ticket(TransportLayer::SessionUnknownStatus); - } else if (session.ended()) { - return Ticket(Session::ClosedStatus); + } else if (_sessions[session.id()].ended) { + return Ticket(TransportLayer::TicketSessionClosedStatus); } return Ticket(this, @@ -84,8 +84,8 @@ Ticket TransportLayerMock::sinkMessage(Session& session, return Ticket(TransportLayer::ShutdownStatus); } else if (!owns(session.id())) { return Ticket(TransportLayer::SessionUnknownStatus); - } else if (session.ended()) { - return Ticket(Session::ClosedStatus); + } else if (_sessions[session.id()].ended) { + return Ticket(TransportLayer::TicketSessionClosedStatus); } return Ticket(this, stdx::make_unique<TransportLayerMock::TicketMock>(&session, expiration)); @@ -98,8 +98,8 @@ Status TransportLayerMock::wait(Ticket&& ticket) { return ticket.status(); } else if (!owns(ticket.sessionId())) { return TicketSessionUnknownStatus; - } else if (get(ticket.sessionId())->ended()) { - return Ticket::SessionClosedStatus; + } else if (_sessions[ticket.sessionId()].ended) { + return TransportLayer::TicketSessionClosedStatus; } return Status::OK(); @@ -129,7 +129,7 @@ Session* TransportLayerMock::createSession() { stdx::make_unique<Session>(HostAndPort(), HostAndPort(), this); Session::Id sessionId = session->id(); - _sessions[sessionId] = Connection{std::move(session), SSLPeerInfo()}; + _sessions[sessionId] = Connection{false, std::move(session), SSLPeerInfo()}; return _sessions[sessionId].session.get(); } @@ -146,7 +146,9 @@ bool TransportLayerMock::owns(Session::Id id) { } void TransportLayerMock::end(Session& session) { - session.end(); + if (!owns(session.id())) + return; + _sessions[session.id()].ended = true; } void TransportLayerMock::endAllSessions(Session::TagMask tags) { @@ -176,5 +178,7 @@ TransportLayerMock::~TransportLayerMock() { shutdown(); } +void TransportLayerMock::_destroy(Session& session) {} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index 73a14006dbb..a8d37ab7a0d 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -103,7 +103,10 @@ public: bool inShutdown() const; private: + void _destroy(Session& session) override; + struct Connection { + bool ended; std::unique_ptr<Session> session; SSLPeerInfo peerInfo; }; diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp index 2a6952989a1..a86874b4b00 100644 --- a/src/mongo/transport/transport_layer_mock_test.cpp +++ b/src/mongo/transport/transport_layer_mock_test.cpp @@ -228,13 +228,6 @@ TEST_F(TransportLayerMockTest, WaitTLShutdown) { ASSERT_EQUALS(status.code(), ErrorCodes::ShutdownInProgress); } -// end() closes the session -TEST_F(TransportLayerMockTest, EndSession) { - Session* session = tl()->createSession(); - tl()->end(*session); - ASSERT(session->ended()); -} - std::vector<Session*> createSessions(TransportLayerMock* tl) { int numSessions = 10; std::vector<Session*> sessions; @@ -245,9 +238,13 @@ std::vector<Session*> createSessions(TransportLayerMock* tl) { return sessions; } -void assertEnded(std::vector<Session*> sessions) { +void assertEnded(TransportLayer* tl, + std::vector<Session*> sessions, + ErrorCodes::Error code = ErrorCodes::TransportSessionClosed) { for (auto session : sessions) { - ASSERT(session->ended()); + Ticket ticket = Ticket(tl, stdx::make_unique<TransportLayerMock::TicketMock>(session)); + Status status = tl->wait(std::move(ticket)); + ASSERT_EQUALS(status.code(), code); } } @@ -255,14 +252,14 @@ void assertEnded(std::vector<Session*> sessions) { TEST_F(TransportLayerMockTest, EndAllSessions) { std::vector<Session*> sessions = createSessions(tl()); tl()->endAllSessions(Session::kEmptyTagMask); - assertEnded(sessions); + assertEnded(tl(), sessions); } // shutdown() ends all sessions and shuts down TEST_F(TransportLayerMockTest, Shutdown) { std::vector<Session*> sessions = createSessions(tl()); tl()->shutdown(); - assertEnded(sessions); + assertEnded(tl(), sessions, ErrorCodes::ShutdownInProgress); ASSERT(tl()->inShutdown()); } |