summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp167
1 files changed, 161 insertions, 6 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 7598eefe83..c8a8b3d0f1 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -26,8 +26,8 @@
#include "check.h"
-// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
-// could (should) be promoted to be platform portable
+// TODO The basic algorithm here is not really POSIX specific and with a
+// bit more abstraction could (should) be promoted to be platform portable
#include <unistd.h>
#include <sys/socket.h>
#include <signal.h>
@@ -65,24 +65,55 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
/*
* Asynch Acceptor
*/
+namespace qpid {
+namespace sys {
+
+class AsynchAcceptorPrivate {
+public:
+ AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+ void start(Poller::shared_ptr poller);
+
+private:
+ void readable(DispatchHandle& handle);
+
+private:
+ AsynchAcceptor::Callback acceptedCallback;
+ DispatchHandle handle;
+ const Socket& socket;
+
+};
+
+}} // namespace qpid::sys
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
+ impl(new AsynchAcceptorPrivate(s, callback))
+{}
+
+AsynchAcceptor::~AsynchAcceptor()
+{ delete impl;}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ impl->start(poller);
+}
+
+AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
+ AsynchAcceptor::Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+ handle(s, boost::bind(&AsynchAcceptorPrivate::readable, this, _1), 0, 0),
socket(s) {
s.setNonblocking();
ignoreSigpipe();
}
-void AsynchAcceptor::start(Poller::shared_ptr poller) {
+void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
handle.startWatch(poller);
}
/*
* We keep on accepting as long as there is something to accept
*/
-void AsynchAcceptor::readable(DispatchHandle& h) {
+void AsynchAcceptorPrivate::readable(DispatchHandle& h) {
Socket* s;
do {
errno = 0;
@@ -106,6 +137,36 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
/*
* Asynch Connector
*/
+namespace qpid {
+namespace sys {
+namespace posix {
+
+/*
+ * POSIX version of AsynchIO TCP socket connector.
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector,
+ private DispatchHandle {
+
+private:
+ void connComplete(DispatchHandle& handle);
+ void failure(int, std::string);
+
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+
+public:
+ AsynchConnector(const Socket& socket,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+};
AsynchConnector::AsynchConnector(const Socket& s,
Poller::shared_ptr poller,
@@ -155,9 +216,85 @@ void AsynchConnector::failure(int errCode, std::string message)
DispatchHandle::doDelete();
}
+} // namespace posix
+
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new qpid::sys::posix::AsynchConnector(s,
+ poller,
+ hostname,
+ port,
+ connCb,
+ failCb);
+}
+
/*
- * Asynch reader/writer
+ * POSIX version of AsynchIO reader/writer
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
*/
+namespace posix {
+
+class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle {
+
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ virtual void queueForDeletion();
+
+ virtual void start(Poller::shared_ptr poller);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual BufferBase* getQueuedBuffer();
+
+private:
+ ~AsynchIO();
+
+ // Methods that are callback targets from Dispatcher.
+ void readable(DispatchHandle& handle);
+ void writeable(DispatchHandle& handle);
+ void disconnected(DispatchHandle& handle);
+ void close(DispatchHandle& handle);
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ bool queuedClose;
+ /**
+ * This flag is used to detect and handle concurrency between
+ * calls to notifyPendingWrite() (which can be made from any thread) and
+ * the execution of the writeable() method (which is always on the
+ * thread processing this handle.
+ */
+ volatile bool writePending;
+};
+
AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
@@ -239,6 +376,10 @@ void AsynchIO::queueWriteClose() {
DispatchHandle::rewatchWrite();
}
+bool AsynchIO::writeQueueEmpty() {
+ return writeQueue.empty();
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
@@ -427,3 +568,17 @@ void AsynchIO::close(DispatchHandle& h) {
}
}
+} // namespace posix
+
+AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
+ AsynchIO::ReadCallback rCb,
+ AsynchIO::EofCallback eofCb,
+ AsynchIO::DisconnectCallback disCb,
+ AsynchIO::ClosedCallback cCb,
+ AsynchIO::BuffersEmptyCallback eCb,
+ AsynchIO::IdleCallback iCb)
+{
+ return new qpid::sys::posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys