diff options
author | Alan Conway <aconway@apache.org> | 2012-10-12 18:38:53 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-10-12 18:38:53 +0000 |
commit | d2f7c788bd3cb2910b8ead1da6643b30367d0569 (patch) | |
tree | 8c043a8ce41d251974af43ffc03a35bf172aed4c /cpp | |
parent | d8fa6da3799f8dcf17aa224f46a7c840f0f884d4 (diff) | |
download | qpid-python-d2f7c788bd3cb2910b8ead1da6643b30367d0569.tar.gz |
QPID-4369: HA backup brokers core dump in benchmark test.
Was seeing core dumps with QueueReplicator::queue == 0. Caused by race
conditions when calling QueueReplicator::deactivate. Renamed deactivate to
destroy and call it only when the broker::Queue is destroyed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1397676 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/ha/StatusCheck.cpp | 2 | ||||
-rwxr-xr-x | cpp/src/tests/qpid-cluster-benchmark | 2 |
5 files changed, 26 insertions, 24 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index c9b9664821..48d5b71134 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -277,14 +277,7 @@ void collectQueueReplicators( } } // namespace -void BrokerReplicator::shutdown() { - QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down."); - set<boost::shared_ptr<QueueReplicator> > collect; - broker.getExchanges().eachExchange( - boost::bind(&collectQueueReplicators, _1, boost::ref(collect))); - for_each(collect.begin(), collect.end(), - boost::bind(&QueueReplicator::deactivate, _1)); -} +void BrokerReplicator::shutdown() {} // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { @@ -672,8 +665,6 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( } void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { - boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name)); - if (qr) qr->deactivate(); Queue::shared_ptr queue = broker.getQueues().find(name); if (queue) { // Purge before deleting to ensure that we don't reroute any diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 5b9993bd90..6d30a5c10c 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -47,6 +47,7 @@ namespace ha { using namespace broker; using namespace framing; using namespace std; +using sys::Mutex; const std::string QPID_HA_EVENT_PREFIX("qpid.ha-"); const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); @@ -94,7 +95,8 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver { void requeued(const Message&) {} void consumerAdded( const Consumer& ) {} void consumerRemoved( const Consumer& ) {} - void destroy() { queueReplicator->deactivate(); } + // Queue observer is destroyed when the queue is. + void destroy() { queueReplicator->destroy(); } private: boost::shared_ptr<QueueReplicator> queueReplicator; }; @@ -115,7 +117,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb, // This must be separate from the constructor so we can call shared_from_this. void QueueReplicator::activate() { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( bridgeName, @@ -141,12 +144,14 @@ void QueueReplicator::activate() { queue->addObserver(observer); } -QueueReplicator::~QueueReplicator() { deactivate(); } +QueueReplicator::~QueueReplicator() {} -void QueueReplicator::deactivate() { - QPID_LOG(debug, logPrefix << "Deactivated"); - sys::Mutex::ScopedLock l(lock); - if (bridge) bridge->close(); +void QueueReplicator::destroy() { + // Called from Queue::destroyed() + Mutex::ScopedLock l(lock); + if (!bridge) return; + QPID_LOG(debug, logPrefix << "Destroyed."); + bridge->close(); // Need to drop shared pointers to avoid pointer cycles keeping this in memory. queue.reset(); link.reset(); @@ -156,7 +161,8 @@ void QueueReplicator::deactivate() { // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable settings; @@ -197,7 +203,13 @@ template <class T> T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) { +void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { + boost::shared_ptr<Queue> q; + { + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed + q = queue; + } // Thread safe: only calls thread safe Queue functions. queue->dequeueMessageAt(n); } @@ -218,7 +230,8 @@ void QueueReplicator::route(Deliverable& msg) { try { const std::string& key = msg.getMessage().getRoutingKey(); - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed if (!isEventKey(key)) { msg.deliverTo(queue); // We are on a backup so the queue is not modified except via this. diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index b302162286..5fdc022cb1 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -70,7 +70,6 @@ class QueueReplicator : public broker::Exchange, ~QueueReplicator(); void activate(); // Call after ctor - void deactivate(); // Call before dtor std::string getType() const; bool bind(boost::shared_ptr<broker::Queue @@ -90,6 +89,7 @@ class QueueReplicator : public broker::Exchange, class QueueObserver; void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); + void destroy(); // Called when the queue is destroyed. void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&); HaBroker& haBroker; diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp index e4597a5a45..01fceb7783 100644 --- a/cpp/src/qpid/ha/StatusCheck.cpp +++ b/cpp/src/qpid/ha/StatusCheck.cpp @@ -93,7 +93,7 @@ void StatusCheckThread::run() { QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); } } catch(const exception& error) { - QPID_LOG(warning, "Error checking status of " << url << ": " << error.what()); + QPID_LOG(info, "Checking status of " << url << ": " << error.what()); } delete this; } diff --git a/cpp/src/tests/qpid-cluster-benchmark b/cpp/src/tests/qpid-cluster-benchmark index 610beacebd..3e6b805692 100755 --- a/cpp/src/tests/qpid-cluster-benchmark +++ b/cpp/src/tests/qpid-cluster-benchmark @@ -55,12 +55,10 @@ done shift $(($OPTIND-1)) CONNECTION_OPTIONS="--connection-options {tcp-nodelay:$TCP_NODELAY,reconnect:$RECONNECT,heartbeat:$HEARTBEAT}" -CREATE_OPTIONS="node:{x-declare:{arguments:{'qpid.replicate':all}}}" BROKER=$(echo $BROKERS | sed s/,.*//) run_test() { echo $*; shift; "$@"; echo; echo; echo; } OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $CONNECTION_OPTIONS $NO_DELETE" -OPTS="$OPTS --create-option $CREATE_OPTIONS" run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@" |