diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 38 |
1 files changed, 31 insertions, 7 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index a1c161b596..01ff8b6bfa 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -23,6 +23,7 @@ #include "qpid/sys/Socket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/Probes.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Time.h" #include "qpid/log/Statement.h" @@ -40,7 +41,9 @@ #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> -using namespace qpid::sys; +namespace qpid { +namespace sys { +namespace posix { namespace { @@ -70,10 +73,6 @@ __thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms /* * Asynch Acceptor */ -namespace qpid { -namespace sys { -namespace posix { - class AsynchAcceptor : public qpid::sys::AsynchAcceptor { public: AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); @@ -423,9 +422,12 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { void AsynchIO::readable(DispatchHandle& h) { if (readingStopped) { // We have been flow controlled. + QPID_PROBE1(asynchio_read_flowcontrolled, &h); return; } AbsTime readStartTime = AbsTime::now(); + size_t total = 0; + int readCalls = 0; do { // (Try to) get a buffer if (!bufferQueue.empty()) { @@ -436,23 +438,29 @@ void AsynchIO::readable(DispatchHandle& h) { errno = 0; int readCount = buff->byteCount-buff->dataCount; int rc = socket.read(buff->bytes + buff->dataCount, readCount); + int64_t duration = Duration(readStartTime, AbsTime::now()); + ++readCalls; if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; + total += rc; readCallback(*this, buff); if (readingStopped) { // We have been flow controlled. + QPID_PROBE4(asynchio_read_finished_flowcontrolled, &h, duration, total, readCalls); break; } if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading + QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls); break; } // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) { + if ( duration > threadMaxIoTimeNs) { + QPID_PROBE4(asynchio_read_finished_maxtime, &h, duration, total, readCalls); break; } @@ -461,6 +469,7 @@ void AsynchIO::readable(DispatchHandle& h) { bufferQueue.push_front(buff); assert(buff); + QPID_PROBE5(asynchio_read_finished_error, &h, duration, total, readCalls, errno); // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { eofCallback(*this); @@ -486,6 +495,7 @@ void AsynchIO::readable(DispatchHandle& h) { // If we still have no buffers we can't do anything more if (bufferQueue.empty()) { h.unwatchRead(); + QPID_PROBE4(asynchio_read_finished_nobuffers, &h, Duration(readStartTime, AbsTime::now()), total, readCalls); break; } @@ -501,6 +511,8 @@ void AsynchIO::readable(DispatchHandle& h) { */ void AsynchIO::writeable(DispatchHandle& h) { AbsTime writeStartTime = AbsTime::now(); + size_t total = 0; + int writeCalls = 0; do { // See if we've got something to write if (!writeQueue.empty()) { @@ -510,14 +522,18 @@ void AsynchIO::writeable(DispatchHandle& h) { errno = 0; assert(buff->dataStart+buff->dataCount <= buff->byteCount); int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); + int64_t duration = Duration(writeStartTime, AbsTime::now()); + ++writeCalls; if (rc >= 0) { threadWriteTotal += rc; + total += rc; // If we didn't write full buffer put rest back if (rc != buff->dataCount) { buff->dataStart += rc; buff->dataCount -= rc; writeQueue.push_back(buff); + QPID_PROBE4(asynchio_write_finished_done, &h, duration, total, writeCalls); break; } @@ -525,12 +541,15 @@ void AsynchIO::writeable(DispatchHandle& h) { queueReadBuffer(buff); // Stop writing if we've overrun our timeslot - if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) { + if (duration > threadMaxIoTimeNs) { + QPID_PROBE4(asynchio_write_finished_maxtime, &h, duration, total, writeCalls); break; } } else { // Put buffer back writeQueue.push_back(buff); + QPID_PROBE5(asynchio_write_finished_error, &h, duration, total, writeCalls, errno); + if (errno == ECONNRESET || errno == EPIPE) { // Just stop watching for write here - we'll get a // disconnect callback soon enough @@ -548,9 +567,13 @@ void AsynchIO::writeable(DispatchHandle& h) { } } } else { + int64_t duration = Duration(writeStartTime, AbsTime::now()); + (void) duration; // force duration to be used if no probes are compiled + // If we're waiting to close the socket then can do it now as there is nothing to write if (queuedClose) { close(h); + QPID_PROBE4(asynchio_write_finished_closed, &h, duration, total, writeCalls); break; } // Fd is writable, but nothing to write @@ -567,6 +590,7 @@ void AsynchIO::writeable(DispatchHandle& h) { // desired rewatchWrite so we correct that here if (writePending) h.rewatchWrite(); + QPID_PROBE4(asynchio_write_finished_nodata, &h, duration, total, writeCalls); break; } } |