diff options
Diffstat (limited to 'qpid/cpp/src/tests/SocketProxy.h')
-rw-r--r-- | qpid/cpp/src/tests/SocketProxy.h | 111 |
1 files changed, 70 insertions, 41 deletions
diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index d2a93c902b..ccce3c8842 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -21,45 +21,58 @@ * */ +#include "qpid/sys/IOHandle.h" +#ifdef _WIN32 +# include "qpid/sys/windows/IoHandlePrivate.h" + typedef SOCKET FdType; +#else +# include "qpid/sys/posix/PrivatePosix.h" + typedef int FdType; +#endif #include "qpid/sys/Socket.h" -#include "qpid/sys/Poller.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" -#include "qpid/client/Connection.h" #include "qpid/log/Statement.h" -#include <algorithm> - /** * A simple socket proxy that forwards to another socket. * Used between client & local broker to simulate network failures. */ class SocketProxy : private qpid::sys::Runnable { + // Need a Socket we can get the fd from + class LowSocket : public qpid::sys::Socket { + public: + FdType getFd() { return toFd(impl); } + }; + public: /** Connect to connectPort on host, start a forwarding thread. * Listen for connection on getPort(). */ SocketProxy(int connectPort, const std::string host="localhost") - : closed(false), port(listener.listen()), dropClient(), dropServer() + : closed(false), joined(true), + port(listener.listen()), dropClient(), dropServer() { client.connect(host, connectPort); + joined = false; thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); } - ~SocketProxy() { close(); } + ~SocketProxy() { close(); if (!joined) thread.join(); } /** Simulate a network disconnect. */ void close() { { qpid::sys::Mutex::ScopedLock l(lock); - if (closed) return; + if (closed) { return; } closed=true; } - poller.shutdown(); - if (thread.id() != qpid::sys::Thread::current().id()) - thread.join(); + if (thread.id() != qpid::sys::Thread::current().id()) { + thread.join(); + joined = true; + } client.close(); } @@ -85,56 +98,72 @@ class SocketProxy : private qpid::sys::Runnable } void run() { - std::auto_ptr<qpid::sys::Socket> server; + std::auto_ptr<LowSocket> server; try { - qpid::sys::PollerHandle listenerHandle(listener); - poller.addFd(listenerHandle, qpid::sys::Poller::INPUT); - qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); - throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); - - poller.delFd(listenerHandle); - server.reset(listener.accept()); - - // Pump data between client & server sockets - qpid::sys::PollerHandle clientHandle(client); - qpid::sys::PollerHandle serverHandle(*server); - poller.addFd(clientHandle, qpid::sys::Poller::INPUT); - poller.addFd(serverHandle, qpid::sys::Poller::INPUT); + fd_set socks; + FdType maxFd = listener.getFd(); + struct timeval tmo; + for (;;) { + FD_ZERO(&socks); + FD_SET(maxFd, &socks); + tmo.tv_sec = 0; + tmo.tv_usec = 500 * 1000; + if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { + qpid::sys::Mutex::ScopedLock l(lock); + throwIf(closed, "SocketProxy: Closed by close()"); + continue; + } + throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed"); + break; // Accept ready... go to next step + } + server.reset(reinterpret_cast<LowSocket *>(listener.accept())); + maxFd = server->getFd(); + if (client.getFd() > maxFd) + maxFd = client.getFd(); char buffer[1024]; for (;;) { - qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); - throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected"); - if (event.handle == &serverHandle) { + FD_ZERO(&socks); + tmo.tv_sec = 0; + tmo.tv_usec = 500 * 1000; + FD_SET(client.getFd(), &socks); + FD_SET(server->getFd(), &socks); + if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { + qpid::sys::Mutex::ScopedLock l(lock); + throwIf(closed, "SocketProxy: Closed by close()"); + continue; + } + // Something is set; relay data as needed until something closes + if (FD_ISSET(server->getFd(), &socks)) { ssize_t n = server->read(buffer, sizeof(buffer)); + throwIf(n <= 0, "SocketProxy: server disconnected"); if (!dropServer) client.write(buffer, n); - poller.rearmFd(serverHandle); - } else if (event.handle == &clientHandle) { + } + if (FD_ISSET(client.getFd(), &socks)) { ssize_t n = client.read(buffer, sizeof(buffer)); - if (!dropClient) server->write(buffer, n); - poller.rearmFd(clientHandle); - } else { - throwIf(true, "SocketProxy: No handle ready"); + throwIf(n <= 0, "SocketProxy: client disconnected"); + if (!dropServer) server->write(buffer, n); } + if (!FD_ISSET(client.getFd(), &socks) && + !FD_ISSET(server->getFd(), &socks)) + throwIf(true, "SocketProxy: No handle ready"); } } catch (const std::exception& e) { QPID_LOG(debug, "SocketProxy::run exception: " << e.what()); } try { - if (server.get()) server->close(); - close(); - } + if (server.get()) server->close(); + close(); + } catch (const std::exception& e) { QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what()); } } mutable qpid::sys::Mutex lock; - bool closed; - qpid::sys::Poller poller; - qpid::sys::Socket client, listener; + mutable bool closed; + bool joined; + LowSocket client, listener; uint16_t port; qpid::sys::Thread thread; bool dropClient, dropServer; |