diff options
author | Alan Conway <aconway@apache.org> | 2009-02-10 21:42:10 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-10 21:42:10 +0000 |
commit | 3029ec8cffc18061dc7bb3a0b6e944e30d7198fa (patch) | |
tree | 9854d670046d1660e7974973833b0a4b80aa643c | |
parent | 83c75e56fe17323a86c65cd7594fdb3be170f834 (diff) | |
download | qpid-python-3029ec8cffc18061dc7bb3a0b6e944e30d7198fa.tar.gz |
Fix cluster flow control bug: hang with large messages (>frame-max) and low --cluster-read-max.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743114 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/UnknownType.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionDecoder.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionMap.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionMap.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Decoder.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Decoder.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/types.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/latencytest.cpp | 3 |
12 files changed, 40 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h b/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h index 1e4aa04bf4..77498871b3 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h +++ b/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h @@ -21,9 +21,9 @@ * under the License. * */ +#include "qpid/sys/IntegerTypes.h" #include <vector> #include <iosfwd> -#include <stdint.h> namespace qpid { namespace amqp_0_10 { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index be04eebc57..d9a5125760 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -103,7 +103,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b "Error delivering frames", poller), connections(*this), - decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections), expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), frameId(0), initialized(false), diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 295705e967..3f331978ca 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -62,14 +62,14 @@ NoOpConnectionOutputHandler Connection::discardHandler; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false) + connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false) { init(); } // Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0), + connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), expectProtocolHeader(isLink) { init(); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index a6e9aa65f9..4923ef2920 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -146,6 +146,8 @@ class Connection : // Encoded queue/exchange replication. void queue(const std::string& encoded); void exchange(const std::string& encoded); + + void giveReadCredit(int credit) { output.giveReadCredit(credit); } private: void init(); @@ -171,7 +173,6 @@ class Connection : framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; - int readCredit; bool expectProtocolHeader; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp index 1500b6a743..cb958758b8 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp @@ -21,28 +21,34 @@ #include "ConnectionDecoder.h" #include "EventFrame.h" +#include "ConnectionMap.h" namespace qpid { namespace cluster { using namespace framing; -ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h), readCredit(0) {} +ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {} -void ConnectionDecoder::decode(const EventHeader& eh, const void* data) { +void ConnectionDecoder::decode(const EventHeader& eh, const void* data, ConnectionMap& map) { assert(eh.getType() == DATA); // Only handle connection data events. const char* cp = static_cast<const char*>(data); Buffer buf(const_cast<char*>(cp), eh.getSize()); - // Set read credit on the last frame in the event. - ++readCredit; // One credit per event = connection read buffer. - if (decoder.decode(buf)) { // Decoded a frame + if (decoder.decode(buf)) { // Decoded a frame AMQFrame frame(decoder.frame); while (decoder.decode(buf)) { handler(EventFrame(eh, frame)); frame = decoder.frame; } - handler(EventFrame(eh, frame, readCredit)); - readCredit = 0; // Reset credit for next event. + handler(EventFrame(eh, frame, 1)); // Set read-credit on the last frame. + } + else { + // We must give 1 unit read credit per event. + // This event does not contain any complete frames so + // we must give read credit directly. + ConnectionPtr connection = map.getLocal(eh.getConnectionId()); + if (connection) + connection->giveReadCredit(1); } } diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h index 5f139b23e9..449387c1cc 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h @@ -30,6 +30,8 @@ namespace cluster { class EventHeader; class EventFrame; +class ConnectionMap; + /** * Decodes delivered connection data Event's as EventFrame's for a * connection replica, local or shadow. Manages state for frame @@ -47,12 +49,11 @@ class ConnectionDecoder /** Takes EventHeader + data rather than Event so that the caller can * pass a pointer to connection data or a CPG buffer directly without copy. */ - void decode(const EventHeader& eh, const void* data); + void decode(const EventHeader& eh, const void* data, ConnectionMap& connections); private: Handler handler; framing::FrameDecoder decoder; - int readCredit; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp index 1a49a4d663..ed2fa94412 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp @@ -62,6 +62,12 @@ ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { return i->second; } +ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) { + if (id.getMember() != cluster.getId()) return 0; + Map::const_iterator i = map.find(id); + return i == map.end() ? 0 : i->second; +} + ConnectionMap::Vector ConnectionMap::values() const { Vector result(map.size()); std::transform(map.begin(), map.end(), result.begin(), diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.h b/qpid/cpp/src/qpid/cluster/ConnectionMap.h index c5437eb84a..f8aa663339 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionMap.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.h @@ -60,6 +60,9 @@ class ConnectionMap { */ ConnectionPtr get(const ConnectionId& id); + /** If ID is a local connection and in the map return it, else return 0 */ + ConnectionPtr getLocal(const ConnectionId& id); + /** Get connections for sending an update. */ Vector values() const; diff --git a/qpid/cpp/src/qpid/cluster/Decoder.cpp b/qpid/cpp/src/qpid/cluster/Decoder.cpp index b2ab7c8d0f..54d0224db1 100644 --- a/qpid/cpp/src/qpid/cluster/Decoder.cpp +++ b/qpid/cpp/src/qpid/cluster/Decoder.cpp @@ -29,12 +29,12 @@ namespace cluster { using namespace framing; -Decoder::Decoder(const Handler& h) : handler(h) {} +Decoder::Decoder(const Handler& h, ConnectionMap& cm) : handler(h), connections(cm) {} void Decoder::decode(const EventHeader& eh, const void* data) { ConnectionId id = eh.getConnectionId(); std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler)); - ptr_map_ptr(ib.first)->decode(eh, data); + ptr_map_ptr(ib.first)->decode(eh, data, connections); } void Decoder::erase(const ConnectionId& c) { diff --git a/qpid/cpp/src/qpid/cluster/Decoder.h b/qpid/cpp/src/qpid/cluster/Decoder.h index dffd6c8f75..50f6afa491 100644 --- a/qpid/cpp/src/qpid/cluster/Decoder.h +++ b/qpid/cpp/src/qpid/cluster/Decoder.h @@ -30,6 +30,7 @@ namespace qpid { namespace cluster { class EventHeader; +class ConnectionMap; /** * Holds a map of ConnectionDecoders. Decodes Events into EventFrames @@ -42,7 +43,7 @@ class Decoder public: typedef boost::function<void(const EventFrame&)> Handler; - Decoder(const Handler& h); + Decoder(const Handler& h, ConnectionMap&); /** Takes EventHeader + data rather than Event so that the caller can * pass a pointer to connection data or a CPG buffer directly without copy. @@ -56,7 +57,9 @@ class Decoder typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map; Handler handler; Map map; + ConnectionMap& connections; }; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_DECODER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index 5e0d3d20e3..d1d6fdc427 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -24,11 +24,12 @@ #include "config.h" #include "qpid/Url.h" +#include "qpid/sys/IntegerTypes.h" #include <boost/intrusive_ptr.hpp> #include <utility> #include <iosfwd> #include <string> -#include <stdint.h> + extern "C" { #if defined (HAVE_OPENAIS_CPG_H) diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp index 29bafb1a68..81ac610001 100644 --- a/qpid/cpp/src/tests/latencytest.cpp +++ b/qpid/cpp/src/tests/latencytest.cpp @@ -223,6 +223,7 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0 if (msgCount) { std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl; session.queuePurge(arg::queue=queue); + session.sync(); } SubscriptionSettings settings; if (opts.prefetch) { @@ -245,10 +246,8 @@ void Receiver::received(Message& msg) { ++count; uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); - //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add support for uint64_t as a field table type uint64_t receivedAt = current_time(); - //std::cerr << "Latency: " << (receivedAt - sentAt) << std::endl; stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC); if (!opts.rate && count >= opts.count) { |