summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-12-17 20:08:46 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-12-17 20:08:46 +0000
commita1643ff7395fec6c46d2cd08c5fda7a8d0cfa99a (patch)
tree4a3618d1a23515366f8c3c46220b28026d40fd17 /cpp/src
parentaea3779357db3028a721974d2d39a29d9ded2dd6 (diff)
downloadqpid-python-a1643ff7395fec6c46d2cd08c5fda7a8d0cfa99a.tar.gz
* Limited time allowed for reading on a single connection in a single go to 2ms
* Limit bytes allowed to be written on a connection on a single go to the max ever read * Small performance fix for appending to FrameSets git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@604983 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp2
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp71
-rw-r--r--cpp/src/qpid/sys/posix/Time.cpp2
3 files changed, 59 insertions, 16 deletions
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
index 9d9b7bc8f8..0e1c9da922 100644
--- a/cpp/src/qpid/framing/FrameSet.cpp
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -28,7 +28,7 @@
using namespace qpid::framing;
using namespace boost;
-FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {}
+FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {parts.reserve(4);}
void FrameSet::append(AMQFrame& part)
{
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index e73bbc03ca..f8aaa38cf5 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -20,9 +20,12 @@
*/
#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Time.h"
#include "check.h"
+// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
+// could (should) be promoted to be platform portable
#include <unistd.h>
#include <sys/socket.h>
#include <signal.h>
@@ -42,6 +45,18 @@ void ignoreSigpipe() {
::signal(SIGPIPE, SIG_IGN);
}
+/*
+ * We keep per thread state to avoid locking overhead. The assumption is that
+ * on average all the connections are serviced by all the threads so the state
+ * recorded in each thread is about the same. If this turns out not to be the
+ * case we could rebalance the info occasionally.
+ */
+__thread int threadReadTotal = 0;
+__thread int threadMaxRead = 0;
+__thread int threadReadCount = 0;
+__thread int threadWriteTotal = 0;
+__thread int threadWriteCount = 0;
+__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
}
/*
@@ -182,6 +197,8 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
* it in
*/
void AsynchIO::readable(DispatchHandle& h) {
+ int readTotal = 0;
+ AbsTime readStartTime = AbsTime::now();
do {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
@@ -193,11 +210,20 @@ void AsynchIO::readable(DispatchHandle& h) {
int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
if (rc > 0) {
buff->dataCount += rc;
+ threadReadTotal += rc;
+ readTotal += rc;
+
readCallback(*this, buff);
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
- return;
+ break;
+ }
+
+ // Stop reading if we've overrun our timeslot
+ if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) {
+ break;
}
+
} else {
// Put buffer back (at front so it doesn't interfere with unread buffers)
bufferQueue.push_front(buff);
@@ -206,11 +232,11 @@ void AsynchIO::readable(DispatchHandle& h) {
if (rc == 0 || errno == ECONNRESET) {
eofCallback(*this);
h.unwatchRead();
- return;
+ break;
} else if (errno == EAGAIN) {
// We have just put a buffer back so we know
// we can carry on watching for reads
- return;
+ break;
} else {
QPID_POSIX_CHECK(rc);
}
@@ -223,17 +249,22 @@ void AsynchIO::readable(DispatchHandle& h) {
// If we still have no buffers we can't do anything more
if (bufferQueue.empty()) {
h.unwatchRead();
- return;
+ break;
}
}
} while (true);
+
+ ++threadReadCount;
+ threadMaxRead = std::max(threadMaxRead, readTotal);
+ return;
}
/*
* We carry on writing whilst we have data to write and we can write
*/
void AsynchIO::writeable(DispatchHandle& h) {
+ int writeTotal = 0;
do {
// See if we've got something to write
if (!writeQueue.empty()) {
@@ -244,16 +275,24 @@ void AsynchIO::writeable(DispatchHandle& h) {
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
+ threadWriteTotal += rc;
+ writeTotal += rc;
+
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
buff->dataStart += rc;
buff->dataCount -= rc;
writeQueue.push_back(buff);
- return;
+ break;
}
// Recycle the buffer
queueReadBuffer(buff);
+
+ // If we've already written more than the max for reading then stop
+ // (this is to stop writes dominating reads)
+ if (writeTotal > threadMaxRead)
+ break;
} else {
// Put buffer back
writeQueue.push_back(buff);
@@ -261,11 +300,11 @@ void AsynchIO::writeable(DispatchHandle& h) {
// Just stop watching for write here - we'll get a
// disconnect callback soon enough
h.unwatchWrite();
- return;
+ break;
} else if (errno == EAGAIN) {
// We have just put a buffer back so we know
// we can carry on watching for writes
- return;
+ break;
} else {
QPID_POSIX_CHECK(rc);
}
@@ -274,7 +313,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
// If we're waiting to close the socket then can do it now as there is nothing to write
if (queuedClose) {
close(h);
- return;
+ break;
}
// Fd is writable, but nothing to write
if (idleCallback) {
@@ -284,15 +323,19 @@ void AsynchIO::writeable(DispatchHandle& h) {
// If we still have no buffers to write we can't do anything more
if (writeQueue.empty() && !writePending && !queuedClose) {
h.unwatchWrite();
- //the following handles the case where writePending is
- //set to true after the test above; in this case its
- //possible that the unwatchWrite overwrites the
- //desired rewatchWrite so we correct that here
- if (writePending) h.rewatchWrite();
- return;
+ // The following handles the case where writePending is
+ // set to true after the test above; in this case its
+ // possible that the unwatchWrite overwrites the
+ // desired rewatchWrite so we correct that here
+ if (writePending)
+ h.rewatchWrite();
+ break;
}
}
} while (true);
+
+ ++threadWriteCount;
+ return;
}
void AsynchIO::disconnected(DispatchHandle& h) {
diff --git a/cpp/src/qpid/sys/posix/Time.cpp b/cpp/src/qpid/sys/posix/Time.cpp
index 2228caea58..ac63a3d038 100644
--- a/cpp/src/qpid/sys/posix/Time.cpp
+++ b/cpp/src/qpid/sys/posix/Time.cpp
@@ -31,7 +31,7 @@ namespace sys {
AbsTime AbsTime::now() {
struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
+ ::clock_gettime(CLOCK_REALTIME, &ts);
AbsTime time_now;
time_now.time_ns = toTime(ts).nanosecs;
return time_now;