summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-10 21:42:10 +0000
committerAlan Conway <aconway@apache.org>2009-02-10 21:42:10 +0000
commit3029ec8cffc18061dc7bb3a0b6e944e30d7198fa (patch)
tree9854d670046d1660e7974973833b0a4b80aa643c
parent83c75e56fe17323a86c65cd7594fdb3be170f834 (diff)
downloadqpid-python-3029ec8cffc18061dc7bb3a0b6e944e30d7198fa.tar.gz
Fix cluster flow control bug: hang with large messages (>frame-max) and low --cluster-read-max.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743114 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/UnknownType.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp20
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionDecoder.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionMap.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionMap.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/Decoder.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Decoder.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h3
-rw-r--r--qpid/cpp/src/tests/latencytest.cpp3
12 files changed, 40 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h b/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h
index 1e4aa04bf4..77498871b3 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h
@@ -21,9 +21,9 @@
* under the License.
*
*/
+#include "qpid/sys/IntegerTypes.h"
#include <vector>
#include <iosfwd>
-#include <stdint.h>
namespace qpid {
namespace amqp_0_10 {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index be04eebc57..d9a5125760 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -103,7 +103,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
"Error delivering frames",
poller),
connections(*this),
- decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
frameId(0),
initialized(false),
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 295705e967..3f331978ca 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -62,14 +62,14 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false)
+ connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0),
+ connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
expectProtocolHeader(isLink)
{ init(); }
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index a6e9aa65f9..4923ef2920 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -146,6 +146,8 @@ class Connection :
// Encoded queue/exchange replication.
void queue(const std::string& encoded);
void exchange(const std::string& encoded);
+
+ void giveReadCredit(int credit) { output.giveReadCredit(credit); }
private:
void init();
@@ -171,7 +173,6 @@ class Connection :
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
- int readCredit;
bool expectProtocolHeader;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
index 1500b6a743..cb958758b8 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
@@ -21,28 +21,34 @@
#include "ConnectionDecoder.h"
#include "EventFrame.h"
+#include "ConnectionMap.h"
namespace qpid {
namespace cluster {
using namespace framing;
-ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h), readCredit(0) {}
+ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {}
-void ConnectionDecoder::decode(const EventHeader& eh, const void* data) {
+void ConnectionDecoder::decode(const EventHeader& eh, const void* data, ConnectionMap& map) {
assert(eh.getType() == DATA); // Only handle connection data events.
const char* cp = static_cast<const char*>(data);
Buffer buf(const_cast<char*>(cp), eh.getSize());
- // Set read credit on the last frame in the event.
- ++readCredit; // One credit per event = connection read buffer.
- if (decoder.decode(buf)) { // Decoded a frame
+ if (decoder.decode(buf)) { // Decoded a frame
AMQFrame frame(decoder.frame);
while (decoder.decode(buf)) {
handler(EventFrame(eh, frame));
frame = decoder.frame;
}
- handler(EventFrame(eh, frame, readCredit));
- readCredit = 0; // Reset credit for next event.
+ handler(EventFrame(eh, frame, 1)); // Set read-credit on the last frame.
+ }
+ else {
+ // We must give 1 unit read credit per event.
+ // This event does not contain any complete frames so
+ // we must give read credit directly.
+ ConnectionPtr connection = map.getLocal(eh.getConnectionId());
+ if (connection)
+ connection->giveReadCredit(1);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
index 5f139b23e9..449387c1cc 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
+++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
@@ -30,6 +30,8 @@ namespace cluster {
class EventHeader;
class EventFrame;
+class ConnectionMap;
+
/**
* Decodes delivered connection data Event's as EventFrame's for a
* connection replica, local or shadow. Manages state for frame
@@ -47,12 +49,11 @@ class ConnectionDecoder
/** Takes EventHeader + data rather than Event so that the caller can
* pass a pointer to connection data or a CPG buffer directly without copy.
*/
- void decode(const EventHeader& eh, const void* data);
+ void decode(const EventHeader& eh, const void* data, ConnectionMap& connections);
private:
Handler handler;
framing::FrameDecoder decoder;
- int readCredit;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
index 1a49a4d663..ed2fa94412 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
@@ -62,6 +62,12 @@ ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) {
return i->second;
}
+ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) {
+ if (id.getMember() != cluster.getId()) return 0;
+ Map::const_iterator i = map.find(id);
+ return i == map.end() ? 0 : i->second;
+}
+
ConnectionMap::Vector ConnectionMap::values() const {
Vector result(map.size());
std::transform(map.begin(), map.end(), result.begin(),
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.h b/qpid/cpp/src/qpid/cluster/ConnectionMap.h
index c5437eb84a..f8aa663339 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionMap.h
+++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.h
@@ -60,6 +60,9 @@ class ConnectionMap {
*/
ConnectionPtr get(const ConnectionId& id);
+ /** If ID is a local connection and in the map return it, else return 0 */
+ ConnectionPtr getLocal(const ConnectionId& id);
+
/** Get connections for sending an update. */
Vector values() const;
diff --git a/qpid/cpp/src/qpid/cluster/Decoder.cpp b/qpid/cpp/src/qpid/cluster/Decoder.cpp
index b2ab7c8d0f..54d0224db1 100644
--- a/qpid/cpp/src/qpid/cluster/Decoder.cpp
+++ b/qpid/cpp/src/qpid/cluster/Decoder.cpp
@@ -29,12 +29,12 @@ namespace cluster {
using namespace framing;
-Decoder::Decoder(const Handler& h) : handler(h) {}
+Decoder::Decoder(const Handler& h, ConnectionMap& cm) : handler(h), connections(cm) {}
void Decoder::decode(const EventHeader& eh, const void* data) {
ConnectionId id = eh.getConnectionId();
std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler));
- ptr_map_ptr(ib.first)->decode(eh, data);
+ ptr_map_ptr(ib.first)->decode(eh, data, connections);
}
void Decoder::erase(const ConnectionId& c) {
diff --git a/qpid/cpp/src/qpid/cluster/Decoder.h b/qpid/cpp/src/qpid/cluster/Decoder.h
index dffd6c8f75..50f6afa491 100644
--- a/qpid/cpp/src/qpid/cluster/Decoder.h
+++ b/qpid/cpp/src/qpid/cluster/Decoder.h
@@ -30,6 +30,7 @@ namespace qpid {
namespace cluster {
class EventHeader;
+class ConnectionMap;
/**
* Holds a map of ConnectionDecoders. Decodes Events into EventFrames
@@ -42,7 +43,7 @@ class Decoder
public:
typedef boost::function<void(const EventFrame&)> Handler;
- Decoder(const Handler& h);
+ Decoder(const Handler& h, ConnectionMap&);
/** Takes EventHeader + data rather than Event so that the caller can
* pass a pointer to connection data or a CPG buffer directly without copy.
@@ -56,7 +57,9 @@ class Decoder
typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map;
Handler handler;
Map map;
+ ConnectionMap& connections;
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_DECODER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index 5e0d3d20e3..d1d6fdc427 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -24,11 +24,12 @@
#include "config.h"
#include "qpid/Url.h"
+#include "qpid/sys/IntegerTypes.h"
#include <boost/intrusive_ptr.hpp>
#include <utility>
#include <iosfwd>
#include <string>
-#include <stdint.h>
+
extern "C" {
#if defined (HAVE_OPENAIS_CPG_H)
diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp
index 29bafb1a68..81ac610001 100644
--- a/qpid/cpp/src/tests/latencytest.cpp
+++ b/qpid/cpp/src/tests/latencytest.cpp
@@ -223,6 +223,7 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0
if (msgCount) {
std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl;
session.queuePurge(arg::queue=queue);
+ session.sync();
}
SubscriptionSettings settings;
if (opts.prefetch) {
@@ -245,10 +246,8 @@ void Receiver::received(Message& msg)
{
++count;
uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
- //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add support for uint64_t as a field table type
uint64_t receivedAt = current_time();
- //std::cerr << "Latency: " << (receivedAt - sentAt) << std::endl;
stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
if (!opts.rate && count >= opts.count) {