diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueEvents.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueEvents.cpp | 41 |
1 files changed, 32 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp index 6df869673d..bba054b0b8 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp +++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp @@ -25,25 +25,41 @@ namespace qpid { namespace broker { -QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : - eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) +QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) : + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) { - eventQueue.start(); + if (!sync) eventQueue.start(); } QueueEvents::~QueueEvents() { - eventQueue.stop(); + if (!sync) eventQueue.stop(); } void QueueEvents::enqueued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(ENQUEUE, m)); + if (enabled) { + Event enq(ENQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(enq); + } else { + eventQueue.push(enq); + } + } } void QueueEvents::dequeued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(DEQUEUE, m)); + if (enabled) { + Event deq(DEQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(deq); + } else { + eventQueue.push(Event(DEQUEUE, m)); + } + } } void QueueEvents::registerListener(const std::string& id, const EventListener& listener) @@ -70,15 +86,16 @@ QueueEvents::EventQueue::Batch::const_iterator QueueEvents::handle(const EventQueue::Batch& events) { qpid::sys::Mutex::ScopedLock l(lock); for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) { - for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) - j->second(*i); + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) { + j->second(*i); + } } return events.end(); } void QueueEvents::shutdown() { - if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); + if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); } void QueueEvents::enable() @@ -93,6 +110,12 @@ void QueueEvents::disable() QPID_LOG(debug, "Queue events disabled"); } +bool QueueEvents::isSync() +{ + return sync; +} + + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} |