summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-10-06 20:38:09 +0000
committerAlan Conway <aconway@apache.org>2011-10-06 20:38:09 +0000
commitb39d5c14e2c78d9d924b7abf895e92a0adb54dff (patch)
treef3f2c0aebd82b5f237e2f906fe7541e959a31f4e
parent7dec096a62550ac85415564a58dba25d0627011b (diff)
downloadqpid-python-b39d5c14e2c78d9d924b7abf895e92a0adb54dff.tar.gz
QPID-2920: Configurable connection output prefetch.
Allow connections to collect more output than they can write immediately. Improves performance in a cluster: while a broker has the consume lock connections can collect extra output data to keep them busy while waiting for the consume lock to return. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1179839 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp12
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp7
7 files changed, 32 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index bf2e7d5713..f15a06a4ff 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -30,9 +30,10 @@ namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
+Connection::Connection(
+ sys::OutputControl& o, const std::string& id, bool _isClient, size_t prefetch_)
: pushClosed(false), popClosed(false), output(o), identifier(id), initialized(false),
- isClient(_isClient), buffered(0), version(0,10)
+ isClient(_isClient), buffered(0), version(0,10), prefetch(prefetch_)
{}
void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
@@ -90,6 +91,7 @@ size_t Connection::encode(const char* buffer, size_t size) {
}
size_t frameSize=0;
size_t encoded=0;
+ // Encode as much as possible into the IO buffer
while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize()) <= out.available())) {
workQueue.front().encode(out);
QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front());
@@ -108,6 +110,12 @@ size_t Connection::encode(const char* buffer, size_t size) {
workQueue.clear();
if (frameQueue.empty() && pushClosed)
popClosed = true;
+ // Prefetch frames to be encoded on the next call.
+ bool more = true;
+ while (buffered < prefetch && more) {
+ Mutex::ScopedUnlock u(frameQueueLock);
+ more = connection->doOutput();
+ }
}
return out.getPosition();
}
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
index 995d824796..39c30eb207 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
@@ -56,9 +56,11 @@ class Connection : public sys::ConnectionCodec,
bool isClient;
size_t buffered;
framing::ProtocolVersion version;
-
+ size_t prefetch;
+
public:
- QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, bool isClient);
+ QPID_BROKER_EXTERN Connection(
+ sys::OutputControl&, const std::string& id, bool isClient, size_t prefetch);
QPID_BROKER_EXTERN void setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 1004fa2bcd..8c8c9f752d 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -123,7 +123,8 @@ Broker::Options::Options(const std::string& name) :
qmf1Support(true),
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
- queueThresholdEventRatio(80)
+ queueThresholdEventRatio(80),
+ outputPrefetch(0)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -159,7 +160,9 @@ Broker::Options::Options(const std::string& name) :
("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
- ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised");
+ ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
+ // FIXME aconway 2011-10-06: in or out? Needs a bettter name & description.
+ ("output-prefetch", optValue(outputPrefetch, "BYTES"), "Experimental: Pre fetch limit for connection output in bytes");
}
const std::string empty;
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 76d049df75..18be8c1fec 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -122,6 +122,7 @@ public:
uint queueFlowStopRatio; // producer flow control: on
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
+ size_t outputPrefetch;
private:
std::string getHome();
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
index 9e0020812b..1428e04b34 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -46,7 +46,8 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std:
return 0;
}
if (v == ProtocolVersion(0, 10)) {
- ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
+ ConnectionPtr c(
+ new amqp_0_10::Connection(out, id, false, broker.getOptions().outputPrefetch));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false)));
return c.release();
}
@@ -57,10 +58,10 @@ sys::ConnectionCodec*
ConnectionFactory::create(sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
// used to create connections from one broker to another
- ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
+ ConnectionPtr c(
+ new amqp_0_10::Connection(out, id, true, broker.getOptions().outputPrefetch));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, true)));
return c.release();
}
-
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
index 754b443c22..383a29479e 100644
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -48,7 +48,8 @@ SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, cons
}
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, false));
+ CodecPtr c(new amqp_0_10::Connection(
+ out, id, false, broker.getOptions().outputPrefetch));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
@@ -63,7 +64,8 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
// used to create connections from one broker to another
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, true));
+ CodecPtr c(
+ new amqp_0_10::Connection(out, id, true, broker.getOptions().outputPrefetch));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true ));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
index d0ba8abfb3..5c9c07f106 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -50,7 +50,7 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out,
if (v == ProtocolVersion(0, 10))
return new ConnectionCodec(v, out, id, cluster, false, false, external);
else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
- return new ConnectionCodec(v, out, id, cluster, true, false, external);
+ return new ConnectionCodec(v, out, id, cluster, true, false, external);
return 0;
}
@@ -63,8 +63,9 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& log
ConnectionCodec::ConnectionCodec(
const ProtocolVersion& v, sys::OutputControl& out,
- const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, const qpid::sys::SecuritySettings& external
-) : codec(out, logId, isLink),
+ const std::string& logId, Cluster& cluster, bool catchUp,
+ bool isLink, const qpid::sys::SecuritySettings& external
+) : codec(out, logId, isLink, cluster.getBroker().getOptions().outputPrefetch),
interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, external))
{
cluster.addLocalConnection(interceptor);