summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-07-05 16:19:05 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-07-05 16:19:05 +0000
commita48c9eddce4cbc56521fca7fd64582d9dafe1d40 (patch)
tree923b6677edb1754a4a9a89c5ff0b9e2a8cc28439 /cpp/src
parent18fc5427470dbe4bfd0e9927beb7b0cc5fe01bfc (diff)
downloadqpid-python-a48c9eddce4cbc56521fca7fd64582d9dafe1d40.tar.gz
- Added RW lock
- Updated all exchanges to us RW lock - Updated all registries to us RW lock - Still need to do (client, channel, message and queues) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553549 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h2
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h2
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h2
-rw-r--r--cpp/src/qpid/sys/Mutex.h20
-rw-r--r--cpp/src/qpid/sys/posix/Mutex.h84
14 files changed, 124 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 7b516414c0..87f363ff3d 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -30,7 +30,7 @@ DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- Mutex::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if (i == queues.end()) {
@@ -42,7 +42,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
}
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
@@ -58,7 +58,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
+ RWlock::ScopedRlock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
int count(0);
for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 059f69a9ad..554be295bf 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -35,7 +35,7 @@ namespace broker {
typedef std::vector<Queue::shared_ptr> Queues;
typedef std::map<string, Queues > Bindings;
Bindings bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 9555fa43a4..732d45dc44 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -38,7 +38,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
bool durable, const FieldTable& args)
throw(UnknownExchangeTypeException){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end()) {
Exchange::shared_ptr exchange;
@@ -62,7 +62,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
}
void ExchangeRegistry::destroy(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i != exchanges.end()) {
exchanges.erase(i);
@@ -70,7 +70,7 @@ void ExchangeRegistry::destroy(const string& name){
}
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end())
throw ChannelException(404, "Exchange not found:" + name);
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index 59fe51691b..5505a4074a 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -35,7 +35,7 @@ namespace broker {
class ExchangeRegistry{
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
throw(UnknownExchangeTypeException);
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 79702394ad..ea2327c788 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -29,7 +29,7 @@ FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
// Add if not already present.
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i == bindings.end()) {
@@ -41,7 +41,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/,
}
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i != bindings.end()) {
bindings.erase(i);
@@ -52,7 +52,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedRlock locker(lock);
for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
msg.deliverTo(*i);
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 4ebd10d3cf..3cbffc6f2f 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -34,7 +34,7 @@ namespace broker {
class FanOutExchange : public virtual Exchange {
std::vector<Queue::shared_ptr> bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 36f7330433..955b975ffb 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -44,7 +44,7 @@ HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
std::string what = args->getString("x-match");
if (what != all && what != any) {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
@@ -61,7 +61,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/
}
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
if (i != bindings.end()) {
@@ -74,7 +74,7 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);;
+ RWlock::ScopedRlock locker(lock);;
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (match(i->first, *args)) msg.deliverTo(i->second);
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 51aa30f56d..a99cc1c92c 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -37,7 +37,7 @@ class HeadersExchange : public virtual Exchange {
typedef std::vector<Binding> Bindings;
Bindings bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index e309594aa9..ef1fb982e1 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -33,7 +33,7 @@ std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
bool autoDelete, const ConnectionToken* owner)
{
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
@@ -47,12 +47,12 @@ QueueRegistry::declare(const string& declareName, bool durable,
}
void QueueRegistry::destroy(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
queues.erase(name);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedRlock locker(lock);
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
return Queue::shared_ptr();
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index cec2a11e68..22a89d7825 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -64,7 +64,7 @@ class QueueRegistry{
void destroy(const string& name);
template <class Test> void destroyIf(const string& name, Test test)
{
- qpid::sys::Mutex::ScopedLock locker(lock);
+ qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
queues.erase(name);
}
@@ -88,7 +88,7 @@ class QueueRegistry{
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
int counter;
MessageStore* const store;
};
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 80c207b56a..99c0d381c5 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -120,7 +120,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, const Fiel
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
TopicPattern routingPattern(routingKey);
if (isBound(queue, routingPattern)) {
return false;
@@ -131,7 +131,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
Queue::vector& qv(bi->second);
if (bi == bindings.end()) return false;
@@ -151,7 +151,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
+ RWlock::ScopedRlock l(lock);
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(routingKey)) {
Queue::vector& qv(i->second);
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 4b294de5ae..6536a7c4ce 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -74,7 +74,7 @@ class TopicPattern : public Tokens
class TopicExchange : public virtual Exchange{
typedef std::map<TopicPattern, Queue::vector> BindingMap;
BindingMap bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
bool isBound(Queue::shared_ptr queue, TopicPattern& pattern);
public:
diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h
index 4eff6078ae..b8545d4449 100644
--- a/cpp/src/qpid/sys/Mutex.h
+++ b/cpp/src/qpid/sys/Mutex.h
@@ -46,6 +46,26 @@ class ScopedUnlock
L& mutex;
};
+template <class L>
+class ScopedRlock
+{
+ public:
+ ScopedRlock(L& l) : mutex(l) { l.rlock(); }
+ ~ScopedRlock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+template <class L>
+class ScopedWlock
+{
+ public:
+ ScopedWlock(L& l) : mutex(l) { l.wlock(); }
+ ~ScopedWlock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
}}
#ifdef USE_APR_PLATFORM
diff --git a/cpp/src/qpid/sys/posix/Mutex.h b/cpp/src/qpid/sys/posix/Mutex.h
index b278c6b14a..b29219235d 100644
--- a/cpp/src/qpid/sys/posix/Mutex.h
+++ b/cpp/src/qpid/sys/posix/Mutex.h
@@ -38,29 +38,58 @@ class Mutex : private boost::noncopyable {
public:
typedef ScopedLock<Mutex> ScopedLock;
typedef ScopedUnlock<Mutex> ScopedUnlock;
-
+
inline Mutex();
inline ~Mutex();
- inline void lock();
+ inline void lock();
inline void unlock();
- inline void trylock();
+ inline void trylock();
+
protected:
pthread_mutex_t mutex;
};
/**
+ * RW lock.
+ */
+class RWlock : private boost::noncopyable {
+ friend class Condition;
+
+public:
+ typedef ScopedRlock<RWlock> ScopedRlock;
+ typedef ScopedWlock<RWlock> ScopedWlock;
+
+ inline RWlock();
+ inline ~RWlock();
+ inline void wlock(); // will write-lock
+ inline void rlock(); // will read-lock
+ inline void unlock();
+ inline void trywlock(); // will write-try
+ inline void tryrlock(); // will read-try
+
+protected:
+ pthread_rwlock_t rwlock;
+};
+
+
+/**
* Initialise a recursive mutex attr for use in creating mutexes later
* (we use pthread_once to make sure it is initialised exactly once)
*/
namespace {
pthread_once_t onceControl = PTHREAD_ONCE_INIT;
+ pthread_rwlockattr_t rwlockattr;
pthread_mutexattr_t mutexattr;
void initMutexattr() {
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
}
+
+ void initRWlockattr() {
+ pthread_rwlockattr_init(&rwlockattr);
+ }
struct RecursiveMutexattr {
RecursiveMutexattr() {
@@ -71,8 +100,21 @@ namespace {
return &mutexattr;
}
};
+ struct RecursiveRWlockattr {
+ RecursiveRWlockattr() {
+ pthread_once(&onceControl, initRWlockattr);
+ }
+
+ operator const pthread_rwlockattr_t*() const {
+ return &rwlockattr;
+ }
+ };
RecursiveMutexattr recursiveMutexattr;
+ RecursiveRWlockattr recursiveRWlockattr;
+
+
+
}
/**
@@ -83,9 +125,9 @@ struct PODMutex
{
typedef ScopedLock<PODMutex> ScopedLock;
- inline void lock();
+ inline void lock();
inline void unlock();
- inline void trylock();
+ inline void trylock();
// Must be public to be a POD:
pthread_mutex_t mutex;
@@ -96,6 +138,7 @@ struct PODMutex
void PODMutex::lock() {
QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
}
+
void PODMutex::unlock() {
QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
}
@@ -115,6 +158,7 @@ Mutex::~Mutex(){
void Mutex::lock() {
QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
}
+
void Mutex::unlock() {
QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
}
@@ -123,5 +167,35 @@ void Mutex::trylock() {
QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
}
+
+RWlock::RWlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr));
+}
+
+RWlock::~RWlock(){
+ QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock));
+}
+
+void RWlock::wlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock));
+}
+
+void RWlock::rlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock));
+}
+
+void RWlock::unlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock));
+}
+
+void RWlock::trywlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
+}
+
+void RWlock::tryrlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
+}
+
+
}}
#endif /*!_sys_posix_Mutex_h*/