summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-03 21:28:14 +0000
committerAlan Conway <aconway@apache.org>2009-02-03 21:28:14 +0000
commit729e9ce65125154cfdd2877abc8f7a901ad7caa2 (patch)
tree433e563454062f0ab10705cb1913b0de53558168 /cpp/src/qpid/cluster/Connection.cpp
parent779753f10d0ff1295d1282e367a3973f283ab34f (diff)
downloadqpid-python-729e9ce65125154cfdd2877abc8f7a901ad7caa2.tar.gz
Fix for race conditions in cluster join.
- ConnectionDecoder: separated from Connection. - cluster/PollableQueue: stop processing frames if PollableQueue is stopped. - move state checks in event-queue handler to frame-queue handler. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740459 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp33
1 files changed, 8 insertions, 25 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9016e812be..a71950ef1d 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -75,7 +75,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocal() && !isCatchUp() && cluster.getReadMax()) {
+ if (isLocalClient()) {
+ cluster.addLocalConnection(this);
+ if (cluster.getReadMax())
output.giveReadCredit(cluster.getReadMax());
}
}
@@ -99,17 +101,15 @@ void Connection::deliverDoOutput(uint32_t requested) {
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
- if (isLocal()) {
+ if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
}
- else { // Shadow or updated ex catch-up connection.
+ else { // Shadow or updated catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
- if (isShadow()) {
- QPID_LOG(debug, cluster << " inserting connection " << *this);
- cluster.insert(boost::intrusive_ptr<Connection>(this));
- }
+ if (isShadow())
+ cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
output.setOutputHandler(discardHandler);
@@ -136,24 +136,7 @@ bool Connection::checkUnsupported(const AMQBody& body) {
return !message.empty();
}
-// Decode buffer and put frames on frameq.
-void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) {
- assert(!catchUp);
- Buffer buf(e);
- // Set read credit on the last frame.
- ++readCredit; // One credit per buffer.
- if (!mcastDecoder.decode(buf)) return;
- AMQFrame frame(mcastDecoder.frame);
- while (mcastDecoder.decode(buf)) {
- frameq.push(EventFrame(this, e, frame));
- frame = mcastDecoder.frame;
- }
- frameq.push(EventFrame(this, e, frame, readCredit));
- readCredit = 0;
-}
-
-
-// Delivered from cluster.
+// Called in delivery thread, in cluster order.
void Connection::deliveredFrame(const EventFrame& f) {
assert(!catchUp);
currentChannel = f.frame.getChannel();