diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/SocketAddress.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/SocketAddress.cpp | 28 | ||||
-rw-r--r--[-rwxr-xr-x] | cpp/src/qpid/sys/windows/Socket.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SocketAddress.cpp | 7 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_test_logs.py | 1 |
8 files changed, 127 insertions, 37 deletions
diff --git a/cpp/src/qpid/sys/SocketAddress.h b/cpp/src/qpid/sys/SocketAddress.h index c2120338cf..481beab747 100644 --- a/cpp/src/qpid/sys/SocketAddress.h +++ b/cpp/src/qpid/sys/SocketAddress.h @@ -41,12 +41,15 @@ public: QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&); QPID_COMMON_EXTERN ~SocketAddress(); - std::string asString(bool numeric=true) const; + QPID_COMMON_EXTERN bool nextAddress(); + QPID_COMMON_EXTERN std::string asString(bool numeric=true) const; + QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port); private: std::string host; std::string port; mutable ::addrinfo* addrInfo; + mutable ::addrinfo* currentAddrInfo; }; }} diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 34338ce434..c5cc86c813 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -25,21 +25,22 @@ #include "qpid/Plugin.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> -#include <memory> +#include <boost/ptr_container/ptr_vector.hpp> namespace qpid { namespace sys { class AsynchIOProtocolFactory : public ProtocolFactory { const bool tcpNoDelay; - Socket listener; - const uint16_t listeningPort; - std::auto_ptr<AsynchAcceptor> acceptor; + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; + uint16_t listeningPort; public: AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); @@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin { "", boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort()); + QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); broker->registerProtocolFactory("tcp", protocolt); } } } tcpPlugin; AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : - tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog)) -{} + tcpNoDelay(nodelay) +{ + SocketAddress sa(host, port); + + // We must have at least one resolved address + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + } + +} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f, bool isClient) { @@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPort() const { void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - acceptor.reset( - AsynchAcceptor::create(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); - acceptor->start(poller); + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + acceptors[i].start(poller); + } } void AsynchIOProtocolFactory::connectFailed( diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index b5a0b0bf32..dab8bd09c6 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -149,6 +149,7 @@ private: ConnectedCallback connCallback; FailedCallback failCallback; const Socket& socket; + SocketAddress sa; public: AsynchConnector(const Socket& socket, @@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const Socket& s, boost::bind(&AsynchConnector::connComplete, this, _1)), connCallback(connCb), failCallback(failCb), - socket(s) + socket(s), + sa(hostname, port) { socket.setNonblocking(); - SocketAddress sa(hostname, port); + // Note, not catching any exceptions here, also has effect of destructing + QPID_LOG(info, "Connecting: " << sa.asString()); socket.connect(sa); } @@ -191,11 +194,26 @@ void AsynchConnector::stop() void AsynchConnector::connComplete(DispatchHandle& h) { - h.stopWatch(); int errCode = socket.getError(); if (errCode == 0) { + h.stopWatch(); connCallback(socket); } else { + // Retry while we cause an immediate exception + // (asynch failure will be handled by re-entering here at the top) + while (sa.nextAddress()) { + try { + // Try next address without deleting ourselves + QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode)); + QPID_LOG(info, "Retrying connect: " << sa.asString()); + socket.connect(sa); + return; + } catch (const std::exception& e) { + QPID_LOG(debug, "Ignored socket connect exception: " << e.what()); + } + errCode = socket.getError(); + } + h.stopWatch(); failCallback(socket, errCode, strError(errCode)); } DispatchHandle::doDelete(); diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index aa25f8062d..b6f3b7addd 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -63,6 +63,20 @@ std::string getName(int fd, bool local) throw QPID_POSIX_ERROR(rc); return std::string(dispName) + ":" + std::string(servName); } + +uint16_t getLocalPort(int fd) +{ + ::sockaddr_storage name; + ::socklen_t namelen = sizeof(name); + if (::getsockname(fd, (::sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); + + switch (name.ss_family) { + case AF_INET: return ntohs(((::sockaddr_in&)name).sin_port); + case AF_INET6: return ntohs(((::sockaddr_in6&)name).sin6_port); + default:throw Exception(QPID_MSG("Unexpected socket type")); + } +} } Socket::Socket() : @@ -88,6 +102,11 @@ void Socket::createSocket(const SocketAddress& sa) const try { if (nonblocking) setNonblocking(); if (nodelay) setTcpNoDelay(); + if (getAddrInfo(sa).ai_family == AF_INET6) { + int flag = 1; + int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } } catch (std::exception&) { ::close(s); socket = -1; @@ -109,7 +128,7 @@ void Socket::setTcpNoDelay() const nodelay = true; if (socket != -1) { int flag = 1; - int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); QPID_POSIX_CHECK(result); } } @@ -179,19 +198,14 @@ int Socket::listen(const SocketAddress& sa, int backlog) const const int& socket = impl->fd; int yes=1; - QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); if (::listen(socket, backlog) < 0) throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); - struct sockaddr_in name; - socklen_t namelen = sizeof(name); - if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) - throw QPID_POSIX_ERROR(errno); - - return ntohs(name.sin_port); + return getLocalPort(socket); } Socket* Socket::accept() const diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp index 10f1c8a563..67438c0d89 100644 --- a/cpp/src/qpid/sys/posix/SocketAddress.cpp +++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -27,8 +27,6 @@ #include <string.h> #include <netdb.h> -#include <algorithm> - namespace qpid { namespace sys { @@ -73,19 +71,38 @@ std::string SocketAddress::asString(bool numeric) const dispName, sizeof(dispName), servName, sizeof(servName), NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw QPID_POSIX_ERROR(rc); + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); std::string s(dispName); s += ":"; s += servName; return s; } +bool SocketAddress::nextAddress() { + bool r = currentAddrInfo->ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + +void SocketAddress::setAddrInfoPort(uint16_t port) { + if (!currentAddrInfo) return; + + ::addrinfo& ai = *currentAddrInfo; + switch (ai.ai_family) { + case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return; + case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } +} + const ::addrinfo& getAddrInfo(const SocketAddress& sa) { if (!sa.addrInfo) { ::addrinfo hints; ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // Change this to support IPv6 + hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 hints.ai_socktype = SOCK_STREAM; const char* node = 0; @@ -99,9 +116,10 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa) int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); if (n != 0) throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); + sa.currentAddrInfo = sa.addrInfo; } - return *sa.addrInfo; + return *sa.currentAddrInfo; } }} diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp index baa80f04e0..1ba3831b9f 100755..100644 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/Socket.cpp @@ -211,21 +211,24 @@ int Socket::read(void *buf, size_t count) const return received; } -int Socket::listen(const std::string&, const std::string& port, int backlog) const +int Socket::listen(const std::string& host, const std::string& port, int backlog) const +{ + SocketAddress sa(host, port); + return listen(sa, backlog); +} + +int Socket::listen(const SocketAddress& addr, int backlog) const { const SOCKET& socket = impl->fd; BOOL yes=1; QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); - struct sockaddr_in name; - memset(&name, 0, sizeof(name)); - name.sin_family = AF_INET; - name.sin_port = htons(boost::lexical_cast<uint16_t>(port)); - name.sin_addr.s_addr = 0; - if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError()))); + + if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); if (::listen(socket, backlog) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError()))); + throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError()))); + struct sockaddr_in name; socklen_t namelen = sizeof(name); QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen)); return ntohs(name.sin_port); diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp index ac43cd2d23..b040cc5f14 100644 --- a/cpp/src/qpid/sys/windows/SocketAddress.cpp +++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -68,6 +68,13 @@ std::string SocketAddress::asString(bool) const return host + ":" + port; } +bool SocketAddress::nextAddress() { + bool r = currentAddrInfo->ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + const ::addrinfo& getAddrInfo(const SocketAddress& sa) { return *sa.addrInfo; diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py index a0ce8fb9c3..3c7e8e8020 100755 --- a/cpp/src/tests/cluster_test_logs.py +++ b/cpp/src/tests/cluster_test_logs.py @@ -53,6 +53,7 @@ def filter_log(log): 'stall for update|unstall, ignore update|cancelled offer .* unstall', 'caught up', 'active for links|Passivating links|Activating links', + 'info Connecting: .*', # UpdateClient connection 'info Connection.* connected to', # UpdateClient connection 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection 'warning Broker closed connection: 200, OK', |