summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp253
1 files changed, 222 insertions, 31 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 7598eefe83..67b5cf0534 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -21,13 +21,16 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Time.h"
#include "qpid/log/Statement.h"
-#include "check.h"
+#include "qpid/sys/posix/check.h"
-// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
-// could (should) be promoted to be platform portable
+// TODO The basic algorithm here is not really POSIX specific and with a
+// bit more abstraction could (should) be promoted to be platform portable
#include <unistd.h>
#include <sys/socket.h>
#include <signal.h>
@@ -35,18 +38,21 @@
#include <string.h>
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
using namespace qpid::sys;
namespace {
-/*
- * Make *process* not generate SIGPIPE when writing to closed
- * pipe/socket (necessary as default action is to terminate process)
- */
-void ignoreSigpipe() {
- ::signal(SIGPIPE, SIG_IGN);
-}
+struct StaticInit {
+ StaticInit() {
+ /**
+ * Make *process* not generate SIGPIPE when writing to closed
+ * pipe/socket (necessary as default action is to terminate process)
+ */
+ ::signal(SIGPIPE, SIG_IGN);
+ };
+} init;
/*
* We keep per thread state to avoid locking overhead. The assumption is that
@@ -65,14 +71,37 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
/*
* Asynch Acceptor
*/
+namespace qpid {
+namespace sys {
+namespace posix {
+
+class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
+public:
+ AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptor();
+ void start(Poller::shared_ptr poller);
+
+private:
+ void readable(DispatchHandle& handle);
+
+private:
+ AsynchAcceptor::Callback acceptedCallback;
+ DispatchHandle handle;
+ const Socket& socket;
-AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
+};
+
+AsynchAcceptor::AsynchAcceptor(const Socket& s,
+ AsynchAcceptor::Callback callback) :
acceptedCallback(callback),
handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
socket(s) {
s.setNonblocking();
- ignoreSigpipe();
+}
+
+AsynchAcceptor::~AsynchAcceptor() {
+ handle.stopWatch();
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
@@ -89,7 +118,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
try {
- s = socket.accept(0, 0);
+ s = socket.accept();
if (s) {
acceptedCallback(*s);
} else {
@@ -97,6 +126,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
}
} catch (const std::exception& e) {
QPID_LOG(error, "Could not accept socket: " << e.what());
+ break;
}
} while (true);
@@ -104,8 +134,32 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
}
/*
- * Asynch Connector
+ * POSIX version of AsynchIO TCP socket connector.
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
*/
+class AsynchConnector : public qpid::sys::AsynchConnector,
+ private DispatchHandle {
+
+private:
+ void connComplete(DispatchHandle& handle);
+ void failure(int, const std::string&);
+
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ std::string errMsg;
+ const Socket& socket;
+
+public:
+ AsynchConnector(const Socket& socket,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb);
+};
AsynchConnector::AsynchConnector(const Socket& s,
Poller::shared_ptr poller,
@@ -122,12 +176,17 @@ AsynchConnector::AsynchConnector(const Socket& s,
socket(s)
{
socket.setNonblocking();
+ SocketAddress sa(hostname, boost::lexical_cast<std::string>(port));
try {
- socket.connect(hostname, port);
- startWatch(poller);
+ socket.connect(sa);
} catch(std::exception& e) {
- failure(-1, std::string(e.what()));
+ // Defer reporting failure
+ startWatch(poller);
+ errMsg = e.what();
+ DispatchHandle::call(boost::bind(&AsynchConnector::failure, this, -1, errMsg));
+ return;
}
+ startWatch(poller);
}
void AsynchConnector::connComplete(DispatchHandle& h)
@@ -139,25 +198,87 @@ void AsynchConnector::connComplete(DispatchHandle& h)
connCallback(socket);
DispatchHandle::doDelete();
} else {
- // TODO: This need to be fixed as strerror isn't thread safe
- failure(errCode, std::string(::strerror(errCode)));
+ failure(errCode, strError(errCode));
}
}
-void AsynchConnector::failure(int errCode, std::string message)
+void AsynchConnector::failure(int errCode, const std::string& message)
{
- if (failCallback)
- failCallback(errCode, message);
-
- socket.close();
- delete &socket;
+ failCallback(socket, errCode, message);
DispatchHandle::doDelete();
}
/*
- * Asynch reader/writer
+ * POSIX version of AsynchIO reader/writer
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
*/
+class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle {
+
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ virtual void queueForDeletion();
+
+ virtual void start(Poller::shared_ptr poller);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual void startReading();
+ virtual void stopReading();
+ virtual void requestCallback(RequestCallback);
+ virtual BufferBase* getQueuedBuffer();
+
+private:
+ ~AsynchIO();
+
+ // Methods that are callback targets from Dispatcher.
+ void readable(DispatchHandle& handle);
+ void writeable(DispatchHandle& handle);
+ void disconnected(DispatchHandle& handle);
+ void requestedCall(RequestCallback);
+ void close(DispatchHandle& handle);
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ bool queuedClose;
+ /**
+ * This flag is used to detect and handle concurrency between
+ * calls to notifyPendingWrite() (which can be made from any thread) and
+ * the execution of the writeable() method (which is always on the
+ * thread processing this handle.
+ */
+ volatile bool writePending;
+ /**
+ * This records whether we've been reading is flow controlled:
+ * it's safe as a simple boolean as the only way to be stopped
+ * is in calls only allowed in the callback context, the only calls
+ * checking it are also in calls only allowed in callback context.
+ */
+ volatile bool readingStopped;
+};
+
AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
@@ -174,7 +295,8 @@ AsynchIO::AsynchIO(const Socket& s,
idleCallback(iCb),
socket(s),
queuedClose(false),
- writePending(false) {
+ writePending(false),
+ readingStopped(false) {
s.setNonblocking();
}
@@ -202,8 +324,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
+
+ bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_back(buff);
- DispatchHandle::rewatchRead();
+ if (queueWasEmpty && !readingStopped)
+ DispatchHandle::rewatchRead();
}
void AsynchIO::unread(BufferBase* buff) {
@@ -212,15 +337,18 @@ void AsynchIO::unread(BufferBase* buff) {
memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
buff->dataStart = 0;
}
+
+ bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_front(buff);
- DispatchHandle::rewatchRead();
+ if (queueWasEmpty && !readingStopped)
+ DispatchHandle::rewatchRead();
}
void AsynchIO::queueWrite(BufferBase* buff) {
assert(buff);
// If we've already closed the socket then throw the write away
if (queuedClose) {
- bufferQueue.push_front(buff);
+ queueReadBuffer(buff);
return;
} else {
writeQueue.push_front(buff);
@@ -229,6 +357,7 @@ void AsynchIO::queueWrite(BufferBase* buff) {
DispatchHandle::rewatchWrite();
}
+// This can happen outside the callback context
void AsynchIO::notifyPendingWrite() {
writePending = true;
DispatchHandle::rewatchWrite();
@@ -239,6 +368,33 @@ void AsynchIO::queueWriteClose() {
DispatchHandle::rewatchWrite();
}
+bool AsynchIO::writeQueueEmpty() {
+ return writeQueue.empty();
+}
+
+// This can happen outside the callback context
+void AsynchIO::startReading() {
+ readingStopped = false;
+ DispatchHandle::rewatchRead();
+}
+
+void AsynchIO::stopReading() {
+ readingStopped = true;
+ DispatchHandle::unwatchRead();
+}
+
+void AsynchIO::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback));
+}
+
+void AsynchIO::requestedCall(RequestCallback callback) {
+ assert(callback);
+ callback(*this);
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
@@ -277,6 +433,11 @@ void AsynchIO::readable(DispatchHandle& h) {
readTotal += rc;
readCallback(*this, buff);
+ if (readingStopped) {
+ // We have been flow controlled.
+ break;
+ }
+
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
break;
@@ -303,7 +464,7 @@ void AsynchIO::readable(DispatchHandle& h) {
break;
} else {
// Report error then just treat as a socket disconnect
- QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(rc) << "(" << rc << ")" );
+ QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" );
eofCallback(*this);
h.unwatchRead();
break;
@@ -427,3 +588,33 @@ void AsynchIO::close(DispatchHandle& h) {
}
}
+} // namespace posix
+
+AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
+ Callback callback)
+{
+ return new posix::AsynchAcceptor(s, callback);
+}
+
+AsynchConnector* AsynchConnector::create(const Socket& s,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new posix::AsynchConnector(s, poller, hostname, port, connCb, failCb);
+}
+
+AsynchIO* AsynchIO::create(const Socket& s,
+ AsynchIO::ReadCallback rCb,
+ AsynchIO::EofCallback eofCb,
+ AsynchIO::DisconnectCallback disCb,
+ AsynchIO::ClosedCallback cCb,
+ AsynchIO::BuffersEmptyCallback eCb,
+ AsynchIO::IdleCallback iCb)
+{
+ return new posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys