diff options
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIO.h')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 125 |
1 files changed, 55 insertions, 70 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index ff7823e00d..f5c4607992 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -21,7 +21,8 @@ * */ -#include "Dispatcher.h" +// @@TODO: TAKE THIS OUT... SHould be in posix version. +#include "DispatchHandle.h" #include <boost/function.hpp> #include <deque> @@ -35,48 +36,45 @@ class Socket; * Asynchronous acceptor: accepts connections then does a callback with the * accepted fd */ +class AsynchAcceptorPrivate; class AsynchAcceptor { public: typedef boost::function1<void, const Socket&> Callback; private: - Callback acceptedCallback; - DispatchHandle handle; - const Socket& socket; + AsynchAcceptorPrivate* impl; public: AsynchAcceptor(const Socket& s, Callback callback); + ~AsynchAcceptor(); void start(Poller::shared_ptr poller); - -private: - void readable(DispatchHandle& handle); }; /* * Asynchronous connector: starts the process of initiating a connection and * invokes a callback when completed or failed. */ -class AsynchConnector : private DispatchHandle { +class AsynchConnector { public: typedef boost::function1<void, const Socket&> ConnectedCallback; typedef boost::function2<void, int, std::string> FailedCallback; -private: - ConnectedCallback connCallback; - FailedCallback failCallback; - const Socket& socket; - -public: - AsynchConnector(const Socket& socket, - Poller::shared_ptr poller, - std::string hostname, - uint16_t port, - ConnectedCallback connCb, - FailedCallback failCb = 0); - -private: - void connComplete(DispatchHandle& handle); - void failure(int, std::string); + // Call create() to allocate a new AsynchConnector object with the + // specified poller, addressing, and callbacks. + // This method is implemented in platform-specific code to + // create a correctly typed object. The platform code also manages + // deletes. To correctly manage heaps when needed, the allocate and + // delete should both be done from the same class/library. + static AsynchConnector* create(const Socket& s, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb = 0); + +protected: + AsynchConnector() {} + virtual ~AsynchConnector() {} }; struct AsynchIOBufferBase { @@ -99,16 +97,14 @@ struct AsynchIOBufferBase { /* * Asychronous reader/writer: * Reader accepts buffers to read into; reads into the provided buffers - * and then does a callback with the buffer and amount read. Optionally it can callback - * when there is something to read but no buffer to read it into. + * and then does a callback with the buffer and amount read. Optionally it + * can callback when there is something to read but no buffer to read it into. * * Writer accepts a buffer and queues it for writing; can also be given - * a callback for when writing is "idle" (ie fd is writable, but nothing to write) - * - * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting - * the contained DispatchHandle + * a callback for when writing is "idle" (ie fd is writable, but nothing + * to write). */ -class AsynchIO : private DispatchHandle { +class AsynchIO { public: typedef AsynchIOBufferBase BufferBase; @@ -119,46 +115,35 @@ public: typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback; typedef boost::function1<void, AsynchIO&> IdleCallback; -private: - ReadCallback readCallback; - EofCallback eofCallback; - DisconnectCallback disCallback; - ClosedCallback closedCallback; - BuffersEmptyCallback emptyCallback; - IdleCallback idleCallback; - const Socket& socket; - std::deque<BufferBase*> bufferQueue; - std::deque<BufferBase*> writeQueue; - bool queuedClose; - /** - * This flag is used to detect and handle concurrency between - * calls to notifyPendingWrite() (which can be made from any thread) and - * the execution of the writeable() method (which is always on the - * thread processing this handle. - */ - volatile bool writePending; - + // Call create() to allocate a new AsynchIO object with the specified + // callbacks. This method is implemented in platform-specific code to + // create a correctly typed object. The platform code also manages + // deletes. To correctly manage heaps when needed, the allocate and + // delete should both be done from the same class/library. + static AsynchIO* create(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); public: - AsynchIO(const Socket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); - void queueForDeletion(); - - void start(Poller::shared_ptr poller); - void queueReadBuffer(BufferBase* buff); - void unread(BufferBase* buff); - void queueWrite(BufferBase* buff); - void notifyPendingWrite(); - void queueWriteClose(); - bool writeQueueEmpty() { return writeQueue.empty(); } - BufferBase* getQueuedBuffer(); - -private: - ~AsynchIO(); - void readable(DispatchHandle& handle); - void writeable(DispatchHandle& handle); - void disconnected(DispatchHandle& handle); - void close(DispatchHandle& handle); + virtual void queueForDeletion() = 0; + + virtual void start(Poller::shared_ptr poller) = 0; + virtual void queueReadBuffer(BufferBase* buff) = 0; + virtual void unread(BufferBase* buff) = 0; + virtual void queueWrite(BufferBase* buff) = 0; + virtual void notifyPendingWrite() = 0; + virtual void queueWriteClose() = 0; + virtual bool writeQueueEmpty() = 0; + virtual BufferBase* getQueuedBuffer() = 0; + +protected: + // Derived class manages lifetime; must be constructed using the + // static create() method. Deletes not allowed from outside. + AsynchIO() {} + virtual ~AsynchIO() {} }; }} |