summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/SocketAddress.h5
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp48
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp24
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp30
-rw-r--r--cpp/src/qpid/sys/posix/SocketAddress.cpp28
-rw-r--r--[-rwxr-xr-x]cpp/src/qpid/sys/windows/Socket.cpp21
-rw-r--r--cpp/src/qpid/sys/windows/SocketAddress.cpp7
-rwxr-xr-xcpp/src/tests/cluster_test_logs.py1
8 files changed, 127 insertions, 37 deletions
diff --git a/cpp/src/qpid/sys/SocketAddress.h b/cpp/src/qpid/sys/SocketAddress.h
index c2120338cf..481beab747 100644
--- a/cpp/src/qpid/sys/SocketAddress.h
+++ b/cpp/src/qpid/sys/SocketAddress.h
@@ -41,12 +41,15 @@ public:
QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
QPID_COMMON_EXTERN ~SocketAddress();
- std::string asString(bool numeric=true) const;
+ QPID_COMMON_EXTERN bool nextAddress();
+ QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+ QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
private:
std::string host;
std::string port;
mutable ::addrinfo* addrInfo;
+ mutable ::addrinfo* currentAddrInfo;
};
}}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 34338ce434..c5cc86c813 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -25,21 +25,22 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- Socket listener;
- const uint16_t listeningPort;
- std::auto_ptr<AsynchAcceptor> acceptor;
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ uint16_t listeningPort;
public:
AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
@@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin {
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort());
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog))
-{}
+ tcpNoDelay(nodelay)
+{
+ SocketAddress sa(host, port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
+
+}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPort() const {
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- acceptor.reset(
- AsynchAcceptor::create(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptor->start(poller);
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptors[i].start(poller);
+ }
}
void AsynchIOProtocolFactory::connectFailed(
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index b5a0b0bf32..dab8bd09c6 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -149,6 +149,7 @@ private:
ConnectedCallback connCallback;
FailedCallback failCallback;
const Socket& socket;
+ SocketAddress sa;
public:
AsynchConnector(const Socket& socket,
@@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const Socket& s,
boost::bind(&AsynchConnector::connComplete, this, _1)),
connCallback(connCb),
failCallback(failCb),
- socket(s)
+ socket(s),
+ sa(hostname, port)
{
socket.setNonblocking();
- SocketAddress sa(hostname, port);
+
// Note, not catching any exceptions here, also has effect of destructing
+ QPID_LOG(info, "Connecting: " << sa.asString());
socket.connect(sa);
}
@@ -191,11 +194,26 @@ void AsynchConnector::stop()
void AsynchConnector::connComplete(DispatchHandle& h)
{
- h.stopWatch();
int errCode = socket.getError();
if (errCode == 0) {
+ h.stopWatch();
connCallback(socket);
} else {
+ // Retry while we cause an immediate exception
+ // (asynch failure will be handled by re-entering here at the top)
+ while (sa.nextAddress()) {
+ try {
+ // Try next address without deleting ourselves
+ QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+ QPID_LOG(info, "Retrying connect: " << sa.asString());
+ socket.connect(sa);
+ return;
+ } catch (const std::exception& e) {
+ QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+ }
+ errCode = socket.getError();
+ }
+ h.stopWatch();
failCallback(socket, errCode, strError(errCode));
}
DispatchHandle::doDelete();
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index aa25f8062d..b6f3b7addd 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -63,6 +63,20 @@ std::string getName(int fd, bool local)
throw QPID_POSIX_ERROR(rc);
return std::string(dispName) + ":" + std::string(servName);
}
+
+uint16_t getLocalPort(int fd)
+{
+ ::sockaddr_storage name;
+ ::socklen_t namelen = sizeof(name);
+ if (::getsockname(fd, (::sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ switch (name.ss_family) {
+ case AF_INET: return ntohs(((::sockaddr_in&)name).sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6&)name).sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
}
Socket::Socket() :
@@ -88,6 +102,11 @@ void Socket::createSocket(const SocketAddress& sa) const
try {
if (nonblocking) setNonblocking();
if (nodelay) setTcpNoDelay();
+ if (getAddrInfo(sa).ai_family == AF_INET6) {
+ int flag = 1;
+ int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ }
} catch (std::exception&) {
::close(s);
socket = -1;
@@ -109,7 +128,7 @@ void Socket::setTcpNoDelay() const
nodelay = true;
if (socket != -1) {
int flag = 1;
- int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
QPID_POSIX_CHECK(result);
}
}
@@ -179,19 +198,14 @@ int Socket::listen(const SocketAddress& sa, int backlog) const
const int& socket = impl->fd;
int yes=1;
- QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+ QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
- struct sockaddr_in name;
- socklen_t namelen = sizeof(name);
- if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return ntohs(name.sin_port);
+ return getLocalPort(socket);
}
Socket* Socket::accept() const
diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp
index 10f1c8a563..67438c0d89 100644
--- a/cpp/src/qpid/sys/posix/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp
@@ -27,8 +27,6 @@
#include <string.h>
#include <netdb.h>
-#include <algorithm>
-
namespace qpid {
namespace sys {
@@ -73,19 +71,38 @@ std::string SocketAddress::asString(bool numeric) const
dispName, sizeof(dispName),
servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
std::string s(dispName);
s += ":";
s += servName;
return s;
}
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+ if (!currentAddrInfo) return;
+
+ ::addrinfo& ai = *currentAddrInfo;
+ switch (ai.ai_family) {
+ case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+ case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
{
if (!sa.addrInfo) {
::addrinfo hints;
::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // Change this to support IPv6
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
hints.ai_socktype = SOCK_STREAM;
const char* node = 0;
@@ -99,9 +116,10 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa)
int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
if (n != 0)
throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
}
- return *sa.addrInfo;
+ return *sa.currentAddrInfo;
}
}}
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp
index baa80f04e0..1ba3831b9f 100755..100644
--- a/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/cpp/src/qpid/sys/windows/Socket.cpp
@@ -211,21 +211,24 @@ int Socket::read(void *buf, size_t count) const
return received;
}
-int Socket::listen(const std::string&, const std::string& port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+ SocketAddress sa(host, port);
+ return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
{
const SOCKET& socket = impl->fd;
BOOL yes=1;
QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
- struct sockaddr_in name;
- memset(&name, 0, sizeof(name));
- name.sin_family = AF_INET;
- name.sin_port = htons(boost::lexical_cast<uint16_t>(port));
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+ if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
if (::listen(socket, backlog) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
+ throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+ struct sockaddr_in name;
socklen_t namelen = sizeof(name);
QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
return ntohs(name.sin_port);
diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp
index ac43cd2d23..b040cc5f14 100644
--- a/cpp/src/qpid/sys/windows/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -68,6 +68,13 @@ std::string SocketAddress::asString(bool) const
return host + ":" + port;
}
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
{
return *sa.addrInfo;
diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py
index a0ce8fb9c3..3c7e8e8020 100755
--- a/cpp/src/tests/cluster_test_logs.py
+++ b/cpp/src/tests/cluster_test_logs.py
@@ -53,6 +53,7 @@ def filter_log(log):
'stall for update|unstall, ignore update|cancelled offer .* unstall',
'caught up',
'active for links|Passivating links|Activating links',
+ 'info Connecting: .*', # UpdateClient connection
'info Connection.* connected to', # UpdateClient connection
'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',