summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2011-11-11 16:06:08 +0000
committerAndrew Stitcher <astitcher@apache.org>2011-11-11 16:06:08 +0000
commitda70d70963bfccaace1af1c5dd208f86b6f01fa9 (patch)
treee40009a4ad8b5b9d9e2dde2217c9cf68192cd003 /cpp/src
parent4316192ae867d6ae7f6ad8e87594572fa807572c (diff)
downloadqpid-python-da70d70963bfccaace1af1c5dd208f86b6f01fa9.tar.gz
QPID-3608: Improve C++ broker consume performance
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1200925 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp31
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.cpp31
2 files changed, 26 insertions, 36 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index dab8bd09c6..a1c161b596 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -61,11 +61,10 @@ struct StaticInit {
* case we could rebalance the info occasionally.
*/
__thread int threadReadTotal = 0;
-__thread int threadMaxRead = 0;
__thread int threadReadCount = 0;
__thread int threadWriteTotal = 0;
__thread int threadWriteCount = 0;
-__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
+__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
}
/*
@@ -426,7 +425,6 @@ void AsynchIO::readable(DispatchHandle& h) {
// We have been flow controlled.
return;
}
- int readTotal = 0;
AbsTime readStartTime = AbsTime::now();
do {
// (Try to) get a buffer
@@ -441,7 +439,6 @@ void AsynchIO::readable(DispatchHandle& h) {
if (rc > 0) {
buff->dataCount += rc;
threadReadTotal += rc;
- readTotal += rc;
readCallback(*this, buff);
if (readingStopped) {
@@ -453,17 +450,17 @@ void AsynchIO::readable(DispatchHandle& h) {
// If we didn't fill the read buffer then time to stop reading
break;
}
-
+
// Stop reading if we've overrun our timeslot
- if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) {
+ if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
break;
}
-
+
} else {
// Put buffer back (at front so it doesn't interfere with unread buffers)
bufferQueue.push_front(buff);
assert(buff);
-
+
// Eof or other side has gone away
if (rc == 0 || errno == ECONNRESET) {
eofCallback(*this);
@@ -491,12 +488,11 @@ void AsynchIO::readable(DispatchHandle& h) {
h.unwatchRead();
break;
}
-
+
}
} while (true);
++threadReadCount;
- threadMaxRead = std::max(threadMaxRead, readTotal);
return;
}
@@ -504,7 +500,7 @@ void AsynchIO::readable(DispatchHandle& h) {
* We carry on writing whilst we have data to write and we can write
*/
void AsynchIO::writeable(DispatchHandle& h) {
- int writeTotal = 0;
+ AbsTime writeStartTime = AbsTime::now();
do {
// See if we've got something to write
if (!writeQueue.empty()) {
@@ -516,7 +512,6 @@ void AsynchIO::writeable(DispatchHandle& h) {
int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
threadWriteTotal += rc;
- writeTotal += rc;
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
@@ -525,14 +520,14 @@ void AsynchIO::writeable(DispatchHandle& h) {
writeQueue.push_back(buff);
break;
}
-
+
// Recycle the buffer
queueReadBuffer(buff);
-
- // If we've already written more than the max for reading then stop
- // (this is to stop writes dominating reads)
- if (writeTotal > threadMaxRead)
+
+ // Stop writing if we've overrun our timeslot
+ if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
break;
+ }
} else {
// Put buffer back
writeQueue.push_back(buff);
@@ -580,7 +575,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
++threadWriteCount;
return;
}
-
+
void AsynchIO::disconnected(DispatchHandle& h) {
// If we have not already queued close then call disconnected callback before closing
if (!queuedClose && disCallback) disCallback(*this);
diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp
index 4a59819183..73f15617dc 100644
--- a/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -57,11 +57,10 @@ void ignoreSigpipe() {
* case we could rebalance the info occasionally.
*/
__thread int threadReadTotal = 0;
-__thread int threadMaxRead = 0;
__thread int threadReadCount = 0;
__thread int threadWriteTotal = 0;
__thread int threadWriteCount = 0;
-__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
+__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
}
/*
@@ -277,7 +276,6 @@ SslIO::BufferBase* SslIO::getQueuedBuffer() {
* it in
*/
void SslIO::readable(DispatchHandle& h) {
- int readTotal = 0;
AbsTime readStartTime = AbsTime::now();
do {
// (Try to) get a buffer
@@ -292,24 +290,23 @@ void SslIO::readable(DispatchHandle& h) {
if (rc > 0) {
buff->dataCount += rc;
threadReadTotal += rc;
- readTotal += rc;
readCallback(*this, buff);
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
break;
}
-
+
// Stop reading if we've overrun our timeslot
- if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) {
+ if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
break;
}
-
+
} else {
// Put buffer back (at front so it doesn't interfere with unread buffers)
bufferQueue.push_front(buff);
assert(buff);
-
+
// Eof or other side has gone away
if (rc == 0 || errno == ECONNRESET) {
eofCallback(*this);
@@ -337,12 +334,11 @@ void SslIO::readable(DispatchHandle& h) {
h.unwatchRead();
break;
}
-
+
}
} while (true);
++threadReadCount;
- threadMaxRead = std::max(threadMaxRead, readTotal);
return;
}
@@ -350,7 +346,7 @@ void SslIO::readable(DispatchHandle& h) {
* We carry on writing whilst we have data to write and we can write
*/
void SslIO::writeable(DispatchHandle& h) {
- int writeTotal = 0;
+ AbsTime writeStartTime = AbsTime::now();
do {
// See if we've got something to write
if (!writeQueue.empty()) {
@@ -362,7 +358,6 @@ void SslIO::writeable(DispatchHandle& h) {
int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
threadWriteTotal += rc;
- writeTotal += rc;
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
@@ -371,14 +366,14 @@ void SslIO::writeable(DispatchHandle& h) {
writeQueue.push_back(buff);
break;
}
-
+
// Recycle the buffer
queueReadBuffer(buff);
-
- // If we've already written more than the max for reading then stop
- // (this is to stop writes dominating reads)
- if (writeTotal > threadMaxRead)
+
+ // Stop writing if we've overrun our timeslot
+ if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
break;
+ }
} else {
// Put buffer back
writeQueue.push_back(buff);
@@ -425,7 +420,7 @@ void SslIO::writeable(DispatchHandle& h) {
++threadWriteCount;
return;
}
-
+
void SslIO::disconnected(DispatchHandle& h) {
// If we've already queued close do it instead of disconnected callback
if (queuedClose) {