summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp465
1 files changed, 214 insertions, 251 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3de93ed74e..cfb32749a0 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -23,11 +23,16 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -67,11 +72,13 @@ const std::string qpidMaxCount("qpid.max_count");
const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
const std::string qpidLastValueQueue("qpid.last_value_queue");
const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
//following feature is not ready for general use as it doesn't handle
//the case where a message is enqueued on more than one queue well enough:
const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -93,19 +100,18 @@ Queue::Queue(const string& _name, bool _autodelete,
consumerCount(0),
exclusive(0),
noLocal(false),
- lastValueQueue(false),
- lastValueQueueNoBrowse(false),
persistLastNode(false),
inLastNodeFailure(false),
+ messages(new MessageDeque()),
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
eventMode(0),
- eventMgr(0),
insertSeqNo(0),
broker(b),
deleted(false),
- barrier(*this)
+ barrier(*this),
+ autoDeleteTimeout(0)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -160,7 +166,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
} else {
enqueue(0, msg);
push(msg);
- mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -179,7 +184,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
- mgntEnqStats(msg);
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -194,7 +198,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -208,7 +211,7 @@ void Queue::requeue(const QueuedMessage& msg){
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+ messages->reinsert(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -223,57 +226,23 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-void Queue::clearLVQIndex(const QueuedMessage& msg){
- assertClusterSafe();
- const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
-}
-
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
-
- Messages::iterator i = findAt(position);
- if (i != messages.end() ) {
- message = *i;
- if (lastValueQueue) {
- clearLVQIndex(*i);
- }
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
- messages.erase(i);
+ if (messages->remove(position, message)) {
+ QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
- }
- QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
- return false;
+ } else {
+ QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+ return false;
+ }
}
bool Queue::acquire(const QueuedMessage& msg) {
- Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
-
- QPID_LOG(debug, "attempting to acquire " << msg.position);
- Messages::iterator i = findAt(msg.position);
- if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
- (!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
-
- clearLVQIndex(msg);
- QPID_LOG(debug,
- "Match found, acquire succeeded: " <<
- i->position << " == " << msg.position);
- messages.erase(i);
- return true;
- }
-
- QPID_LOG(debug, "Acquire failed for " << msg.position);
- return false;
+ QueuedMessage copy = msg;
+ return acquireMessageAt(msg.position, copy);
}
void Queue::notifyListener()
@@ -282,7 +251,7 @@ void Queue::notifyListener()
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -311,12 +280,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
} else {
- QueuedMessage msg = getFront();
+ QueuedMessage msg = messages->front();
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
popAndDequeue();
@@ -326,7 +295,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- popMsg(msg);
+ pop();
return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -352,11 +321,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
//consumer wants the message
c->position = msg.position;
m = msg;
- if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) m.payload = replacement;
- }
return true;
} else {
//browser hasn't got enough credit for the message
@@ -378,7 +342,7 @@ void Queue::removeListener(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -399,52 +363,20 @@ bool Queue::dispatch(Consumer::shared_ptr c)
// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c->position) {
- if (c->position < getFront().position) {
- msg = getFront();
- return true;
- } else {
- Messages::iterator pos = findAt(c->position);
- if (pos != messages.end() && pos+1 != messages.end()) {
- msg = *(pos+1);
- return true;
- }
- }
+ if (messages->next(c->position, msg)) {
+ return true;
+ } else {
+ listeners.addListener(c);
+ return false;
}
- listeners.addListener(c);
- return false;
}
-Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
-
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i!= messages.end() && i->position == pos)
- return i;
- }
- return messages.end(); // no match found.
-}
-
-
QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i != messages.end())
- return *i;
- }
- return QueuedMessage();
+ QueuedMessage msg;
+ messages->find(pos, msg);
+ return msg;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -464,6 +396,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
consumerCount++;
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
}
void Queue::cancel(Consumer::shared_ptr c){
@@ -478,12 +414,18 @@ void Queue::cancel(Consumer::shared_ptr c){
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
+ messages->pop(msg);
+ return msg;
+}
- if(!messages.empty()){
- msg = getFront();
- popMsg(msg);
+bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+{
+ if (message.payload->hasExpired()) {
+ expired.push_back(message);
+ return true;
+ } else {
+ return false;
}
- return msg;
}
void Queue::purgeExpired()
@@ -492,37 +434,11 @@ void Queue::purgeExpired()
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
- //Note: This method is currently called periodically on the timer
- //thread. In a clustered broker this means that the purging does
- //not occur on the cluster event dispatch thread and consequently
- //that is not totally ordered w.r.t other events (including
- //publication of messages). However the cluster does ensure that
- //the actual expiration of messages (as distinct from the removing
- //of those expired messages from the queue) *is* consistently
- //ordered w.r.t. cluster events. This means that delivery of
- //messages is in general consistent across the cluster inspite of
- //any non-determinism in the triggering of a purge. However at
- //present purging a last value queue could potentially cause
- //inconsistencies in the cluster (as the order w.r.t publications
- //can affect the order in which messages appear in the
- //queue). Consequently periodic purging of an LVQ is not enabled
- //(expired messages will be removed on delivery and consolidated
- //by key as part of normal LVQ operation).
-
- if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
- Messages expired;
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
- //Re-introduce management of LVQ-specific state here
- //if purging is renabled for that case (see note above)
- if (i->payload->hasExpired()) {
- expired.push_back(*i);
- i = messages.erase(i);
- } else {
- ++i;
- }
- }
+ messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -548,13 +464,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty()) {
+ while((!purge_request || purge_count--) && !messages->empty()) {
if (dest.get()) {
//
// If there is a destination exchange, stage the messages onto a reroute queue
// so they don't wind up getting purged more than once.
//
- DeliverableMessage msg(getFront().payload);
+ DeliverableMessage msg(messages->front().payload);
rerouteQueue.push_back(msg);
}
popAndDequeue();
@@ -580,101 +496,53 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
- while((!qty || move_count--) && !messages.empty()) {
- QueuedMessage qmsg = getFront();
+ while((!qty || move_count--) && !messages->empty()) {
+ QueuedMessage qmsg = messages->front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- popMsg(qmsg);
+ pop();
dequeue(0, qmsg);
count++;
}
return count;
}
-void Queue::popMsg(QueuedMessage& qmsg)
+void Queue::pop()
{
assertClusterSafe();
- const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
- messages.pop_front();
+ messages->pop();
++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
+ QueuedMessage removed;
+ bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
- LVQ::iterator i;
- const framing::FieldTable* ft = msg->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
-
- i = lvq.find(key);
- if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
- messages.push_back(qm);
- listeners.populate(copy);
- lvq[key] = msg;
- }else {
- boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
- if (!old) old = i->second;
- i->second->setReplacementMessage(msg,this);
- if (isRecovery) {
- //can't issue new requests for the store until
- //recovery is complete
- pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
- } else {
- Mutex::ScopedUnlock u(messageLock);
- dequeue(0, QueuedMessage(qm.queue, old, qm.position));
- }
- }
- }else {
- messages.push_back(qm);
- listeners.populate(copy);
- }
- if (eventMode) {
- if (eventMgr) eventMgr->enqueued(qm);
- else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
- }
- if (policy.get()) {
- policy->enqueued(qm);
- }
- if (flowLimit.get())
- flowLimit->enqueued(qm);
+ dequeueRequired = messages->push(qm, removed);
+ listeners.populate(copy);
+ enqueued(qm);
}
copy.notify();
-}
-
-QueuedMessage Queue::getFront()
-{
- QueuedMessage msg = messages.front();
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) msg.payload = replacement;
+ if (dequeueRequired) {
+ if (isRecovery) {
+ //can't issue new requests for the store until
+ //recovery is complete
+ pendingDequeues.push_back(removed);
+ } else {
+ dequeue(0, removed);
+ }
}
- return msg;
}
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
+void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
{
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) {
- const framing::FieldTable* ft = replacement->getApplicationHeaders();
- if (ft) {
- string key = ft->getAsString(qpidVQMatchProperty);
- if (lvq.find(key) != lvq.end()){
- lvq[key] = replacement;
- }
- }
- msg.payload = replacement;
- }
- return msg;
+ if (message.payload->isIngressComplete()) (*result)++;
}
/** function only provided for unit tests, or code not in critical message path */
@@ -682,20 +550,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
- for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- //NOTE: don't need to use checkLvqReplace() here as it
- //is only relevant for LVQ which does not support persistence
- //so the enqueueComplete check has no effect
- if ( i->payload->isIngressComplete() ) count ++;
- }
-
+ messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
return count;
}
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
- return messages.size();
+ return messages->size();
}
uint32_t Queue::getConsumerCount() const
@@ -707,7 +569,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(consumerLock);
- return autodelete && !consumerCount;
+ return autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
@@ -715,21 +577,22 @@ void Queue::clearLastNodeFailure()
inLastNodeFailure = false;
}
+void Queue::forcePersistent(QueuedMessage& message)
+{
+ if(!message.payload->isStoredOnQueue(shared_from_this())) {
+ message.payload->forcePersistent();
+ if (message.payload->isForcedPersistent() ){
+ enqueue(0, message.payload);
+ }
+ }
+}
+
void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
try {
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
- if (lastValueQueue) checkLvqReplace(*i);
- // don't force a message twice to disk.
- if(!i->payload->isStoredOnQueue(shared_from_this())) {
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
- }
- }
- }
+ messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
} catch (const std::exception& e) {
// Could not go into last node standing (for example journal not large enough)
QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
@@ -746,7 +609,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
if (!u.acquired) return false;
if (policy.get() && !suppressPolicyCheck) {
- Messages dequeues;
+ std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
policy->tryEnqueue(msg);
@@ -833,8 +696,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
*/
void Queue::popAndDequeue()
{
- QueuedMessage msg = getFront();
- popMsg(msg);
+ QueuedMessage msg = messages->front();
+ pop();
dequeue(0, msg);
}
@@ -845,11 +708,16 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
+ /** todo KAG make flowLimit an observer */
if (flowLimit.get())
flowLimit->dequeued(msg);
mgntDeqStats(msg.payload);
- if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
- eventMgr->dequeued(msg);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->dequeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
+ }
}
}
@@ -863,16 +731,41 @@ void Queue::create(const FieldTable& _settings)
configure(_settings);
}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
void Queue::configure(const FieldTable& _settings, bool recovering)
{
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+ if (eventMode && broker) {
+ broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
+ }
if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
- } else if (eventMgr && !eventMgr->isSync() ) {
+ } else if (broker && !(broker->getQueueEvents().isSync()) ) {
QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
}
FieldTable copy(_settings);
@@ -881,17 +774,30 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
} else {
setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
}
+ if (broker && broker->getManagementAgent()) {
+ ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+ }
+
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
- lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
-
- lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
- if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
- lastValueQueue = lastValueQueueNoBrowse;
+ std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
+ if (lvqKey.size()) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
+ messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ } else if (_settings.get(qpidLastValueQueue)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ } else {
+ std::auto_ptr<Messages> m = Fairshare::create(_settings);
+ if (m.get()) {
+ messages = m;
+ QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ }
}
persistLastNode= _settings.get(qpidPersistLastNode);
@@ -910,6 +816,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
+ autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
if (flowLimit.get())
@@ -924,8 +834,8 @@ void Queue::destroy()
{
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
- while(!messages.empty()){
- DeliverableMessage msg(getFront().payload);
+ while(!messages->empty()){
+ DeliverableMessage msg(messages->front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
popAndDequeue();
@@ -939,6 +849,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
}
void Queue::notifyDeleted()
@@ -1043,15 +954,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
}
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+ Broker& broker;
+ Queue::shared_ptr queue;
+
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+ void fire()
+ {
+ //need to detect case where queue was used after the task was
+ //created, but then became unused again before the task fired;
+ //in this case ignore this request as there will have already
+ //been a later task added
+ tryAutoDeleteImpl(broker, queue);
+ }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+ AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ broker.getClusterTimer().add(queue->autoDeleteTask);
+ QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+ } else {
+ tryAutoDeleteImpl(broker, queue);
+ }
+}
+
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
@@ -1066,6 +1008,10 @@ void Queue::releaseExclusiveOwnership()
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;
@@ -1154,11 +1100,6 @@ SequenceNumber Queue::getPosition() {
int Queue::getEventMode() { return eventMode; }
-void Queue::setQueueEventManager(QueueEvents& mgr)
-{
- eventMgr = &mgr;
-}
-
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
@@ -1184,16 +1125,31 @@ void Queue::insertSequenceNumbers(const std::string& key)
void Queue::enqueued(const QueuedMessage& m)
{
- if (m.payload) {
- if (policy.get()) {
- policy->recoverEnqueued(m.payload);
- policy->enqueued(m);
+ for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
+ try {
+ (*i)->enqueued(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
- if (flowLimit.get())
- flowLimit->enqueued(m);
- mgntEnqStats(m.payload);
+ }
+ if (policy.get()) {
+ policy->enqueued(m);
+ }
+ /** todo make flowlimit an observer */
+ if (flowLimit.get())
+ flowLimit->enqueued(m);
+ mgntEnqStats(m.payload);
+}
+
+void Queue::updateEnqueued(const QueuedMessage& m)
+{
+ if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
+ if (policy.get()) {
+ policy->recoverEnqueued(payload);
+ }
+ enqueued(m);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1205,6 +1161,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
}
QueueListeners& Queue::getListeners() { return listeners; }
+Messages& Queue::getMessages() { return *messages; }
+const Messages& Queue::getMessages() const { return *messages; }
void Queue::checkNotDeleted()
{
@@ -1213,6 +1171,11 @@ void Queue::checkNotDeleted()
}
}
+void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ observers.insert(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);