summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp78
1 files changed, 14 insertions, 64 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 3eaab30394..74d23fdabf 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -195,6 +195,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
persistenceId(0),
settings(b ? merge(_settings, b->getOptions()) : _settings),
eventMode(0),
+ observers(name, messageLock),
broker(b),
deleted(false),
barrier(*this),
@@ -988,68 +989,38 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, Sc
{
current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->dequeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.dequeued(msg, lock);
if (autodelete && isEmpty(lock)) autodelete->check(lock);
}
/** updates queue observers when a message has become unavailable for transfer.
* Requires messageLock be held by caller.
*/
-void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->acquired(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
- }
- }
+ observers.acquired(msg, l);
}
/** updates queue observers when a message has become re-available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->requeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.requeued(msg, l);
}
/** updates queue observers when a new consumer has subscribed to this queue.
*/
-void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerAdded(c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
- }
- }
+ observers.consumerAdded(c, l);
}
/** updates queue observers when a consumer has unsubscribed from this queue.
*/
-void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerRemoved(c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
- }
- }
+ observers.consumerRemoved(c, l);
}
@@ -1133,12 +1104,9 @@ void Queue::destroyed()
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
{
- Mutex::ScopedLock lock(messageLock);
- for_each(observers.begin(), observers.end(),
- boost::bind(&QueueObserver::destroy, _1));
- observers.clear();
+ Mutex::ScopedLock l(messageLock);
+ observers.destroy(l);
}
-
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
if (brokerMgmtObject)
@@ -1513,15 +1481,9 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
/** updates queue observers and state when a message has become available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
+void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l)
{
- for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
- try {
- (*i)->enqueued(m);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.enqueued(m, l);
mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
@@ -1538,18 +1500,6 @@ bool Queue::isDeleted() const
return deleted;
}
-void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
-{
- Mutex::ScopedLock lock(messageLock);
- observers.insert(observer);
-}
-
-void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
-{
- Mutex::ScopedLock lock(messageLock);
- observers.erase(observer);
-}
-
void Queue::flush()
{
ScopedUse u(barrier);