diff options
author | Alan Conway <aconway@apache.org> | 2009-05-26 21:41:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-26 21:41:52 +0000 |
commit | b9ce6e056f3ec10fc0efc710a29e2b9d60657c27 (patch) | |
tree | 7402007454c505975f11f9b889631e5d92db9cf5 /qpid/cpp/src | |
parent | e6e47a46b03da1a275cf6646614139f5d9abf513 (diff) | |
download | qpid-python-b9ce6e056f3ec10fc0efc710a29e2b9d60657c27.tar.gz |
Improved doOutput algorithm.
Simpler & more robust algorithm based on message count rather than byte size.
Self-tuning, removes 2 hard-to-explain cluster options.
Similar or marginally better performance.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@778896 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterSettings.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp | 73 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/OutputInterceptor.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/PollableQueue.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 3 |
8 files changed, 50 insertions, 74 deletions
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index daed608eb8..c2c07c052a 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -70,13 +70,8 @@ struct ClusterOptions : public Options { #if HAVE_LIBCMAN_H ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif - ("cluster-read-max", optValue(settings.readMax,"N"), - "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.") - ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"), - "Experimental: initial estimate for write rate per multicast cycle") - ("cluster-write-min", optValue(settings.writeMin, "Kb"), - "Experimental: minimum estimate for write rate per multicast cycle") - // FIXME aconway 2009-05-20: temporary + ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") + // FIXME aconway 2009-05-20: temporary ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.") ; } diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h index 8ae77cd807..c82be9d227 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterSettings.h +++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h @@ -32,11 +32,11 @@ struct ClusterSettings { std::string name; std::string url; bool quorum; - size_t readMax, writeEstimate, writeMin; + size_t readMax; std::string username, password, mechanism; bool checkErrors; - ClusterSettings() : quorum(false), readMax(10), writeEstimate(1), writeMin(1), + ClusterSettings() : quorum(false), readMax(10), checkErrors(true) // FIXME aconway 2009-05-20: temporary {} diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index cc0af77029..dda4b5435b 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -113,14 +113,6 @@ bool Connection::doOutput() { return output.doOutput(); } -// Delivery of doOutput allows us to run the real connection doOutput() -// which stocks up the write buffers with data. -// -void Connection::deliverDoOutput(uint32_t requested) { - assert(!catchUp); - output.deliverDoOutput(requested); -} - // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); @@ -279,7 +271,7 @@ void Connection::sessionState( QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) { +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { ConnectionId shadowId = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; @@ -287,6 +279,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); connection.setErrorListener(this); + output.setSendMax(sendMax); } void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 969d191bd7..a0be2203e4 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -115,7 +115,7 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); @@ -150,6 +150,8 @@ class Connection : void deliverClose(); + OutputInterceptor& getOutput() { return output; } + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -164,8 +166,7 @@ class Connection : void init(); bool checkUnsupported(const framing::AMQBody& body); - void deliverDoOutput(uint32_t requested); - void sendDoOutput(); + void deliverDoOutput(uint32_t limit) { output.deliverDoOutput(limit); } boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); broker::SessionState& sessionState(); diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index a7ec82128b..f898957351 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -32,17 +32,17 @@ namespace qpid { namespace cluster { using namespace framing; +using namespace std; NoOpConnectionOutputHandler OutputInterceptor::discardHandler; OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) - : parent(p), closing(false), next(&h), sent(), - estimate(p.getCluster().getSettings().writeEstimate*1024), - minimum(p.getCluster().getSettings().writeMin*1024), - moreOutput(), doingOutput() + : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false) {} -LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;) +#if defined QPID_LATENCY_TRACKER +extern sys::LatencyTracker<const AMQBody*> doOutputTracker; +#endif void OutputInterceptor::send(framing::AMQFrame& f) { LATENCY_TRACK(doOutputTracker.finish(f.getBody())); @@ -53,8 +53,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) { sys::Mutex::ScopedLock l(lock); next->send(f); } - if (!parent.isCatchUp()) - sent += f.encodedSize(); } void OutputInterceptor::activateOutput() { @@ -62,11 +60,8 @@ void OutputInterceptor::activateOutput() { sys::Mutex::ScopedLock l(lock); next->activateOutput(); } - else if (!closing) { // Don't send do ouput after output stopped. - QPID_LOG(trace, parent << " activateOutput - sending doOutput"); - moreOutput = true; - sendDoOutput(estimate); - } + else + sendDoOutput(sendMax); } void OutputInterceptor::giveReadCredit(int32_t credit) { @@ -77,43 +72,33 @@ void OutputInterceptor::giveReadCredit(int32_t credit) { // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { return false; } - -// Delivery of doOutput allows us to run the real connection doOutput() -// which tranfers frames to the codec for writing. -// -void OutputInterceptor::deliverDoOutput(size_t requested) { - size_t buf = getBuffered(); - if (parent.isLocal()) { // Adjust estimate for next sendDoOutput - sent = sent > buf ? sent - buf : 0; // Buffered data was not sent. - if (buf > 0) // Wrote to capacity, move estimate towards sent. - estimate = (estimate + sent) /2; - else if (sent >= estimate) // Last estimate was too small, increase it. - estimate *= 2; - if (estimate < minimum) estimate = minimum; - } - // Run the real doOutput() till we have added the requested data - // or there's nothing to output. Record how much we send. - sent = 0; - do { - moreOutput = parent.getBrokerConnection().doOutput(); - } while (sent < requested && moreOutput); - sent += buf; // Include data previously in the buffer +bool OutputInterceptor::doOutput() { return false; } +// Send output up to limit, calculate new limit. +void OutputInterceptor::deliverDoOutput(uint32_t limit) { + sentDoOutput = false; + sendMax = limit; + size_t newLimit = limit; if (parent.isLocal()) { - // Send the next doOutput request - doingOutput = false; - sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer? + size_t buffered = getBuffered(); + if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. + newLimit = sendMax*2; + else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. + newLimit = sent-1; } + sent = 0; + while (sent < limit && parent.getBrokerConnection().doOutput()) + ++sent; + if (sent == limit) sendDoOutput(newLimit); } -// Send a doOutput request if one is not already in flight. -void OutputInterceptor::sendDoOutput(size_t request) { - if (!parent.isLocal() || doingOutput || !moreOutput) return; - doingOutput = true; - parent.getCluster().getMulticast().mcastControl( - ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId()); - QPID_LOG(trace, parent << "Send doOutput request for " << request); +void OutputInterceptor::sendDoOutput(size_t newLimit) { + if (parent.isLocal() && !sentDoOutput && !closing) { + sentDoOutput = true; + parent.getCluster().getMulticast().mcastControl( + ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit), + parent.getId()); + } } void OutputInterceptor::closeOutput() { diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h index ea603dbe06..7d6c718b82 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -49,28 +49,28 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { size_t getBuffered() const; // Delivery point for doOutput requests. - void deliverDoOutput(size_t requested); + void deliverDoOutput(uint32_t limit); // Intercept doOutput requests on Connection. bool doOutput(); void closeOutput(); + uint32_t getSendMax() const { return sendMax; } + void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; } + cluster::Connection& parent; private: typedef sys::Mutex::ScopedLock Locker; - void sendDoOutput(size_t); + void sendDoOutput(size_t newLimit); mutable sys::Mutex lock; bool closing; sys::ConnectionOutputHandler* next; - size_t sent; - size_t estimate; - size_t minimum; - bool moreOutput; - bool doingOutput; static NoOpConnectionOutputHandler discardHandler; + uint32_t sendMax, sent; + bool sentDoOutput; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h index db98faa9da..65f615d8b6 100644 --- a/qpid/cpp/src/qpid/cluster/PollableQueue.h +++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h @@ -41,7 +41,8 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { const boost::shared_ptr<sys::Poller>& poller) : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller), - callback(f), error(err), message(msg) {} + callback(f), error(err), message(msg) + {} typename sys::PollableQueue<T>::Batch::const_iterator handleBatch(const typename sys::PollableQueue<T>::Batch& values) { diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index bb4df8890a..edd83463d2 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -253,7 +253,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda updateConnection->getId().getMember(), updateConnection->getId().getNumber(), bc.getUserId(), - string(fragment.first, fragment.second) + string(fragment.first, fragment.second), + updateConnection->getOutput().getSendMax() ); shadowConnection.close(); QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); |