diff options
author | Alan Conway <aconway@apache.org> | 2009-02-03 21:28:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-03 21:28:14 +0000 |
commit | 729e9ce65125154cfdd2877abc8f7a901ad7caa2 (patch) | |
tree | 433e563454062f0ab10705cb1913b0de53558168 /cpp/src/qpid/cluster/Connection.cpp | |
parent | 779753f10d0ff1295d1282e367a3973f283ab34f (diff) | |
download | qpid-python-729e9ce65125154cfdd2877abc8f7a901ad7caa2.tar.gz |
Fix for race conditions in cluster join.
- ConnectionDecoder: separated from Connection.
- cluster/PollableQueue: stop processing frames if PollableQueue is stopped.
- move state checks in event-queue handler to frame-queue handler.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740459 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 33 |
1 files changed, 8 insertions, 25 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9016e812be..a71950ef1d 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -75,7 +75,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocal() && !isCatchUp() && cluster.getReadMax()) { + if (isLocalClient()) { + cluster.addLocalConnection(this); + if (cluster.getReadMax()) output.giveReadCredit(cluster.getReadMax()); } } @@ -99,17 +101,15 @@ void Connection::deliverDoOutput(uint32_t requested) { // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); - if (isLocal()) { + if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } - else { // Shadow or updated ex catch-up connection. + else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) { - QPID_LOG(debug, cluster << " inserting connection " << *this); - cluster.insert(boost::intrusive_ptr<Connection>(this)); - } + if (isShadow()) + cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); @@ -136,24 +136,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { return !message.empty(); } -// Decode buffer and put frames on frameq. -void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { - assert(!catchUp); - Buffer buf(e); - // Set read credit on the last frame. - ++readCredit; // One credit per buffer. - if (!mcastDecoder.decode(buf)) return; - AMQFrame frame(mcastDecoder.frame); - while (mcastDecoder.decode(buf)) { - frameq.push(EventFrame(this, e, frame)); - frame = mcastDecoder.frame; - } - frameq.push(EventFrame(this, e, frame, readCredit)); - readCredit = 0; -} - - -// Delivered from cluster. +// Called in delivery thread, in cluster order. void Connection::deliveredFrame(const EventFrame& f) { assert(!catchUp); currentChannel = f.frame.getChannel(); |