summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-10-12 05:53:27 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-10-12 05:53:27 +0000
commit56e7fca9735459c62b5cca4a35376faf64a992f2 (patch)
tree93a52dadb94ef80243e844814be4fcbd2f75acfe /qpid
parentb2524e1641ea2d424545162c5fdb1629edbb5aba (diff)
downloadqpid-python-56e7fca9735459c62b5cca4a35376faf64a992f2.tar.gz
Refactored Socket to allow for IPv6 and unix domain socket
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@824237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h11
-rw-r--r--qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp24
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp34
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Socket.cpp13
4 files changed, 48 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index d108402682..4a8cf0dd50 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -39,12 +39,9 @@ public:
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Create an initialized TCP socket */
- void createTcp() const;
-
/** Set timeout for read and write */
void setTimeout(const Duration& interval) const;
-
+
/** Set socket non blocking */
void setNonblocking() const;
@@ -59,7 +56,7 @@ public:
*@return The bound port.
*/
QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
-
+
/** Returns the "socket name" ie the address bound to
* the near end of the socket
*/
@@ -102,8 +99,12 @@ public:
QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const;
private:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
Socket(IOHandlePrivate*);
mutable std::string connectname;
+ mutable bool nonblocking;
};
}}
diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 9fd0602ce9..fd9a4b3468 100644
--- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -60,19 +60,23 @@ class PollerHandlePrivate {
DELETED
};
- int fd;
::__uint32_t events;
+ const IOHandlePrivate* ioHandle;
PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(int f, PollerHandle* p) :
- fd(f),
+ PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
events(0),
+ ioHandle(h),
pollerHandle(p),
stat(ABSENT) {
}
+ int fd() const {
+ return toFd(ioHandle);
+ }
+
bool isActive() const {
return stat == MONITORED || stat == MONITORED_HUNGUP;
}
@@ -131,7 +135,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), this))
+ impl(new PollerHandlePrivate(h.impl, this))
{}
PollerHandle::~PollerHandle() {
@@ -303,7 +307,7 @@ void Poller::registerHandle(PollerHandle& handle) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
eh.setActive();
}
@@ -313,7 +317,7 @@ void Poller::unregisterHandle(PollerHandle& handle) {
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -344,7 +348,7 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
eh.setActive();
return;
@@ -382,7 +386,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
@@ -408,7 +412,7 @@ void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::shutdown() {
@@ -443,7 +447,7 @@ bool Poller::interrupt(PollerHandle& handle) {
epe.events = 0;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
if (eh.isInactive()) {
eh.setInterrupted();
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 02004b1999..be26027405 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -97,22 +97,30 @@ std::string getService(int fd, bool local)
}
Socket::Socket() :
- IOHandle(new IOHandlePrivate)
-{
- createTcp();
-}
+ IOHandle(new IOHandlePrivate),
+ nonblocking(false)
+{}
Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h)
+ IOHandle(h),
+ nonblocking(false)
{}
-void Socket::createTcp() const
+void Socket::createSocket(const SocketAddress& sa) const
{
int& socket = impl->fd;
if (socket != -1) Socket::close();
- int s = ::socket (AF_INET, SOCK_STREAM, 0);
+ int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ } catch (std::exception&) {
+ ::close(s);
+ socket = -1;
+ throw;
+ }
}
void Socket::setTimeout(const Duration& interval) const
@@ -125,7 +133,9 @@ void Socket::setTimeout(const Duration& interval) const
}
void Socket::setNonblocking() const {
- QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+ int& socket = impl->fd;
+ if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
+ nonblocking = true;
}
void Socket::connect(const std::string& host, uint16_t port) const
@@ -138,8 +148,9 @@ void Socket::connect(const SocketAddress& addr) const
{
connectname = addr.asString();
- const int& socket = impl->fd;
+ createSocket(addr);
+ const int& socket = impl->fd;
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
@@ -158,11 +169,14 @@ Socket::close() const
int Socket::listen(uint16_t port, int backlog) const
{
+ SocketAddress sa("", boost::lexical_cast<std::string>(port));
+
+ createSocket(sa);
+
const int& socket = impl->fd;
int yes=1;
QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
- SocketAddress sa("", boost::lexical_cast<std::string>(port));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
index 18fa7c3b1c..a89b0dff1b 100755
--- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -138,15 +138,6 @@ std::string getService(SOCKET fd, bool local)
Socket::Socket() :
IOHandle(new IOHandlePrivate)
{
- createTcp();
-}
-
-Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h)
-{}
-
-void Socket::createTcp() const
-{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
@@ -154,6 +145,10 @@ void Socket::createTcp() const
socket = s;
}
+Socket::Socket(IOHandlePrivate* h) :
+ IOHandle(h)
+{}
+
void Socket::setTimeout(const Duration& interval) const
{
const SOCKET& socket = impl->fd;