diff options
author | Alan Conway <aconway@apache.org> | 2011-10-06 20:38:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-10-06 20:38:09 +0000 |
commit | b39d5c14e2c78d9d924b7abf895e92a0adb54dff (patch) | |
tree | f3f2c0aebd82b5f237e2f906fe7541e959a31f4e | |
parent | 7dec096a62550ac85415564a58dba25d0627011b (diff) | |
download | qpid-python-b39d5c14e2c78d9d924b7abf895e92a0adb54dff.tar.gz |
QPID-2920: Configurable connection output prefetch.
Allow connections to collect more output than they can write immediately.
Improves performance in a cluster: while a broker has the consume lock
connections can collect extra output data to keep them busy while
waiting for the consume lock to return.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1179839 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionFactory.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp | 7 |
7 files changed, 32 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index bf2e7d5713..f15a06a4ff 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -30,9 +30,10 @@ namespace amqp_0_10 { using sys::Mutex; -Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient) +Connection::Connection( + sys::OutputControl& o, const std::string& id, bool _isClient, size_t prefetch_) : pushClosed(false), popClosed(false), output(o), identifier(id), initialized(false), - isClient(_isClient), buffered(0), version(0,10) + isClient(_isClient), buffered(0), version(0,10), prefetch(prefetch_) {} void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) { @@ -90,6 +91,7 @@ size_t Connection::encode(const char* buffer, size_t size) { } size_t frameSize=0; size_t encoded=0; + // Encode as much as possible into the IO buffer while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize()) <= out.available())) { workQueue.front().encode(out); QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front()); @@ -108,6 +110,12 @@ size_t Connection::encode(const char* buffer, size_t size) { workQueue.clear(); if (frameQueue.empty() && pushClosed) popClosed = true; + // Prefetch frames to be encoded on the next call. + bool more = true; + while (buffered < prefetch && more) { + Mutex::ScopedUnlock u(frameQueueLock); + more = connection->doOutput(); + } } return out.getPosition(); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index 995d824796..39c30eb207 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -56,9 +56,11 @@ class Connection : public sys::ConnectionCodec, bool isClient; size_t buffered; framing::ProtocolVersion version; - + size_t prefetch; + public: - QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, bool isClient); + QPID_BROKER_EXTERN Connection( + sys::OutputControl&, const std::string& id, bool isClient, size_t prefetch); QPID_BROKER_EXTERN void setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c); size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 1004fa2bcd..8c8c9f752d 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -123,7 +123,8 @@ Broker::Options::Options(const std::string& name) : qmf1Support(true), queueFlowStopRatio(80), queueFlowResumeRatio(70), - queueThresholdEventRatio(80) + queueThresholdEventRatio(80), + outputPrefetch(0) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -159,7 +160,9 @@ Broker::Options::Options(const std::string& name) : ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") - ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised"); + ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") + // FIXME aconway 2011-10-06: in or out? Needs a bettter name & description. + ("output-prefetch", optValue(outputPrefetch, "BYTES"), "Experimental: Pre fetch limit for connection output in bytes"); } const std::string empty; diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 76d049df75..18be8c1fec 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -122,6 +122,7 @@ public: uint queueFlowStopRatio; // producer flow control: on uint queueFlowResumeRatio; // producer flow control: off uint16_t queueThresholdEventRatio; + size_t outputPrefetch; private: std::string getHome(); diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index 9e0020812b..1428e04b34 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -46,7 +46,8 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std: return 0; } if (v == ProtocolVersion(0, 10)) { - ConnectionPtr c(new amqp_0_10::Connection(out, id, false)); + ConnectionPtr c( + new amqp_0_10::Connection(out, id, false, broker.getOptions().outputPrefetch)); c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false))); return c.release(); } @@ -57,10 +58,10 @@ sys::ConnectionCodec* ConnectionFactory::create(sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { // used to create connections from one broker to another - ConnectionPtr c(new amqp_0_10::Connection(out, id, true)); + ConnectionPtr c( + new amqp_0_10::Connection(out, id, true, broker.getOptions().outputPrefetch)); c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, true))); return c.release(); } - }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 754b443c22..383a29479e 100644 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -48,7 +48,8 @@ SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, cons } if (v == ProtocolVersion(0, 10)) { SecureConnectionPtr sc(new SecureConnection()); - CodecPtr c(new amqp_0_10::Connection(out, id, false)); + CodecPtr c(new amqp_0_10::Connection( + out, id, false, broker.getOptions().outputPrefetch)); ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false)); i->setSecureConnection(sc.get()); c->setInputHandler(InputPtr(i.release())); @@ -63,7 +64,8 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { // used to create connections from one broker to another SecureConnectionPtr sc(new SecureConnection()); - CodecPtr c(new amqp_0_10::Connection(out, id, true)); + CodecPtr c( + new amqp_0_10::Connection(out, id, true, broker.getOptions().outputPrefetch)); ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true )); i->setSecureConnection(sc.get()); c->setInputHandler(InputPtr(i.release())); diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index d0ba8abfb3..5c9c07f106 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -50,7 +50,7 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, if (v == ProtocolVersion(0, 10)) return new ConnectionCodec(v, out, id, cluster, false, false, external); else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection - return new ConnectionCodec(v, out, id, cluster, true, false, external); + return new ConnectionCodec(v, out, id, cluster, true, false, external); return 0; } @@ -63,8 +63,9 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& log ConnectionCodec::ConnectionCodec( const ProtocolVersion& v, sys::OutputControl& out, - const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, const qpid::sys::SecuritySettings& external -) : codec(out, logId, isLink), + const std::string& logId, Cluster& cluster, bool catchUp, + bool isLink, const qpid::sys::SecuritySettings& external +) : codec(out, logId, isLink, cluster.getBroker().getOptions().outputPrefetch), interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, external)) { cluster.addLocalConnection(interceptor); |