diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9089ba0c54..5acc474aa1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -21,6 +21,7 @@ #include "Broker.h" #include "Queue.h" +#include "QueueEvents.h" #include "Exchange.h" #include "DeliverableMessage.h" #include "MessageStore.h" @@ -64,8 +65,11 @@ const std::string qpidLastValueQueue("qpid.last_value_queue"); const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); -} +const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const int ENQUEUE_ONLY=1; +const int ENQUEUE_AND_DEQUEUE=2; +} Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -85,7 +89,9 @@ Queue::Queue(const string& _name, bool _autodelete, inLastNodeFailure(false), persistenceId(0), policyExceeded(false), - mgmtObject(0) + mgmtObject(0), + eventMode(0), + eventMgr(0) { if (parent != 0) { @@ -207,6 +213,25 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +{ + Mutex::ScopedLock locker(messageLock); + QPID_LOG(debug, "Attempting to acquire message at " << position); + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position == position) { + message = *i; + if (lastValueQueue) { + clearLVQIndex(*i); + } + messages.erase(i); + QPID_LOG(debug, "Acquired message at " << i->position << " from " << name); + return true; + } + } + QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); + return false; +} + bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "attempting to acquire " << msg.position); @@ -515,13 +540,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ lvq[key] = msg; }else { i->second->setReplacementMessage(msg,this); - qm.payload = i->second; - dequeued(qm); + dequeued(QueuedMessage(qm.queue, i->second, qm.position)); } }else { messages.push_back(qm); listeners.populate(copy); } + if (eventMode && eventMgr) { + eventMgr->enqueued(qm); + } } copy.notify(); } @@ -659,6 +686,9 @@ void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); + if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { + eventMgr->dequeued(msg); + } } @@ -698,6 +728,8 @@ void Queue::configure(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); + eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (mgmtObject != 0) mgmtObject->set_arguments (_settings); } @@ -898,3 +930,10 @@ void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; } + +int Queue::getEventMode() { return eventMode; } + +void Queue::setQueueEventManager(QueueEvents& mgr) +{ + eventMgr = &mgr; +} |