diff options
author | samantharitter <samantha.ritter@10gen.com> | 2016-11-04 14:45:32 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2016-11-05 21:26:59 -0400 |
commit | 0ac04999faae1d2fc0e10972aaf21082a2e48c8f (patch) | |
tree | d9b74efcf36c5381469cc622c3aea4c0f8166398 /src/mongo/db | |
parent | 2d1dd9e07a40f314853e29bffb56b45bf21df940 (diff) | |
download | mongo-0ac04999faae1d2fc0e10972aaf21082a2e48c8f.tar.gz |
SERVER-26674 transport::Session objects should be shared_ptr managed
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/auth/authorization_manager_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/client.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/client.h | 25 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/dbmessage.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_request_votes.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/service_context.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/service_context.h | 4 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.h | 4 |
11 files changed, 57 insertions, 54 deletions
diff --git a/src/mongo/db/auth/authorization_manager_test.cpp b/src/mongo/db/auth/authorization_manager_test.cpp index 9e9312d0694..50f823f014a 100644 --- a/src/mongo/db/auth/authorization_manager_test.cpp +++ b/src/mongo/db/auth/authorization_manager_test.cpp @@ -244,9 +244,9 @@ TEST_F(AuthorizationManagerTest, testAcquireV2User) { TEST_F(AuthorizationManagerTest, testLocalX509Authorization) { ServiceContextNoop serviceContext; transport::TransportLayerMock transportLayer{}; - transport::Session* session = transportLayer.createSession(); + transport::SessionHandle session = transportLayer.createSession(); transportLayer.setX509PeerInfo( - *session, + session, SSLPeerInfo("CN=mongodb.com", {RoleName("read", "test"), RoleName("readWrite", "test")})); ServiceContext::UniqueClient client = serviceContext.makeClient("testClient", session); ServiceContext::UniqueOperationContext txn = client->makeOperationContext(); @@ -278,9 +278,9 @@ TEST_F(AuthorizationManagerTest, testLocalX509Authorization) { TEST_F(AuthorizationManagerTest, testLocalX509AuthorizationInvalidUser) { ServiceContextNoop serviceContext; transport::TransportLayerMock transportLayer{}; - transport::Session* session = transportLayer.createSession(); + transport::SessionHandle session = transportLayer.createSession(); transportLayer.setX509PeerInfo( - *session, + session, SSLPeerInfo("CN=mongodb.com", {RoleName("read", "test"), RoleName("write", "test")})); ServiceContext::UniqueClient client = serviceContext.makeClient("testClient", session); ServiceContext::UniqueOperationContext txn = client->makeOperationContext(); @@ -293,8 +293,8 @@ TEST_F(AuthorizationManagerTest, testLocalX509AuthorizationInvalidUser) { TEST_F(AuthorizationManagerTest, testLocalX509AuthenticationNoAuthorization) { ServiceContextNoop serviceContext; transport::TransportLayerMock transportLayer{}; - transport::Session* session = transportLayer.createSession(); - transportLayer.setX509PeerInfo(*session, {}); + transport::SessionHandle session = transportLayer.createSession(); + transportLayer.setX509PeerInfo(session, {}); ServiceContext::UniqueClient client = serviceContext.makeClient("testClient", session); ServiceContext::UniqueOperationContext txn = client->makeOperationContext(); diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 179b0938d5b..643a25fda05 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -44,7 +44,6 @@ #include "mongo/db/lasterror.h" #include "mongo/db/service_context.h" #include "mongo/stdx/thread.h" -#include "mongo/transport/session.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/exit.h" #include "mongo/util/mongoutils/str.h" @@ -64,11 +63,13 @@ void Client::initThreadIfNotAlready() { initThreadIfNotAlready(getThreadName().c_str()); } -void Client::initThread(const char* desc, transport::Session* session) { - initThread(desc, getGlobalServiceContext(), session); +void Client::initThread(const char* desc, transport::SessionHandle session) { + initThread(desc, getGlobalServiceContext(), std::move(session)); } -void Client::initThread(const char* desc, ServiceContext* service, transport::Session* session) { +void Client::initThread(const char* desc, + ServiceContext* service, + transport::SessionHandle session) { invariant(currentClient.getMake()->get() == nullptr); std::string fullDesc; @@ -81,7 +82,7 @@ void Client::initThread(const char* desc, ServiceContext* service, transport::Se setThreadName(fullDesc.c_str()); // Create the client obj, attach to thread - *currentClient.get() = service->makeClient(fullDesc, session); + *currentClient.get() = service->makeClient(fullDesc, std::move(session)); } void Client::destroy() { @@ -99,12 +100,12 @@ int64_t generateSeed(const std::string& desc) { } } // namespace -Client::Client(std::string desc, ServiceContext* serviceContext, transport::Session* session) +Client::Client(std::string desc, ServiceContext* serviceContext, transport::SessionHandle session) : _serviceContext(serviceContext), - _session(session), + _session(std::move(session)), _desc(std::move(desc)), _threadId(stdx::this_thread::get_id()), - _connectionId(session ? session->id() : 0), + _connectionId(_session ? _session->id() : 0), _prng(generateSeed(_desc)) {} void Client::reportState(BSONObjBuilder& builder) { diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 8e313379b9c..50d4f5de541 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -56,10 +56,6 @@ class AbstractMessagingPort; class Collection; class OperationContext; -namespace transport { -class Session; -} // namespace transport - typedef long long ConnectionId; /** @@ -73,13 +69,12 @@ public: * An unowned pointer to a transport::Session may optionally be provided. If 'session' * is non-null, then it will be used to augment the thread name, and for reporting purposes. * - * If provided, 'session' must outlive the newly-created Client object. Client::destroy() may - * be used to help enforce that the Client does not outlive 'session.' + * If provided, session's ref count will be bumped by this Client. */ - static void initThread(const char* desc, transport::Session* session = nullptr); + static void initThread(const char* desc, transport::SessionHandle session = nullptr); static void initThread(const char* desc, ServiceContext* serviceContext, - transport::Session* session); + transport::SessionHandle session); static Client* getCurrent(); @@ -91,7 +86,7 @@ public: } bool hasRemote() const { - return _session; + return (_session != nullptr); } HostAndPort getRemote() const { @@ -109,10 +104,14 @@ public: /** * Returns the Session to which this client is bound, if any. */ - transport::Session* session() const { + const transport::SessionHandle& session() const& { return _session; } + transport::SessionHandle session() && { + return std::move(_session); + } + /** * Inits a thread if that thread has not already been init'd, setting the thread name to * "desc". @@ -202,10 +201,12 @@ public: private: friend class ServiceContext; - Client(std::string desc, ServiceContext* serviceContext, transport::Session* session = nullptr); + explicit Client(std::string desc, + ServiceContext* serviceContext, + transport::SessionHandle session); ServiceContext* const _serviceContext; - transport::Session* const _session; + const transport::SessionHandle _session; // Description for the client (e.g. conn8) const std::string _desc; diff --git a/src/mongo/db/dbmessage.cpp b/src/mongo/db/dbmessage.cpp index 182d538764b..1b991041094 100644 --- a/src/mongo/db/dbmessage.cpp +++ b/src/mongo/db/dbmessage.cpp @@ -177,7 +177,7 @@ OpQueryReplyBuilder::OpQueryReplyBuilder() : _buffer(32768) { _buffer.skip(sizeof(QueryResult::Value)); } -void OpQueryReplyBuilder::send(transport::Session* session, +void OpQueryReplyBuilder::send(const transport::SessionHandle& session, int queryResultFlags, const Message& requestMsg, int nReturned, @@ -192,7 +192,8 @@ void OpQueryReplyBuilder::send(transport::Session* session, uassertStatusOK(session->sinkMessage(response).wait()); } -void OpQueryReplyBuilder::sendCommandReply(transport::Session* session, const Message& requestMsg) { +void OpQueryReplyBuilder::sendCommandReply(const transport::SessionHandle& session, + const Message& requestMsg) { send(session, /*queryFlags*/ 0, requestMsg, /*nReturned*/ 1); } @@ -209,7 +210,7 @@ void OpQueryReplyBuilder::putInMessage( } void replyToQuery(int queryResultFlags, - transport::Session* session, + const transport::SessionHandle& session, Message& requestMsg, const void* data, int size, @@ -222,7 +223,7 @@ void replyToQuery(int queryResultFlags, } void replyToQuery(int queryResultFlags, - transport::Session* session, + const transport::SessionHandle& session, Message& requestMsg, const BSONObj& responseObj) { replyToQuery(queryResultFlags, diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 63d3aaea350..405b96c35af 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -35,6 +35,7 @@ #include "mongo/client/constants.h" #include "mongo/db/jsobj.h" #include "mongo/db/server_options.h" +#include "mongo/transport/session.h" #include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/message.h" @@ -42,10 +43,6 @@ namespace mongo { class OperationContext; -namespace transport { -class Session; -} // namespace transport - /* db response format Query or GetMore: // see struct QueryResult @@ -363,7 +360,7 @@ public: /** * Finishes the reply and sends the message out to 'destination'. */ - void send(transport::Session* session, + void send(const transport::SessionHandle& session, int queryResultFlags, const Message& requestMsg, int nReturned, @@ -373,14 +370,14 @@ public: /** * Similar to send() but used for replying to a command. */ - void sendCommandReply(transport::Session* session, const Message& requestMsg); + void sendCommandReply(const transport::SessionHandle& session, const Message& requestMsg); private: BufBuilder _buffer; }; void replyToQuery(int queryResultFlags, - transport::Session* session, + const transport::SessionHandle& session, Message& requestMsg, const void* data, int size, @@ -390,7 +387,7 @@ void replyToQuery(int queryResultFlags, /* object reply helper. */ void replyToQuery(int queryResultFlags, - transport::Session* session, + const transport::SessionHandle& session, Message& requestMsg, const BSONObj& responseObj); diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index c01628466c0..02eb5311cb3 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -67,7 +67,7 @@ private: // We want to keep request vote connection open when relinquishing primary. // Tag it here. transport::Session::TagMask originalTag = 0; - transport::Session* session = txn->getClient()->session(); + auto session = txn->getClient()->session(); if (session) { originalTag = session->getTags(); session->replaceTags(originalTag | transport::Session::kKeepOpen); diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index f52b29d360a..528ce49dbea 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -740,7 +740,7 @@ public: /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ transport::Session::TagMask originalTag = 0; - transport::Session* session = txn->getClient()->session(); + auto session = txn->getClient()->session(); if (session) { originalTag = session->getTags(); session->replaceTags(originalTag | transport::Session::kKeepOpen); diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index ffaa0f7f128..09af7608dd0 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -130,8 +130,8 @@ ServiceContext::~ServiceContext() { } ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, - transport::Session* session) { - std::unique_ptr<Client> client(new Client(std::move(desc), this, session)); + transport::SessionHandle session) { + std::unique_ptr<Client> client(new Client(std::move(desc), this, std::move(session))); auto observer = _clientObservers.cbegin(); try { for (; observer != _clientObservers.cend(); ++observer) { diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 12b205dba0b..c1895bf4065 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -37,6 +37,7 @@ #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" +#include "mongo/transport/session.h" #include "mongo/util/clock_source.h" #include "mongo/util/decorable.h" #include "mongo/util/tick_source.h" @@ -50,7 +51,6 @@ class OpObserver; class ServiceEntryPoint; namespace transport { -class Session; class TransportLayer; class TransportLayerManager; } // namespace transport @@ -216,7 +216,7 @@ public: * * If supplied, "session" is the transport::Session used for communicating with the client. */ - UniqueClient makeClient(std::string desc, transport::Session* session = nullptr); + UniqueClient makeClient(std::string desc, transport::SessionHandle session = nullptr); /** * Creates a new OperationContext on "client". diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 310c939c074..ac00e12d9ac 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -92,16 +92,19 @@ using transport::TransportLayer; ServiceEntryPointMongod::ServiceEntryPointMongod(TransportLayer* tl) : _tl(tl) {} -void ServiceEntryPointMongod::startSession(Session&& session) { - launchWrappedServiceEntryWorkerThread(std::move(session), [this](Session* session) { - _nWorkers.fetchAndAdd(1); - auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); }); - - _sessionLoop(session); - }); +void ServiceEntryPointMongod::startSession(transport::SessionHandle session) { + // Pass ownership of the transport::SessionHandle into our worker thread. When this + // thread exits, the session will end. + launchWrappedServiceEntryWorkerThread( + std::move(session), [this](const transport::SessionHandle& session) { + _nWorkers.fetchAndAdd(1); + auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); }); + + _sessionLoop(session); + }); } -void ServiceEntryPointMongod::_sessionLoop(Session* session) { +void ServiceEntryPointMongod::_sessionLoop(const transport::SessionHandle& session) { Message inMessage; bool inExhaust = false; int64_t counter = 0; diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h index 95a24de92fa..4917430f02f 100644 --- a/src/mongo/db/service_entry_point_mongod.h +++ b/src/mongo/db/service_entry_point_mongod.h @@ -53,14 +53,14 @@ public: virtual ~ServiceEntryPointMongod() = default; - void startSession(transport::Session&& session) override; + void startSession(transport::SessionHandle session) override; std::size_t getNumberOfActiveWorkerThreads() const { return _nWorkers.load(); } private: - void _sessionLoop(transport::Session* session); + void _sessionLoop(const transport::SessionHandle& session); transport::TransportLayer* _tl; AtomicWord<std::size_t> _nWorkers; |