summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp667
1 files changed, 438 insertions, 229 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 015957927f..e7305c021d 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -19,8 +19,9 @@
*
*/
-#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
+
+#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Fairshare.h"
@@ -41,6 +42,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
@@ -56,7 +58,9 @@
#include <boost/intrusive_ptr.hpp>
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::management::ManagementAgent;
@@ -88,8 +92,57 @@ const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
+
+inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
+ _qmf::Queue* mgmtObject,
+ _qmf::Broker* brokerMgmtObject)
+{
+ if (mgmtObject != 0) {
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+
+ uint64_t contentSize = msg->contentSize();
+ qStats->msgTotalEnqueues +=1;
+ bStats->msgTotalEnqueues += 1;
+ qStats->byteTotalEnqueues += contentSize;
+ bStats->byteTotalEnqueues += contentSize;
+ if (msg->isPersistent ()) {
+ qStats->msgPersistEnqueues += 1;
+ bStats->msgPersistEnqueues += 1;
+ qStats->bytePersistEnqueues += contentSize;
+ bStats->bytePersistEnqueues += contentSize;
+ }
+ mgmtObject->statisticsUpdated();
+ brokerMgmtObject->statisticsUpdated();
+ }
+}
+
+inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
+ _qmf::Queue* mgmtObject,
+ _qmf::Broker* brokerMgmtObject)
+{
+ if (mgmtObject != 0){
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ uint64_t contentSize = msg->contentSize();
+
+ qStats->msgTotalDequeues += 1;
+ bStats->msgTotalDequeues += 1;
+ qStats->byteTotalDequeues += contentSize;
+ bStats->byteTotalDequeues += contentSize;
+ if (msg->isPersistent ()){
+ qStats->msgPersistDequeues += 1;
+ bStats->msgPersistDequeues += 1;
+ qStats->bytePersistDequeues += contentSize;
+ bStats->bytePersistDequeues += contentSize;
+ }
+ mgmtObject->statisticsUpdated();
+ brokerMgmtObject->statisticsUpdated();
+ }
}
+} // namespace
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
@@ -101,6 +154,7 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
consumerCount(0),
+ browserCount(0),
exclusive(0),
noLocal(false),
persistLastNode(false),
@@ -166,7 +220,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
- alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
+ alternateExchange->route(deliverable);
}
} else if (isLocal(msg)) {
//drop message
@@ -183,11 +237,16 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
{
+ Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->recoverEnqueued(msg);
}
-void Queue::recover(boost::intrusive_ptr<Message>& msg){
- if (policy.get()) policy->recoverEnqueued(msg);
+void Queue::recover(boost::intrusive_ptr<Message>& msg)
+{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->recoverEnqueued(msg);
+ }
push(msg, true);
if (store){
@@ -209,11 +268,16 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject != 0){
- mgmtObject->inc_msgTxnEnqueues ();
- mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ const uint64_t contentSize = msg->contentSize();
+ qStats->msgTxnEnqueues += 1;
+ qStats->byteTxnEnqueues += contentSize;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgTxnEnqueues ();
- brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgTxnEnqueues += 1;
+ bStats->byteTxnEnqueues += contentSize;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -222,7 +286,6 @@ void Queue::requeue(const QueuedMessage& msg){
assertClusterSafe();
QueueListeners::NotificationSet copy;
{
- Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
if (deleted) {
//
@@ -238,10 +301,20 @@ void Queue::requeue(const QueuedMessage& msg){
if (brokerMgmtObject)
brokerMgmtObject->inc_abandoned();
}
- mgntDeqStats(msg.payload);
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
} else {
- messages->release(msg);
- listeners.populate(copy);
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->release(msg);
+ observeRequeue(msg, locker);
+ listeners.populate(copy);
+ }
+
+ if (mgmtObject) {
+ mgmtObject->inc_releases();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_releases();
+ }
// for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
@@ -251,7 +324,6 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
- observeRequeue(msg, locker);
}
}
copy.notify();
@@ -259,10 +331,9 @@ void Queue::requeue(const QueuedMessage& msg){
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
- Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (acquire(position, message, locker)) {
+ if (acquire(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
} else {
@@ -273,17 +344,20 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
{
- Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
-
- if (!allocator->allocate( consumer, msg )) {
+ bool ok;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ ok = allocator->allocate( consumer, msg );
+ }
+ if (!ok) {
QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
return false;
}
QueuedMessage copy(msg);
- if (acquire( msg.position, copy, locker)) {
+ if (acquire( msg.position, copy)) {
QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
return true;
}
@@ -325,59 +399,73 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
while (true) {
- Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
- if (allocator->nextConsumableMessage(c, msg)) {
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->setPosition(msg.position);
- dequeue(0, msg);
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl();
- }
+ bool found;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ found = allocator->nextConsumableMessage(c, msg);
+ if (!found) listeners.addListener(c);
+ }
+ if (!found) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ return NO_MESSAGES;
+ }
- continue;
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->setPosition(msg.position);
+ dequeue(0, msg);
+ if (mgmtObject) {
+ mgmtObject->inc_discardsTtl();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl();
}
+ continue;
+ }
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ {
+ Mutex::ScopedLock locker(messageLock);
bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
(void) ok; assert(ok);
observeAcquire(msg, locker);
- m = msg;
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- messages->release(msg);
- return CANT_CONSUME;
}
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+ m = msg;
+ return CONSUMED;
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- messages->release(msg);
- return CANT_CONSUME;
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
}
} else {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- listeners.addListener(c);
- return NO_MESSAGES;
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
}
+
+ Mutex::ScopedLock locker(messageLock);
+ messages->release(msg);
+ return CANT_CONSUME;
}
}
bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
while (true) {
- Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
-
- if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
+ bool found;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ found = allocator->nextBrowsableMessage(c, msg);
+ if (!found) listeners.addListener(c);
+ }
+ if (!found) { // no next available
QPID_LOG(debug, "No browsable messages available for consumer " <<
c->getName() << " on queue '" << name << "'");
- listeners.addListener(c);
return false;
}
@@ -435,60 +523,67 @@ bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
{
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ Mutex::ScopedLock locker(messageLock);
+ // NOTE: consumerCount is actually a count of all
+ // subscriptions, both acquiring and non-acquiring (browsers).
+ // Check for exclusivity of acquiring consumers.
+ size_t acquiringConsumers = consumerCount - browserCount;
+ if (c->preAcquires()) {
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName()
+ << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(acquiringConsumers) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName()
+ << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
}
+ else
+ browserCount++;
consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
}
+ observeConsumerAdd(*c, locker);
}
- Mutex::ScopedLock locker(messageLock);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerAdded(*c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
- }
- }
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
{
- Mutex::ScopedLock locker(consumerLock);
+ Mutex::ScopedLock locker(messageLock);
consumerCount--;
+ if (!c->preAcquires()) browserCount--;
if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
- }
- Mutex::ScopedLock locker(messageLock);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerRemoved(*c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
- }
+ observeConsumerRemove(*c, locker);
}
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
}
QueuedMessage Queue::get(){
- Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->consume(msg))
- observeAcquire(msg, locker);
+ bool ok;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ ok = messages->consume(msg);
+ if (ok) observeAcquire(msg, locker);
+ }
+
+ if (ok && mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+
return msg;
}
@@ -520,22 +615,26 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- //
- // Report the count of discarded-by-ttl messages
- //
- if (mgmtObject && !expired.empty()) {
- mgmtObject->inc_discardsTtl(expired.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl(expired.size());
- }
+ if (!expired.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(expired.size());
+ mgmtObject->inc_discardsTtl(expired.size());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(expired.size());
+ brokerMgmtObject->inc_discardsTtl(expired.size());
+ }
+ }
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
+ for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+ i != expired.end(); ++i) {
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf() call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
}
- dequeue( 0, *i );
}
}
}
@@ -661,32 +760,46 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
Collector c(*mf.get(), purge_request);
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ }
- if (mgmtObject && !c.matches.empty()) {
- if (dest.get()) {
- mgmtObject->inc_reroutes(c.matches.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_reroutes(c.matches.size());
- } else {
- mgmtObject->inc_discardsPurge(c.matches.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsPurge(c.matches.size());
+ if (!c.matches.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(c.matches.size());
+ if (dest.get()) {
+ mgmtObject->inc_reroutes(c.matches.size());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(c.matches.size());
+ brokerMgmtObject->inc_reroutes(c.matches.size());
+ }
+ } else {
+ mgmtObject->inc_discardsPurge(c.matches.size());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(c.matches.size());
+ brokerMgmtObject->inc_discardsPurge(c.matches.size());
+ }
+ }
}
- }
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
- // Update observers and message state:
- observeAcquire(*qmsg, locker);
- dequeue(0, *qmsg);
- QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
- // now reroute if necessary
- if (dest.get()) {
- assert(qmsg->payload);
- DeliverableMessage dmsg(qmsg->payload);
- dest->routeWithAlternate(dmsg);
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*qmsg, locker);
+ }
+ dequeue(0, *qmsg);
+ QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
+ // now reroute if necessary
+ if (dest.get()) {
+ assert(qmsg->payload);
+ DeliverableMessage dmsg(qmsg->payload);
+ dest->routeWithAlternate(dmsg);
+ }
}
}
return c.matches.size();
@@ -698,27 +811,51 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
Collector c(*mf.get(), qty);
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ }
+
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
+ if (!c.matches.empty()) {
// Update observers and message state:
- observeAcquire(*qmsg, locker);
- dequeue(0, *qmsg);
- // and move to destination Queue.
- assert(qmsg->payload);
- destq->deliver(qmsg->payload);
+
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires(c.matches.size());
+ }
+
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*qmsg, locker);
+ }
+ dequeue(0, *qmsg);
+ // and move to destination Queue.
+ assert(qmsg->payload);
+ destq->deliver(qmsg->payload);
+ }
}
return c.matches.size();
}
/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
- const Mutex::ScopedLock& locker)
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg)
{
- if (messages->acquire(position, msg)) {
- observeAcquire(msg, locker);
+ bool ok;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ ok = messages->acquire(position, msg);
+ if (ok) observeAcquire(msg, locker);
+ }
+ if (ok) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
++dequeueSincePurge;
return true;
}
@@ -728,35 +865,43 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- QueuedMessage removed;
+ QueuedMessage removed, qm(this, msg);
bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
- QueuedMessage qm(this, msg, ++sequence);
- if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
-
- dequeueRequired = messages->push(qm, removed);
- if (dequeueRequired) {
+ qm.position = ++sequence;
+ if (messages->push(qm, removed)) {
+ dequeueRequired = true;
observeAcquire(removed, locker);
- if (mgmtObject) {
- mgmtObject->inc_discardsLvq();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsLvq();
- }
}
- listeners.populate(copy);
observeEnqueue(qm, locker);
+ if (policy.get()) {
+ policy->enqueued(qm);
+ }
+ listeners.populate(copy);
}
- copy.notify();
+ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position);
+
+ mgntEnqStats(msg, mgmtObject, brokerMgmtObject);
+
if (dequeueRequired) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ mgmtObject->inc_discardsLvq();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ brokerMgmtObject->inc_discardsLvq();
+ }
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
+ Mutex::ScopedLock locker(messageLock);
pendingDequeues.push_back(removed);
} else {
dequeue(0, removed);
}
}
+ copy.notify();
}
void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
@@ -767,8 +912,8 @@ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
/** function only provided for unit tests, or code not in critical message path */
uint32_t Queue::getEnqueueCompleteMessageCount() const
{
- Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
+ Mutex::ScopedLock locker(messageLock);
messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
return count;
}
@@ -781,13 +926,13 @@ uint32_t Queue::getMessageCount() const
uint32_t Queue::getConsumerCount() const
{
- Mutex::ScopedLock locker(consumerLock);
+ Mutex::ScopedLock locker(messageLock);
return consumerCount;
}
bool Queue::canAutoDelete() const
{
- Mutex::ScopedLock locker(consumerLock);
+ Mutex::ScopedLock locker(messageLock);
return autodelete && !consumerCount && !owner;
}
@@ -894,14 +1039,20 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
-
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
if (!ctxt) {
+ if (policy.get()) policy->dequeued(msg);
+ messages->deleted(msg);
observeDequeue(msg, locker);
}
}
+
+ if (!ctxt) {
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
+ }
+
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -918,14 +1069,24 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
- Mutex::ScopedLock locker(messageLock);
- observeDequeue(msg, locker);
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->dequeued(msg);
+ messages->deleted(msg);
+ observeDequeue(msg, locker);
+ }
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
if (mgmtObject != 0) {
- mgmtObject->inc_msgTxnDequeues();
- mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ const uint64_t contentSize = msg.payload->contentSize();
+ qStats->msgTxnDequeues += 1;
+ qStats->byteTxnDequeues += contentSize;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgTxnDequeues();
- brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgTxnDequeues += 1;
+ bStats->byteTxnDequeues += contentSize;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -934,10 +1095,20 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
* Removes the first (oldest) message from the in-memory delivery queue as well dequeing
* it from the logical (and persistent if applicable) queue
*/
-bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
+bool Queue::popAndDequeue(QueuedMessage& msg)
{
- if (messages->consume(msg)) {
- observeAcquire(msg, locker);
+ bool popped;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ popped = messages->consume(msg);
+ if (popped) observeAcquire(msg, locker);
+ }
+ if (popped) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
dequeue(0, msg);
return true;
} else {
@@ -947,13 +1118,10 @@ bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
/**
* Updates policy and management when a message has been dequeued,
- * expects messageLock to be held
+ * Requires messageLock be held by caller.
*/
-void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
- mgntDeqStats(msg.payload);
- if (policy.get()) policy->dequeued(msg);
- messages->deleted(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -963,17 +1131,11 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
}
}
-/** updates queue observers when a message has become unavailable for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become unavailable for transfer.
+ * Requires messageLock be held by caller.
*/
-void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
-
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->acquired(msg);
@@ -983,17 +1145,11 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
}
}
-/** updates queue observers when a message has become re-available for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become re-available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
- if (mgmtObject) {
- mgmtObject->inc_releases();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_releases();
- }
-
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->requeued(msg);
@@ -1003,6 +1159,33 @@ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
}
}
+/** updates queue observers when a new consumer has subscribed to this queue.
+ */
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerAdded(c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
+/** updates queue observers when a consumer has unsubscribed from this queue.
+ */
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerRemoved(c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
+
void Queue::create(const FieldTable& _settings)
{
settings = _settings;
@@ -1150,23 +1333,21 @@ void Queue::configureImpl(const FieldTable& _settings)
void Queue::destroyed()
{
unbind(broker->getExchanges());
- {
- Mutex::ScopedLock locker(messageLock);
- QueuedMessage m;
- while(popAndDequeue(m, locker)) {
- DeliverableMessage msg(m.payload);
- if (alternateExchange.get()) {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandonedViaAlt();
- alternateExchange->routeWithAlternate(msg);
- } else {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandoned();
- }
+
+ QueuedMessage m;
+ while(popAndDequeue(m)) {
+ DeliverableMessage msg(m.payload);
+ if (alternateExchange.get()) {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ alternateExchange->routeWithAlternate(msg);
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
}
- if (alternateExchange.get())
- alternateExchange->decAlternateUsers();
}
+ if (alternateExchange.get())
+ alternateExchange->decAlternateUsers();
if (store) {
barrier.destroy();
@@ -1177,7 +1358,7 @@ void Queue::destroyed()
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock lock(messageLock);
observers.clear();
}
}
@@ -1187,8 +1368,8 @@ void Queue::notifyDeleted()
QueueListeners::ListenerSet set;
{
Mutex::ScopedLock locker(messageLock);
- listeners.snapshot(set);
deleted = true;
+ listeners.snapshot(set);
}
set.notifyAll();
}
@@ -1206,6 +1387,7 @@ void Queue::unbind(ExchangeRegistry& exchanges)
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
{
+ Mutex::ScopedLock locker(messageLock);
policy = _policy;
if (policy.get())
policy->setQueue(this);
@@ -1213,6 +1395,7 @@ void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
const QueuePolicy* Queue::getPolicy()
{
+ Mutex::ScopedLock locker(messageLock);
return policy.get();
}
@@ -1302,7 +1485,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask
Queue::shared_ptr queue;
AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
void fire()
{
@@ -1388,11 +1571,15 @@ void Queue::countRejected() const
void Queue::countFlowedToDisk(uint64_t size) const
{
if (mgmtObject) {
- mgmtObject->inc_msgFtdEnqueues();
- mgmtObject->inc_byteFtdEnqueues(size);
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qStats->msgFtdEnqueues += 1;
+ qStats->byteFtdEnqueues += size;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgFtdEnqueues();
- brokerMgmtObject->inc_byteFtdEnqueues(size);
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgFtdEnqueues += 1;
+ bStats->byteFtdEnqueues += size;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -1400,11 +1587,15 @@ void Queue::countFlowedToDisk(uint64_t size) const
void Queue::countLoadedFromDisk(uint64_t size) const
{
if (mgmtObject) {
- mgmtObject->inc_msgFtdDequeues();
- mgmtObject->inc_byteFtdDequeues(size);
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qStats->msgFtdDequeues += 1;
+ qStats->byteFtdDequeues += size;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgFtdDequeues();
- brokerMgmtObject->inc_byteFtdDequeues(size);
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgFtdDequeues += 1;
+ bStats->byteFtdDequeues += size;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -1434,9 +1625,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
{
_qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
boost::shared_ptr<Exchange> dest;
- if (rerouteArgs.i_useAltExchange)
+ if (rerouteArgs.i_useAltExchange) {
+ if (!alternateExchange) {
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ etext = "No alternate-exchange defined";
+ break;
+ }
dest = alternateExchange;
- else {
+ } else {
try {
dest = broker->getExchanges().get(rerouteArgs.i_exchange);
} catch(const std::exception&) {
@@ -1486,8 +1682,12 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
<< "\": exchange does not exist.");
}
//process any pending dequeues
- for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
- pendingDequeues.clear();
+ std::deque<QueuedMessage> pd;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ pendingDequeues.swap(pd);
+ }
+ for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
void Queue::insertSequenceNumbers(const std::string& key)
@@ -1497,10 +1697,10 @@ void Queue::insertSequenceNumbers(const std::string& key)
QPID_LOG(debug, "Inserting sequence numbers as " << key);
}
-/** updates queue observers and state when a message has become available for transfer,
- * expects messageLock to be held
+/** updates queue observers and state when a message has become available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
+void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1509,10 +1709,6 @@ void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
}
- if (policy.get()) {
- policy->enqueued(m);
- }
- mgntEnqStats(m.payload);
}
void Queue::updateEnqueued(const QueuedMessage& m)
@@ -1520,12 +1716,16 @@ void Queue::updateEnqueued(const QueuedMessage& m)
if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
enqueue(0, payload, true);
- messages->updateAcquired(m);
- if (policy.get()) {
- policy->recoverEnqueued(payload);
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->updateAcquired(m);
+ observeEnqueue(m, locker);
+ if (policy.get()) {
+ policy->recoverEnqueued(payload);
+ policy->enqueued(m);
+ }
}
- Mutex::ScopedLock locker(messageLock);
- observeEnqueue(m, locker);
+ mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1533,10 +1733,16 @@ void Queue::updateEnqueued(const QueuedMessage& m)
bool Queue::isEnqueued(const QueuedMessage& msg)
{
+ Mutex::ScopedLock locker(messageLock);
return !policy.get() || policy->isEnqueued(msg);
}
+// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's
+// state is not changed while listeners is referenced.
QueueListeners& Queue::getListeners() { return listeners; }
+
+// Note: accessing messages outside of lock is dangerous. Caller must ensure the queue's
+// state is not changed while messages is referenced.
Messages& Queue::getMessages() { return *messages; }
const Messages& Queue::getMessages() const { return *messages; }
@@ -1549,13 +1755,13 @@ void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock lock(messageLock);
observers.insert(observer);
}
void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock lock(messageLock);
observers.erase(observer);
}
@@ -1618,7 +1824,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
{
- Monitor::ScopedLock l(parent.messageLock);
+ Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */
if (parent.deleted) {
return false;
} else {
@@ -1639,3 +1845,6 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+}}
+