diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
commit | dd53b33c3badd538d2d25a35146d9ab032573cc0 (patch) | |
tree | 305a9f3e6cdc5d88d6c78638c75dda9d3ddb9831 /cpp/src/qpid/sys | |
parent | 8ac8e19e4805e78c3adcab66f1aab2ef5190f48e (diff) | |
download | qpid-python-dd53b33c3badd538d2d25a35146d9ab032573cc0.tar.gz |
Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than Socket) that
can also be added to a Poller and waited for.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/IOHandle.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Poller.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Socket.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/IOHandle.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/PrivatePosix.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 51 |
11 files changed, 153 insertions, 83 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index ca34d82741..3bcee8ba22 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -40,6 +40,7 @@ public: private: Callback acceptedCallback; DispatchHandle handle; + const Socket& socket; public: AsynchAcceptor(const Socket& s, Callback callback); @@ -94,6 +95,7 @@ private: ClosedCallback closedCallback; BuffersEmptyCallback emptyCallback; IdleCallback idleCallback; + const Socket& socket; std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; bool queuedClose; @@ -119,7 +121,6 @@ public: void queueWriteClose(); bool writeQueueEmpty() { return writeQueue.empty(); } BufferBase* getQueuedBuffer(); - const Socket& getSocket() const { return DispatchHandle::getSocket(); } private: ~AsynchIO(); diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 5c784912b3..43fbfdf7be 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -84,19 +84,20 @@ struct Buff : public AsynchIO::BufferBase { }; class AsynchIOHandler : public OutputControl { + std::string identifier; AsynchIO* aio; ConnectionCodec::Factory* factory; ConnectionCodec* codec; bool readError; - std::string identifier; bool isClient; void write(const framing::ProtocolInitiation&); public: - AsynchIOHandler() : + AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : + identifier(id), aio(0), - factory(0), + factory(f), codec(0), readError(false), isClient(false) @@ -110,11 +111,8 @@ class AsynchIOHandler : public OutputControl { void setClient() { isClient = true; } - void init(AsynchIO* a, ConnectionCodec::Factory* f) { + void init(AsynchIO* a) { aio = a; - factory = f; - identifier = aio->getSocket().getPeerAddress(); - } // Output side @@ -133,7 +131,7 @@ class AsynchIOHandler : public OutputControl { }; void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { - AsynchIOHandler* async = new AsynchIOHandler; + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -141,7 +139,8 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, f); + async->init(aio); + // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); @@ -185,7 +184,7 @@ void AsynchIOAcceptor::connect( { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); - AsynchIOHandler* async = new AsynchIOHandler; + AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); async->setClient(); AsynchIO* aio = new AsynchIO(*socket, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), @@ -194,7 +193,8 @@ void AsynchIOAcceptor::connect( boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, f); + async->init(aio); + // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h index 7cc4873068..cd7a0bdb66 100644 --- a/cpp/src/qpid/sys/Dispatcher.h +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -55,8 +55,8 @@ private: } state; public: - DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) : - PollerHandle(s), + DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : + PollerHandle(h), readableCallback(rCb), writableCallback(wCb), disconnectedCallback(dCb), diff --git a/cpp/src/qpid/sys/IOHandle.h b/cpp/src/qpid/sys/IOHandle.h new file mode 100644 index 0000000000..d06512da58 --- /dev/null +++ b/cpp/src/qpid/sys/IOHandle.h @@ -0,0 +1,45 @@ +#ifndef _sys_IOHandle_h +#define _sys_IOHandle_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace sys { + +/** + * This is a class intended to abstract the Unix concept of file descriptor or the Windows concept of HANDLE + */ +class PollerHandle; +class IOHandlePrivate; +class IOHandle { + friend class PollerHandle; + +protected: + IOHandlePrivate* const impl; + + IOHandle(IOHandlePrivate*); + virtual ~IOHandle(); +}; + +}} + +#endif // _sys_IOHandle_h diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h index 0d6b4f9308..dccc12479a 100644 --- a/cpp/src/qpid/sys/Poller.h +++ b/cpp/src/qpid/sys/Poller.h @@ -35,16 +35,16 @@ namespace sys { /** * Handle class to use for polling */ +class IOHandle; class Poller; class PollerHandlePrivate; class PollerHandle { friend class Poller; PollerHandlePrivate* const impl; - const Socket& socket; public: - PollerHandle(const Socket& s); + PollerHandle(const IOHandle& h); // Usual way to delete (will defer deletion until we // can't be returned from a Poller::wait any more) @@ -52,8 +52,6 @@ public: // Class clients shouldn't ever use this virtual ~PollerHandle(); - - const Socket& getSocket() const {return socket;} }; /** diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index 0ebfc0c330..cab95654ad 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -21,25 +21,22 @@ * under the License. * */ +#include "IOHandle.h" + #include <string> -#include "qpid/sys/Time.h" struct sockaddr; namespace qpid { namespace sys { -class SocketPrivate; -class Socket -{ - friend class Poller; - - SocketPrivate* const impl; +class Duration; +class Socket : public IOHandle +{ public: /** Create a socket wrapper for descriptor. */ Socket(); - ~Socket(); /** Create an initialized TCP socket */ void createTcp() const; @@ -106,10 +103,8 @@ public: int read(void *buf, size_t count) const; int write(const void *buf, size_t count) const; - int toFd() const; - private: - Socket(SocketPrivate*); + Socket(IOHandlePrivate*); }; }} diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 8936251f94..44b84c4239 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -20,6 +20,7 @@ */ #include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/DeletionManager.h" #include "qpid/sys/posix/check.h" @@ -54,11 +55,13 @@ class PollerHandlePrivate { MONITORED_HUNGUP }; + int fd; ::__uint32_t events; FDStat stat; Mutex lock; - PollerHandlePrivate() : + PollerHandlePrivate(int f) : + fd(f), events(0), stat(ABSENT) { } @@ -97,9 +100,8 @@ class PollerHandlePrivate { } }; -PollerHandle::PollerHandle(const Socket& s) : - impl(new PollerHandlePrivate), - socket(s) +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(toFd(h.impl))) {} PollerHandle::~PollerHandle() { @@ -201,7 +203,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { } epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); // Record monitoring state of this fd eh.events = epe.events; @@ -212,7 +214,7 @@ void Poller::delFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); - int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { @@ -231,7 +233,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); // Record monitoring state of this fd eh.events = epe.events; @@ -247,7 +249,7 @@ void Poller::rearmFd(PollerHandle& handle) { epe.events = eh.events; epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); eh.setActive(); } diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 94c68bd5d0..cedad5c011 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -65,7 +65,8 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { + handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + socket(s) { s.setNonblocking(); ignoreSigpipe(); @@ -84,7 +85,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { errno = 0; // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. - s = h.getSocket().accept(0, 0); + s = socket.accept(0, 0); if (s) { acceptedCallback(*s); } else { @@ -112,6 +113,7 @@ AsynchIO::AsynchIO(const Socket& s, closedCallback(cCb), emptyCallback(eCb), idleCallback(iCb), + socket(s), queuedClose(false), writePending(false) { @@ -209,7 +211,7 @@ void AsynchIO::readable(DispatchHandle& h) { bufferQueue.pop_front(); errno = 0; int readCount = buff->byteCount-buff->dataCount; - int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount); + int rc = socket.read(buff->bytes + buff->dataCount, readCount); if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; @@ -276,7 +278,7 @@ void AsynchIO::writeable(DispatchHandle& h) { writeQueue.pop_back(); errno = 0; assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount); + int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { threadWriteTotal += rc; writeTotal += rc; @@ -356,9 +358,9 @@ void AsynchIO::disconnected(DispatchHandle& h) { */ void AsynchIO::close(DispatchHandle& h) { h.stopWatch(); - h.getSocket().close(); + socket.close(); if (closedCallback) { - closedCallback(*this, getSocket()); + closedCallback(*this, socket); } } diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp new file mode 100644 index 0000000000..80b487eadc --- /dev/null +++ b/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IOHandle.h" + +#include "PrivatePosix.h" + +namespace qpid { +namespace sys { + +int toFd(const IOHandlePrivate* h) +{ + return h->fd; +} + +IOHandle::IOHandle(IOHandlePrivate* h) : + impl(h) +{} + +IOHandle::~IOHandle() { + delete impl; +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h index 9ec9770cab..33c0cd81bc 100644 --- a/cpp/src/qpid/sys/posix/PrivatePosix.h +++ b/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -35,9 +35,17 @@ struct timespec& toTimespec(struct timespec& ts, const Duration& t); struct timeval& toTimeval(struct timeval& tv, const Duration& t); Duration toTime(const struct timespec& ts); -// Private socket related implementation details -class SocketPrivate; -int toFd(const SocketPrivate* s); +// Private fd related implementation details +class IOHandlePrivate { +public: + IOHandlePrivate(int f = -1) : + fd(f) + {} + + int fd; +}; + +int toFd(const IOHandlePrivate* h); }} diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index c286ebce27..99cf7210b6 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -38,19 +38,8 @@ namespace qpid { namespace sys { -class SocketPrivate { -public: - SocketPrivate(int f = -1) : - fd(f) - {} - - int fd; - - std::string getName(bool local, bool includeService = false) const; - std::string getService(bool local) const; -}; - -std::string SocketPrivate::getName(bool local, bool includeService) const +namespace { +std::string getName(int fd, bool local, bool includeService = false) { ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); @@ -80,7 +69,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const } } -std::string SocketPrivate::getService(bool local) const +std::string getService(int fd, bool local) { ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); @@ -101,21 +90,18 @@ std::string SocketPrivate::getService(bool local) const throw QPID_POSIX_ERROR(rc); return servName; } +} Socket::Socket() : - impl(new SocketPrivate) + IOHandle(new IOHandlePrivate) { createTcp(); } -Socket::Socket(SocketPrivate* sp) : - impl(sp) +Socket::Socket(IOHandlePrivate* h) : + IOHandle(h) {} -Socket::~Socket() { - delete impl; -} - void Socket::createTcp() const { int& socket = impl->fd; @@ -225,7 +211,7 @@ Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const { int afd = ::accept(impl->fd, addr, addrlen); if ( afd >= 0) - return new Socket(new SocketPrivate(afd)); + return new Socket(new IOHandlePrivate(afd)); else if (errno == EAGAIN) return 0; else throw QPID_POSIX_ERROR(errno); @@ -243,41 +229,32 @@ int Socket::write(const void *buf, size_t count) const std::string Socket::getSockname() const { - return impl->getName(true); + return getName(impl->fd, true); } std::string Socket::getPeername() const { - return impl->getName(false); + return getName(impl->fd, false); } std::string Socket::getPeerAddress() const { - return impl->getName(false, true); + return getName(impl->fd, false, true); } std::string Socket::getLocalAddress() const { - return impl->getName(true, true); + return getName(impl->fd, true, true); } uint16_t Socket::getLocalPort() const { - return atoi(impl->getService(true).c_str()); + return atoi(getService(impl->fd, true).c_str()); } uint16_t Socket::getRemotePort() const { - return atoi(impl->getService(true).c_str()); -} - -int Socket::toFd() const { - return impl->fd; -} - -int toFd(const SocketPrivate* s) -{ - return s->fd; + return atoi(getService(impl->fd, true).c_str()); } }} // namespace qpid::sys |