summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-03-19 20:07:37 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-03-19 20:07:37 +0000
commit841377c0309773dad4db14af13002fff5cc6d236 (patch)
treefc73eadae55f4110af88888d4d79acb988de5c6b
parent59ee66dabcb09c5a64f7338b5d4917f8a2522610 (diff)
downloadqpid-python-841377c0309773dad4db14af13002fff5cc6d236.tar.gz
QPID-3890: revert changes to observer methods interfaces
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3890@1302629 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp48
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h12
2 files changed, 32 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 111c47a861..3bd9233791 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -301,7 +301,7 @@ void Queue::requeue(const QueuedMessage& msg){
{
Mutex::ScopedLock locker(messageLock);
messages->release(msg);
- observeRequeueLH(msg);
+ observeRequeue(msg, locker);
listeners.populate(copy);
}
@@ -424,7 +424,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
Mutex::ScopedLock locker(messageLock);
bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
(void) ok; assert(ok);
- observeAcquireLH(msg);
+ observeAcquire(msg, locker);
}
if (mgmtObject) {
mgmtObject->inc_acquires();
@@ -535,7 +535,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
}
- observeConsumerAddLH(*c);
+ observeConsumerAdd(*c, locker);
}
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
@@ -547,7 +547,7 @@ void Queue::cancel(Consumer::shared_ptr c){
Mutex::ScopedLock locker(messageLock);
consumerCount--;
if(exclusive) exclusive = 0;
- observeConsumerRemoveLH(*c);
+ observeConsumerRemove(*c, locker);
}
if (mgmtObject != 0)
mgmtObject->dec_consumerCount ();
@@ -559,7 +559,7 @@ QueuedMessage Queue::get(){
{
Mutex::ScopedLock locker(messageLock);
ok = messages->consume(msg);
- if (ok) observeAcquireLH(msg);
+ if (ok) observeAcquire(msg, locker);
}
if (ok && mgmtObject) {
@@ -615,7 +615,7 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
// KAG: should be safe to retake lock after the removeIf, since
// no other thread can touch these messages after the removeIf() call
Mutex::ScopedLock locker(messageLock);
- observeAcquireLH(*i);
+ observeAcquire(*i, locker);
}
dequeue( 0, *i );
}
@@ -774,7 +774,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
// KAG: should be safe to retake lock after the removeIf, since
// no other thread can touch these messages after the removeIf call
Mutex::ScopedLock locker(messageLock);
- observeAcquireLH(*qmsg);
+ observeAcquire(*qmsg, locker);
}
dequeue(0, *qmsg);
QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
@@ -814,7 +814,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
qmsg != c.matches.end(); ++qmsg) {
{
Mutex::ScopedLock locker(messageLock);
- observeAcquireLH(*qmsg);
+ observeAcquire(*qmsg, locker);
}
dequeue(0, *qmsg);
// and move to destination Queue.
@@ -832,7 +832,7 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage
{
Mutex::ScopedLock locker(messageLock);
ok = messages->acquire(position, msg);
- if (ok) observeAcquireLH(msg);
+ if (ok) observeAcquire(msg, locker);
}
if (ok) {
if (mgmtObject) {
@@ -856,9 +856,9 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
qm.position = ++sequence;
if (messages->push(qm, removed)) {
dequeueRequired = true;
- observeAcquireLH(removed);
+ observeAcquire(removed, locker);
}
- observeEnqueueLH(qm);
+ observeEnqueue(qm, locker);
if (policy.get()) {
policy->enqueued(qm);
}
@@ -1029,7 +1029,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
if (!ctxt) {
if (policy.get()) policy->dequeued(msg);
messages->deleted(msg);
- observeDequeueLH(msg);
+ observeDequeue(msg, locker);
}
}
@@ -1057,7 +1057,7 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(msg);
messages->deleted(msg);
- observeDequeueLH(msg);
+ observeDequeue(msg, locker);
}
mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
if (mgmtObject != 0) {
@@ -1085,7 +1085,7 @@ bool Queue::popAndDequeue(QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
popped = messages->consume(msg);
- if (popped) observeAcquireLH(msg);
+ if (popped) observeAcquire(msg, locker);
}
if (popped) {
if (mgmtObject) {
@@ -1102,8 +1102,9 @@ bool Queue::popAndDequeue(QueuedMessage& msg)
/**
* Updates policy and management when a message has been dequeued,
+ * Requires messageLock be held by caller.
*/
-void Queue::observeDequeueLH(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1114,9 +1115,10 @@ void Queue::observeDequeueLH(const QueuedMessage& msg)
}
}
-/** updates queue observers when a message has become unavailable for transfer
+/** updates queue observers when a message has become unavailable for transfer.
+ * Requires messageLock be held by caller.
*/
-void Queue::observeAcquireLH(const QueuedMessage& msg)
+void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1128,8 +1130,9 @@ void Queue::observeAcquireLH(const QueuedMessage& msg)
}
/** updates queue observers when a message has become re-available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeRequeueLH(const QueuedMessage& msg)
+void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1142,7 +1145,7 @@ void Queue::observeRequeueLH(const QueuedMessage& msg)
/** updates queue observers when a new consumer has subscribed to this queue.
*/
-void Queue::observeConsumerAddLH( const Consumer& c)
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1155,7 +1158,7 @@ void Queue::observeConsumerAddLH( const Consumer& c)
/** updates queue observers when a consumer has unsubscribed from this queue.
*/
-void Queue::observeConsumerRemoveLH( const Consumer& c)
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1674,8 +1677,9 @@ void Queue::insertSequenceNumbers(const std::string& key)
}
/** updates queue observers and state when a message has become available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeEnqueueLH(const QueuedMessage& m)
+void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1694,7 +1698,7 @@ void Queue::updateEnqueued(const QueuedMessage& m)
{
Mutex::ScopedLock locker(messageLock);
messages->updateAcquired(m);
- observeEnqueueLH(m);
+ observeEnqueue(m, locker);
if (policy.get()) {
policy->recoverEnqueued(payload);
policy->enqueued(m);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 9cdfb9846e..4e24740730 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -144,12 +144,12 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** update queue observers, stats, policy, etc when the messages' state changes.
* messageLock is held by caller */
- void observeEnqueueLH(const QueuedMessage& msg);
- void observeAcquireLH(const QueuedMessage& msg);
- void observeRequeueLH(const QueuedMessage& msg);
- void observeDequeueLH(const QueuedMessage& msg);
- void observeConsumerAddLH( const Consumer& );
- void observeConsumerRemoveLH( const Consumer& );
+ void observeEnqueue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeConsumerAdd( const Consumer&, const qpid::sys::Mutex::ScopedLock&);
+ void observeConsumerRemove( const Consumer&, const qpid::sys::Mutex::ScopedLock&);
bool popAndDequeue(QueuedMessage&);
bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);