diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Time.cpp | 2 |
3 files changed, 59 insertions, 16 deletions
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index 9d9b7bc8f8..0e1c9da922 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -28,7 +28,7 @@ using namespace qpid::framing; using namespace boost; -FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {} +FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {parts.reserve(4);} void FrameSet::append(AMQFrame& part) { diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index e73bbc03ca..f8aaa38cf5 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -20,9 +20,12 @@ */ #include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Time.h" #include "check.h" +// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction +// could (should) be promoted to be platform portable #include <unistd.h> #include <sys/socket.h> #include <signal.h> @@ -42,6 +45,18 @@ void ignoreSigpipe() { ::signal(SIGPIPE, SIG_IGN); } +/* + * We keep per thread state to avoid locking overhead. The assumption is that + * on average all the connections are serviced by all the threads so the state + * recorded in each thread is about the same. If this turns out not to be the + * case we could rebalance the info occasionally. + */ +__thread int threadReadTotal = 0; +__thread int threadMaxRead = 0; +__thread int threadReadCount = 0; +__thread int threadWriteTotal = 0; +__thread int threadWriteCount = 0; +__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms } /* @@ -182,6 +197,8 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { * it in */ void AsynchIO::readable(DispatchHandle& h) { + int readTotal = 0; + AbsTime readStartTime = AbsTime::now(); do { // (Try to) get a buffer if (!bufferQueue.empty()) { @@ -193,11 +210,20 @@ void AsynchIO::readable(DispatchHandle& h) { int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount); if (rc > 0) { buff->dataCount += rc; + threadReadTotal += rc; + readTotal += rc; + readCallback(*this, buff); if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading - return; + break; + } + + // Stop reading if we've overrun our timeslot + if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) { + break; } + } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); @@ -206,11 +232,11 @@ void AsynchIO::readable(DispatchHandle& h) { if (rc == 0 || errno == ECONNRESET) { eofCallback(*this); h.unwatchRead(); - return; + break; } else if (errno == EAGAIN) { // We have just put a buffer back so we know // we can carry on watching for reads - return; + break; } else { QPID_POSIX_CHECK(rc); } @@ -223,17 +249,22 @@ void AsynchIO::readable(DispatchHandle& h) { // If we still have no buffers we can't do anything more if (bufferQueue.empty()) { h.unwatchRead(); - return; + break; } } } while (true); + + ++threadReadCount; + threadMaxRead = std::max(threadMaxRead, readTotal); + return; } /* * We carry on writing whilst we have data to write and we can write */ void AsynchIO::writeable(DispatchHandle& h) { + int writeTotal = 0; do { // See if we've got something to write if (!writeQueue.empty()) { @@ -244,16 +275,24 @@ void AsynchIO::writeable(DispatchHandle& h) { assert(buff->dataStart+buff->dataCount <= buff->byteCount); int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { + threadWriteTotal += rc; + writeTotal += rc; + // If we didn't write full buffer put rest back if (rc != buff->dataCount) { buff->dataStart += rc; buff->dataCount -= rc; writeQueue.push_back(buff); - return; + break; } // Recycle the buffer queueReadBuffer(buff); + + // If we've already written more than the max for reading then stop + // (this is to stop writes dominating reads) + if (writeTotal > threadMaxRead) + break; } else { // Put buffer back writeQueue.push_back(buff); @@ -261,11 +300,11 @@ void AsynchIO::writeable(DispatchHandle& h) { // Just stop watching for write here - we'll get a // disconnect callback soon enough h.unwatchWrite(); - return; + break; } else if (errno == EAGAIN) { // We have just put a buffer back so we know // we can carry on watching for writes - return; + break; } else { QPID_POSIX_CHECK(rc); } @@ -274,7 +313,7 @@ void AsynchIO::writeable(DispatchHandle& h) { // If we're waiting to close the socket then can do it now as there is nothing to write if (queuedClose) { close(h); - return; + break; } // Fd is writable, but nothing to write if (idleCallback) { @@ -284,15 +323,19 @@ void AsynchIO::writeable(DispatchHandle& h) { // If we still have no buffers to write we can't do anything more if (writeQueue.empty() && !writePending && !queuedClose) { h.unwatchWrite(); - //the following handles the case where writePending is - //set to true after the test above; in this case its - //possible that the unwatchWrite overwrites the - //desired rewatchWrite so we correct that here - if (writePending) h.rewatchWrite(); - return; + // The following handles the case where writePending is + // set to true after the test above; in this case its + // possible that the unwatchWrite overwrites the + // desired rewatchWrite so we correct that here + if (writePending) + h.rewatchWrite(); + break; } } } while (true); + + ++threadWriteCount; + return; } void AsynchIO::disconnected(DispatchHandle& h) { diff --git a/cpp/src/qpid/sys/posix/Time.cpp b/cpp/src/qpid/sys/posix/Time.cpp index 2228caea58..ac63a3d038 100644 --- a/cpp/src/qpid/sys/posix/Time.cpp +++ b/cpp/src/qpid/sys/posix/Time.cpp @@ -31,7 +31,7 @@ namespace sys { AbsTime AbsTime::now() { struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); + ::clock_gettime(CLOCK_REALTIME, &ts); AbsTime time_now; time_now.time_ns = toTime(ts).nanosecs; return time_now; |