summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp38
1 files changed, 31 insertions, 7 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index a1c161b596..01ff8b6bfa 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -23,6 +23,7 @@
#include "qpid/sys/Socket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/Probes.h"
#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Time.h"
#include "qpid/log/Statement.h"
@@ -40,7 +41,9 @@
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
-using namespace qpid::sys;
+namespace qpid {
+namespace sys {
+namespace posix {
namespace {
@@ -70,10 +73,6 @@ __thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
/*
* Asynch Acceptor
*/
-namespace qpid {
-namespace sys {
-namespace posix {
-
class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
public:
AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
@@ -423,9 +422,12 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
void AsynchIO::readable(DispatchHandle& h) {
if (readingStopped) {
// We have been flow controlled.
+ QPID_PROBE1(asynchio_read_flowcontrolled, &h);
return;
}
AbsTime readStartTime = AbsTime::now();
+ size_t total = 0;
+ int readCalls = 0;
do {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
@@ -436,23 +438,29 @@ void AsynchIO::readable(DispatchHandle& h) {
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
int rc = socket.read(buff->bytes + buff->dataCount, readCount);
+ int64_t duration = Duration(readStartTime, AbsTime::now());
+ ++readCalls;
if (rc > 0) {
buff->dataCount += rc;
threadReadTotal += rc;
+ total += rc;
readCallback(*this, buff);
if (readingStopped) {
// We have been flow controlled.
+ QPID_PROBE4(asynchio_read_finished_flowcontrolled, &h, duration, total, readCalls);
break;
}
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
+ QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls);
break;
}
// Stop reading if we've overrun our timeslot
- if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
+ if ( duration > threadMaxIoTimeNs) {
+ QPID_PROBE4(asynchio_read_finished_maxtime, &h, duration, total, readCalls);
break;
}
@@ -461,6 +469,7 @@ void AsynchIO::readable(DispatchHandle& h) {
bufferQueue.push_front(buff);
assert(buff);
+ QPID_PROBE5(asynchio_read_finished_error, &h, duration, total, readCalls, errno);
// Eof or other side has gone away
if (rc == 0 || errno == ECONNRESET) {
eofCallback(*this);
@@ -486,6 +495,7 @@ void AsynchIO::readable(DispatchHandle& h) {
// If we still have no buffers we can't do anything more
if (bufferQueue.empty()) {
h.unwatchRead();
+ QPID_PROBE4(asynchio_read_finished_nobuffers, &h, Duration(readStartTime, AbsTime::now()), total, readCalls);
break;
}
@@ -501,6 +511,8 @@ void AsynchIO::readable(DispatchHandle& h) {
*/
void AsynchIO::writeable(DispatchHandle& h) {
AbsTime writeStartTime = AbsTime::now();
+ size_t total = 0;
+ int writeCalls = 0;
do {
// See if we've got something to write
if (!writeQueue.empty()) {
@@ -510,14 +522,18 @@ void AsynchIO::writeable(DispatchHandle& h) {
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
+ int64_t duration = Duration(writeStartTime, AbsTime::now());
+ ++writeCalls;
if (rc >= 0) {
threadWriteTotal += rc;
+ total += rc;
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
buff->dataStart += rc;
buff->dataCount -= rc;
writeQueue.push_back(buff);
+ QPID_PROBE4(asynchio_write_finished_done, &h, duration, total, writeCalls);
break;
}
@@ -525,12 +541,15 @@ void AsynchIO::writeable(DispatchHandle& h) {
queueReadBuffer(buff);
// Stop writing if we've overrun our timeslot
- if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
+ if (duration > threadMaxIoTimeNs) {
+ QPID_PROBE4(asynchio_write_finished_maxtime, &h, duration, total, writeCalls);
break;
}
} else {
// Put buffer back
writeQueue.push_back(buff);
+ QPID_PROBE5(asynchio_write_finished_error, &h, duration, total, writeCalls, errno);
+
if (errno == ECONNRESET || errno == EPIPE) {
// Just stop watching for write here - we'll get a
// disconnect callback soon enough
@@ -548,9 +567,13 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
}
} else {
+ int64_t duration = Duration(writeStartTime, AbsTime::now());
+ (void) duration; // force duration to be used if no probes are compiled
+
// If we're waiting to close the socket then can do it now as there is nothing to write
if (queuedClose) {
close(h);
+ QPID_PROBE4(asynchio_write_finished_closed, &h, duration, total, writeCalls);
break;
}
// Fd is writable, but nothing to write
@@ -567,6 +590,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
// desired rewatchWrite so we correct that here
if (writePending)
h.rewatchWrite();
+ QPID_PROBE4(asynchio_write_finished_nodata, &h, duration, total, writeCalls);
break;
}
}