summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-12 18:38:53 +0000
committerAlan Conway <aconway@apache.org>2012-10-12 18:38:53 +0000
commitd2f7c788bd3cb2910b8ead1da6643b30367d0569 (patch)
tree8c043a8ce41d251974af43ffc03a35bf172aed4c
parentd8fa6da3799f8dcf17aa224f46a7c840f0f884d4 (diff)
downloadqpid-python-d2f7c788bd3cb2910b8ead1da6643b30367d0569.tar.gz
QPID-4369: HA backup brokers core dump in benchmark test.
Was seeing core dumps with QueueReplicator::queue == 0. Caused by race conditions when calling QueueReplicator::deactivate. Renamed deactivate to destroy and call it only when the broker::Queue is destroyed. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1397676 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp11
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp33
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h2
-rw-r--r--cpp/src/qpid/ha/StatusCheck.cpp2
-rwxr-xr-xcpp/src/tests/qpid-cluster-benchmark2
5 files changed, 26 insertions, 24 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index c9b9664821..48d5b71134 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -277,14 +277,7 @@ void collectQueueReplicators(
}
} // namespace
-void BrokerReplicator::shutdown() {
- QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
- set<boost::shared_ptr<QueueReplicator> > collect;
- broker.getExchanges().eachExchange(
- boost::bind(&collectQueueReplicators, _1, boost::ref(collect)));
- for_each(collect.begin(), collect.end(),
- boost::bind(&QueueReplicator::deactivate, _1));
-}
+void BrokerReplicator::shutdown() {}
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
@@ -672,8 +665,6 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
}
void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
- boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name));
- if (qr) qr->deactivate();
Queue::shared_ptr queue = broker.getQueues().find(name);
if (queue) {
// Purge before deleting to ensure that we don't reroute any
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 5b9993bd90..6d30a5c10c 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -47,6 +47,7 @@ namespace ha {
using namespace broker;
using namespace framing;
using namespace std;
+using sys::Mutex;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
@@ -94,7 +95,8 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver {
void requeued(const Message&) {}
void consumerAdded( const Consumer& ) {}
void consumerRemoved( const Consumer& ) {}
- void destroy() { queueReplicator->deactivate(); }
+ // Queue observer is destroyed when the queue is.
+ void destroy() { queueReplicator->destroy(); }
private:
boost::shared_ptr<QueueReplicator> queueReplicator;
};
@@ -115,7 +117,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
// This must be separate from the constructor so we can call shared_from_this.
void QueueReplicator::activate() {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
bridgeName,
@@ -141,12 +144,14 @@ void QueueReplicator::activate() {
queue->addObserver(observer);
}
-QueueReplicator::~QueueReplicator() { deactivate(); }
+QueueReplicator::~QueueReplicator() {}
-void QueueReplicator::deactivate() {
- QPID_LOG(debug, logPrefix << "Deactivated");
- sys::Mutex::ScopedLock l(lock);
- if (bridge) bridge->close();
+void QueueReplicator::destroy() {
+ // Called from Queue::destroyed()
+ Mutex::ScopedLock l(lock);
+ if (!bridge) return;
+ QPID_LOG(debug, logPrefix << "Destroyed.");
+ bridge->close();
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
link.reset();
@@ -156,7 +161,8 @@ void QueueReplicator::deactivate() {
// Called in a broker connection thread when the bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable settings;
@@ -197,7 +203,13 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
+void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
+ boost::shared_ptr<Queue> q;
+ {
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
+ q = queue;
+ }
// Thread safe: only calls thread safe Queue functions.
queue->dequeueMessageAt(n);
}
@@ -218,7 +230,8 @@ void QueueReplicator::route(Deliverable& msg)
{
try {
const std::string& key = msg.getMessage().getRoutingKey();
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
if (!isEventKey(key)) {
msg.deliverTo(queue);
// We are on a backup so the queue is not modified except via this.
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index b302162286..5fdc022cb1 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -70,7 +70,6 @@ class QueueReplicator : public broker::Exchange,
~QueueReplicator();
void activate(); // Call after ctor
- void deactivate(); // Call before dtor
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue
@@ -90,6 +89,7 @@ class QueueReplicator : public broker::Exchange,
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+ void destroy(); // Called when the queue is destroyed.
void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
HaBroker& haBroker;
diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp
index e4597a5a45..01fceb7783 100644
--- a/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/cpp/src/qpid/ha/StatusCheck.cpp
@@ -93,7 +93,7 @@ void StatusCheckThread::run() {
QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
} catch(const exception& error) {
- QPID_LOG(warning, "Error checking status of " << url << ": " << error.what());
+ QPID_LOG(info, "Checking status of " << url << ": " << error.what());
}
delete this;
}
diff --git a/cpp/src/tests/qpid-cluster-benchmark b/cpp/src/tests/qpid-cluster-benchmark
index 610beacebd..3e6b805692 100755
--- a/cpp/src/tests/qpid-cluster-benchmark
+++ b/cpp/src/tests/qpid-cluster-benchmark
@@ -55,12 +55,10 @@ done
shift $(($OPTIND-1))
CONNECTION_OPTIONS="--connection-options {tcp-nodelay:$TCP_NODELAY,reconnect:$RECONNECT,heartbeat:$HEARTBEAT}"
-CREATE_OPTIONS="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
BROKER=$(echo $BROKERS | sed s/,.*//)
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $CONNECTION_OPTIONS $NO_DELETE"
-OPTS="$OPTS --create-option $CREATE_OPTIONS"
run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@"