summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueueEvents.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueEvents.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueEvents.cpp41
1 files changed, 32 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
index 6df869673d..bba054b0b8 100644
--- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
@@ -25,25 +25,41 @@
namespace qpid {
namespace broker {
-QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) :
- eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true)
+QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) :
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync)
{
- eventQueue.start();
+ if (!sync) eventQueue.start();
}
QueueEvents::~QueueEvents()
{
- eventQueue.stop();
+ if (!sync) eventQueue.stop();
}
void QueueEvents::enqueued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(ENQUEUE, m));
+ if (enabled) {
+ Event enq(ENQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(enq);
+ } else {
+ eventQueue.push(enq);
+ }
+ }
}
void QueueEvents::dequeued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(DEQUEUE, m));
+ if (enabled) {
+ Event deq(DEQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(deq);
+ } else {
+ eventQueue.push(Event(DEQUEUE, m));
+ }
+ }
}
void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -70,15 +86,16 @@ QueueEvents::EventQueue::Batch::const_iterator
QueueEvents::handle(const EventQueue::Batch& events) {
qpid::sys::Mutex::ScopedLock l(lock);
for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
- for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
- j->second(*i);
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) {
+ j->second(*i);
+ }
}
return events.end();
}
void QueueEvents::shutdown()
{
- if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
+ if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
}
void QueueEvents::enable()
@@ -93,6 +110,12 @@ void QueueEvents::disable()
QPID_LOG(debug, "Queue events disabled");
}
+bool QueueEvents::isSync()
+{
+ return sync;
+}
+
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}