diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 78 |
1 files changed, 14 insertions, 64 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 3eaab30394..74d23fdabf 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -195,6 +195,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, persistenceId(0), settings(b ? merge(_settings, b->getOptions()) : _settings), eventMode(0), + observers(name, messageLock), broker(b), deleted(false), barrier(*this), @@ -988,68 +989,38 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, Sc { current -= QueueDepth(1, msg.getMessageSize()); mgntDeqStats(msg, mgmtObject, brokerMgmtObject); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->dequeued(msg); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); - } - } + observers.dequeued(msg, lock); if (autodelete && isEmpty(lock)) autodelete->check(lock); } /** updates queue observers when a message has become unavailable for transfer. * Requires messageLock be held by caller. */ -void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&) +void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->acquired(msg); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what()); - } - } + observers.acquired(msg, l); } /** updates queue observers when a message has become re-available for transfer * Requires messageLock be held by caller. */ -void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&) +void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->requeued(msg); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); - } - } + observers.requeued(msg, l); } /** updates queue observers when a new consumer has subscribed to this queue. */ -void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerAdded(c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); - } - } + observers.consumerAdded(c, l); } /** updates queue observers when a consumer has unsubscribed from this queue. */ -void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerRemoved(c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); - } - } + observers.consumerRemoved(c, l); } @@ -1133,12 +1104,9 @@ void Queue::destroyed() if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); { - Mutex::ScopedLock lock(messageLock); - for_each(observers.begin(), observers.end(), - boost::bind(&QueueObserver::destroy, _1)); - observers.clear(); + Mutex::ScopedLock l(messageLock); + observers.destroy(l); } - if (mgmtObject != 0) { mgmtObject->resourceDestroy(); if (brokerMgmtObject) @@ -1513,15 +1481,9 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) /** updates queue observers and state when a message has become available for transfer * Requires messageLock be held by caller. */ -void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&) +void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l) { - for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { - try { - (*i)->enqueued(m); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); - } - } + observers.enqueued(m, l); mgntEnqStats(m, mgmtObject, brokerMgmtObject); } @@ -1538,18 +1500,6 @@ bool Queue::isDeleted() const return deleted; } -void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) -{ - Mutex::ScopedLock lock(messageLock); - observers.insert(observer); -} - -void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) -{ - Mutex::ScopedLock lock(messageLock); - observers.erase(observer); -} - void Queue::flush() { ScopedUse u(barrier); |
