diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 45 |
1 files changed, 27 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index cdee87dfcd..a255acfc1f 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -62,14 +62,14 @@ NoOpConnectionOutputHandler Connection::discardHandler; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), readCredit(0) { init(); } // Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), readCredit(0) { init(); } void Connection::init() { @@ -135,17 +135,35 @@ bool Connection::checkUnsupported(const AMQBody& body) { return !message.empty(); } +// Decode buffer and put frames on frameq. +void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) { + assert(!catchUp); + Buffer buf(e); + // Set read credit on the last frame. + ++readCredit; // One credit per buffer. + if (!mcastDecoder.decode(buf)) return; + AMQFrame frame(mcastDecoder.frame); + while (mcastDecoder.decode(buf)) { + frameq.push(EventFrame(this, getId().getMember(), frame)); + frame = mcastDecoder.frame; + } + frameq.push(EventFrame(this, getId().getMember(), frame, readCredit)); + readCredit = 0; +} + + // Delivered from cluster. -void Connection::delivered(framing::AMQFrame& f) { - QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f); - QPID_LATENCY_INIT(f); +void Connection::deliveredFrame(const EventFrame& f) { + QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame); assert(!catchUp); - currentChannel = f.getChannel(); - if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol. - && !checkUnsupported(*f.getBody())) // Unsupported operation. + currentChannel = f.frame.getChannel(); + if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. + && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - connection.received(f); // Pass to broker connection. + connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection. } + if (cluster.getReadMax() && f.readCredit) + output.giveReadCredit(f.readCredit); } // A local connection is closed by the network layer. @@ -200,15 +218,6 @@ size_t Connection::decode(const char* buffer, size_t size) { return size; } -void Connection::deliverBuffer(Buffer& buf) { - assert(!catchUp); - ++deliverSeq; - while (mcastDecoder.decode(buf)) - delivered(mcastDecoder.frame); - if (cluster.getReadMax()) - output.giveReadCredit(1); -} - broker::SessionState& Connection::sessionState() { return *connection.getChannel(currentChannel).getSession(); } |