summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp45
1 files changed, 27 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index cdee87dfcd..a255acfc1f 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,14 +62,14 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
{ init(); }
void Connection::init() {
@@ -135,17 +135,35 @@ bool Connection::checkUnsupported(const AMQBody& body) {
return !message.empty();
}
+// Decode buffer and put frames on frameq.
+void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) {
+ assert(!catchUp);
+ Buffer buf(e);
+ // Set read credit on the last frame.
+ ++readCredit; // One credit per buffer.
+ if (!mcastDecoder.decode(buf)) return;
+ AMQFrame frame(mcastDecoder.frame);
+ while (mcastDecoder.decode(buf)) {
+ frameq.push(EventFrame(this, getId().getMember(), frame));
+ frame = mcastDecoder.frame;
+ }
+ frameq.push(EventFrame(this, getId().getMember(), frame, readCredit));
+ readCredit = 0;
+}
+
+
// Delivered from cluster.
-void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
- QPID_LATENCY_INIT(f);
+void Connection::deliveredFrame(const EventFrame& f) {
+ QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame);
assert(!catchUp);
- currentChannel = f.getChannel();
- if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
- && !checkUnsupported(*f.getBody())) // Unsupported operation.
+ currentChannel = f.frame.getChannel();
+ if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+ && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(f); // Pass to broker connection.
+ connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
}
+ if (cluster.getReadMax() && f.readCredit)
+ output.giveReadCredit(f.readCredit);
}
// A local connection is closed by the network layer.
@@ -200,15 +218,6 @@ size_t Connection::decode(const char* buffer, size_t size) {
return size;
}
-void Connection::deliverBuffer(Buffer& buf) {
- assert(!catchUp);
- ++deliverSeq;
- while (mcastDecoder.decode(buf))
- delivered(mcastDecoder.frame);
- if (cluster.getReadMax())
- output.giveReadCredit(1);
-}
-
broker::SessionState& Connection::sessionState() {
return *connection.getChannel(currentChannel).getSession();
}