summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIO.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIO.h')
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h125
1 files changed, 55 insertions, 70 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index ff7823e00d..f5c4607992 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -21,7 +21,8 @@
*
*/
-#include "Dispatcher.h"
+// @@TODO: TAKE THIS OUT... SHould be in posix version.
+#include "DispatchHandle.h"
#include <boost/function.hpp>
#include <deque>
@@ -35,48 +36,45 @@ class Socket;
* Asynchronous acceptor: accepts connections then does a callback with the
* accepted fd
*/
+class AsynchAcceptorPrivate;
class AsynchAcceptor {
public:
typedef boost::function1<void, const Socket&> Callback;
private:
- Callback acceptedCallback;
- DispatchHandle handle;
- const Socket& socket;
+ AsynchAcceptorPrivate* impl;
public:
AsynchAcceptor(const Socket& s, Callback callback);
+ ~AsynchAcceptor();
void start(Poller::shared_ptr poller);
-
-private:
- void readable(DispatchHandle& handle);
};
/*
* Asynchronous connector: starts the process of initiating a connection and
* invokes a callback when completed or failed.
*/
-class AsynchConnector : private DispatchHandle {
+class AsynchConnector {
public:
typedef boost::function1<void, const Socket&> ConnectedCallback;
typedef boost::function2<void, int, std::string> FailedCallback;
-private:
- ConnectedCallback connCallback;
- FailedCallback failCallback;
- const Socket& socket;
-
-public:
- AsynchConnector(const Socket& socket,
- Poller::shared_ptr poller,
- std::string hostname,
- uint16_t port,
- ConnectedCallback connCb,
- FailedCallback failCb = 0);
-
-private:
- void connComplete(DispatchHandle& handle);
- void failure(int, std::string);
+ // Call create() to allocate a new AsynchConnector object with the
+ // specified poller, addressing, and callbacks.
+ // This method is implemented in platform-specific code to
+ // create a correctly typed object. The platform code also manages
+ // deletes. To correctly manage heaps when needed, the allocate and
+ // delete should both be done from the same class/library.
+ static AsynchConnector* create(const Socket& s,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+
+protected:
+ AsynchConnector() {}
+ virtual ~AsynchConnector() {}
};
struct AsynchIOBufferBase {
@@ -99,16 +97,14 @@ struct AsynchIOBufferBase {
/*
* Asychronous reader/writer:
* Reader accepts buffers to read into; reads into the provided buffers
- * and then does a callback with the buffer and amount read. Optionally it can callback
- * when there is something to read but no buffer to read it into.
+ * and then does a callback with the buffer and amount read. Optionally it
+ * can callback when there is something to read but no buffer to read it into.
*
* Writer accepts a buffer and queues it for writing; can also be given
- * a callback for when writing is "idle" (ie fd is writable, but nothing to write)
- *
- * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting
- * the contained DispatchHandle
+ * a callback for when writing is "idle" (ie fd is writable, but nothing
+ * to write).
*/
-class AsynchIO : private DispatchHandle {
+class AsynchIO {
public:
typedef AsynchIOBufferBase BufferBase;
@@ -119,46 +115,35 @@ public:
typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
-private:
- ReadCallback readCallback;
- EofCallback eofCallback;
- DisconnectCallback disCallback;
- ClosedCallback closedCallback;
- BuffersEmptyCallback emptyCallback;
- IdleCallback idleCallback;
- const Socket& socket;
- std::deque<BufferBase*> bufferQueue;
- std::deque<BufferBase*> writeQueue;
- bool queuedClose;
- /**
- * This flag is used to detect and handle concurrency between
- * calls to notifyPendingWrite() (which can be made from any thread) and
- * the execution of the writeable() method (which is always on the
- * thread processing this handle.
- */
- volatile bool writePending;
-
+ // Call create() to allocate a new AsynchIO object with the specified
+ // callbacks. This method is implemented in platform-specific code to
+ // create a correctly typed object. The platform code also manages
+ // deletes. To correctly manage heaps when needed, the allocate and
+ // delete should both be done from the same class/library.
+ static AsynchIO* create(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
public:
- AsynchIO(const Socket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
- void queueForDeletion();
-
- void start(Poller::shared_ptr poller);
- void queueReadBuffer(BufferBase* buff);
- void unread(BufferBase* buff);
- void queueWrite(BufferBase* buff);
- void notifyPendingWrite();
- void queueWriteClose();
- bool writeQueueEmpty() { return writeQueue.empty(); }
- BufferBase* getQueuedBuffer();
-
-private:
- ~AsynchIO();
- void readable(DispatchHandle& handle);
- void writeable(DispatchHandle& handle);
- void disconnected(DispatchHandle& handle);
- void close(DispatchHandle& handle);
+ virtual void queueForDeletion() = 0;
+
+ virtual void start(Poller::shared_ptr poller) = 0;
+ virtual void queueReadBuffer(BufferBase* buff) = 0;
+ virtual void unread(BufferBase* buff) = 0;
+ virtual void queueWrite(BufferBase* buff) = 0;
+ virtual void notifyPendingWrite() = 0;
+ virtual void queueWriteClose() = 0;
+ virtual bool writeQueueEmpty() = 0;
+ virtual BufferBase* getQueuedBuffer() = 0;
+
+protected:
+ // Derived class manages lifetime; must be constructed using the
+ // static create() method. Deletes not allowed from outside.
+ AsynchIO() {}
+ virtual ~AsynchIO() {}
};
}}