diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 167 |
1 files changed, 161 insertions, 6 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 7598eefe83..c8a8b3d0f1 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -26,8 +26,8 @@ #include "check.h" -// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction -// could (should) be promoted to be platform portable +// TODO The basic algorithm here is not really POSIX specific and with a +// bit more abstraction could (should) be promoted to be platform portable #include <unistd.h> #include <sys/socket.h> #include <signal.h> @@ -65,24 +65,55 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms /* * Asynch Acceptor */ +namespace qpid { +namespace sys { + +class AsynchAcceptorPrivate { +public: + AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback); + void start(Poller::shared_ptr poller); + +private: + void readable(DispatchHandle& handle); + +private: + AsynchAcceptor::Callback acceptedCallback; + DispatchHandle handle; + const Socket& socket; + +}; + +}} // namespace qpid::sys AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : + impl(new AsynchAcceptorPrivate(s, callback)) +{} + +AsynchAcceptor::~AsynchAcceptor() +{ delete impl;} + +void AsynchAcceptor::start(Poller::shared_ptr poller) { + impl->start(poller); +} + +AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s, + AsynchAcceptor::Callback callback) : acceptedCallback(callback), - handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + handle(s, boost::bind(&AsynchAcceptorPrivate::readable, this, _1), 0, 0), socket(s) { s.setNonblocking(); ignoreSigpipe(); } -void AsynchAcceptor::start(Poller::shared_ptr poller) { +void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) { handle.startWatch(poller); } /* * We keep on accepting as long as there is something to accept */ -void AsynchAcceptor::readable(DispatchHandle& h) { +void AsynchAcceptorPrivate::readable(DispatchHandle& h) { Socket* s; do { errno = 0; @@ -106,6 +137,36 @@ void AsynchAcceptor::readable(DispatchHandle& h) { /* * Asynch Connector */ +namespace qpid { +namespace sys { +namespace posix { + +/* + * POSIX version of AsynchIO TCP socket connector. + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. + */ +class AsynchConnector : public qpid::sys::AsynchConnector, + private DispatchHandle { + +private: + void connComplete(DispatchHandle& handle); + void failure(int, std::string); + +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + const Socket& socket; + +public: + AsynchConnector(const Socket& socket, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb = 0); +}; AsynchConnector::AsynchConnector(const Socket& s, Poller::shared_ptr poller, @@ -155,9 +216,85 @@ void AsynchConnector::failure(int errCode, std::string message) DispatchHandle::doDelete(); } +} // namespace posix + + +AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new qpid::sys::posix::AsynchConnector(s, + poller, + hostname, + port, + connCb, + failCb); +} + /* - * Asynch reader/writer + * POSIX version of AsynchIO reader/writer + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. */ +namespace posix { + +class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle { + +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + + // Methods inherited from qpid::sys::AsynchIO + + virtual void queueForDeletion(); + + virtual void start(Poller::shared_ptr poller); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual BufferBase* getQueuedBuffer(); + +private: + ~AsynchIO(); + + // Methods that are callback targets from Dispatcher. + void readable(DispatchHandle& handle); + void writeable(DispatchHandle& handle); + void disconnected(DispatchHandle& handle); + void close(DispatchHandle& handle); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + bool queuedClose; + /** + * This flag is used to detect and handle concurrency between + * calls to notifyPendingWrite() (which can be made from any thread) and + * the execution of the writeable() method (which is always on the + * thread processing this handle. + */ + volatile bool writePending; +}; + AsynchIO::AsynchIO(const Socket& s, ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : @@ -239,6 +376,10 @@ void AsynchIO::queueWriteClose() { DispatchHandle::rewatchWrite(); } +bool AsynchIO::writeQueueEmpty() { + return writeQueue.empty(); +} + /** Return a queued buffer if there are enough * to spare */ @@ -427,3 +568,17 @@ void AsynchIO::close(DispatchHandle& h) { } } +} // namespace posix + +AsynchIO* qpid::sys::AsynchIO::create(const Socket& s, + AsynchIO::ReadCallback rCb, + AsynchIO::EofCallback eofCb, + AsynchIO::DisconnectCallback disCb, + AsynchIO::ClosedCallback cCb, + AsynchIO::BuffersEmptyCallback eCb, + AsynchIO::IdleCallback iCb) +{ + return new qpid::sys::posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); +} + +}} // namespace qpid::sys |