diff options
author | Alan Conway <aconway@apache.org> | 2009-02-17 20:18:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-17 20:18:38 +0000 |
commit | 99d4449d13756dc8cbf403dd61b2b08cda655cb7 (patch) | |
tree | 442d51a588c69c81f00a3901b9778076ddf39db6 | |
parent | 93c5936810bf818cb6f6fcd8bed046135e21bf20 (diff) | |
download | qpid-python-99d4449d13756dc8cbf403dd61b2b08cda655cb7.tar.gz |
Minor fixes.
client/SubscriptionManager: made it thread safe, was causing latencytest to crash with --rate and --time-limit.
cluster/Cluster.cpp: don't call cpg_leave during shutdown. Not required and a problem if shutdown was caused by a cpg error.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@745226 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/client/SubscriptionManager.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SubscriptionManager.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 5 |
3 files changed, 11 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp index 1d1d23056e..b016109ead 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp @@ -41,6 +41,7 @@ SubscriptionManager::SubscriptionManager(const Session& s) Subscription SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { + sys::Mutex::ScopedLock l(lock); std::string name=n.empty() ? q:n; boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener); dispatcher.listen(si); @@ -52,6 +53,7 @@ Subscription SubscriptionManager::subscribe( Subscription SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { + sys::Mutex::ScopedLock l(lock); std::string name=n.empty() ? q:n; boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); lq.queue=si->divert(); @@ -74,13 +76,14 @@ Subscription SubscriptionManager::subscribe( void SubscriptionManager::cancel(const std::string& dest) { + sys::Mutex::ScopedLock l(lock); std::map<std::string, Subscription>::iterator i = subscriptions.find(dest); if (i != subscriptions.end()) { sync(session).messageCancel(dest); dispatcher.cancel(dest); Subscription s = i->second; - if (s.isValid()) subscriptions[dest].impl->cancelDiversion(); - subscriptions.erase(dest); + if (s.isValid()) s.impl->cancelDiversion(); + subscriptions.erase(i); } } @@ -131,6 +134,7 @@ Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout Session SubscriptionManager::getSession() const { return session; } Subscription SubscriptionManager::getSubscription(const std::string& name) const { + sys::Mutex::ScopedLock l(lock); std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name); if (i == subscriptions.end()) throw Exception(QPID_MSG("Subscription not found: " << name)); diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.h b/qpid/cpp/src/qpid/client/SubscriptionManager.h index 89823a11bc..6b45092931 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionManager.h +++ b/qpid/cpp/src/qpid/client/SubscriptionManager.h @@ -95,14 +95,6 @@ namespace client { */ class SubscriptionManager : public sys::Runnable { - typedef sys::Mutex::ScopedLock Lock; - typedef sys::Mutex::ScopedUnlock Unlock; - - qpid::client::Dispatcher dispatcher; - qpid::client::AsyncSession session; - bool autoStop; - SubscriptionSettings defaultSettings; - public: /** Create a new SubscriptionManager associated with a session */ SubscriptionManager(const Session& session); @@ -271,6 +263,11 @@ class SubscriptionManager : public sys::Runnable Session getSession() const; private: + mutable sys::Mutex lock; + qpid::client::Dispatcher dispatcher; + qpid::client::AsyncSession session; + bool autoStop; + SubscriptionSettings defaultSettings; std::map<std::string, Subscription> subscriptions; }; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index f845492dbc..6221b0054c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -193,10 +193,6 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - try { cpg.leave(); } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error leaving process group: " << e.what()); - } connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { @@ -371,7 +367,6 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { // callbacks will be invoked. // void Cluster::brokerShutdown() { - QPID_LOG(notice, *this << " shutting down "); if (state != LEFT) { try { cpg.shutdown(); } catch (const std::exception& e) { |