diff options
author | Andrew Stitcher <astitcher@apache.org> | 2011-11-11 16:06:08 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2011-11-11 16:06:08 +0000 |
commit | da70d70963bfccaace1af1c5dd208f86b6f01fa9 (patch) | |
tree | e40009a4ad8b5b9d9e2dde2217c9cf68192cd003 /cpp/src | |
parent | 4316192ae867d6ae7f6ad8e87594572fa807572c (diff) | |
download | qpid-python-da70d70963bfccaace1af1c5dd208f86b6f01fa9.tar.gz |
QPID-3608: Improve C++ broker consume performance
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1200925 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.cpp | 31 |
2 files changed, 26 insertions, 36 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index dab8bd09c6..a1c161b596 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -61,11 +61,10 @@ struct StaticInit { * case we could rebalance the info occasionally. */ __thread int threadReadTotal = 0; -__thread int threadMaxRead = 0; __thread int threadReadCount = 0; __thread int threadWriteTotal = 0; __thread int threadWriteCount = 0; -__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms +__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms } /* @@ -426,7 +425,6 @@ void AsynchIO::readable(DispatchHandle& h) { // We have been flow controlled. return; } - int readTotal = 0; AbsTime readStartTime = AbsTime::now(); do { // (Try to) get a buffer @@ -441,7 +439,6 @@ void AsynchIO::readable(DispatchHandle& h) { if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; - readTotal += rc; readCallback(*this, buff); if (readingStopped) { @@ -453,17 +450,17 @@ void AsynchIO::readable(DispatchHandle& h) { // If we didn't fill the read buffer then time to stop reading break; } - + // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) { + if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; } - + } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); assert(buff); - + // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { eofCallback(*this); @@ -491,12 +488,11 @@ void AsynchIO::readable(DispatchHandle& h) { h.unwatchRead(); break; } - + } } while (true); ++threadReadCount; - threadMaxRead = std::max(threadMaxRead, readTotal); return; } @@ -504,7 +500,7 @@ void AsynchIO::readable(DispatchHandle& h) { * We carry on writing whilst we have data to write and we can write */ void AsynchIO::writeable(DispatchHandle& h) { - int writeTotal = 0; + AbsTime writeStartTime = AbsTime::now(); do { // See if we've got something to write if (!writeQueue.empty()) { @@ -516,7 +512,6 @@ void AsynchIO::writeable(DispatchHandle& h) { int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { threadWriteTotal += rc; - writeTotal += rc; // If we didn't write full buffer put rest back if (rc != buff->dataCount) { @@ -525,14 +520,14 @@ void AsynchIO::writeable(DispatchHandle& h) { writeQueue.push_back(buff); break; } - + // Recycle the buffer queueReadBuffer(buff); - - // If we've already written more than the max for reading then stop - // (this is to stop writes dominating reads) - if (writeTotal > threadMaxRead) + + // Stop writing if we've overrun our timeslot + if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; + } } else { // Put buffer back writeQueue.push_back(buff); @@ -580,7 +575,7 @@ void AsynchIO::writeable(DispatchHandle& h) { ++threadWriteCount; return; } - + void AsynchIO::disconnected(DispatchHandle& h) { // If we have not already queued close then call disconnected callback before closing if (!queuedClose && disCallback) disCallback(*this); diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp index 4a59819183..73f15617dc 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -57,11 +57,10 @@ void ignoreSigpipe() { * case we could rebalance the info occasionally. */ __thread int threadReadTotal = 0; -__thread int threadMaxRead = 0; __thread int threadReadCount = 0; __thread int threadWriteTotal = 0; __thread int threadWriteCount = 0; -__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms +__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms } /* @@ -277,7 +276,6 @@ SslIO::BufferBase* SslIO::getQueuedBuffer() { * it in */ void SslIO::readable(DispatchHandle& h) { - int readTotal = 0; AbsTime readStartTime = AbsTime::now(); do { // (Try to) get a buffer @@ -292,24 +290,23 @@ void SslIO::readable(DispatchHandle& h) { if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; - readTotal += rc; readCallback(*this, buff); if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading break; } - + // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) { + if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; } - + } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); assert(buff); - + // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { eofCallback(*this); @@ -337,12 +334,11 @@ void SslIO::readable(DispatchHandle& h) { h.unwatchRead(); break; } - + } } while (true); ++threadReadCount; - threadMaxRead = std::max(threadMaxRead, readTotal); return; } @@ -350,7 +346,7 @@ void SslIO::readable(DispatchHandle& h) { * We carry on writing whilst we have data to write and we can write */ void SslIO::writeable(DispatchHandle& h) { - int writeTotal = 0; + AbsTime writeStartTime = AbsTime::now(); do { // See if we've got something to write if (!writeQueue.empty()) { @@ -362,7 +358,6 @@ void SslIO::writeable(DispatchHandle& h) { int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { threadWriteTotal += rc; - writeTotal += rc; // If we didn't write full buffer put rest back if (rc != buff->dataCount) { @@ -371,14 +366,14 @@ void SslIO::writeable(DispatchHandle& h) { writeQueue.push_back(buff); break; } - + // Recycle the buffer queueReadBuffer(buff); - - // If we've already written more than the max for reading then stop - // (this is to stop writes dominating reads) - if (writeTotal > threadMaxRead) + + // Stop writing if we've overrun our timeslot + if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; + } } else { // Put buffer back writeQueue.push_back(buff); @@ -425,7 +420,7 @@ void SslIO::writeable(DispatchHandle& h) { ++threadWriteCount; return; } - + void SslIO::disconnected(DispatchHandle& h) { // If we've already queued close do it instead of disconnected callback if (queuedClose) { |