diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 253 |
1 files changed, 222 insertions, 31 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 7598eefe83..67b5cf0534 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -21,13 +21,16 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Time.h" #include "qpid/log/Statement.h" -#include "check.h" +#include "qpid/sys/posix/check.h" -// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction -// could (should) be promoted to be platform portable +// TODO The basic algorithm here is not really POSIX specific and with a +// bit more abstraction could (should) be promoted to be platform portable #include <unistd.h> #include <sys/socket.h> #include <signal.h> @@ -35,18 +38,21 @@ #include <string.h> #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> using namespace qpid::sys; namespace { -/* - * Make *process* not generate SIGPIPE when writing to closed - * pipe/socket (necessary as default action is to terminate process) - */ -void ignoreSigpipe() { - ::signal(SIGPIPE, SIG_IGN); -} +struct StaticInit { + StaticInit() { + /** + * Make *process* not generate SIGPIPE when writing to closed + * pipe/socket (necessary as default action is to terminate process) + */ + ::signal(SIGPIPE, SIG_IGN); + }; +} init; /* * We keep per thread state to avoid locking overhead. The assumption is that @@ -65,14 +71,37 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms /* * Asynch Acceptor */ +namespace qpid { +namespace sys { +namespace posix { + +class AsynchAcceptor : public qpid::sys::AsynchAcceptor { +public: + AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptor(); + void start(Poller::shared_ptr poller); + +private: + void readable(DispatchHandle& handle); + +private: + AsynchAcceptor::Callback acceptedCallback; + DispatchHandle handle; + const Socket& socket; -AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : +}; + +AsynchAcceptor::AsynchAcceptor(const Socket& s, + AsynchAcceptor::Callback callback) : acceptedCallback(callback), handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), socket(s) { s.setNonblocking(); - ignoreSigpipe(); +} + +AsynchAcceptor::~AsynchAcceptor() { + handle.stopWatch(); } void AsynchAcceptor::start(Poller::shared_ptr poller) { @@ -89,7 +118,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. try { - s = socket.accept(0, 0); + s = socket.accept(); if (s) { acceptedCallback(*s); } else { @@ -97,6 +126,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { } } catch (const std::exception& e) { QPID_LOG(error, "Could not accept socket: " << e.what()); + break; } } while (true); @@ -104,8 +134,32 @@ void AsynchAcceptor::readable(DispatchHandle& h) { } /* - * Asynch Connector + * POSIX version of AsynchIO TCP socket connector. + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. */ +class AsynchConnector : public qpid::sys::AsynchConnector, + private DispatchHandle { + +private: + void connComplete(DispatchHandle& handle); + void failure(int, const std::string&); + +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + std::string errMsg; + const Socket& socket; + +public: + AsynchConnector(const Socket& socket, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb); +}; AsynchConnector::AsynchConnector(const Socket& s, Poller::shared_ptr poller, @@ -122,12 +176,17 @@ AsynchConnector::AsynchConnector(const Socket& s, socket(s) { socket.setNonblocking(); + SocketAddress sa(hostname, boost::lexical_cast<std::string>(port)); try { - socket.connect(hostname, port); - startWatch(poller); + socket.connect(sa); } catch(std::exception& e) { - failure(-1, std::string(e.what())); + // Defer reporting failure + startWatch(poller); + errMsg = e.what(); + DispatchHandle::call(boost::bind(&AsynchConnector::failure, this, -1, errMsg)); + return; } + startWatch(poller); } void AsynchConnector::connComplete(DispatchHandle& h) @@ -139,25 +198,87 @@ void AsynchConnector::connComplete(DispatchHandle& h) connCallback(socket); DispatchHandle::doDelete(); } else { - // TODO: This need to be fixed as strerror isn't thread safe - failure(errCode, std::string(::strerror(errCode))); + failure(errCode, strError(errCode)); } } -void AsynchConnector::failure(int errCode, std::string message) +void AsynchConnector::failure(int errCode, const std::string& message) { - if (failCallback) - failCallback(errCode, message); - - socket.close(); - delete &socket; + failCallback(socket, errCode, message); DispatchHandle::doDelete(); } /* - * Asynch reader/writer + * POSIX version of AsynchIO reader/writer + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. */ +class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle { + +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + + // Methods inherited from qpid::sys::AsynchIO + + virtual void queueForDeletion(); + + virtual void start(Poller::shared_ptr poller); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual void startReading(); + virtual void stopReading(); + virtual void requestCallback(RequestCallback); + virtual BufferBase* getQueuedBuffer(); + +private: + ~AsynchIO(); + + // Methods that are callback targets from Dispatcher. + void readable(DispatchHandle& handle); + void writeable(DispatchHandle& handle); + void disconnected(DispatchHandle& handle); + void requestedCall(RequestCallback); + void close(DispatchHandle& handle); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + bool queuedClose; + /** + * This flag is used to detect and handle concurrency between + * calls to notifyPendingWrite() (which can be made from any thread) and + * the execution of the writeable() method (which is always on the + * thread processing this handle. + */ + volatile bool writePending; + /** + * This records whether we've been reading is flow controlled: + * it's safe as a simple boolean as the only way to be stopped + * is in calls only allowed in the callback context, the only calls + * checking it are also in calls only allowed in callback context. + */ + volatile bool readingStopped; +}; + AsynchIO::AsynchIO(const Socket& s, ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : @@ -174,7 +295,8 @@ AsynchIO::AsynchIO(const Socket& s, idleCallback(iCb), socket(s), queuedClose(false), - writePending(false) { + writePending(false), + readingStopped(false) { s.setNonblocking(); } @@ -202,8 +324,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; buff->dataCount = 0; + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_back(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::unread(BufferBase* buff) { @@ -212,15 +337,18 @@ void AsynchIO::unread(BufferBase* buff) { memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); buff->dataStart = 0; } + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_front(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::queueWrite(BufferBase* buff) { assert(buff); // If we've already closed the socket then throw the write away if (queuedClose) { - bufferQueue.push_front(buff); + queueReadBuffer(buff); return; } else { writeQueue.push_front(buff); @@ -229,6 +357,7 @@ void AsynchIO::queueWrite(BufferBase* buff) { DispatchHandle::rewatchWrite(); } +// This can happen outside the callback context void AsynchIO::notifyPendingWrite() { writePending = true; DispatchHandle::rewatchWrite(); @@ -239,6 +368,33 @@ void AsynchIO::queueWriteClose() { DispatchHandle::rewatchWrite(); } +bool AsynchIO::writeQueueEmpty() { + return writeQueue.empty(); +} + +// This can happen outside the callback context +void AsynchIO::startReading() { + readingStopped = false; + DispatchHandle::rewatchRead(); +} + +void AsynchIO::stopReading() { + readingStopped = true; + DispatchHandle::unwatchRead(); +} + +void AsynchIO::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback)); +} + +void AsynchIO::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + /** Return a queued buffer if there are enough * to spare */ @@ -277,6 +433,11 @@ void AsynchIO::readable(DispatchHandle& h) { readTotal += rc; readCallback(*this, buff); + if (readingStopped) { + // We have been flow controlled. + break; + } + if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading break; @@ -303,7 +464,7 @@ void AsynchIO::readable(DispatchHandle& h) { break; } else { // Report error then just treat as a socket disconnect - QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(rc) << "(" << rc << ")" ); + QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); eofCallback(*this); h.unwatchRead(); break; @@ -427,3 +588,33 @@ void AsynchIO::close(DispatchHandle& h) { } } +} // namespace posix + +AsynchAcceptor* AsynchAcceptor::create(const Socket& s, + Callback callback) +{ + return new posix::AsynchAcceptor(s, callback); +} + +AsynchConnector* AsynchConnector::create(const Socket& s, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new posix::AsynchConnector(s, poller, hostname, port, connCb, failCb); +} + +AsynchIO* AsynchIO::create(const Socket& s, + AsynchIO::ReadCallback rCb, + AsynchIO::EofCallback eofCb, + AsynchIO::DisconnectCallback disCb, + AsynchIO::ClosedCallback cCb, + AsynchIO::BuffersEmptyCallback eCb, + AsynchIO::IdleCallback iCb) +{ + return new posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); +} + +}} // namespace qpid::sys |