summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-13 14:06:48 +0000
committerAlan Conway <aconway@apache.org>2011-04-13 14:06:48 +0000
commit3ff288653b2a0bde0849a0a0438a2bc6b7ad00b2 (patch)
treef4a6282fa2973db3bf632f822b192bd761dfa224
parentf914dd107474b33fd900f9e7008d5db27f3563dc (diff)
downloadqpid-python-3ff288653b2a0bde0849a0a0438a2bc6b7ad00b2.tar.gz
QPID-3202: Clustered brokers shut down with "unknown connection" error.
This error is an assertion recently introduced in r1091097, the bug has probably been there for a while. Two fixes: - Connections that close before they are announced no longer send a deliver-close. - Fixed race in OutputInterceptor that sent a deliver-do-output after deliver-close. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1091790 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp22
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.h10
4 files changed, 27 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index c1304c2b75..f2ea466a9b 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -78,7 +78,7 @@ const std::string shadowPrefix("[shadow]");
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& mgmtId,
const ConnectionId& id, const qpid::sys::SecuritySettings& external)
- : cluster(c), self(id), catchUp(false), output(*this, out),
+ : cluster(c), self(id), catchUp(false), announced(false), output(*this, out),
connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
@@ -90,7 +90,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& mgmtId, MemberId member,
bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
-) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out),
connectionCtor(&output, cluster.getBroker(),
mgmtId,
external,
@@ -255,7 +255,7 @@ void Connection::deliveredFrame(const EventFrame& f) {
}
}
-// A local connection is closed by the network layer.
+// A local connection is closed by the network layer. Called in the connection thread.
void Connection::closed() {
try {
if (isUpdated()) {
@@ -272,8 +272,9 @@ void Connection::closed() {
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.closeOutput();
- cluster.getMulticast().mcastControl(
- ClusterConnectionDeliverCloseBody(), self);
+ if (announced)
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(), self);
}
}
catch (const std::exception& e) {
@@ -384,6 +385,7 @@ void Connection::processInitialFrames(const char*& ptr, size_t size) {
connection->getUserId(),
initialFrames),
getId());
+ announced = true;
initialFrames.clear();
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 8a9891deb6..a4436e84a8 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -250,6 +250,7 @@ class Connection :
Cluster& cluster;
ConnectionId self;
bool catchUp;
+ bool announced;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
ConnectionCtor connectionCtor;
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 13e95d1ec2..4bf03eefa2 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -45,12 +45,11 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
}
void OutputInterceptor::activateOutput() {
- if (parent.isCatchUp()) {
- sys::Mutex::ScopedLock l(lock);
+ sys::Mutex::ScopedLock l(lock);
+ if (parent.isCatchUp())
next->activateOutput();
- }
else
- sendDoOutput(sendMax);
+ sendDoOutput(sendMax, l);
}
void OutputInterceptor::abort() {
@@ -75,23 +74,29 @@ bool OutputInterceptor::doOutput() {
// Send output up to limit, calculate new limit.
void OutputInterceptor::deliverDoOutput(uint32_t limit) {
+ sys::Mutex::ScopedLock l(lock);
sentDoOutput = false;
sendMax = limit;
size_t newLimit = limit;
if (parent.isLocal()) {
- size_t buffered = getBuffered();
+ size_t buffered = next->getBuffered();
if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
newLimit = sendMax*2;
else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
newLimit = (sendMax + sent) / 2;
}
sent = 0;
- while (sent < limit && parent.getBrokerConnection()->doOutput())
+ while (sent < limit) {
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ if (!parent.getBrokerConnection()->doOutput()) break;
+ }
++sent;
- if (sent == limit) sendDoOutput(newLimit);
+ }
+ if (sent == limit) sendDoOutput(newLimit, l);
}
-void OutputInterceptor::sendDoOutput(size_t newLimit) {
+void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
if (parent.isLocal() && !sentDoOutput && !closing) {
sentDoOutput = true;
parent.getCluster().getMulticast().mcastControl(
@@ -100,6 +105,7 @@ void OutputInterceptor::sendDoOutput(size_t newLimit) {
}
}
+// Called in connection thread when local connection closes.
void OutputInterceptor::closeOutput() {
sys::Mutex::ScopedLock l(lock);
closing = true;
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
index 65bd82a4fc..3abf5273a0 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -58,13 +58,13 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
uint32_t getSendMax() const { return sendMax; }
void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; }
-
+
cluster::Connection& parent;
-
+
private:
typedef sys::Mutex::ScopedLock Locker;
- void sendDoOutput(size_t newLimit);
+ void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&);
mutable sys::Mutex lock;
bool closing;