diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
commit | 7bf3ed2b4bca4706e3837126d597ae5d2ee11537 (patch) | |
tree | 018e7046950bec77038ccde2ef3ed22ae190ed8d | |
parent | df53cdeef2ceea78010708a1135c64830aa3049e (diff) | |
download | qpid-python-7bf3ed2b4bca4706e3837126d597ae5d2ee11537.tar.gz |
Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than Socket) that
can also be added to a Poller and waited for.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@648288 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIO.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Dispatcher.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/IOHandle.h | 45 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Poller.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Socket.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/IOHandle.cpp | 42 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/PrivatePosix.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Socket.cpp | 51 | ||||
-rw-r--r-- | qpid/cpp/src/tests/SocketProxy.h | 84 |
15 files changed, 190 insertions, 153 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 4a43a9cc55..3177d280a6 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -62,7 +62,7 @@ qpidd_SOURCES = qpidd.cpp posix_plat_src = \ qpid/sys/epoll/EpollPoller.cpp \ - qpid/sys/DeletionManager.h \ + qpid/sys/posix/IOHandle.cpp \ qpid/sys/posix/Socket.cpp \ qpid/sys/posix/AsynchIO.cpp \ qpid/sys/posix/Time.cpp \ @@ -469,15 +469,17 @@ nobase_include_HEADERS = \ qpid/sys/AtomicCount.h \ qpid/sys/BlockingQueue.h \ qpid/sys/Condition.h \ + qpid/sys/ConnectionCodec.h \ qpid/sys/ConnectionInputHandler.h \ qpid/sys/ConnectionInputHandlerFactory.h \ qpid/sys/ConnectionOutputHandler.h \ + qpid/sys/DeletionManager.h \ qpid/sys/Dispatcher.h \ + qpid/sys/IOHandle.h \ qpid/sys/Module.h \ qpid/sys/Monitor.h \ qpid/sys/Mutex.h \ qpid/sys/OutputControl.h \ - qpid/sys/ConnectionCodec.h \ qpid/sys/OutputTask.h \ qpid/sys/Poller.h \ qpid/sys/Runnable.h \ diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index a0be05fbbc..11aff6184b 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -18,16 +18,17 @@ * under the License. * */ -#include <iostream> +#include "Connector.h" + #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" -#include "Connector.h" - #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" #include "qpid/Msg.h" + +#include <iostream> #include <boost/bind.hpp> #include <boost/format.hpp> @@ -62,7 +63,7 @@ void Connector::connect(const std::string& host, int port){ Mutex::ScopedLock l(closedLock); assert(closed); socket.connect(host, port); - identifier=str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; poller = Poller::shared_ptr(new Poller); aio = new AsynchIO(socket, @@ -72,7 +73,7 @@ void Connector::connect(const std::string& host, int port){ 0, // closed 0, // nobuffs boost::bind(&Connector::writebuff, this, _1)); - writer.setAio(aio); + writer.init(identifier, aio); } void Connector::init(){ @@ -184,11 +185,11 @@ Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) Connector::Writer::~Writer() { delete buffer; } -void Connector::Writer::setAio(sys::AsynchIO* a) { +void Connector::Writer::init(std::string id, sys::AsynchIO* a) { Mutex::ScopedLock l(lock); + identifier = id; aio = a; newBuffer(l); - identifier = str(format("[%1% %2%]") % aio->getSocket().getLocalPort() % aio->getSocket().getPeerAddress()); } void Connector::Writer::handle(framing::AMQFrame& frame) { diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index ffddbfd1be..78aad0b60a 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -68,7 +68,7 @@ class Connector : public framing::OutputHandler, Writer(); ~Writer(); - void setAio(sys::AsynchIO*); + void init(std::string id, sys::AsynchIO*); void handle(framing::AMQFrame&); void write(sys::AsynchIO&); }; diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index ca34d82741..3bcee8ba22 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -40,6 +40,7 @@ public: private: Callback acceptedCallback; DispatchHandle handle; + const Socket& socket; public: AsynchAcceptor(const Socket& s, Callback callback); @@ -94,6 +95,7 @@ private: ClosedCallback closedCallback; BuffersEmptyCallback emptyCallback; IdleCallback idleCallback; + const Socket& socket; std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; bool queuedClose; @@ -119,7 +121,6 @@ public: void queueWriteClose(); bool writeQueueEmpty() { return writeQueue.empty(); } BufferBase* getQueuedBuffer(); - const Socket& getSocket() const { return DispatchHandle::getSocket(); } private: ~AsynchIO(); diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 5c784912b3..43fbfdf7be 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -84,19 +84,20 @@ struct Buff : public AsynchIO::BufferBase { }; class AsynchIOHandler : public OutputControl { + std::string identifier; AsynchIO* aio; ConnectionCodec::Factory* factory; ConnectionCodec* codec; bool readError; - std::string identifier; bool isClient; void write(const framing::ProtocolInitiation&); public: - AsynchIOHandler() : + AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : + identifier(id), aio(0), - factory(0), + factory(f), codec(0), readError(false), isClient(false) @@ -110,11 +111,8 @@ class AsynchIOHandler : public OutputControl { void setClient() { isClient = true; } - void init(AsynchIO* a, ConnectionCodec::Factory* f) { + void init(AsynchIO* a) { aio = a; - factory = f; - identifier = aio->getSocket().getPeerAddress(); - } // Output side @@ -133,7 +131,7 @@ class AsynchIOHandler : public OutputControl { }; void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { - AsynchIOHandler* async = new AsynchIOHandler; + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -141,7 +139,8 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, f); + async->init(aio); + // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); @@ -185,7 +184,7 @@ void AsynchIOAcceptor::connect( { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); - AsynchIOHandler* async = new AsynchIOHandler; + AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); async->setClient(); AsynchIO* aio = new AsynchIO(*socket, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), @@ -194,7 +193,8 @@ void AsynchIOAcceptor::connect( boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, f); + async->init(aio); + // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); diff --git a/qpid/cpp/src/qpid/sys/Dispatcher.h b/qpid/cpp/src/qpid/sys/Dispatcher.h index 7cc4873068..cd7a0bdb66 100644 --- a/qpid/cpp/src/qpid/sys/Dispatcher.h +++ b/qpid/cpp/src/qpid/sys/Dispatcher.h @@ -55,8 +55,8 @@ private: } state; public: - DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) : - PollerHandle(s), + DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : + PollerHandle(h), readableCallback(rCb), writableCallback(wCb), disconnectedCallback(dCb), diff --git a/qpid/cpp/src/qpid/sys/IOHandle.h b/qpid/cpp/src/qpid/sys/IOHandle.h new file mode 100644 index 0000000000..d06512da58 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/IOHandle.h @@ -0,0 +1,45 @@ +#ifndef _sys_IOHandle_h +#define _sys_IOHandle_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace sys { + +/** + * This is a class intended to abstract the Unix concept of file descriptor or the Windows concept of HANDLE + */ +class PollerHandle; +class IOHandlePrivate; +class IOHandle { + friend class PollerHandle; + +protected: + IOHandlePrivate* const impl; + + IOHandle(IOHandlePrivate*); + virtual ~IOHandle(); +}; + +}} + +#endif // _sys_IOHandle_h diff --git a/qpid/cpp/src/qpid/sys/Poller.h b/qpid/cpp/src/qpid/sys/Poller.h index 0d6b4f9308..dccc12479a 100644 --- a/qpid/cpp/src/qpid/sys/Poller.h +++ b/qpid/cpp/src/qpid/sys/Poller.h @@ -35,16 +35,16 @@ namespace sys { /** * Handle class to use for polling */ +class IOHandle; class Poller; class PollerHandlePrivate; class PollerHandle { friend class Poller; PollerHandlePrivate* const impl; - const Socket& socket; public: - PollerHandle(const Socket& s); + PollerHandle(const IOHandle& h); // Usual way to delete (will defer deletion until we // can't be returned from a Poller::wait any more) @@ -52,8 +52,6 @@ public: // Class clients shouldn't ever use this virtual ~PollerHandle(); - - const Socket& getSocket() const {return socket;} }; /** diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index 0ebfc0c330..cab95654ad 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -21,25 +21,22 @@ * under the License. * */ +#include "IOHandle.h" + #include <string> -#include "qpid/sys/Time.h" struct sockaddr; namespace qpid { namespace sys { -class SocketPrivate; -class Socket -{ - friend class Poller; - - SocketPrivate* const impl; +class Duration; +class Socket : public IOHandle +{ public: /** Create a socket wrapper for descriptor. */ Socket(); - ~Socket(); /** Create an initialized TCP socket */ void createTcp() const; @@ -106,10 +103,8 @@ public: int read(void *buf, size_t count) const; int write(const void *buf, size_t count) const; - int toFd() const; - private: - Socket(SocketPrivate*); + Socket(IOHandlePrivate*); }; }} diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 8936251f94..44b84c4239 100644 --- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -20,6 +20,7 @@ */ #include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/DeletionManager.h" #include "qpid/sys/posix/check.h" @@ -54,11 +55,13 @@ class PollerHandlePrivate { MONITORED_HUNGUP }; + int fd; ::__uint32_t events; FDStat stat; Mutex lock; - PollerHandlePrivate() : + PollerHandlePrivate(int f) : + fd(f), events(0), stat(ABSENT) { } @@ -97,9 +100,8 @@ class PollerHandlePrivate { } }; -PollerHandle::PollerHandle(const Socket& s) : - impl(new PollerHandlePrivate), - socket(s) +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(toFd(h.impl))) {} PollerHandle::~PollerHandle() { @@ -201,7 +203,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { } epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); // Record monitoring state of this fd eh.events = epe.events; @@ -212,7 +214,7 @@ void Poller::delFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); - int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { @@ -231,7 +233,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); // Record monitoring state of this fd eh.events = epe.events; @@ -247,7 +249,7 @@ void Poller::rearmFd(PollerHandle& handle) { epe.events = eh.events; epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); eh.setActive(); } diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 94c68bd5d0..cedad5c011 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -65,7 +65,8 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { + handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + socket(s) { s.setNonblocking(); ignoreSigpipe(); @@ -84,7 +85,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { errno = 0; // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. - s = h.getSocket().accept(0, 0); + s = socket.accept(0, 0); if (s) { acceptedCallback(*s); } else { @@ -112,6 +113,7 @@ AsynchIO::AsynchIO(const Socket& s, closedCallback(cCb), emptyCallback(eCb), idleCallback(iCb), + socket(s), queuedClose(false), writePending(false) { @@ -209,7 +211,7 @@ void AsynchIO::readable(DispatchHandle& h) { bufferQueue.pop_front(); errno = 0; int readCount = buff->byteCount-buff->dataCount; - int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount); + int rc = socket.read(buff->bytes + buff->dataCount, readCount); if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; @@ -276,7 +278,7 @@ void AsynchIO::writeable(DispatchHandle& h) { writeQueue.pop_back(); errno = 0; assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount); + int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { threadWriteTotal += rc; writeTotal += rc; @@ -356,9 +358,9 @@ void AsynchIO::disconnected(DispatchHandle& h) { */ void AsynchIO::close(DispatchHandle& h) { h.stopWatch(); - h.getSocket().close(); + socket.close(); if (closedCallback) { - closedCallback(*this, getSocket()); + closedCallback(*this, socket); } } diff --git a/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp new file mode 100644 index 0000000000..80b487eadc --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IOHandle.h" + +#include "PrivatePosix.h" + +namespace qpid { +namespace sys { + +int toFd(const IOHandlePrivate* h) +{ + return h->fd; +} + +IOHandle::IOHandle(IOHandlePrivate* h) : + impl(h) +{} + +IOHandle::~IOHandle() { + delete impl; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h index 9ec9770cab..33c0cd81bc 100644 --- a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h +++ b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -35,9 +35,17 @@ struct timespec& toTimespec(struct timespec& ts, const Duration& t); struct timeval& toTimeval(struct timeval& tv, const Duration& t); Duration toTime(const struct timespec& ts); -// Private socket related implementation details -class SocketPrivate; -int toFd(const SocketPrivate* s); +// Private fd related implementation details +class IOHandlePrivate { +public: + IOHandlePrivate(int f = -1) : + fd(f) + {} + + int fd; +}; + +int toFd(const IOHandlePrivate* h); }} diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index c286ebce27..99cf7210b6 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -38,19 +38,8 @@ namespace qpid { namespace sys { -class SocketPrivate { -public: - SocketPrivate(int f = -1) : - fd(f) - {} - - int fd; - - std::string getName(bool local, bool includeService = false) const; - std::string getService(bool local) const; -}; - -std::string SocketPrivate::getName(bool local, bool includeService) const +namespace { +std::string getName(int fd, bool local, bool includeService = false) { ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); @@ -80,7 +69,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const } } -std::string SocketPrivate::getService(bool local) const +std::string getService(int fd, bool local) { ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); @@ -101,21 +90,18 @@ std::string SocketPrivate::getService(bool local) const throw QPID_POSIX_ERROR(rc); return servName; } +} Socket::Socket() : - impl(new SocketPrivate) + IOHandle(new IOHandlePrivate) { createTcp(); } -Socket::Socket(SocketPrivate* sp) : - impl(sp) +Socket::Socket(IOHandlePrivate* h) : + IOHandle(h) {} -Socket::~Socket() { - delete impl; -} - void Socket::createTcp() const { int& socket = impl->fd; @@ -225,7 +211,7 @@ Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const { int afd = ::accept(impl->fd, addr, addrlen); if ( afd >= 0) - return new Socket(new SocketPrivate(afd)); + return new Socket(new IOHandlePrivate(afd)); else if (errno == EAGAIN) return 0; else throw QPID_POSIX_ERROR(errno); @@ -243,41 +229,32 @@ int Socket::write(const void *buf, size_t count) const std::string Socket::getSockname() const { - return impl->getName(true); + return getName(impl->fd, true); } std::string Socket::getPeername() const { - return impl->getName(false); + return getName(impl->fd, false); } std::string Socket::getPeerAddress() const { - return impl->getName(false, true); + return getName(impl->fd, false, true); } std::string Socket::getLocalAddress() const { - return impl->getName(true, true); + return getName(impl->fd, true, true); } uint16_t Socket::getLocalPort() const { - return atoi(impl->getService(true).c_str()); + return atoi(getService(impl->fd, true).c_str()); } uint16_t Socket::getRemotePort() const { - return atoi(impl->getService(true).c_str()); -} - -int Socket::toFd() const { - return impl->fd; -} - -int toFd(const SocketPrivate* s) -{ - return s->fd; + return atoi(getService(impl->fd, true).c_str()); } }} // namespace qpid::sys diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index a37c1f2c3e..3263652fe2 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -22,6 +22,7 @@ */ #include "qpid/sys/Socket.h" +#include "qpid/sys/Poller.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" @@ -43,8 +44,6 @@ class SocketProxy : private qpid::sys::Runnable SocketProxy(int connectPort, const std::string host="localhost") : closed(false), port(listener.listen()) { - int r=::pipe(closePipe); - if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); client.connect(host, connectPort); thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); } @@ -58,11 +57,9 @@ class SocketProxy : private qpid::sys::Runnable if (closed) return; closed=true; } - write(closePipe[1], this, 1); // Random byte to closePipe + poller.shutdown(); thread.join(); client.close(); - ::close(closePipe[0]); - ::close(closePipe[1]); } bool isClosed() const { @@ -79,71 +76,38 @@ class SocketProxy : private qpid::sys::Runnable static void throwIf(bool condition, const std::string& msg) { if (condition) throw qpid::Exception(msg); } - - struct FdSet : fd_set { - FdSet() : maxFd(0) { clear(); } - void clear() { FD_ZERO(this); } - void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } - bool isSet(int fd) const { return FD_ISSET(fd, this); } - bool operator[](int fd) const { return isSet(fd); } - - int maxFd; - }; - - enum { RD=1, WR=2, ER=4 }; - - struct Selector { - FdSet rd, wr, er; - - void set(int fd, int sets) { - if (sets & RD) rd.set(fd); - if (sets & WR) wr.set(fd); - if (sets & ER) er.set(fd); - } - - int select() { - for (;;) { - int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); - int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); - if (r == -1 && errno == EINTR) continue; - if (r < 0) throwErrno(QPID_MSG("select returned " <<r)); - return r; - } - } - }; void run() { std::auto_ptr<qpid::sys::Socket> server; try { - // Accept incoming connections, watch closePipe. - Selector accept; - accept.set(listener.toFd(), RD|ER); - accept.set(closePipe[0], RD|ER); - accept.select(); - throwIf(accept.rd[closePipe[0]], "Closed by close()"); - throwIf(!accept.rd[listener.toFd()],"Accept failed"); + qpid::sys::PollerHandle listenerHandle(listener); + poller.addFd(listenerHandle, qpid::sys::Poller::IN); + qpid::sys::Poller::Event event = poller.wait(); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); + throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed"); + + poller.delFd(listenerHandle); server.reset(listener.accept(0, 0)); - // Pump data between client & server sockets, watch closePipe. + // Pump data between client & server sockets + qpid::sys::PollerHandle clientHandle(client); + qpid::sys::PollerHandle serverHandle(*server); + poller.addFd(clientHandle, qpid::sys::Poller::IN); + poller.addFd(serverHandle, qpid::sys::Poller::IN); char buffer[1024]; for (;;) { - Selector select; - select.set(server->toFd(), RD|ER); - select.set(client.toFd(), RD|ER); - select.set(closePipe[0], RD|ER); - select.select(); - throwIf(select.rd[closePipe[0]], "Closed by close()"); - // Read even if fd is in error to throw a useful exception. - bool gotData=false; - if (select.rd[server->toFd()] || select.er[server->toFd()]) { + qpid::sys::Poller::Event event = poller.wait(); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); + throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected"); + if (event.handle == &serverHandle) { client.write(buffer, server->read(buffer, sizeof(buffer))); - gotData=true; - } - if (select.rd[client.toFd()] || select.er[client.toFd()]) { + poller.rearmFd(serverHandle); + } else if (event.handle == &clientHandle) { server->write(buffer, client.read(buffer, sizeof(buffer))); - gotData=true; + poller.rearmFd(clientHandle); + } else { + throwIf(true, "No handle ready"); } - throwIf(!gotData, "No data from select()"); } } catch (const std::exception& e) { @@ -155,9 +119,9 @@ class SocketProxy : private qpid::sys::Runnable mutable qpid::sys::Mutex lock; bool closed; + qpid::sys::Poller poller; qpid::sys::Socket client, listener; uint16_t port; - int closePipe[2]; qpid::sys::Thread thread; }; |