summaryrefslogtreecommitdiff
path: root/cpp/src/tests/SocketProxy.h
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-15 15:41:21 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-15 15:41:21 +0000
commitdd53b33c3badd538d2d25a35146d9ab032573cc0 (patch)
tree305a9f3e6cdc5d88d6c78638c75dda9d3ddb9831 /cpp/src/tests/SocketProxy.h
parent8ac8e19e4805e78c3adcab66f1aab2ef5190f48e (diff)
downloadqpid-python-dd53b33c3badd538d2d25a35146d9ab032573cc0.tar.gz
Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than Socket) that can also be added to a Poller and waited for. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/SocketProxy.h')
-rw-r--r--cpp/src/tests/SocketProxy.h84
1 files changed, 24 insertions, 60 deletions
diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h
index a37c1f2c3e..3263652fe2 100644
--- a/cpp/src/tests/SocketProxy.h
+++ b/cpp/src/tests/SocketProxy.h
@@ -22,6 +22,7 @@
*/
#include "qpid/sys/Socket.h"
+#include "qpid/sys/Poller.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
@@ -43,8 +44,6 @@ class SocketProxy : private qpid::sys::Runnable
SocketProxy(int connectPort, const std::string host="localhost")
: closed(false), port(listener.listen())
{
- int r=::pipe(closePipe);
- if (r<0) throwErrno(QPID_MSG("::pipe returned " << r));
client.connect(host, connectPort);
thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
}
@@ -58,11 +57,9 @@ class SocketProxy : private qpid::sys::Runnable
if (closed) return;
closed=true;
}
- write(closePipe[1], this, 1); // Random byte to closePipe
+ poller.shutdown();
thread.join();
client.close();
- ::close(closePipe[0]);
- ::close(closePipe[1]);
}
bool isClosed() const {
@@ -79,71 +76,38 @@ class SocketProxy : private qpid::sys::Runnable
static void throwIf(bool condition, const std::string& msg) {
if (condition) throw qpid::Exception(msg);
}
-
- struct FdSet : fd_set {
- FdSet() : maxFd(0) { clear(); }
- void clear() { FD_ZERO(this); }
- void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); }
- bool isSet(int fd) const { return FD_ISSET(fd, this); }
- bool operator[](int fd) const { return isSet(fd); }
-
- int maxFd;
- };
-
- enum { RD=1, WR=2, ER=4 };
-
- struct Selector {
- FdSet rd, wr, er;
-
- void set(int fd, int sets) {
- if (sets & RD) rd.set(fd);
- if (sets & WR) wr.set(fd);
- if (sets & ER) er.set(fd);
- }
-
- int select() {
- for (;;) {
- int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd));
- int r = ::select(maxFd + 1, &rd, &wr, &er, NULL);
- if (r == -1 && errno == EINTR) continue;
- if (r < 0) throwErrno(QPID_MSG("select returned " <<r));
- return r;
- }
- }
- };
void run() {
std::auto_ptr<qpid::sys::Socket> server;
try {
- // Accept incoming connections, watch closePipe.
- Selector accept;
- accept.set(listener.toFd(), RD|ER);
- accept.set(closePipe[0], RD|ER);
- accept.select();
- throwIf(accept.rd[closePipe[0]], "Closed by close()");
- throwIf(!accept.rd[listener.toFd()],"Accept failed");
+ qpid::sys::PollerHandle listenerHandle(listener);
+ poller.addFd(listenerHandle, qpid::sys::Poller::IN);
+ qpid::sys::Poller::Event event = poller.wait();
+ throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()");
+ throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed");
+
+ poller.delFd(listenerHandle);
server.reset(listener.accept(0, 0));
- // Pump data between client & server sockets, watch closePipe.
+ // Pump data between client & server sockets
+ qpid::sys::PollerHandle clientHandle(client);
+ qpid::sys::PollerHandle serverHandle(*server);
+ poller.addFd(clientHandle, qpid::sys::Poller::IN);
+ poller.addFd(serverHandle, qpid::sys::Poller::IN);
char buffer[1024];
for (;;) {
- Selector select;
- select.set(server->toFd(), RD|ER);
- select.set(client.toFd(), RD|ER);
- select.set(closePipe[0], RD|ER);
- select.select();
- throwIf(select.rd[closePipe[0]], "Closed by close()");
- // Read even if fd is in error to throw a useful exception.
- bool gotData=false;
- if (select.rd[server->toFd()] || select.er[server->toFd()]) {
+ qpid::sys::Poller::Event event = poller.wait();
+ throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()");
+ throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected");
+ if (event.handle == &serverHandle) {
client.write(buffer, server->read(buffer, sizeof(buffer)));
- gotData=true;
- }
- if (select.rd[client.toFd()] || select.er[client.toFd()]) {
+ poller.rearmFd(serverHandle);
+ } else if (event.handle == &clientHandle) {
server->write(buffer, client.read(buffer, sizeof(buffer)));
- gotData=true;
+ poller.rearmFd(clientHandle);
+ } else {
+ throwIf(true, "No handle ready");
}
- throwIf(!gotData, "No data from select()");
}
}
catch (const std::exception& e) {
@@ -155,9 +119,9 @@ class SocketProxy : private qpid::sys::Runnable
mutable qpid::sys::Mutex lock;
bool closed;
+ qpid::sys::Poller poller;
qpid::sys::Socket client, listener;
uint16_t port;
- int closePipe[2];
qpid::sys::Thread thread;
};