diff options
author | Andrew Stitcher <astitcher@apache.org> | 2011-08-12 22:32:25 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2011-08-12 22:32:25 +0000 |
commit | 2ad73b9bb5ec8461e58dbea9d123761a7bf308dc (patch) | |
tree | 8ace2518fea6dd55cb87eab7c43d39c47edca2df /cpp/src | |
parent | cde4e55e47ae031af23746b45981fd0fd3a40cad (diff) | |
download | qpid-python-2ad73b9bb5ec8461e58dbea9d123761a7bf308dc.tar.gz |
QPID-3406: Get IPv6 working on Windows port
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1157274 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/windows/SslProtocolFactory.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Socket.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 47 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/AsynchIoResult.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/Socket.cpp | 70 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SocketAddress.cpp | 85 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SslAsynchIO.h | 3 |
8 files changed, 187 insertions, 85 deletions
diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index 676074a590..1dff1ddc8f 100644 --- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -27,10 +27,14 @@ #include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/SystemInfo.h" #include "qpid/sys/windows/SslAsynchIO.h" + #include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> #include <memory> + // security.h needs to see this to distinguish from kernel use. #define SECURITY_WIN32 #include <security.h> @@ -68,9 +72,10 @@ struct SslServerOptions : qpid::Options }; class SslProtocolFactory : public qpid::sys::ProtocolFactory { - qpid::sys::Socket listener; const bool tcpNoDelay; - const uint16_t listeningPort; + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; + uint16_t listeningPort; std::string brokerHost; const bool clientAuthSelected; std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor; @@ -78,7 +83,7 @@ class SslProtocolFactory : public qpid::sys::ProtocolFactory { CredHandle credHandle; public: - SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay); + SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, int backlog, bool nodelay); ~SslProtocolFactory(); void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*); void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port, @@ -114,6 +119,7 @@ static struct SslPlugin : public Plugin { try { const broker::Broker::Options& opts = broker->getOptions(); ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options, + "", boost::lexical_cast<std::string>(options.port), opts.connectionBacklog, opts.tcpNoDelay)); QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort()); @@ -126,12 +132,13 @@ static struct SslPlugin : public Plugin { } sslPlugin; SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, - int backlog, + const std::string& host, const std::string& port, int backlog, bool nodelay) : tcpNoDelay(nodelay), - listeningPort(listener.listen("", boost::lexical_cast<std::string>(options.port), backlog)), clientAuthSelected(options.clientAuth) { + // Make sure that certificate store is good before listening to sockets + // to avoid having open and listening sockets when there is no cert store SecInvalidateHandle(&credHandle); // Get the certificate for this server. @@ -176,6 +183,23 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, throw QPID_WINDOWS_ERROR(status); ::CertFreeCertificateContext(certContext); ::CertCloseStore(certStoreHandle, 0); + + // Listen to socket(s) + SocketAddress sa(host, port); + + // We must have at least one resolved address + QPID_LOG(info, "SSL Listening to: " << sa.asString()) + Socket* s = new Socket; + listeningPort = s->listen(sa, backlog); + listeners.push_back(s); + + // Try any other resolved addresses + while (sa.nextAddress()) { + QPID_LOG(info, "SSL Listening to: " << sa.asString()) + Socket* s = new Socket; + s->listen(sa, backlog); + listeners.push_back(s); + } } SslProtocolFactory::~SslProtocolFactory() { @@ -238,10 +262,12 @@ uint16_t SslProtocolFactory::getPort() const { void SslProtocolFactory::accept(sys::Poller::shared_ptr poller, sys::ConnectionCodec::Factory* fact) { - acceptor.reset( - AsynchAcceptor::create(listener, - boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); - acceptor->start(poller); + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], + boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); + acceptors[i].start(poller); + } } void SslProtocolFactory::connect(sys::Poller::shared_ptr poller, diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index 9f62f3be1c..15cd388f60 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -39,6 +39,9 @@ public: /** Create a socket wrapper for descriptor. */ QPID_COMMON_EXTERN Socket(); + /** Create a new Socket which is the same address family as this one */ + QPID_COMMON_EXTERN Socket* createSameTypeSocket() const; + /** Set socket non blocking */ void setNonblocking() const; @@ -92,7 +95,9 @@ private: /** Create socket */ void createSocket(const SocketAddress&) const; + /** Construct socket with existing handle */ Socket(IOHandlePrivate*); + mutable std::string localname; mutable std::string peername; mutable bool nonblocking; diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index b6f3b7addd..0ec5688d07 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -114,6 +114,20 @@ void Socket::createSocket(const SocketAddress& sa) const } } +Socket* Socket::createSameTypeSocket() const { + int& socket = impl->fd; + // Socket currently has no actual socket attached + if (socket == -1) + return new Socket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s < 0) throw QPID_POSIX_ERROR(errno); + return new Socket(new IOHandlePrivate(s)); +} + void Socket::setNonblocking() const { int& socket = impl->fd; nonblocking = true; diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 8d84fdb7b2..30378d4c5f 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -47,16 +47,13 @@ namespace { /* * The function pointers for AcceptEx and ConnectEx need to be looked up - * at run time. Make sure this is done only once. + * at run time. */ -boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; -LPFN_ACCEPTEX fnAcceptEx = 0; -typedef void (*lookUpFunc)(const qpid::sys::Socket &); - -void lookUpAcceptEx() { - SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { + SOCKET h = toSocketHandle(s); GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; + LPFN_ACCEPTEX fnAcceptEx; WSAIoctl(h, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, @@ -66,9 +63,9 @@ void lookUpAcceptEx() { &dwBytes, NULL, NULL); - closesocket(h); if (fnAcceptEx == 0) throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); + return fnAcceptEx; } } @@ -95,18 +92,15 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - socket(s) { + socket(s), + fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); -#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ - boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); -#else - boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); -#endif } AsynchAcceptor::~AsynchAcceptor() @@ -124,25 +118,26 @@ void AsynchAcceptor::restart(void) { DWORD bytesReceived = 0; // Not used, needed for AcceptEx API AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, this, - toSocketHandle(socket)); + socket); BOOL status; - status = ::fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), - result->addressBuffer, - 0, - AsynchAcceptResult::SOCKADDRMAXLEN, - AsynchAcceptResult::SOCKADDRMAXLEN, - &bytesReceived, - result->overlapped()); + status = fnAcceptEx(toSocketHandle(socket), + toSocketHandle(*result->newSocket), + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); QPID_WINDOWS_CHECK_ASYNC_START(status); } AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - SOCKET listener) - : callback(cb), acceptor(acceptor), listener(listener) { - newSocket.reset (new Socket()); + const Socket& listener) + : callback(cb), acceptor(acceptor), + listener(toSocketHandle(listener)), + newSocket(listener.createSameTypeSocket()) { } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h index b11324918b..27e4c22138 100755 --- a/cpp/src/qpid/sys/windows/AsynchIoResult.h +++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -83,17 +83,17 @@ class AsynchAcceptResult : public AsynchResult { public: AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - SOCKET listener); + const qpid::sys::Socket& listener); virtual void success (size_t bytesTransferred); virtual void failure (int error); private: virtual void complete(void) {} // No-op for this class. - std::auto_ptr<qpid::sys::Socket> newSocket; qpid::sys::AsynchAcceptor::Callback callback; AsynchAcceptor *acceptor; SOCKET listener; + std::auto_ptr<qpid::sys::Socket> newSocket; // AcceptEx needs a place to write the local and remote addresses // when accepting the connection. Place those here; get enough for diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp index 1ba3831b9f..8736656e19 100644 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/Socket.cpp @@ -91,7 +91,7 @@ namespace { std::string getName(SOCKET fd, bool local) { - sockaddr_in name; // big enough for any socket address + sockaddr_storage name; // big enough for any socket address socklen_t namelen = sizeof(name); if (local) { QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); @@ -108,19 +108,26 @@ std::string getName(SOCKET fd, bool local) throw qpid::Exception(QPID_MSG(gai_strerror(rc))); return std::string(dispName) + ":" + std::string(servName); } + +uint16_t getLocalPort(int fd) +{ + ::sockaddr_storage name; + ::socklen_t namelen = sizeof(name); + QPID_WINSOCK_CHECK(::getsockname(fd, (::sockaddr*)&name, &namelen)); + + switch (name.ss_family) { + case AF_INET: return ntohs(((::sockaddr_in&)name).sin_port); + case AF_INET6: return ntohs(((::sockaddr_in6&)name).sin6_port); + default:throw Exception(QPID_MSG("Unexpected socket type")); + } +} } // namespace Socket::Socket() : IOHandle(new IOHandlePrivate), nonblocking(false), nodelay(false) -{ - SOCKET& socket = impl->fd; - if (socket != INVALID_SOCKET) Socket::close(); - SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); - if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); - socket = s; -} +{} Socket::Socket(IOHandlePrivate* h) : IOHandle(h), @@ -143,6 +150,11 @@ Socket::createSocket(const SocketAddress& sa) const try { if (nonblocking) setNonblocking(); if (nodelay) setTcpNoDelay(); + if (getAddrInfo(sa).ai_family == AF_INET6) { + int flag = 1; + int result = setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); + QPID_WINSOCK_CHECK(result); + } } catch (std::exception&) { closesocket(s); socket = INVALID_SOCKET; @@ -150,6 +162,20 @@ Socket::createSocket(const SocketAddress& sa) const } } +Socket* Socket::createSameTypeSocket() const { + SOCKET& socket = impl->fd; + // Socket currently has no actual socket attached + if (socket == INVALID_SOCKET) + return new Socket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + return new Socket(new IOHandlePrivate(s)); +} + void Socket::setNonblocking() const { u_long nonblock = 1; QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); @@ -166,21 +192,14 @@ Socket::connect(const SocketAddress& addr) const { peername = addr.asString(false); + createSocket(addr); + const SOCKET& socket = impl->fd; - const addrinfo *addrs = &(getAddrInfo(addr)); - int error = 0; + int err; WSASetLastError(0); - while (addrs != 0) { - if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) || - (WSAGetLastError() == WSAEWOULDBLOCK)) - break; - // Error... save this error code and see if there are other address - // to try before throwing the exception. - error = WSAGetLastError(); - addrs = addrs->ai_next; - } - if (error) - throw qpid::Exception(QPID_MSG(strError(error) << ": " << peername)); + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && + ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK)) + throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername)); } void @@ -219,6 +238,8 @@ int Socket::listen(const std::string& host, const std::string& port, int backlog int Socket::listen(const SocketAddress& addr, int backlog) const { + createSocket(addr); + const SOCKET& socket = impl->fd; BOOL yes=1; QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); @@ -227,11 +248,8 @@ int Socket::listen(const SocketAddress& addr, int backlog) const throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); if (::listen(socket, backlog) == SOCKET_ERROR) throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError()))); - - struct sockaddr_in name; - socklen_t namelen = sizeof(name); - QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen)); - return ntohs(name.sin_port); + + return getLocalPort(socket); } Socket* Socket::accept() const diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp index b040cc5f14..438c31f001 100644 --- a/cpp/src/qpid/sys/windows/SocketAddress.cpp +++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -40,32 +40,47 @@ SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) port(port0), addrInfo(0) { - ::addrinfo hints; - ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well - hints.ai_socktype = SOCK_STREAM; - - const char* node = 0; - if (host.empty()) { - hints.ai_flags |= AI_PASSIVE; - } else { - node = host.c_str(); - } - const char* service = port.empty() ? "0" : port.c_str(); +} - int n = ::getaddrinfo(node, service, &hints, &addrInfo); - if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); +SocketAddress::SocketAddress(const SocketAddress& sa) : + host(sa.host), + port(sa.port), + addrInfo(0) +{ +} + +SocketAddress& SocketAddress::operator=(const SocketAddress& sa) +{ + SocketAddress temp(sa); + + std::swap(temp, *this); + return *this; } SocketAddress::~SocketAddress() { - ::freeaddrinfo(addrInfo); + if (addrInfo) { + ::freeaddrinfo(addrInfo); + } } -std::string SocketAddress::asString(bool) const +std::string SocketAddress::asString(bool numeric) const { - return host + ":" + port; + if (!numeric) + return host + ":" + port; + // Canonicalise into numeric id + const ::addrinfo& ai = getAddrInfo(*this); + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (int rc=::getnameinfo(ai.ai_addr, ai.ai_addrlen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + std::string s(dispName); + s += ":"; + s += servName; + return s; } bool SocketAddress::nextAddress() { @@ -75,9 +90,41 @@ bool SocketAddress::nextAddress() { return r; } +void SocketAddress::setAddrInfoPort(uint16_t port) { + if (!currentAddrInfo) return; + + ::addrinfo& ai = *currentAddrInfo; + switch (ai.ai_family) { + case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return; + case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } +} + const ::addrinfo& getAddrInfo(const SocketAddress& sa) { - return *sa.addrInfo; + if (!sa.addrInfo) { + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (sa.host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = sa.host.c_str(); + } + const char* service = sa.port.empty() ? "0" : sa.port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); + sa.currentAddrInfo = sa.addrInfo; + } + + return *sa.currentAddrInfo; } }} diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h index 3cdf2c8f08..edec081ced 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.h +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -39,9 +39,6 @@ namespace qpid { namespace sys { namespace windows { -class Socket; -class Poller; - /* * SSL/Schannel shim between the frame-handling and AsynchIO layers. * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class |