diff options
Diffstat (limited to 'cpp/src/tests/SocketProxy.h')
-rw-r--r-- | cpp/src/tests/SocketProxy.h | 158 |
1 files changed, 120 insertions, 38 deletions
diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h index b985ded175..a37c1f2c3e 100644 --- a/cpp/src/tests/SocketProxy.h +++ b/cpp/src/tests/SocketProxy.h @@ -24,59 +24,141 @@ #include "qpid/sys/Socket.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/Mutex.h" +#include "qpid/client/Connection.h" +#include "qpid/log/Statement.h" + +#include <algorithm> /** - * A simple socket proxy that forwards to another socket. Used between - * client & broker to simulate network failures. + * A simple socket proxy that forwards to another socket. + * Used between client & local broker to simulate network failures. */ -struct SocketProxy : public qpid::sys::Runnable +class SocketProxy : private qpid::sys::Runnable { - int port; // Port bound to server socket. - qpid::sys::Socket client, server; // Client & server sockets. + public: + /** Connect to connectPort on host, start a forwarding thread. + * Listen for connection on getPort(). + */ + SocketProxy(int connectPort, const std::string host="localhost") + : closed(false), port(listener.listen()) + { + int r=::pipe(closePipe); + if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); + client.connect(host, connectPort); + thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); + } + + ~SocketProxy() { close(); } - SocketProxy(const std::string& host, int port) { init(host,port); } - SocketProxy(int port) { init("localhost",port); } + /** Simulate a network disconnect. */ + void close() { + { + qpid::sys::Mutex::ScopedLock l(lock); + if (closed) return; + closed=true; + } + write(closePipe[1], this, 1); // Random byte to closePipe + thread.join(); + client.close(); + ::close(closePipe[0]); + ::close(closePipe[1]); + } - ~SocketProxy() { client.close(); server.close(); thread.join(); } + bool isClosed() const { + qpid::sys::Mutex::ScopedLock l(lock); + return closed; + } + + uint16_t getPort() const { return port; } private: - - void init(const std::string& host, int connectPort) { - client.connect(host, connectPort); - port = server.listen(); - thread=qpid::sys::Thread(this); + static void throwErrno(const std::string& msg) { + throw qpid::Exception(msg+":"+qpid::strError(errno)); } - - void run() { - try { - do { - ssize_t recv = server.recv(buffer, sizeof(buffer)); - if (recv <= 0) return; - ssize_t sent=client.send(buffer, recv); - if (sent < 0) return; - assert(sent == recv); // Assumes we can send as we receive. - } while (true); - } catch(...) {} + static void throwIf(bool condition, const std::string& msg) { + if (condition) throw qpid::Exception(msg); } + + struct FdSet : fd_set { + FdSet() : maxFd(0) { clear(); } + void clear() { FD_ZERO(this); } + void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } + bool isSet(int fd) const { return FD_ISSET(fd, this); } + bool operator[](int fd) const { return isSet(fd); } - qpid::sys::Thread thread; - char buffer[64*1024]; -}; + int maxFd; + }; -/** A local client connection via a socket proxy. */ -struct ProxyConnection : public qpid::client::Connection { - SocketProxy proxy; - qpid::client::Session_0_10 session; + enum { RD=1, WR=2, ER=4 }; - ProxyConnection(const std::string& host, int port) : proxy(port) { - open(host, proxy.port); - session=newSession(); - } + struct Selector { + FdSet rd, wr, er; + + void set(int fd, int sets) { + if (sets & RD) rd.set(fd); + if (sets & WR) wr.set(fd); + if (sets & ER) er.set(fd); + } + + int select() { + for (;;) { + int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); + int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); + if (r == -1 && errno == EINTR) continue; + if (r < 0) throwErrno(QPID_MSG("select returned " <<r)); + return r; + } + } + }; - ProxyConnection(int port) : proxy(port) { - open("localhost", proxy.port); - session=newSession(); + void run() { + std::auto_ptr<qpid::sys::Socket> server; + try { + // Accept incoming connections, watch closePipe. + Selector accept; + accept.set(listener.toFd(), RD|ER); + accept.set(closePipe[0], RD|ER); + accept.select(); + throwIf(accept.rd[closePipe[0]], "Closed by close()"); + throwIf(!accept.rd[listener.toFd()],"Accept failed"); + server.reset(listener.accept(0, 0)); + + // Pump data between client & server sockets, watch closePipe. + char buffer[1024]; + for (;;) { + Selector select; + select.set(server->toFd(), RD|ER); + select.set(client.toFd(), RD|ER); + select.set(closePipe[0], RD|ER); + select.select(); + throwIf(select.rd[closePipe[0]], "Closed by close()"); + // Read even if fd is in error to throw a useful exception. + bool gotData=false; + if (select.rd[server->toFd()] || select.er[server->toFd()]) { + client.write(buffer, server->read(buffer, sizeof(buffer))); + gotData=true; + } + if (select.rd[client.toFd()] || select.er[client.toFd()]) { + server->write(buffer, client.read(buffer, sizeof(buffer))); + gotData=true; + } + throwIf(!gotData, "No data from select()"); + } + } + catch (const std::exception& e) { + QPID_LOG(debug, "SocketProxy::run exiting: " << e.what()); + } + if (server.get()) server->close(); + close(); } + + mutable qpid::sys::Mutex lock; + bool closed; + qpid::sys::Socket client, listener; + uint16_t port; + int closePipe[2]; + qpid::sys::Thread thread; }; #endif |