summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h12
-rw-r--r--qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp24
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp42
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp2
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Socket.cpp62
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp70
6 files changed, 153 insertions, 59 deletions
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index d108402682..76b993fd63 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -39,12 +39,9 @@ public:
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Create an initialized TCP socket */
- void createTcp() const;
-
/** Set timeout for read and write */
void setTimeout(const Duration& interval) const;
-
+
/** Set socket non blocking */
void setNonblocking() const;
@@ -59,7 +56,8 @@ public:
*@return The bound port.
*/
QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
-
+ QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
+
/** Returns the "socket name" ie the address bound to
* the near end of the socket
*/
@@ -102,8 +100,12 @@ public:
QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const;
private:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
Socket(IOHandlePrivate*);
mutable std::string connectname;
+ mutable bool nonblocking;
};
}}
diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 9fd0602ce9..fd9a4b3468 100644
--- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -60,19 +60,23 @@ class PollerHandlePrivate {
DELETED
};
- int fd;
::__uint32_t events;
+ const IOHandlePrivate* ioHandle;
PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(int f, PollerHandle* p) :
- fd(f),
+ PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
events(0),
+ ioHandle(h),
pollerHandle(p),
stat(ABSENT) {
}
+ int fd() const {
+ return toFd(ioHandle);
+ }
+
bool isActive() const {
return stat == MONITORED || stat == MONITORED_HUNGUP;
}
@@ -131,7 +135,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), this))
+ impl(new PollerHandlePrivate(h.impl, this))
{}
PollerHandle::~PollerHandle() {
@@ -303,7 +307,7 @@ void Poller::registerHandle(PollerHandle& handle) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
eh.setActive();
}
@@ -313,7 +317,7 @@ void Poller::unregisterHandle(PollerHandle& handle) {
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -344,7 +348,7 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
eh.setActive();
return;
@@ -382,7 +386,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
@@ -408,7 +412,7 @@ void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::shutdown() {
@@ -443,7 +447,7 @@ bool Poller::interrupt(PollerHandle& handle) {
epe.events = 0;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
if (eh.isInactive()) {
eh.setInterrupted();
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 02004b1999..481aa6c88e 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -97,22 +97,30 @@ std::string getService(int fd, bool local)
}
Socket::Socket() :
- IOHandle(new IOHandlePrivate)
-{
- createTcp();
-}
+ IOHandle(new IOHandlePrivate),
+ nonblocking(false)
+{}
Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h)
+ IOHandle(h),
+ nonblocking(false)
{}
-void Socket::createTcp() const
+void Socket::createSocket(const SocketAddress& sa) const
{
int& socket = impl->fd;
if (socket != -1) Socket::close();
- int s = ::socket (AF_INET, SOCK_STREAM, 0);
+ int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ } catch (std::exception&) {
+ ::close(s);
+ socket = -1;
+ throw;
+ }
}
void Socket::setTimeout(const Duration& interval) const
@@ -125,7 +133,9 @@ void Socket::setTimeout(const Duration& interval) const
}
void Socket::setNonblocking() const {
- QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+ int& socket = impl->fd;
+ if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
+ nonblocking = true;
}
void Socket::connect(const std::string& host, uint16_t port) const
@@ -138,8 +148,9 @@ void Socket::connect(const SocketAddress& addr) const
{
connectname = addr.asString();
- const int& socket = impl->fd;
+ createSocket(addr);
+ const int& socket = impl->fd;
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
@@ -158,15 +169,22 @@ Socket::close() const
int Socket::listen(uint16_t port, int backlog) const
{
+ SocketAddress sa("", boost::lexical_cast<std::string>(port));
+
+ createSocket(sa);
+ return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& sa, int backlog) const
+{
const int& socket = impl->fd;
int yes=1;
QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
- SocketAddress sa("", boost::lexical_cast<std::string>(port));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
+ throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
+ throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
struct sockaddr_in name;
socklen_t namelen = sizeof(name);
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 475b18600d..971f0bb665 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -216,7 +216,7 @@ AsynchConnector::AsynchConnector(const Socket& sock,
connCallback(socket);
} catch(std::exception& e) {
if (failCallback)
- failCallback(-1, std::string(e.what()));
+ failCallback(socket, -1, std::string(e.what()));
socket.close();
delete &socket;
}
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
index 18fa7c3b1c..8e6233bbf8 100755
--- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -20,19 +20,18 @@
*/
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/windows/check.h"
#include "qpid/sys/Time.h"
#include <cstdlib>
#include <string.h>
-#include <iostream>
-#include <memory.h>
#include <winsock2.h>
-#include <ws2tcpip.h>
#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
// Need to initialize WinSock. Ideally, this would be a singleton or embedded
// in some one-time initialization function. I tried boost singleton and could
@@ -138,20 +137,36 @@ std::string getService(SOCKET fd, bool local)
Socket::Socket() :
IOHandle(new IOHandlePrivate)
{
- createTcp();
+ SOCKET& socket = impl->fd;
+ if (socket != INVALID_SOCKET) Socket::close();
+ SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ socket = s;
}
Socket::Socket(IOHandlePrivate* h) :
IOHandle(h)
{}
-void Socket::createTcp() const
+void
+Socket::createSocket(const SocketAddress& sa) const
{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
- SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+
+ SOCKET s = ::socket (getAddrInfo(sa).ai_family,
+ getAddrInfo(sa).ai_socktype,
+ 0);
if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ } catch (std::exception&) {
+ closesocket(s);
+ socket = INVALID_SOCKET;
+ throw;
+ }
}
void Socket::setTimeout(const Duration& interval) const
@@ -175,41 +190,26 @@ void Socket::setNonblocking() const {
void Socket::connect(const std::string& host, uint16_t port) const
{
- std::stringstream portstream;
- portstream << port << std::ends;
- std::string portstr = portstream.str();
- std::stringstream namestream;
- namestream << host << ":" << port;
- connectname = namestream.str();
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ connect(sa);
+}
+void
+Socket::connect(const SocketAddress& addr) const
+{
const SOCKET& socket = impl->fd;
- // TODO: Be good to make this work for IPv6 as well as IPv4. Would require
- // other changes, such as waiting to create the socket until after we
- // have the address family. Maybe unbundle the translation of names here;
- // use TcpAddress to resolve things and make this class take a TcpAddress
- // and grab its address family to create the socket.
- struct addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // We always creating AF_INET-only sockets.
- hints.ai_socktype = SOCK_STREAM; // We always do TCP
- addrinfo *addrs;
- int status = getaddrinfo(host.c_str(), portstr.c_str(), &hints, &addrs);
- if (status != 0)
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " <<
- gai_strerror(status)));
- addrinfo *addr = addrs;
+ const addrinfo *addrs = &(getAddrInfo(addr));
int error = 0;
WSASetLastError(0);
- while (addr != 0) {
- if ((::connect(socket, addr->ai_addr, addr->ai_addrlen) == 0) ||
+ while (addrs != 0) {
+ if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
(WSAGetLastError() == WSAEWOULDBLOCK))
break;
// Error... save this error code and see if there are other address
// to try before throwing the exception.
error = WSAGetLastError();
- addr = addr->ai_next;
+ addrs = addrs->ai_next;
}
- freeaddrinfo(addrs);
if (error)
throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
}
diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
new file mode 100644
index 0000000000..a3e03c9be8
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/sys/windows/check.h"
+
+#include <ws2tcpip.h>
+#include <string.h>
+
+namespace qpid {
+namespace sys {
+
+SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
+ host(host0),
+ port(port0),
+ addrInfo(0)
+{
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = host.c_str();
+ }
+ const char* service = port.empty() ? "0" : port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+}
+
+SocketAddress::~SocketAddress()
+{
+ ::freeaddrinfo(addrInfo);
+}
+
+std::string SocketAddress::asString() const
+{
+ return host + ":" + port;
+}
+
+const ::addrinfo& getAddrInfo(const SocketAddress& sa)
+{
+ return *sa.addrInfo;
+}
+
+}}