summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ConnectionDecoder.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-10 21:42:10 +0000
committerAlan Conway <aconway@apache.org>2009-02-10 21:42:10 +0000
commitf6f6916d3a631240f08f9d9fedf5c3b5f71883aa (patch)
tree425e4f99c4355f15ff317dbe23d32a0ebafca2e5 /cpp/src/qpid/cluster/ConnectionDecoder.cpp
parent4fb52cd93157ba10d82e656ce87051c7867e25f0 (diff)
downloadqpid-python-f6f6916d3a631240f08f9d9fedf5c3b5f71883aa.tar.gz
Fix cluster flow control bug: hang with large messages (>frame-max) and low --cluster-read-max.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@743114 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/ConnectionDecoder.cpp')
-rw-r--r--cpp/src/qpid/cluster/ConnectionDecoder.cpp20
1 files changed, 13 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/cpp/src/qpid/cluster/ConnectionDecoder.cpp
index 1500b6a743..cb958758b8 100644
--- a/cpp/src/qpid/cluster/ConnectionDecoder.cpp
+++ b/cpp/src/qpid/cluster/ConnectionDecoder.cpp
@@ -21,28 +21,34 @@
#include "ConnectionDecoder.h"
#include "EventFrame.h"
+#include "ConnectionMap.h"
namespace qpid {
namespace cluster {
using namespace framing;
-ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h), readCredit(0) {}
+ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {}
-void ConnectionDecoder::decode(const EventHeader& eh, const void* data) {
+void ConnectionDecoder::decode(const EventHeader& eh, const void* data, ConnectionMap& map) {
assert(eh.getType() == DATA); // Only handle connection data events.
const char* cp = static_cast<const char*>(data);
Buffer buf(const_cast<char*>(cp), eh.getSize());
- // Set read credit on the last frame in the event.
- ++readCredit; // One credit per event = connection read buffer.
- if (decoder.decode(buf)) { // Decoded a frame
+ if (decoder.decode(buf)) { // Decoded a frame
AMQFrame frame(decoder.frame);
while (decoder.decode(buf)) {
handler(EventFrame(eh, frame));
frame = decoder.frame;
}
- handler(EventFrame(eh, frame, readCredit));
- readCredit = 0; // Reset credit for next event.
+ handler(EventFrame(eh, frame, 1)); // Set read-credit on the last frame.
+ }
+ else {
+ // We must give 1 unit read credit per event.
+ // This event does not contain any complete frames so
+ // we must give read credit directly.
+ ConnectionPtr connection = map.getLocal(eh.getConnectionId());
+ if (connection)
+ connection->giveReadCredit(1);
}
}