diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r-- | qpid/cpp/src/qpid/sys/Socket.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Socket.cpp | 42 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/qpid/sys/windows/Socket.cpp | 62 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp | 70 |
6 files changed, 153 insertions, 59 deletions
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index d108402682..76b993fd63 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -39,12 +39,9 @@ public: /** Create a socket wrapper for descriptor. */ QPID_COMMON_EXTERN Socket(); - /** Create an initialized TCP socket */ - void createTcp() const; - /** Set timeout for read and write */ void setTimeout(const Duration& interval) const; - + /** Set socket non blocking */ void setNonblocking() const; @@ -59,7 +56,8 @@ public: *@return The bound port. */ QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const; - + QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const; + /** Returns the "socket name" ie the address bound to * the near end of the socket */ @@ -102,8 +100,12 @@ public: QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const; private: + /** Create socket */ + void createSocket(const SocketAddress&) const; + Socket(IOHandlePrivate*); mutable std::string connectname; + mutable bool nonblocking; }; }} diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 9fd0602ce9..fd9a4b3468 100644 --- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -60,19 +60,23 @@ class PollerHandlePrivate { DELETED }; - int fd; ::__uint32_t events; + const IOHandlePrivate* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(int f, PollerHandle* p) : - fd(f), + PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : events(0), + ioHandle(h), pollerHandle(p), stat(ABSENT) { } + int fd() const { + return toFd(ioHandle); + } + bool isActive() const { return stat == MONITORED || stat == MONITORED_HUNGUP; } @@ -131,7 +135,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl), this)) + impl(new PollerHandlePrivate(h.impl, this)) {} PollerHandle::~PollerHandle() { @@ -303,7 +307,7 @@ void Poller::registerHandle(PollerHandle& handle) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe)); eh.setActive(); } @@ -313,7 +317,7 @@ void Poller::unregisterHandle(PollerHandle& handle) { ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); - int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0); // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { @@ -344,7 +348,7 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); eh.setActive(); return; @@ -382,7 +386,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); } void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { @@ -408,7 +412,7 @@ void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); } void Poller::shutdown() { @@ -443,7 +447,7 @@ bool Poller::interrupt(PollerHandle& handle) { epe.events = 0; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); if (eh.isInactive()) { eh.setInterrupted(); diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 02004b1999..481aa6c88e 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -97,22 +97,30 @@ std::string getService(int fd, bool local) } Socket::Socket() : - IOHandle(new IOHandlePrivate) -{ - createTcp(); -} + IOHandle(new IOHandlePrivate), + nonblocking(false) +{} Socket::Socket(IOHandlePrivate* h) : - IOHandle(h) + IOHandle(h), + nonblocking(false) {} -void Socket::createTcp() const +void Socket::createSocket(const SocketAddress& sa) const { int& socket = impl->fd; if (socket != -1) Socket::close(); - int s = ::socket (AF_INET, SOCK_STREAM, 0); + int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); if (s < 0) throw QPID_POSIX_ERROR(errno); socket = s; + + try { + if (nonblocking) setNonblocking(); + } catch (std::exception&) { + ::close(s); + socket = -1; + throw; + } } void Socket::setTimeout(const Duration& interval) const @@ -125,7 +133,9 @@ void Socket::setTimeout(const Duration& interval) const } void Socket::setNonblocking() const { - QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK)); + int& socket = impl->fd; + if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); + nonblocking = true; } void Socket::connect(const std::string& host, uint16_t port) const @@ -138,8 +148,9 @@ void Socket::connect(const SocketAddress& addr) const { connectname = addr.asString(); - const int& socket = impl->fd; + createSocket(addr); + const int& socket = impl->fd; // TODO the correct thing to do here is loop on failure until you've used all the returned addresses if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { @@ -158,15 +169,22 @@ Socket::close() const int Socket::listen(uint16_t port, int backlog) const { + SocketAddress sa("", boost::lexical_cast<std::string>(port)); + + createSocket(sa); + return listen(sa, backlog); +} + +int Socket::listen(const SocketAddress& sa, int backlog) const +{ const int& socket = impl->fd; int yes=1; QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); - SocketAddress sa("", boost::lexical_cast<std::string>(port)); if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) - throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno))); + throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); if (::listen(socket, backlog) < 0) - throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno))); + throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); struct sockaddr_in name; socklen_t namelen = sizeof(name); diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 475b18600d..971f0bb665 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -216,7 +216,7 @@ AsynchConnector::AsynchConnector(const Socket& sock, connCallback(socket); } catch(std::exception& e) { if (failCallback) - failCallback(-1, std::string(e.what())); + failCallback(socket, -1, std::string(e.what())); socket.close(); delete &socket; } diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp index 18fa7c3b1c..8e6233bbf8 100755 --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -20,19 +20,18 @@ */ #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include "qpid/sys/windows/check.h" #include "qpid/sys/Time.h" #include <cstdlib> #include <string.h> -#include <iostream> -#include <memory.h> #include <winsock2.h> -#include <ws2tcpip.h> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> // Need to initialize WinSock. Ideally, this would be a singleton or embedded // in some one-time initialization function. I tried boost singleton and could @@ -138,20 +137,36 @@ std::string getService(SOCKET fd, bool local) Socket::Socket() : IOHandle(new IOHandlePrivate) { - createTcp(); + SOCKET& socket = impl->fd; + if (socket != INVALID_SOCKET) Socket::close(); + SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; } Socket::Socket(IOHandlePrivate* h) : IOHandle(h) {} -void Socket::createTcp() const +void +Socket::createSocket(const SocketAddress& sa) const { SOCKET& socket = impl->fd; if (socket != INVALID_SOCKET) Socket::close(); - SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); + + SOCKET s = ::socket (getAddrInfo(sa).ai_family, + getAddrInfo(sa).ai_socktype, + 0); if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); socket = s; + + try { + if (nonblocking) setNonblocking(); + } catch (std::exception&) { + closesocket(s); + socket = INVALID_SOCKET; + throw; + } } void Socket::setTimeout(const Duration& interval) const @@ -175,41 +190,26 @@ void Socket::setNonblocking() const { void Socket::connect(const std::string& host, uint16_t port) const { - std::stringstream portstream; - portstream << port << std::ends; - std::string portstr = portstream.str(); - std::stringstream namestream; - namestream << host << ":" << port; - connectname = namestream.str(); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + connect(sa); +} +void +Socket::connect(const SocketAddress& addr) const +{ const SOCKET& socket = impl->fd; - // TODO: Be good to make this work for IPv6 as well as IPv4. Would require - // other changes, such as waiting to create the socket until after we - // have the address family. Maybe unbundle the translation of names here; - // use TcpAddress to resolve things and make this class take a TcpAddress - // and grab its address family to create the socket. - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // We always creating AF_INET-only sockets. - hints.ai_socktype = SOCK_STREAM; // We always do TCP - addrinfo *addrs; - int status = getaddrinfo(host.c_str(), portstr.c_str(), &hints, &addrs); - if (status != 0) - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << - gai_strerror(status))); - addrinfo *addr = addrs; + const addrinfo *addrs = &(getAddrInfo(addr)); int error = 0; WSASetLastError(0); - while (addr != 0) { - if ((::connect(socket, addr->ai_addr, addr->ai_addrlen) == 0) || + while (addrs != 0) { + if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) || (WSAGetLastError() == WSAEWOULDBLOCK)) break; // Error... save this error code and see if there are other address // to try before throwing the exception. error = WSAGetLastError(); - addr = addr->ai_next; + addrs = addrs->ai_next; } - freeaddrinfo(addrs); if (error) throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname)); } diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp new file mode 100644 index 0000000000..a3e03c9be8 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SocketAddress.h" + +#include "qpid/sys/windows/check.h" + +#include <ws2tcpip.h> +#include <string.h> + +namespace qpid { +namespace sys { + +SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : + host(host0), + port(port0), + addrInfo(0) +{ + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = host.c_str(); + } + const char* service = port.empty() ? "0" : port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); +} + +SocketAddress::~SocketAddress() +{ + ::freeaddrinfo(addrInfo); +} + +std::string SocketAddress::asString() const +{ + return host + ":" + port; +} + +const ::addrinfo& getAddrInfo(const SocketAddress& sa) +{ + return *sa.addrInfo; +} + +}} |