diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
commit | dd53b33c3badd538d2d25a35146d9ab032573cc0 (patch) | |
tree | 305a9f3e6cdc5d88d6c78638c75dda9d3ddb9831 /cpp/src/tests/SocketProxy.h | |
parent | 8ac8e19e4805e78c3adcab66f1aab2ef5190f48e (diff) | |
download | qpid-python-dd53b33c3badd538d2d25a35146d9ab032573cc0.tar.gz |
Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than Socket) that
can also be added to a Poller and waited for.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/SocketProxy.h')
-rw-r--r-- | cpp/src/tests/SocketProxy.h | 84 |
1 files changed, 24 insertions, 60 deletions
diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h index a37c1f2c3e..3263652fe2 100644 --- a/cpp/src/tests/SocketProxy.h +++ b/cpp/src/tests/SocketProxy.h @@ -22,6 +22,7 @@ */ #include "qpid/sys/Socket.h" +#include "qpid/sys/Poller.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" @@ -43,8 +44,6 @@ class SocketProxy : private qpid::sys::Runnable SocketProxy(int connectPort, const std::string host="localhost") : closed(false), port(listener.listen()) { - int r=::pipe(closePipe); - if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); client.connect(host, connectPort); thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); } @@ -58,11 +57,9 @@ class SocketProxy : private qpid::sys::Runnable if (closed) return; closed=true; } - write(closePipe[1], this, 1); // Random byte to closePipe + poller.shutdown(); thread.join(); client.close(); - ::close(closePipe[0]); - ::close(closePipe[1]); } bool isClosed() const { @@ -79,71 +76,38 @@ class SocketProxy : private qpid::sys::Runnable static void throwIf(bool condition, const std::string& msg) { if (condition) throw qpid::Exception(msg); } - - struct FdSet : fd_set { - FdSet() : maxFd(0) { clear(); } - void clear() { FD_ZERO(this); } - void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } - bool isSet(int fd) const { return FD_ISSET(fd, this); } - bool operator[](int fd) const { return isSet(fd); } - - int maxFd; - }; - - enum { RD=1, WR=2, ER=4 }; - - struct Selector { - FdSet rd, wr, er; - - void set(int fd, int sets) { - if (sets & RD) rd.set(fd); - if (sets & WR) wr.set(fd); - if (sets & ER) er.set(fd); - } - - int select() { - for (;;) { - int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); - int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); - if (r == -1 && errno == EINTR) continue; - if (r < 0) throwErrno(QPID_MSG("select returned " <<r)); - return r; - } - } - }; void run() { std::auto_ptr<qpid::sys::Socket> server; try { - // Accept incoming connections, watch closePipe. - Selector accept; - accept.set(listener.toFd(), RD|ER); - accept.set(closePipe[0], RD|ER); - accept.select(); - throwIf(accept.rd[closePipe[0]], "Closed by close()"); - throwIf(!accept.rd[listener.toFd()],"Accept failed"); + qpid::sys::PollerHandle listenerHandle(listener); + poller.addFd(listenerHandle, qpid::sys::Poller::IN); + qpid::sys::Poller::Event event = poller.wait(); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); + throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed"); + + poller.delFd(listenerHandle); server.reset(listener.accept(0, 0)); - // Pump data between client & server sockets, watch closePipe. + // Pump data between client & server sockets + qpid::sys::PollerHandle clientHandle(client); + qpid::sys::PollerHandle serverHandle(*server); + poller.addFd(clientHandle, qpid::sys::Poller::IN); + poller.addFd(serverHandle, qpid::sys::Poller::IN); char buffer[1024]; for (;;) { - Selector select; - select.set(server->toFd(), RD|ER); - select.set(client.toFd(), RD|ER); - select.set(closePipe[0], RD|ER); - select.select(); - throwIf(select.rd[closePipe[0]], "Closed by close()"); - // Read even if fd is in error to throw a useful exception. - bool gotData=false; - if (select.rd[server->toFd()] || select.er[server->toFd()]) { + qpid::sys::Poller::Event event = poller.wait(); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); + throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected"); + if (event.handle == &serverHandle) { client.write(buffer, server->read(buffer, sizeof(buffer))); - gotData=true; - } - if (select.rd[client.toFd()] || select.er[client.toFd()]) { + poller.rearmFd(serverHandle); + } else if (event.handle == &clientHandle) { server->write(buffer, client.read(buffer, sizeof(buffer))); - gotData=true; + poller.rearmFd(clientHandle); + } else { + throwIf(true, "No handle ready"); } - throwIf(!gotData, "No data from select()"); } } catch (const std::exception& e) { @@ -155,9 +119,9 @@ class SocketProxy : private qpid::sys::Runnable mutable qpid::sys::Mutex lock; bool closed; + qpid::sys::Poller poller; qpid::sys::Socket client, listener; uint16_t port; - int closePipe[2]; qpid::sys::Thread thread; }; |