summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2011-08-12 22:32:05 +0000
committerAndrew Stitcher <astitcher@apache.org>2011-08-12 22:32:05 +0000
commitee500880a0a69400913e3b4055c7796dcd25ad65 (patch)
tree17e31c3aa347ecc0cf59001f3c82120481f5f6bd /cpp
parent07b24b144736892d4df9b79bd4ae2e518ab93205 (diff)
downloadqpid-python-ee500880a0a69400913e3b4055c7796dcd25ad65.tar.gz
QPID-3405: IPv6 support for Unix C++ ports:
- On the Listen side we create separate listening sockets for IPv4 and IPv6 making sure to not allow the IPv6 socket to run dual stack. This makes the reported IPv4 addresses look "normal" and would allow us to turn control IPv4/IPv6 listening separately. - On the connect side we make sure to try all the addresses returned by getaddrinfo() in order until we either find one that connects or have tried all of them. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1157272 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/sys/SocketAddress.h5
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp48
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp24
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp30
-rw-r--r--cpp/src/qpid/sys/posix/SocketAddress.cpp28
-rw-r--r--[-rwxr-xr-x]cpp/src/qpid/sys/windows/Socket.cpp21
-rw-r--r--cpp/src/qpid/sys/windows/SocketAddress.cpp7
-rwxr-xr-xcpp/src/tests/cluster_test_logs.py1
8 files changed, 127 insertions, 37 deletions
diff --git a/cpp/src/qpid/sys/SocketAddress.h b/cpp/src/qpid/sys/SocketAddress.h
index c2120338cf..481beab747 100644
--- a/cpp/src/qpid/sys/SocketAddress.h
+++ b/cpp/src/qpid/sys/SocketAddress.h
@@ -41,12 +41,15 @@ public:
QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
QPID_COMMON_EXTERN ~SocketAddress();
- std::string asString(bool numeric=true) const;
+ QPID_COMMON_EXTERN bool nextAddress();
+ QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+ QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
private:
std::string host;
std::string port;
mutable ::addrinfo* addrInfo;
+ mutable ::addrinfo* currentAddrInfo;
};
}}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 34338ce434..c5cc86c813 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -25,21 +25,22 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- Socket listener;
- const uint16_t listeningPort;
- std::auto_ptr<AsynchAcceptor> acceptor;
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ uint16_t listeningPort;
public:
AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
@@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin {
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort());
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog))
-{}
+ tcpNoDelay(nodelay)
+{
+ SocketAddress sa(host, port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
+
+}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPort() const {
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- acceptor.reset(
- AsynchAcceptor::create(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptor->start(poller);
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptors[i].start(poller);
+ }
}
void AsynchIOProtocolFactory::connectFailed(
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index b5a0b0bf32..dab8bd09c6 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -149,6 +149,7 @@ private:
ConnectedCallback connCallback;
FailedCallback failCallback;
const Socket& socket;
+ SocketAddress sa;
public:
AsynchConnector(const Socket& socket,
@@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const Socket& s,
boost::bind(&AsynchConnector::connComplete, this, _1)),
connCallback(connCb),
failCallback(failCb),
- socket(s)
+ socket(s),
+ sa(hostname, port)
{
socket.setNonblocking();
- SocketAddress sa(hostname, port);
+
// Note, not catching any exceptions here, also has effect of destructing
+ QPID_LOG(info, "Connecting: " << sa.asString());
socket.connect(sa);
}
@@ -191,11 +194,26 @@ void AsynchConnector::stop()
void AsynchConnector::connComplete(DispatchHandle& h)
{
- h.stopWatch();
int errCode = socket.getError();
if (errCode == 0) {
+ h.stopWatch();
connCallback(socket);
} else {
+ // Retry while we cause an immediate exception
+ // (asynch failure will be handled by re-entering here at the top)
+ while (sa.nextAddress()) {
+ try {
+ // Try next address without deleting ourselves
+ QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+ QPID_LOG(info, "Retrying connect: " << sa.asString());
+ socket.connect(sa);
+ return;
+ } catch (const std::exception& e) {
+ QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+ }
+ errCode = socket.getError();
+ }
+ h.stopWatch();
failCallback(socket, errCode, strError(errCode));
}
DispatchHandle::doDelete();
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index aa25f8062d..b6f3b7addd 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -63,6 +63,20 @@ std::string getName(int fd, bool local)
throw QPID_POSIX_ERROR(rc);
return std::string(dispName) + ":" + std::string(servName);
}
+
+uint16_t getLocalPort(int fd)
+{
+ ::sockaddr_storage name;
+ ::socklen_t namelen = sizeof(name);
+ if (::getsockname(fd, (::sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ switch (name.ss_family) {
+ case AF_INET: return ntohs(((::sockaddr_in&)name).sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6&)name).sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
}
Socket::Socket() :
@@ -88,6 +102,11 @@ void Socket::createSocket(const SocketAddress& sa) const
try {
if (nonblocking) setNonblocking();
if (nodelay) setTcpNoDelay();
+ if (getAddrInfo(sa).ai_family == AF_INET6) {
+ int flag = 1;
+ int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ }
} catch (std::exception&) {
::close(s);
socket = -1;
@@ -109,7 +128,7 @@ void Socket::setTcpNoDelay() const
nodelay = true;
if (socket != -1) {
int flag = 1;
- int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
QPID_POSIX_CHECK(result);
}
}
@@ -179,19 +198,14 @@ int Socket::listen(const SocketAddress& sa, int backlog) const
const int& socket = impl->fd;
int yes=1;
- QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+ QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
- struct sockaddr_in name;
- socklen_t namelen = sizeof(name);
- if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return ntohs(name.sin_port);
+ return getLocalPort(socket);
}
Socket* Socket::accept() const
diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp
index 10f1c8a563..67438c0d89 100644
--- a/cpp/src/qpid/sys/posix/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp
@@ -27,8 +27,6 @@
#include <string.h>
#include <netdb.h>
-#include <algorithm>
-
namespace qpid {
namespace sys {
@@ -73,19 +71,38 @@ std::string SocketAddress::asString(bool numeric) const
dispName, sizeof(dispName),
servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
std::string s(dispName);
s += ":";
s += servName;
return s;
}
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+ if (!currentAddrInfo) return;
+
+ ::addrinfo& ai = *currentAddrInfo;
+ switch (ai.ai_family) {
+ case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+ case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
{
if (!sa.addrInfo) {
::addrinfo hints;
::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // Change this to support IPv6
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
hints.ai_socktype = SOCK_STREAM;
const char* node = 0;
@@ -99,9 +116,10 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa)
int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
if (n != 0)
throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
}
- return *sa.addrInfo;
+ return *sa.currentAddrInfo;
}
}}
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp
index baa80f04e0..1ba3831b9f 100755..100644
--- a/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/cpp/src/qpid/sys/windows/Socket.cpp
@@ -211,21 +211,24 @@ int Socket::read(void *buf, size_t count) const
return received;
}
-int Socket::listen(const std::string&, const std::string& port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+ SocketAddress sa(host, port);
+ return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
{
const SOCKET& socket = impl->fd;
BOOL yes=1;
QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
- struct sockaddr_in name;
- memset(&name, 0, sizeof(name));
- name.sin_family = AF_INET;
- name.sin_port = htons(boost::lexical_cast<uint16_t>(port));
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+ if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
if (::listen(socket, backlog) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
+ throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+ struct sockaddr_in name;
socklen_t namelen = sizeof(name);
QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
return ntohs(name.sin_port);
diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp
index ac43cd2d23..b040cc5f14 100644
--- a/cpp/src/qpid/sys/windows/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -68,6 +68,13 @@ std::string SocketAddress::asString(bool) const
return host + ":" + port;
}
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
{
return *sa.addrInfo;
diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py
index a0ce8fb9c3..3c7e8e8020 100755
--- a/cpp/src/tests/cluster_test_logs.py
+++ b/cpp/src/tests/cluster_test_logs.py
@@ -53,6 +53,7 @@ def filter_log(log):
'stall for update|unstall, ignore update|cancelled offer .* unstall',
'caught up',
'active for links|Passivating links|Activating links',
+ 'info Connecting: .*', # UpdateClient connection
'info Connection.* connected to', # UpdateClient connection
'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',