diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 33 |
1 files changed, 8 insertions, 25 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9016e812be..a71950ef1d 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -75,7 +75,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocal() && !isCatchUp() && cluster.getReadMax()) { + if (isLocalClient()) { + cluster.addLocalConnection(this); + if (cluster.getReadMax()) output.giveReadCredit(cluster.getReadMax()); } } @@ -99,17 +101,15 @@ void Connection::deliverDoOutput(uint32_t requested) { // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); - if (isLocal()) { + if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } - else { // Shadow or updated ex catch-up connection. + else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) { - QPID_LOG(debug, cluster << " inserting connection " << *this); - cluster.insert(boost::intrusive_ptr<Connection>(this)); - } + if (isShadow()) + cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); @@ -136,24 +136,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { return !message.empty(); } -// Decode buffer and put frames on frameq. -void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { - assert(!catchUp); - Buffer buf(e); - // Set read credit on the last frame. - ++readCredit; // One credit per buffer. - if (!mcastDecoder.decode(buf)) return; - AMQFrame frame(mcastDecoder.frame); - while (mcastDecoder.decode(buf)) { - frameq.push(EventFrame(this, e, frame)); - frame = mcastDecoder.frame; - } - frameq.push(EventFrame(this, e, frame, readCredit)); - readCredit = 0; -} - - -// Delivered from cluster. +// Called in delivery thread, in cluster order. void Connection::deliveredFrame(const EventFrame& f) { assert(!catchUp); currentChannel = f.frame.getChannel(); |