summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp145
1 files changed, 82 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 969d510e26..0e822d3d4a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -240,7 +240,7 @@ void Queue::requeue(const QueuedMessage& msg){
}
mgntDeqStats(msg.payload);
} else {
- messages->reinsert(msg);
+ messages->release(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -306,7 +306,7 @@ void Queue::notifyListener()
bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
- checkNotDeleted();
+ checkNotDeleted(c);
if (c->preAcquires()) {
switch (consumeNextMessage(m, c)) {
case CONSUMED:
@@ -327,48 +327,43 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
while (true) {
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
+ if (allocator->nextConsumableMessage(c, msg)) {
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->setPosition(msg.position);
+ dequeue(0, msg);
+ if (mgmtObject) {
+ mgmtObject->inc_discardsTtl();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl();
+ }
- if (!allocator->nextConsumableMessage(c, msg)) { // no next available
- QPID_LOG(debug, "No messages available to dispatch to consumer " <<
- c->getName() << " on queue '" << name << "'");
- listeners.addListener(c);
- return NO_MESSAGES;
- }
-
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->setPosition(msg.position);
- acquire( msg.position, msg, locker);
- dequeue( 0, msg );
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl();
+ continue;
}
- continue;
- }
- // a message is available for this consumer - can the consumer use it?
-
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
- (void) ok; assert(ok);
- ok = acquire( msg.position, msg, locker);
- (void) ok; assert(ok);
- m = msg;
- c->setPosition(m.position);
- return CONSUMED;
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
+ (void) ok; assert(ok);
+ observeAcquire(msg, locker);
+ m = msg;
+ return CONSUMED;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ messages->release(msg);
+ return CANT_CONSUME;
+ }
} else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ messages->release(msg);
return CANT_CONSUME;
}
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- c->setPosition(msg.position);
- return CANT_CONSUME;
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ listeners.addListener(c);
+ return NO_MESSAGES;
}
}
}
@@ -431,7 +426,6 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
-
Mutex::ScopedLock locker(messageLock);
if (messages->find(pos, msg))
return true;
@@ -493,7 +487,7 @@ void Queue::cancel(Consumer::shared_ptr c){
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->pop(msg))
+ if (messages->consume(msg))
observeAcquire(msg, locker);
return msg;
}
@@ -687,6 +681,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
// Update observers and message state:
observeAcquire(*qmsg, locker);
dequeue(0, *qmsg);
+ QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
// now reroute if necessary
if (dest.get()) {
assert(qmsg->payload);
@@ -718,24 +713,11 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
return c.matches.size();
}
-/** Acquire the front (oldest) message from the in-memory queue.
- * assumes messageLock held by caller
- */
-void Queue::pop(const Mutex::ScopedLock& locker)
-{
- assertClusterSafe();
- QueuedMessage msg;
- if (messages->pop(msg)) {
- observeAcquire(msg, locker);
- ++dequeueSincePurge;
- }
-}
-
/** Acquire the message at the given position, return true and msg if acquire succeeds */
bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
const Mutex::ScopedLock& locker)
{
- if (messages->remove(position, msg)) {
+ if (messages->acquire(position, msg)) {
observeAcquire(msg, locker);
++dequeueSincePurge;
return true;
@@ -952,12 +934,14 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
* Removes the first (oldest) message from the in-memory delivery queue as well dequeing
* it from the logical (and persistent if applicable) queue
*/
-void Queue::popAndDequeue(const Mutex::ScopedLock& held)
+bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
{
- if (!messages->empty()) {
- QueuedMessage msg = messages->front();
- pop(held);
+ if (messages->consume(msg)) {
+ observeAcquire(msg, locker);
dequeue(0, msg);
+ return true;
+ } else {
+ return false;
}
}
@@ -969,6 +953,7 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
mgntDeqStats(msg.payload);
if (policy.get()) policy->dequeued(msg);
+ messages->deleted(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -1167,8 +1152,9 @@ void Queue::destroyed()
unbind(broker->getExchanges());
{
Mutex::ScopedLock locker(messageLock);
- while(!messages->empty()){
- DeliverableMessage msg(messages->front().payload);
+ QueuedMessage m;
+ while(popAndDequeue(m, locker)) {
+ DeliverableMessage msg(m.payload);
if (alternateExchange.get()) {
if (brokerMgmtObject)
brokerMgmtObject->inc_abandonedViaAlt();
@@ -1177,7 +1163,6 @@ void Queue::destroyed()
if (brokerMgmtObject)
brokerMgmtObject->inc_abandoned();
}
- popAndDequeue(locker);
}
if (alternateExchange.get())
alternateExchange->decAlternateUsers();
@@ -1191,6 +1176,10 @@ void Queue::destroyed()
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observers.clear();
+ }
}
void Queue::notifyDeleted()
@@ -1477,6 +1466,7 @@ void Queue::query(qpid::types::Variant::Map& results) const
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
+ QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
@@ -1549,9 +1539,9 @@ QueueListeners& Queue::getListeners() { return listeners; }
Messages& Queue::getMessages() { return *messages; }
const Messages& Queue::getMessages() const { return *messages; }
-void Queue::checkNotDeleted()
+void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
- if (deleted) {
+ if (deleted && !c->hideDeletedError()) {
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
}
}
@@ -1562,6 +1552,12 @@ void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
observers.insert(observer);
}
+void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ Mutex::ScopedLock locker(messageLock);
+ observers.erase(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);
@@ -1584,7 +1580,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
@@ -1593,6 +1589,29 @@ void Queue::setDequeueSincePurge(uint32_t value) {
dequeueSincePurge = value;
}
+namespace{
+class FindLowest
+{
+ public:
+ FindLowest() : init(false) {}
+ void process(const QueuedMessage& message) {
+ QPID_LOG(debug, "FindLowest processing: " << message.position);
+ if (!init || message.position < lowest) lowest = message.position;
+ init = true;
+ }
+ bool getLowest(qpid::framing::SequenceNumber& result) {
+ if (init) {
+ result = lowest;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ private:
+ bool init;
+ qpid::framing::SequenceNumber lowest;
+};
+}
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}