summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-15 15:41:21 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-15 15:41:21 +0000
commit7bf3ed2b4bca4706e3837126d597ae5d2ee11537 (patch)
tree018e7046950bec77038ccde2ef3ed22ae190ed8d
parentdf53cdeef2ceea78010708a1135c64830aa3049e (diff)
downloadqpid-python-7bf3ed2b4bca4706e3837126d597ae5d2ee11537.tar.gz
Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than Socket) that can also be added to a Poller and waited for. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@648288 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am6
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp15
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h2
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIO.h3
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp22
-rw-r--r--qpid/cpp/src/qpid/sys/Dispatcher.h4
-rw-r--r--qpid/cpp/src/qpid/sys/IOHandle.h45
-rw-r--r--qpid/cpp/src/qpid/sys/Poller.h6
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h17
-rw-r--r--qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp18
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp14
-rw-r--r--qpid/cpp/src/qpid/sys/posix/IOHandle.cpp42
-rw-r--r--qpid/cpp/src/qpid/sys/posix/PrivatePosix.h14
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp51
-rw-r--r--qpid/cpp/src/tests/SocketProxy.h84
15 files changed, 190 insertions, 153 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 4a43a9cc55..3177d280a6 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -62,7 +62,7 @@ qpidd_SOURCES = qpidd.cpp
posix_plat_src = \
qpid/sys/epoll/EpollPoller.cpp \
- qpid/sys/DeletionManager.h \
+ qpid/sys/posix/IOHandle.cpp \
qpid/sys/posix/Socket.cpp \
qpid/sys/posix/AsynchIO.cpp \
qpid/sys/posix/Time.cpp \
@@ -469,15 +469,17 @@ nobase_include_HEADERS = \
qpid/sys/AtomicCount.h \
qpid/sys/BlockingQueue.h \
qpid/sys/Condition.h \
+ qpid/sys/ConnectionCodec.h \
qpid/sys/ConnectionInputHandler.h \
qpid/sys/ConnectionInputHandlerFactory.h \
qpid/sys/ConnectionOutputHandler.h \
+ qpid/sys/DeletionManager.h \
qpid/sys/Dispatcher.h \
+ qpid/sys/IOHandle.h \
qpid/sys/Module.h \
qpid/sys/Monitor.h \
qpid/sys/Mutex.h \
qpid/sys/OutputControl.h \
- qpid/sys/ConnectionCodec.h \
qpid/sys/OutputTask.h \
qpid/sys/Poller.h \
qpid/sys/Runnable.h \
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index a0be05fbbc..11aff6184b 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -18,16 +18,17 @@
* under the License.
*
*/
-#include <iostream>
+#include "Connector.h"
+
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
-#include "Connector.h"
-
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
#include "qpid/Msg.h"
+
+#include <iostream>
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -62,7 +63,7 @@ void Connector::connect(const std::string& host, int port){
Mutex::ScopedLock l(closedLock);
assert(closed);
socket.connect(host, port);
- identifier=str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
closed = false;
poller = Poller::shared_ptr(new Poller);
aio = new AsynchIO(socket,
@@ -72,7 +73,7 @@ void Connector::connect(const std::string& host, int port){
0, // closed
0, // nobuffs
boost::bind(&Connector::writebuff, this, _1));
- writer.setAio(aio);
+ writer.init(identifier, aio);
}
void Connector::init(){
@@ -184,11 +185,11 @@ Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0)
Connector::Writer::~Writer() { delete buffer; }
-void Connector::Writer::setAio(sys::AsynchIO* a) {
+void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
Mutex::ScopedLock l(lock);
+ identifier = id;
aio = a;
newBuffer(l);
- identifier = str(format("[%1% %2%]") % aio->getSocket().getLocalPort() % aio->getSocket().getPeerAddress());
}
void Connector::Writer::handle(framing::AMQFrame& frame) {
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index ffddbfd1be..78aad0b60a 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -68,7 +68,7 @@ class Connector : public framing::OutputHandler,
Writer();
~Writer();
- void setAio(sys::AsynchIO*);
+ void init(std::string id, sys::AsynchIO*);
void handle(framing::AMQFrame&);
void write(sys::AsynchIO&);
};
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h
index ca34d82741..3bcee8ba22 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIO.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIO.h
@@ -40,6 +40,7 @@ public:
private:
Callback acceptedCallback;
DispatchHandle handle;
+ const Socket& socket;
public:
AsynchAcceptor(const Socket& s, Callback callback);
@@ -94,6 +95,7 @@ private:
ClosedCallback closedCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
+ const Socket& socket;
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
bool queuedClose;
@@ -119,7 +121,6 @@ public:
void queueWriteClose();
bool writeQueueEmpty() { return writeQueue.empty(); }
BufferBase* getQueuedBuffer();
- const Socket& getSocket() const { return DispatchHandle::getSocket(); }
private:
~AsynchIO();
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 5c784912b3..43fbfdf7be 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -84,19 +84,20 @@ struct Buff : public AsynchIO::BufferBase {
};
class AsynchIOHandler : public OutputControl {
+ std::string identifier;
AsynchIO* aio;
ConnectionCodec::Factory* factory;
ConnectionCodec* codec;
bool readError;
- std::string identifier;
bool isClient;
void write(const framing::ProtocolInitiation&);
public:
- AsynchIOHandler() :
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+ identifier(id),
aio(0),
- factory(0),
+ factory(f),
codec(0),
readError(false),
isClient(false)
@@ -110,11 +111,8 @@ class AsynchIOHandler : public OutputControl {
void setClient() { isClient = true; }
- void init(AsynchIO* a, ConnectionCodec::Factory* f) {
+ void init(AsynchIO* a) {
aio = a;
- factory = f;
- identifier = aio->getSocket().getPeerAddress();
-
}
// Output side
@@ -133,7 +131,7 @@ class AsynchIOHandler : public OutputControl {
};
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler;
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -141,7 +139,8 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, f);
+ async->init(aio);
+
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
@@ -185,7 +184,7 @@ void AsynchIOAcceptor::connect(
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
- AsynchIOHandler* async = new AsynchIOHandler;
+ AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
async->setClient();
AsynchIO* aio = new AsynchIO(*socket,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -194,7 +193,8 @@ void AsynchIOAcceptor::connect(
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, f);
+ async->init(aio);
+
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
diff --git a/qpid/cpp/src/qpid/sys/Dispatcher.h b/qpid/cpp/src/qpid/sys/Dispatcher.h
index 7cc4873068..cd7a0bdb66 100644
--- a/qpid/cpp/src/qpid/sys/Dispatcher.h
+++ b/qpid/cpp/src/qpid/sys/Dispatcher.h
@@ -55,8 +55,8 @@ private:
} state;
public:
- DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
- PollerHandle(s),
+ DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(h),
readableCallback(rCb),
writableCallback(wCb),
disconnectedCallback(dCb),
diff --git a/qpid/cpp/src/qpid/sys/IOHandle.h b/qpid/cpp/src/qpid/sys/IOHandle.h
new file mode 100644
index 0000000000..d06512da58
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/IOHandle.h
@@ -0,0 +1,45 @@
+#ifndef _sys_IOHandle_h
+#define _sys_IOHandle_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+/**
+ * This is a class intended to abstract the Unix concept of file descriptor or the Windows concept of HANDLE
+ */
+class PollerHandle;
+class IOHandlePrivate;
+class IOHandle {
+ friend class PollerHandle;
+
+protected:
+ IOHandlePrivate* const impl;
+
+ IOHandle(IOHandlePrivate*);
+ virtual ~IOHandle();
+};
+
+}}
+
+#endif // _sys_IOHandle_h
diff --git a/qpid/cpp/src/qpid/sys/Poller.h b/qpid/cpp/src/qpid/sys/Poller.h
index 0d6b4f9308..dccc12479a 100644
--- a/qpid/cpp/src/qpid/sys/Poller.h
+++ b/qpid/cpp/src/qpid/sys/Poller.h
@@ -35,16 +35,16 @@ namespace sys {
/**
* Handle class to use for polling
*/
+class IOHandle;
class Poller;
class PollerHandlePrivate;
class PollerHandle {
friend class Poller;
PollerHandlePrivate* const impl;
- const Socket& socket;
public:
- PollerHandle(const Socket& s);
+ PollerHandle(const IOHandle& h);
// Usual way to delete (will defer deletion until we
// can't be returned from a Poller::wait any more)
@@ -52,8 +52,6 @@ public:
// Class clients shouldn't ever use this
virtual ~PollerHandle();
-
- const Socket& getSocket() const {return socket;}
};
/**
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index 0ebfc0c330..cab95654ad 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -21,25 +21,22 @@
* under the License.
*
*/
+#include "IOHandle.h"
+
#include <string>
-#include "qpid/sys/Time.h"
struct sockaddr;
namespace qpid {
namespace sys {
-class SocketPrivate;
-class Socket
-{
- friend class Poller;
-
- SocketPrivate* const impl;
+class Duration;
+class Socket : public IOHandle
+{
public:
/** Create a socket wrapper for descriptor. */
Socket();
- ~Socket();
/** Create an initialized TCP socket */
void createTcp() const;
@@ -106,10 +103,8 @@ public:
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- int toFd() const;
-
private:
- Socket(SocketPrivate*);
+ Socket(IOHandlePrivate*);
};
}}
diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 8936251f94..44b84c4239 100644
--- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/DeletionManager.h"
#include "qpid/sys/posix/check.h"
@@ -54,11 +55,13 @@ class PollerHandlePrivate {
MONITORED_HUNGUP
};
+ int fd;
::__uint32_t events;
FDStat stat;
Mutex lock;
- PollerHandlePrivate() :
+ PollerHandlePrivate(int f) :
+ fd(f),
events(0),
stat(ABSENT) {
}
@@ -97,9 +100,8 @@ class PollerHandlePrivate {
}
};
-PollerHandle::PollerHandle(const Socket& s) :
- impl(new PollerHandlePrivate),
- socket(s)
+PollerHandle::PollerHandle(const IOHandle& h) :
+ impl(new PollerHandlePrivate(toFd(h.impl)))
{}
PollerHandle::~PollerHandle() {
@@ -201,7 +203,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
}
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -212,7 +214,7 @@ void Poller::delFd(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -231,7 +233,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) {
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -247,7 +249,7 @@ void Poller::rearmFd(PollerHandle& handle) {
epe.events = eh.events;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
eh.setActive();
}
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 94c68bd5d0..cedad5c011 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -65,7 +65,8 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+ handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+ socket(s) {
s.setNonblocking();
ignoreSigpipe();
@@ -84,7 +85,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
- s = h.getSocket().accept(0, 0);
+ s = socket.accept(0, 0);
if (s) {
acceptedCallback(*s);
} else {
@@ -112,6 +113,7 @@ AsynchIO::AsynchIO(const Socket& s,
closedCallback(cCb),
emptyCallback(eCb),
idleCallback(iCb),
+ socket(s),
queuedClose(false),
writePending(false) {
@@ -209,7 +211,7 @@ void AsynchIO::readable(DispatchHandle& h) {
bufferQueue.pop_front();
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
- int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
+ int rc = socket.read(buff->bytes + buff->dataCount, readCount);
if (rc > 0) {
buff->dataCount += rc;
threadReadTotal += rc;
@@ -276,7 +278,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
+ int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
threadWriteTotal += rc;
writeTotal += rc;
@@ -356,9 +358,9 @@ void AsynchIO::disconnected(DispatchHandle& h) {
*/
void AsynchIO::close(DispatchHandle& h) {
h.stopWatch();
- h.getSocket().close();
+ socket.close();
if (closedCallback) {
- closedCallback(*this, getSocket());
+ closedCallback(*this, socket);
}
}
diff --git a/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
new file mode 100644
index 0000000000..80b487eadc
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+
+#include "PrivatePosix.h"
+
+namespace qpid {
+namespace sys {
+
+int toFd(const IOHandlePrivate* h)
+{
+ return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+ impl(h)
+{}
+
+IOHandle::~IOHandle() {
+ delete impl;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
index 9ec9770cab..33c0cd81bc 100644
--- a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
+++ b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
@@ -35,9 +35,17 @@ struct timespec& toTimespec(struct timespec& ts, const Duration& t);
struct timeval& toTimeval(struct timeval& tv, const Duration& t);
Duration toTime(const struct timespec& ts);
-// Private socket related implementation details
-class SocketPrivate;
-int toFd(const SocketPrivate* s);
+// Private fd related implementation details
+class IOHandlePrivate {
+public:
+ IOHandlePrivate(int f = -1) :
+ fd(f)
+ {}
+
+ int fd;
+};
+
+int toFd(const IOHandlePrivate* h);
}}
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index c286ebce27..99cf7210b6 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -38,19 +38,8 @@
namespace qpid {
namespace sys {
-class SocketPrivate {
-public:
- SocketPrivate(int f = -1) :
- fd(f)
- {}
-
- int fd;
-
- std::string getName(bool local, bool includeService = false) const;
- std::string getService(bool local) const;
-};
-
-std::string SocketPrivate::getName(bool local, bool includeService) const
+namespace {
+std::string getName(int fd, bool local, bool includeService = false)
{
::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
@@ -80,7 +69,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const
}
}
-std::string SocketPrivate::getService(bool local) const
+std::string getService(int fd, bool local)
{
::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
@@ -101,21 +90,18 @@ std::string SocketPrivate::getService(bool local) const
throw QPID_POSIX_ERROR(rc);
return servName;
}
+}
Socket::Socket() :
- impl(new SocketPrivate)
+ IOHandle(new IOHandlePrivate)
{
createTcp();
}
-Socket::Socket(SocketPrivate* sp) :
- impl(sp)
+Socket::Socket(IOHandlePrivate* h) :
+ IOHandle(h)
{}
-Socket::~Socket() {
- delete impl;
-}
-
void Socket::createTcp() const
{
int& socket = impl->fd;
@@ -225,7 +211,7 @@ Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
{
int afd = ::accept(impl->fd, addr, addrlen);
if ( afd >= 0)
- return new Socket(new SocketPrivate(afd));
+ return new Socket(new IOHandlePrivate(afd));
else if (errno == EAGAIN)
return 0;
else throw QPID_POSIX_ERROR(errno);
@@ -243,41 +229,32 @@ int Socket::write(const void *buf, size_t count) const
std::string Socket::getSockname() const
{
- return impl->getName(true);
+ return getName(impl->fd, true);
}
std::string Socket::getPeername() const
{
- return impl->getName(false);
+ return getName(impl->fd, false);
}
std::string Socket::getPeerAddress() const
{
- return impl->getName(false, true);
+ return getName(impl->fd, false, true);
}
std::string Socket::getLocalAddress() const
{
- return impl->getName(true, true);
+ return getName(impl->fd, true, true);
}
uint16_t Socket::getLocalPort() const
{
- return atoi(impl->getService(true).c_str());
+ return atoi(getService(impl->fd, true).c_str());
}
uint16_t Socket::getRemotePort() const
{
- return atoi(impl->getService(true).c_str());
-}
-
-int Socket::toFd() const {
- return impl->fd;
-}
-
-int toFd(const SocketPrivate* s)
-{
- return s->fd;
+ return atoi(getService(impl->fd, true).c_str());
}
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h
index a37c1f2c3e..3263652fe2 100644
--- a/qpid/cpp/src/tests/SocketProxy.h
+++ b/qpid/cpp/src/tests/SocketProxy.h
@@ -22,6 +22,7 @@
*/
#include "qpid/sys/Socket.h"
+#include "qpid/sys/Poller.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
@@ -43,8 +44,6 @@ class SocketProxy : private qpid::sys::Runnable
SocketProxy(int connectPort, const std::string host="localhost")
: closed(false), port(listener.listen())
{
- int r=::pipe(closePipe);
- if (r<0) throwErrno(QPID_MSG("::pipe returned " << r));
client.connect(host, connectPort);
thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
}
@@ -58,11 +57,9 @@ class SocketProxy : private qpid::sys::Runnable
if (closed) return;
closed=true;
}
- write(closePipe[1], this, 1); // Random byte to closePipe
+ poller.shutdown();
thread.join();
client.close();
- ::close(closePipe[0]);
- ::close(closePipe[1]);
}
bool isClosed() const {
@@ -79,71 +76,38 @@ class SocketProxy : private qpid::sys::Runnable
static void throwIf(bool condition, const std::string& msg) {
if (condition) throw qpid::Exception(msg);
}
-
- struct FdSet : fd_set {
- FdSet() : maxFd(0) { clear(); }
- void clear() { FD_ZERO(this); }
- void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); }
- bool isSet(int fd) const { return FD_ISSET(fd, this); }
- bool operator[](int fd) const { return isSet(fd); }
-
- int maxFd;
- };
-
- enum { RD=1, WR=2, ER=4 };
-
- struct Selector {
- FdSet rd, wr, er;
-
- void set(int fd, int sets) {
- if (sets & RD) rd.set(fd);
- if (sets & WR) wr.set(fd);
- if (sets & ER) er.set(fd);
- }
-
- int select() {
- for (;;) {
- int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd));
- int r = ::select(maxFd + 1, &rd, &wr, &er, NULL);
- if (r == -1 && errno == EINTR) continue;
- if (r < 0) throwErrno(QPID_MSG("select returned " <<r));
- return r;
- }
- }
- };
void run() {
std::auto_ptr<qpid::sys::Socket> server;
try {
- // Accept incoming connections, watch closePipe.
- Selector accept;
- accept.set(listener.toFd(), RD|ER);
- accept.set(closePipe[0], RD|ER);
- accept.select();
- throwIf(accept.rd[closePipe[0]], "Closed by close()");
- throwIf(!accept.rd[listener.toFd()],"Accept failed");
+ qpid::sys::PollerHandle listenerHandle(listener);
+ poller.addFd(listenerHandle, qpid::sys::Poller::IN);
+ qpid::sys::Poller::Event event = poller.wait();
+ throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()");
+ throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed");
+
+ poller.delFd(listenerHandle);
server.reset(listener.accept(0, 0));
- // Pump data between client & server sockets, watch closePipe.
+ // Pump data between client & server sockets
+ qpid::sys::PollerHandle clientHandle(client);
+ qpid::sys::PollerHandle serverHandle(*server);
+ poller.addFd(clientHandle, qpid::sys::Poller::IN);
+ poller.addFd(serverHandle, qpid::sys::Poller::IN);
char buffer[1024];
for (;;) {
- Selector select;
- select.set(server->toFd(), RD|ER);
- select.set(client.toFd(), RD|ER);
- select.set(closePipe[0], RD|ER);
- select.select();
- throwIf(select.rd[closePipe[0]], "Closed by close()");
- // Read even if fd is in error to throw a useful exception.
- bool gotData=false;
- if (select.rd[server->toFd()] || select.er[server->toFd()]) {
+ qpid::sys::Poller::Event event = poller.wait();
+ throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()");
+ throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected");
+ if (event.handle == &serverHandle) {
client.write(buffer, server->read(buffer, sizeof(buffer)));
- gotData=true;
- }
- if (select.rd[client.toFd()] || select.er[client.toFd()]) {
+ poller.rearmFd(serverHandle);
+ } else if (event.handle == &clientHandle) {
server->write(buffer, client.read(buffer, sizeof(buffer)));
- gotData=true;
+ poller.rearmFd(clientHandle);
+ } else {
+ throwIf(true, "No handle ready");
}
- throwIf(!gotData, "No data from select()");
}
}
catch (const std::exception& e) {
@@ -155,9 +119,9 @@ class SocketProxy : private qpid::sys::Runnable
mutable qpid::sys::Mutex lock;
bool closed;
+ qpid::sys::Poller poller;
qpid::sys::Socket client, listener;
uint16_t port;
- int closePipe[2];
qpid::sys::Thread thread;
};