diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 55 |
1 files changed, 44 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index 34338ce434..85d8c1db87 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -25,21 +25,22 @@ #include "qpid/Plugin.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> -#include <memory> +#include <boost/ptr_container/ptr_vector.hpp> namespace qpid { namespace sys { class AsynchIOProtocolFactory : public ProtocolFactory { const bool tcpNoDelay; - Socket listener; - const uint16_t listeningPort; - std::auto_ptr<AsynchAcceptor> acceptor; + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; + uint16_t listeningPort; public: AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); @@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin { "", boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort()); + QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); broker->registerProtocolFactory("tcp", protocolt); } } } tcpPlugin; AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : - tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog)) -{} + tcpNoDelay(nodelay) +{ + SocketAddress sa(host, port); + + // We must have at least one resolved address + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + } + +} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f, bool isClient) { @@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPort() const { void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - acceptor.reset( - AsynchAcceptor::create(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); - acceptor->start(poller); + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + acceptors[i].start(poller); + } } void AsynchIOProtocolFactory::connectFailed( @@ -138,6 +164,7 @@ void AsynchIOProtocolFactory::connect( // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. Socket* socket = new Socket(); + try { AsynchConnector* c = AsynchConnector::create( *socket, host, @@ -147,6 +174,12 @@ void AsynchIOProtocolFactory::connect( boost::bind(&AsynchIOProtocolFactory::connectFailed, this, _1, _2, _3, failed)); c->start(poller); + } catch (std::exception&) { + // TODO: Design question - should we do the error callback and also throw? + int errCode = socket->getError(); + connectFailed(*socket, errCode, strError(errCode), failed); + throw; + } } }} // namespace qpid::sys |