summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp11
-rw-r--r--cpp/src/qpid/management/ManagementObject.h4
2 files changed, 11 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 08af97eb48..becca8dfcf 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -127,15 +127,17 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
} else {
// if no store then mark as enqueued
if (!enqueue(0, msg)){
- push(msg);
- msg->enqueueComplete();
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
}
+ push(msg);
+ msg->enqueueComplete();
}else {
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -153,6 +155,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -170,6 +173,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -359,6 +363,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){
consumerCount++;
if (mgmtObject.get() != 0){
+ Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_consumerCount ();
}
}
@@ -369,6 +374,7 @@ void Queue::cancel(Consumer& c){
consumerCount--;
if(exclusive) exclusive = 0;
if (mgmtObject.get() != 0){
+ Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->dec_consumerCount ();
}
}
@@ -407,6 +413,7 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
+ Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
mgmtObject->dec_msgDepth ();
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 38f72710de..cf2da13b09 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -112,8 +112,8 @@ class ManagementObject
destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
deleted = true;
}
- bool isDeleted (void) { return deleted; }
- sys::Mutex& getLock() { return accessLock; }
+ inline bool isDeleted (void) { return deleted; }
+ inline sys::Mutex& getLock() { return accessLock; }
};
typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap;