diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-07-12 01:48:13 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-07-12 01:48:13 +0000 |
commit | ef1d54d57f354765e6ea1ae73807a4a633c73998 (patch) | |
tree | 8ce4fada81e4a844b8bc7b523d657dd84e79857c /cpp/src | |
parent | fa267e310c233cfc9fc3b67b3c080adb4911f69f (diff) | |
download | qpid-python-ef1d54d57f354765e6ea1ae73807a4a633c73998.tar.gz |
* Add libuuid to libcommon link (for when apr goes away)
* Latest version of AsynchIO code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@555455 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 43 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 256 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Poller.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 92 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 120 |
7 files changed, 431 insertions, 123 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index bbb1c6655e..3d97436db6 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -121,6 +121,7 @@ libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter libqpidcommon_la_LIBADD = \ -lboost_program_options \ + -luuid \ libLogger.la \ $(APR_LIBS) \ $(LIB_DLOPEN) \ diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index b346c11706..7accde17b0 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -57,42 +57,61 @@ private: * * Writer accepts a buffer and queues it for writing; can also be given * a callback for when writing is "idle" (ie fd is writable, but nothing to write) + * + * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting + * the contained DispatchHandle */ -class AsynchIO { +class AsynchIO : private DispatchHandle { public: struct Buffer { + typedef boost::function1<void, const Buffer&> RecycleStorage; + char* const bytes; const int32_t byteCount; + int32_t dataStart; + int32_t dataCount; Buffer(char* const b, const int32_t s) : bytes(b), - byteCount(s) + byteCount(s), + dataStart(0), + dataCount(s) + {} + + virtual ~Buffer() {} }; - typedef boost::function2<void, const Buffer&, int32_t> ReadCallback; - typedef boost::function0<void> EofCallback; - typedef boost::function0<void> BuffersEmptyCallback; - typedef boost::function1<void, int> IdleCallback; + typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; + typedef boost::function1<void, AsynchIO&> EofCallback; + typedef boost::function1<void, AsynchIO&> DisconnectCallback; + typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback; + typedef boost::function1<void, AsynchIO&> IdleCallback; private: ReadCallback readCallback; EofCallback eofCallback; + DisconnectCallback disCallback; BuffersEmptyCallback emptyCallback; IdleCallback idleCallback; - DispatchHandle handle; - std::deque<Buffer> bufferQueue; - std::deque<Buffer> writeQueue; + std::deque<Buffer*> bufferQueue; + std::deque<Buffer*> writeQueue; public: - AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); + AsynchIO(int fd, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); + void queueForDeletion(); + void start(Poller::shared_ptr poller); - void QueueReadBuffer(const Buffer& buff); - void QueueWrite(const Buffer& buff); + void queueReadBuffer(Buffer* buff); + void queueWrite(Buffer* buff); private: + ~AsynchIO(); void readable(DispatchHandle& handle); void writeable(DispatchHandle& handle); + void disconnected(DispatchHandle& handle); }; }} diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 9a20e2c3bc..3a1da13bd0 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -40,10 +40,10 @@ void Dispatcher::run() { // If can read/write then dispatch appropriate callbacks if (h) { - h->dispatchCallbacks(event.dir); + h->dispatchCallbacks(event.type); } else { // Handle shutdown - switch (event.dir) { + switch (event.type) { case Poller::SHUTDOWN: goto dispatcher_shutdown; default: @@ -57,7 +57,11 @@ dispatcher_shutdown: ; } -void DispatchHandle::watch(Poller::shared_ptr poller0) { +DispatchHandle::~DispatchHandle() { + stopWatch(); +} + +void DispatchHandle::startWatch(Poller::shared_ptr poller0) { bool r = readableCallback; bool w = writableCallback; @@ -84,25 +88,26 @@ void DispatchHandle::watch(Poller::shared_ptr poller0) { } void DispatchHandle::rewatch() { - assert(poller); bool r = readableCallback; bool w = writableCallback; ScopedLock<Mutex> lock(stateLock); switch(state) { - case DispatchHandle::IDLE: - assert(false); + case IDLE: break; - case DispatchHandle::DELAYED_R: - case DispatchHandle::DELAYED_W: - case DispatchHandle::CALLBACK: + case DELAYED_R: + case DELAYED_W: + case CALLBACK: state = r ? (w ? DELAYED_RW : DELAYED_R) : DELAYED_W; break; - case DispatchHandle::INACTIVE: - case DispatchHandle::ACTIVE_R: - case DispatchHandle::ACTIVE_W: { + case DELAYED_DELETE: + break; + case INACTIVE: + case ACTIVE_R: + case ACTIVE_W: { + assert(poller); Poller::Direction d = r ? (w ? Poller::INOUT : Poller::IN) : Poller::OUT; @@ -112,42 +117,43 @@ void DispatchHandle::rewatch() { ACTIVE_W; break; } - case DispatchHandle::DELAYED_RW: - case DispatchHandle::ACTIVE_RW: + case DELAYED_RW: + case ACTIVE_RW: // Don't need to do anything already waiting for readable/writable break; } } void DispatchHandle::rewatchRead() { - assert(poller); if (!readableCallback) { return; } ScopedLock<Mutex> lock(stateLock); switch(state) { - case DispatchHandle::IDLE: - assert(false); + case IDLE: break; - case DispatchHandle::DELAYED_R: - case DispatchHandle::DELAYED_RW: + case DELAYED_R: + case DELAYED_RW: + case DELAYED_DELETE: break; - case DispatchHandle::DELAYED_W: + case DELAYED_W: state = DELAYED_RW; break; - case DispatchHandle::CALLBACK: + case CALLBACK: state = DELAYED_R; break; - case DispatchHandle::ACTIVE_R: - case DispatchHandle::ACTIVE_RW: - // Nothing to do: already wating for readable + case ACTIVE_R: + case ACTIVE_RW: + // Nothing to do: already waiting for readable break; - case DispatchHandle::INACTIVE: + case INACTIVE: + assert(poller); poller->modFd(*this, Poller::IN); state = ACTIVE_R; break; - case DispatchHandle::ACTIVE_W: + case ACTIVE_W: + assert(poller); poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; break; @@ -155,101 +161,245 @@ void DispatchHandle::rewatchRead() { } void DispatchHandle::rewatchWrite() { - assert(poller); if (!writableCallback) { return; } ScopedLock<Mutex> lock(stateLock); switch(state) { - case DispatchHandle::IDLE: - assert(false); + case IDLE: break; - case DispatchHandle::DELAYED_W: - case DispatchHandle::DELAYED_RW: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_DELETE: break; - case DispatchHandle::DELAYED_R: + case DELAYED_R: state = DELAYED_RW; break; - case DispatchHandle::CALLBACK: + case CALLBACK: state = DELAYED_W; break; - case DispatchHandle::INACTIVE: + case INACTIVE: + assert(poller); poller->modFd(*this, Poller::OUT); state = ACTIVE_W; break; - case DispatchHandle::ACTIVE_R: + case ACTIVE_R: + assert(poller); poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; break; - case DispatchHandle::ACTIVE_W: - case DispatchHandle::ACTIVE_RW: + case ACTIVE_W: + case ACTIVE_RW: // Nothing to do: already waiting for writable break; } } +void DispatchHandle::unwatchRead() { + if (!readableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + break; + case DELAYED_R: + state = CALLBACK; + break; + case DELAYED_RW: + state = DELAYED_W; + break; + case DELAYED_W: + case CALLBACK: + case DELAYED_DELETE: + break; + case ACTIVE_R: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + case ACTIVE_RW: + assert(poller); + poller->modFd(*this, Poller::OUT); + state = ACTIVE_W; + break; + case ACTIVE_W: + case INACTIVE: + break; + } +} + +void DispatchHandle::unwatchWrite() { + if (!writableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + break; + case DELAYED_W: + state = CALLBACK; + break; + case DELAYED_RW: + state = DELAYED_R; + break; + case DELAYED_R: + case CALLBACK: + case DELAYED_DELETE: + break; + case ACTIVE_W: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + case ACTIVE_RW: + assert(poller); + poller->modFd(*this, Poller::IN); + state = ACTIVE_R; + break; + case ACTIVE_R: + case INACTIVE: + break; + } +} + void DispatchHandle::unwatch() { - assert(poller); ScopedLock<Mutex> lock(stateLock); + switch (state) { + case IDLE: + break; + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case CALLBACK: + state = CALLBACK; + break; + case DELAYED_DELETE: + break; + default: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + } +} + +void DispatchHandle::stopWatch() { + ScopedLock<Mutex> lock(stateLock); + if ( state == IDLE) { + return; + } + assert(poller); poller->delFd(*this); poller.reset(); state = IDLE; } -void DispatchHandle::dispatchCallbacks(Poller::Direction dir) { - // Note that we are now doing the callbacks +// The slightly strange switch structure +// is to ensure that the lock is released before +// we do the delete +void DispatchHandle::doDelete() { + // If we're in the middle of a callback defer the delete { ScopedLock<Mutex> lock(stateLock); - assert( - state == ACTIVE_R || - state == ACTIVE_W || - state == ACTIVE_RW); + switch (state) { + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case CALLBACK: + case DELAYED_DELETE: + state = DELAYED_DELETE; + return; + default: + break; + } + } + // If we're not then do it right away + delete this; +} - state = CALLBACK; +void DispatchHandle::dispatchCallbacks(Poller::EventType type) { + // Note that we are now doing the callbacks + { + ScopedLock<Mutex> lock(stateLock); + + // Set up to wait for same events next time unless reset + switch(state) { + case ACTIVE_R: + state = DELAYED_R; + break; + case ACTIVE_W: + state = DELAYED_W; + break; + case ACTIVE_RW: + state = DELAYED_RW; + break; + default: + assert(false); + } } // Do callbacks - whilst we are doing the callbacks we are prevented from processing // the same handle until we re-enable it. To avoid rentering the callbacks for a single // handle re-enabling in the callbacks is actually deferred until they are complete. - switch (dir) { - case Poller::IN: + switch (type) { + case Poller::READABLE: readableCallback(*this); break; - case Poller::OUT: + case Poller::WRITABLE: writableCallback(*this); break; - case Poller::INOUT: + case Poller::READ_WRITABLE: readableCallback(*this); writableCallback(*this); break; + case Poller::DISCONNECTED: + { + ScopedLock<Mutex> lock(stateLock); + state = CALLBACK; + } + if (disconnectedCallback) { + disconnectedCallback(*this); + } + break; default: assert(false); } // If any of the callbacks re-enabled reading/writing then actually // do it now + { ScopedLock<Mutex> lock(stateLock); switch (state) { case DELAYED_R: poller->modFd(*this, Poller::IN); state = ACTIVE_R; - break; + return; case DELAYED_W: poller->modFd(*this, Poller::OUT); state = ACTIVE_W; - break; + return; case DELAYED_RW: poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; - break; + return; case CALLBACK: state = INACTIVE; - break; + return; + case IDLE: + return; default: // This should be impossible assert(false); + return; + case DELAYED_DELETE: + break; + } } + delete this; } }} diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h index 3e43ca3bc1..60e9522bc1 100644 --- a/cpp/src/qpid/sys/Dispatcher.h +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -44,26 +44,39 @@ public: private: Callback readableCallback; Callback writableCallback; + Callback disconnectedCallback; Poller::shared_ptr poller; Mutex stateLock; - enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW} state; + enum { + IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, + CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE + } state; public: - DispatchHandle(int fd, Callback rCb, Callback wCb) : + DispatchHandle(int fd, Callback rCb, Callback wCb, Callback dCb) : PollerHandle(fd), readableCallback(rCb), writableCallback(wCb), + disconnectedCallback(dCb), state(IDLE) {} - void watch(Poller::shared_ptr poller); + ~DispatchHandle(); + + void startWatch(Poller::shared_ptr poller); void rewatch(); void rewatchRead(); void rewatchWrite(); void unwatch(); + void unwatchRead(); + void unwatchWrite(); + void stopWatch(); + +protected: + void doDelete(); private: - void dispatchCallbacks(Poller::Direction dir); + void dispatchCallbacks(Poller::EventType dir); }; class Dispatcher : public Runnable { diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h index 6fedd669a0..55fead55aa 100644 --- a/cpp/src/qpid/sys/Poller.h +++ b/cpp/src/qpid/sys/Poller.h @@ -61,20 +61,29 @@ public: typedef boost::shared_ptr<Poller> shared_ptr; enum Direction { - NONE, + NONE = 0, IN, OUT, - INOUT, - SHUTDOWN + INOUT + }; + + enum EventType { + INVALID = 0, + READABLE, + WRITABLE, + READ_WRITABLE, + DISCONNECTED, + SHUTDOWN, + TIMEOUT }; struct Event { PollerHandle* handle; - Direction dir; + EventType type; - Event(PollerHandle* handle0, Direction dir0) : + Event(PollerHandle* handle0, EventType type0) : handle(handle0), - dir(dir0) { + type(type0) { } }; diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 65b2255023..8c3bdbc7d5 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -40,7 +40,9 @@ class PollerHandlePrivate { enum FDStat { ABSENT, MONITORED, - INACTIVE + INACTIVE, + HUNGUP, + MONITORED_HUNGUP }; ::__uint32_t events; @@ -51,6 +53,39 @@ class PollerHandlePrivate { events(0), stat(ABSENT) { } + + bool isActive() const { + return stat == MONITORED || stat == MONITORED_HUNGUP; + } + + void setActive() { + stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED; + } + + bool isInactive() const { + return stat == INACTIVE || stat == HUNGUP; + } + + void setInactive() { + stat = INACTIVE; + } + + bool isIdle() const { + return stat == ABSENT; + } + + void setIdle() { + stat = ABSENT; + } + + bool isHungup() const { + return stat == MONITORED_HUNGUP || stat == HUNGUP; + } + + void setHungup() { + assert(stat == MONITORED); + stat = HUNGUP; + } }; PollerHandle::PollerHandle(int fd0) : @@ -108,13 +143,16 @@ class PollerPrivate { } } - static Poller::Direction epollToDirection(::__uint32_t events) { + static Poller::EventType epollToDirection(::__uint32_t events) { + // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs + // can give you both! + events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events; ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT); switch (e) { - case ::EPOLLIN: return Poller::IN; - case ::EPOLLOUT: return Poller::OUT; - case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT; - default: return Poller::NONE; + case ::EPOLLIN: return Poller::READABLE; + case ::EPOLLOUT: return Poller::WRITABLE; + case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE; + default: return (events & ::EPOLLHUP) ? Poller::DISCONNECTED : Poller::INVALID; } } @@ -138,11 +176,11 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { ::epoll_event epe; int op; - if (eh.stat == PollerHandlePrivate::ABSENT) { + if (eh.isIdle()) { op = EPOLL_CTL_ADD; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; } else { - assert(eh.stat == PollerHandlePrivate::MONITORED); + assert(eh.isActive()); op = EPOLL_CTL_MOD; epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); } @@ -152,22 +190,27 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { // Record monitoring state of this fd eh.events = epe.events; - eh.stat = PollerHandlePrivate::MONITORED; + eh.setActive(); } void Poller::delFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.stat != PollerHandlePrivate::ABSENT); - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0)); - eh.stat = PollerHandlePrivate::ABSENT; + assert(!eh.isIdle()); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0); + // Ignore EBADF since deleting a nonexistent fd has the overall required result! + // And allows the case where a sloppy program closes the fd and then does the delFd() + if (rc == -1 && errno != EBADF) { + QPID_POSIX_CHECK(rc); + } + eh.setIdle(); } // modFd is equivalent to delFd followed by addFd void Poller::modFd(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.stat != PollerHandlePrivate::ABSENT); + assert(!eh.isIdle()); ::epoll_event epe; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; @@ -177,13 +220,13 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { // Record monitoring state of this fd eh.events = epe.events; - eh.stat = PollerHandlePrivate::MONITORED; + eh.setActive(); } void Poller::rearmFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.stat == PollerHandlePrivate::INACTIVE); + assert(eh.isInactive()); ::epoll_event epe; epe.events = eh.events; @@ -191,7 +234,7 @@ void Poller::rearmFd(PollerHandle& handle) { QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); - eh.stat = PollerHandlePrivate::MONITORED; + eh.setActive(); } void Poller::shutdown() { @@ -229,8 +272,17 @@ Poller::Event Poller::wait(Duration timeout) { ScopedLock<Mutex> l(eh.lock); // the handle could have gone inactive since we left the epoll_wait - if (eh.stat == PollerHandlePrivate::MONITORED) { - eh.stat = PollerHandlePrivate::INACTIVE; + if (eh.isActive()) { + // If the connection has been hungup we could still be readable + // (just not writable), allow us to readable until we get here again + if (epe.events & ::EPOLLHUP) { + if (eh.isHungup()) { + return Event(handle, DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); + } return Event(handle, PollerPrivate::epollToDirection(epe.events)); } } @@ -245,8 +297,8 @@ Poller::Event Poller::wait(Duration timeout) { // If the wait wasn't indefinite, but we were interrupted then we have to return // with a timeout as we don't know how long we've waited so far and so we can't // continue the wait. - if (rc == 0 || timeoutMs == -1) { - return Event(0, NONE); + if (rc == 0 || timeoutMs != -1) { + return Event(0, TIMEOUT); } } while (true); } diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 400c2080b2..473ef7936f 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -27,6 +27,7 @@ #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> +#include <signal.h> #include <errno.h> #include <boost/bind.hpp> @@ -42,6 +43,14 @@ void nonblocking(int fd) { QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK)); } +/* + * Make *process* not generate SIGPIPE when writing to closed + * pipe/socket (necessary as default action is to terminate process) + */ +void ignoreSigpipe() { + ::signal(SIGPIPE, SIG_IGN); +} + } /* @@ -50,13 +59,14 @@ void nonblocking(int fd) { AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) : acceptedCallback(callback), - handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) { + handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { nonblocking(fd); + ignoreSigpipe(); } void AsynchAcceptor::start(Poller::shared_ptr poller) { - handle.watch(poller); + handle.startWatch(poller); } /* @@ -84,28 +94,50 @@ void AsynchAcceptor::readable(DispatchHandle& h) { /* * Asynch reader/writer */ -AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb, IdleCallback iCb) : +AsynchIO::AsynchIO(int fd, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + BuffersEmptyCallback eCb, IdleCallback iCb) : + + DispatchHandle(fd, + boost::bind(&AsynchIO::readable, this, _1), + boost::bind(&AsynchIO::writeable, this, _1), + boost::bind(&AsynchIO::disconnected, this, _1)), readCallback(rCb), eofCallback(eofCb), + disCallback(disCb), emptyCallback(eCb), - idleCallback(iCb), - handle(fd, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1)) { + idleCallback(iCb) { nonblocking(fd); } +struct deleter +{ + template <typename T> + void operator()(T *ptr){ delete ptr;} +}; + +AsynchIO::~AsynchIO() { + std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); + std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); +} + +void AsynchIO::queueForDeletion() { + DispatchHandle::doDelete(); +} + void AsynchIO::start(Poller::shared_ptr poller) { - handle.watch(poller); + DispatchHandle::startWatch(poller); } -void AsynchIO::QueueReadBuffer(const Buffer& buff) { +void AsynchIO::queueReadBuffer(Buffer* buff) { bufferQueue.push_front(buff); - handle.rewatchRead(); + DispatchHandle::rewatchRead(); } -void AsynchIO::QueueWrite(const Buffer& buff) { +void AsynchIO::queueWrite(Buffer* buff) { writeQueue.push_front(buff); - handle.rewatchWrite(); + DispatchHandle::rewatchWrite(); } /* @@ -117,22 +149,34 @@ void AsynchIO::readable(DispatchHandle& h) { // (Try to) get a buffer if (!bufferQueue.empty()) { // Read into buffer - Buffer buff = bufferQueue.back(); + Buffer* buff = bufferQueue.back(); bufferQueue.pop_back(); errno = 0; - int rc = ::read(h.getFD(), buff.bytes, buff.byteCount); + int rc = ::read(h.getFD(), buff->bytes, buff->byteCount); if (rc == 0) { - eofCallback(); + eofCallback(*this); + h.unwatchRead(); + return; } else if (rc > 0) { - readCallback(buff, rc); + buff->dataStart = 0; + buff->dataCount = rc; + readCallback(*this, buff); + if (rc != buff->byteCount) { + // If we didn't fill the read buffer then time to stop reading + return; + } } else { // Put buffer back bufferQueue.push_back(buff); - if (errno == EAGAIN) { - // We must have just put a buffer back so we know - // we can do this - h.rewatchRead(); + // This is effectively the same as eof + if (errno == ECONNRESET) { + eofCallback(*this); + h.unwatchRead(); + return; + } else if (errno == EAGAIN) { + // We have just put a buffer back so we know + // we can carry on watching for reads return; } else { QPID_POSIX_CHECK(rc); @@ -141,10 +185,11 @@ void AsynchIO::readable(DispatchHandle& h) { } else { // Something to read but no buffer if (emptyCallback) { - emptyCallback(); + emptyCallback(*this); } // If we still have no buffers we can't do anything more if (bufferQueue.empty()) { + h.unwatchRead(); return; } @@ -160,36 +205,55 @@ void AsynchIO::writeable(DispatchHandle& h) { // See if we've got something to write if (!writeQueue.empty()) { // Write buffer - Buffer buff = writeQueue.back(); + Buffer* buff = writeQueue.back(); writeQueue.pop_back(); errno = 0; - int rc = ::write(h.getFD(), buff.bytes, buff.byteCount); + assert(buff->dataStart+buff->dataCount <= buff->byteCount); + int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { + // If we didn't write full buffer put rest back + if (rc != buff->dataCount) { + buff->dataStart += rc; + buff->dataCount -= rc; + writeQueue.push_back(buff); + return; + } + // Recycle the buffer - QueueReadBuffer(buff); + queueReadBuffer(buff); } else { // Put buffer back writeQueue.push_back(buff); - - if (errno == EAGAIN) { + if (errno == ECONNRESET || errno == EPIPE) { + // Just stop watching for write here - we'll get a + // disconnect callback soon enough + h.unwatchWrite(); + return; + } else if (errno == EAGAIN) { // We have just put a buffer back so we know - // we can do this - h.rewatchWrite(); + // we can carry on watching for writes return; } else { QPID_POSIX_CHECK(rc); } } } else { - // Something to read but no buffer + // Fd is writable, but nothing to write if (idleCallback) { - idleCallback(h.getFD()); + idleCallback(*this); } // If we still have no buffers to write we can't do anything more if (writeQueue.empty()) { + h.unwatchWrite(); return; } } } while (true); } +void AsynchIO::disconnected(DispatchHandle& h) { + if (disCallback) { + disCallback(*this); + h.unwatch(); + } +} |