diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-05 16:19:05 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-05 16:19:05 +0000 |
commit | a48c9eddce4cbc56521fca7fd64582d9dafe1d40 (patch) | |
tree | 923b6677edb1754a4a9a89c5ff0b9e2a8cc28439 /cpp/src | |
parent | 18fc5427470dbe4bfd0e9927beb7b0cc5fe01bfc (diff) | |
download | qpid-python-a48c9eddce4cbc56521fca7fd64582d9dafe1d40.tar.gz |
- Added RW lock
- Updated all exchanges to us RW lock
- Updated all registries to us RW lock
- Still need to do (client, channel, message and queues)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553549 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Mutex.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Mutex.h | 84 |
14 files changed, 124 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 7b516414c0..87f363ff3d 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -30,7 +30,7 @@ DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {} DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ - Mutex::ScopedLock l(lock); + RWlock::ScopedWlock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); if (i == queues.end()) { @@ -42,7 +42,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con } bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - Mutex::ScopedLock l(lock); + RWlock::ScopedWlock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); @@ -58,7 +58,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - Mutex::ScopedLock l(lock); + RWlock::ScopedRlock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); int count(0); for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 059f69a9ad..554be295bf 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -35,7 +35,7 @@ namespace broker { typedef std::vector<Queue::shared_ptr> Queues; typedef std::map<string, Queues > Bindings; Bindings bindings; - qpid::sys::Mutex lock; + qpid::sys::RWlock lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 9555fa43a4..732d45dc44 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -38,7 +38,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, bool durable, const FieldTable& args) throw(UnknownExchangeTypeException){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) { Exchange::shared_ptr exchange; @@ -62,7 +62,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c } void ExchangeRegistry::destroy(const string& name){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i != exchanges.end()) { exchanges.erase(i); @@ -70,7 +70,7 @@ void ExchangeRegistry::destroy(const string& name){ } Exchange::shared_ptr ExchangeRegistry::get(const string& name){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) throw ChannelException(404, "Exchange not found:" + name); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index 59fe51691b..5505a4074a 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -35,7 +35,7 @@ namespace broker { class ExchangeRegistry{ typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; ExchangeMap exchanges; - qpid::sys::Mutex lock; + qpid::sys::RWlock lock; public: std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException); diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 79702394ad..ea2327c788 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -29,7 +29,7 @@ FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); // Add if not already present. Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i == bindings.end()) { @@ -41,7 +41,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, } bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i != bindings.end()) { bindings.erase(i); @@ -52,7 +52,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedRlock locker(lock); for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ msg.deliverTo(*i); } diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 4ebd10d3cf..3cbffc6f2f 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -34,7 +34,7 @@ namespace broker { class FanOutExchange : public virtual Exchange { std::vector<Queue::shared_ptr> bindings; - qpid::sys::Mutex lock; + qpid::sys::RWlock lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 36f7330433..955b975ffb 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -44,7 +44,7 @@ HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); std::string what = args->getString("x-match"); if (what != all && what != any) { THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); @@ -61,7 +61,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/ } bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); if (i != bindings.end()) { @@ -74,7 +74,7 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ - Mutex::ScopedLock locker(lock);; + RWlock::ScopedRlock locker(lock);; for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (match(i->first, *args)) msg.deliverTo(i->second); } diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 51aa30f56d..a99cc1c92c 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -37,7 +37,7 @@ class HeadersExchange : public virtual Exchange { typedef std::vector<Binding> Bindings; Bindings bindings; - qpid::sys::Mutex lock; + qpid::sys::RWlock lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index e309594aa9..ef1fb982e1 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -33,7 +33,7 @@ std::pair<Queue::shared_ptr, bool> QueueRegistry::declare(const string& declareName, bool durable, bool autoDelete, const ConnectionToken* owner) { - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); string name = declareName.empty() ? generateName() : declareName; assert(!name.empty()); QueueMap::iterator i = queues.find(name); @@ -47,12 +47,12 @@ QueueRegistry::declare(const string& declareName, bool durable, } void QueueRegistry::destroy(const string& name){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedWlock locker(lock); queues.erase(name); } Queue::shared_ptr QueueRegistry::find(const string& name){ - Mutex::ScopedLock locker(lock); + RWlock::ScopedRlock locker(lock); QueueMap::iterator i = queues.find(name); if (i == queues.end()) { return Queue::shared_ptr(); diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index cec2a11e68..22a89d7825 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -64,7 +64,7 @@ class QueueRegistry{ void destroy(const string& name); template <class Test> void destroyIf(const string& name, Test test) { - qpid::sys::Mutex::ScopedLock locker(lock); + qpid::sys::RWlock::ScopedWlock locker(lock); if (test()) { queues.erase(name); } @@ -88,7 +88,7 @@ class QueueRegistry{ private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; - qpid::sys::Mutex lock; + qpid::sys::RWlock lock; int counter; MessageStore* const store; }; diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 80c207b56a..99c0d381c5 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -120,7 +120,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, const Fiel bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - Monitor::ScopedLock l(lock); + RWlock::ScopedWlock l(lock); TopicPattern routingPattern(routingKey); if (isBound(queue, routingPattern)) { return false; @@ -131,7 +131,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - Monitor::ScopedLock l(lock); + RWlock::ScopedWlock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); Queue::vector& qv(bi->second); if (bi == bindings.end()) return false; @@ -151,7 +151,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) } void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - Monitor::ScopedLock l(lock); + RWlock::ScopedRlock l(lock); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(routingKey)) { Queue::vector& qv(i->second); diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 4b294de5ae..6536a7c4ce 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -74,7 +74,7 @@ class TopicPattern : public Tokens class TopicExchange : public virtual Exchange{ typedef std::map<TopicPattern, Queue::vector> BindingMap; BindingMap bindings; - qpid::sys::Mutex lock; + qpid::sys::RWlock lock; bool isBound(Queue::shared_ptr queue, TopicPattern& pattern); public: diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h index 4eff6078ae..b8545d4449 100644 --- a/cpp/src/qpid/sys/Mutex.h +++ b/cpp/src/qpid/sys/Mutex.h @@ -46,6 +46,26 @@ class ScopedUnlock L& mutex; }; +template <class L> +class ScopedRlock +{ + public: + ScopedRlock(L& l) : mutex(l) { l.rlock(); } + ~ScopedRlock() { mutex.unlock(); } + private: + L& mutex; +}; + +template <class L> +class ScopedWlock +{ + public: + ScopedWlock(L& l) : mutex(l) { l.wlock(); } + ~ScopedWlock() { mutex.unlock(); } + private: + L& mutex; +}; + }} #ifdef USE_APR_PLATFORM diff --git a/cpp/src/qpid/sys/posix/Mutex.h b/cpp/src/qpid/sys/posix/Mutex.h index b278c6b14a..b29219235d 100644 --- a/cpp/src/qpid/sys/posix/Mutex.h +++ b/cpp/src/qpid/sys/posix/Mutex.h @@ -38,29 +38,58 @@ class Mutex : private boost::noncopyable { public: typedef ScopedLock<Mutex> ScopedLock; typedef ScopedUnlock<Mutex> ScopedUnlock; - + inline Mutex(); inline ~Mutex(); - inline void lock(); + inline void lock(); inline void unlock(); - inline void trylock(); + inline void trylock(); + protected: pthread_mutex_t mutex; }; /** + * RW lock. + */ +class RWlock : private boost::noncopyable { + friend class Condition; + +public: + typedef ScopedRlock<RWlock> ScopedRlock; + typedef ScopedWlock<RWlock> ScopedWlock; + + inline RWlock(); + inline ~RWlock(); + inline void wlock(); // will write-lock + inline void rlock(); // will read-lock + inline void unlock(); + inline void trywlock(); // will write-try + inline void tryrlock(); // will read-try + +protected: + pthread_rwlock_t rwlock; +}; + + +/** * Initialise a recursive mutex attr for use in creating mutexes later * (we use pthread_once to make sure it is initialised exactly once) */ namespace { pthread_once_t onceControl = PTHREAD_ONCE_INIT; + pthread_rwlockattr_t rwlockattr; pthread_mutexattr_t mutexattr; void initMutexattr() { pthread_mutexattr_init(&mutexattr); pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); } + + void initRWlockattr() { + pthread_rwlockattr_init(&rwlockattr); + } struct RecursiveMutexattr { RecursiveMutexattr() { @@ -71,8 +100,21 @@ namespace { return &mutexattr; } }; + struct RecursiveRWlockattr { + RecursiveRWlockattr() { + pthread_once(&onceControl, initRWlockattr); + } + + operator const pthread_rwlockattr_t*() const { + return &rwlockattr; + } + }; RecursiveMutexattr recursiveMutexattr; + RecursiveRWlockattr recursiveRWlockattr; + + + } /** @@ -83,9 +125,9 @@ struct PODMutex { typedef ScopedLock<PODMutex> ScopedLock; - inline void lock(); + inline void lock(); inline void unlock(); - inline void trylock(); + inline void trylock(); // Must be public to be a POD: pthread_mutex_t mutex; @@ -96,6 +138,7 @@ struct PODMutex void PODMutex::lock() { QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); } + void PODMutex::unlock() { QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); } @@ -115,6 +158,7 @@ Mutex::~Mutex(){ void Mutex::lock() { QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); } + void Mutex::unlock() { QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); } @@ -123,5 +167,35 @@ void Mutex::trylock() { QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); } + +RWlock::RWlock() { + QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr)); +} + +RWlock::~RWlock(){ + QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock)); +} + +void RWlock::wlock() { + QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock)); +} + +void RWlock::rlock() { + QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock)); +} + +void RWlock::unlock() { + QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock)); +} + +void RWlock::trywlock() { + QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock)); +} + +void RWlock::tryrlock() { + QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock)); +} + + }} #endif /*!_sys_posix_Mutex_h*/ |