diff options
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 80 |
1 files changed, 25 insertions, 55 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 85d8c1db87..a6528f9ad9 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -25,31 +25,31 @@ #include "qpid/Plugin.h" #include "qpid/sys/Socket.h" -#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> +#include <memory> namespace qpid { namespace sys { class AsynchIOProtocolFactory : public ProtocolFactory { const bool tcpNoDelay; - boost::ptr_vector<Socket> listeners; - boost::ptr_vector<AsynchAcceptor> acceptors; - uint16_t listeningPort; + Socket listener; + const uint16_t listeningPort; + std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); + AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, const std::string& port, + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; + std::string getHost() const; private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, @@ -61,49 +61,23 @@ class AsynchIOProtocolFactory : public ProtocolFactory { static class TCPIOPlugin : public Plugin { void earlyInitialize(Target&) { } - + void initialize(Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocolt( - new AsynchIOProtocolFactory( - "", boost::lexical_cast<std::string>(opts.port), - opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); - broker->registerProtocolFactory("tcp", protocolt); + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, + opts.tcpNoDelay)); + QPID_LOG(notice, "Listening on TCP port " << protocol->getPort()); + broker->registerProtocolFactory("tcp", protocol); } } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : - tcpNoDelay(nodelay) -{ - SocketAddress sa(host, port); - - // We must have at least one resolved address - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; - uint16_t lport = s->listen(sa, backlog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - - listeningPort = lport; - - // Try any other resolved addresses - while (sa.nextAddress()) { - // Hack to ensure that all listening connections are on the same port - sa.setAddrInfoPort(listeningPort); - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; - uint16_t lport = s->listen(sa, backlog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - } - -} +AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) : + tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog)) +{} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f, bool isClient) { @@ -133,14 +107,16 @@ uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } +std::string AsynchIOProtocolFactory::getHost() const { + return listener.getSockname(); +} + void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - for (unsigned i = 0; i<listeners.size(); ++i) { - acceptors.push_back( - AsynchAcceptor::create(listeners[i], - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); - acceptors[i].start(poller); - } + acceptor.reset( + AsynchAcceptor::create(listener, + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + acceptor->start(poller); } void AsynchIOProtocolFactory::connectFailed( @@ -154,7 +130,7 @@ void AsynchIOProtocolFactory::connectFailed( void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, const std::string& port, + const std::string& host, int16_t port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { @@ -163,8 +139,8 @@ void AsynchIOProtocolFactory::connect( // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. + Socket* socket = new Socket(); - try { AsynchConnector* c = AsynchConnector::create( *socket, host, @@ -174,12 +150,6 @@ void AsynchIOProtocolFactory::connect( boost::bind(&AsynchIOProtocolFactory::connectFailed, this, _1, _2, _3, failed)); c->start(poller); - } catch (std::exception&) { - // TODO: Design question - should we do the error callback and also throw? - int errCode = socket->getError(); - connectFailed(*socket, errCode, strError(errCode), failed); - throw; - } } }} // namespace qpid::sys |