summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp47
1 files changed, 43 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 9089ba0c54..5acc474aa1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -21,6 +21,7 @@
#include "Broker.h"
#include "Queue.h"
+#include "QueueEvents.h"
#include "Exchange.h"
#include "DeliverableMessage.h"
#include "MessageStore.h"
@@ -64,8 +65,11 @@ const std::string qpidLastValueQueue("qpid.last_value_queue");
const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
-}
+const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const int ENQUEUE_ONLY=1;
+const int ENQUEUE_AND_DEQUEUE=2;
+}
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -85,7 +89,9 @@ Queue::Queue(const string& _name, bool _autodelete,
inLastNodeFailure(false),
persistenceId(0),
policyExceeded(false),
- mgmtObject(0)
+ mgmtObject(0),
+ eventMode(0),
+ eventMgr(0)
{
if (parent != 0)
{
@@ -207,6 +213,25 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+{
+ Mutex::ScopedLock locker(messageLock);
+ QPID_LOG(debug, "Attempting to acquire message at " << position);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position == position) {
+ message = *i;
+ if (lastValueQueue) {
+ clearLVQIndex(*i);
+ }
+ messages.erase(i);
+ QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
+ return true;
+ }
+ }
+ QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+ return false;
+}
+
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "attempting to acquire " << msg.position);
@@ -515,13 +540,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
lvq[key] = msg;
}else {
i->second->setReplacementMessage(msg,this);
- qm.payload = i->second;
- dequeued(qm);
+ dequeued(QueuedMessage(qm.queue, i->second, qm.position));
}
}else {
messages.push_back(qm);
listeners.populate(copy);
}
+ if (eventMode && eventMgr) {
+ eventMgr->enqueued(qm);
+ }
}
copy.notify();
}
@@ -659,6 +686,9 @@ void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
+ if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
+ eventMgr->dequeued(msg);
+ }
}
@@ -698,6 +728,8 @@ void Queue::configure(const FieldTable& _settings)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
+ eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
if (mgmtObject != 0)
mgmtObject->set_arguments (_settings);
}
@@ -898,3 +930,10 @@ void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
}
+
+int Queue::getEventMode() { return eventMode; }
+
+void Queue::setQueueEventManager(QueueEvents& mgr)
+{
+ eventMgr = &mgr;
+}