summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp1293
1 files changed, 492 insertions, 801 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index d5267c78dc..0dd4cb7b10 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -20,23 +20,23 @@
*/
#include "qpid/broker/Queue.h"
-
#include "qpid/broker/Broker.h"
-#include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/broker/QueueDepth.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Fairshare.h"
#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/LegacyLVQ.h"
-#include "qpid/broker/MessageDeque.h"
-#include "qpid/broker/MessageMap.h"
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageDistributor.h"
+#include "qpid/broker/FifoDistributor.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueFlowLimit.h"
-#include "qpid/broker/ThresholdAlerts.h"
-#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/MessageGroupManager.h"
+//TODO: get rid of this
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
@@ -76,26 +76,8 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace
{
-const std::string qpidMaxSize("qpid.max_size");
-const std::string qpidMaxCount("qpid.max_count");
-const std::string qpidNoLocal("no-local");
-const std::string qpidTraceIdentity("qpid.trace.id");
-const std::string qpidTraceExclude("qpid.trace.exclude");
-const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
-const std::string qpidLastValueQueue("qpid.last_value_queue");
-const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
-const std::string qpidPersistLastNode("qpid.persist_last_node");
-const std::string qpidVQMatchProperty("qpid.LVQ_key");
-const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
-const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
-//following feature is not ready for general use as it doesn't handle
-//the case where a message is enqueued on more than one queue well enough:
-const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
-
-const int ENQUEUE_ONLY=1;
-const int ENQUEUE_AND_DEQUEUE=2;
-
-inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
+
+inline void mgntEnqStats(const Message& msg,
_qmf::Queue* mgmtObject,
_qmf::Broker* brokerMgmtObject)
{
@@ -103,12 +85,12 @@ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg->contentSize();
+ uint64_t contentSize = msg.getContentSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
bStats->byteTotalEnqueues += contentSize;
- if (msg->isPersistent ()) {
+ if (msg.isPersistent ()) {
qStats->msgPersistEnqueues += 1;
bStats->msgPersistEnqueues += 1;
qStats->bytePersistEnqueues += contentSize;
@@ -119,20 +101,20 @@ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
}
}
-inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
+inline void mgntDeqStats(const Message& msg,
_qmf::Queue* mgmtObject,
_qmf::Broker* brokerMgmtObject)
{
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg->contentSize();
+ uint64_t contentSize = msg.getContentSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
qStats->byteTotalDequeues += contentSize;
bStats->byteTotalDequeues += contentSize;
- if (msg->isPersistent ()){
+ if (msg.isPersistent ()){
qStats->msgPersistDequeues += 1;
bStats->msgPersistDequeues += 1;
qStats->bytePersistDequeues += contentSize;
@@ -143,43 +125,81 @@ inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
}
}
-} // namespace
+QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions)
+{
+ QueueSettings settings(inputs);
+ if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) {
+ settings.maxDepth.setSize(globalOptions.queueLimit);
+ }
+ return settings;
+}
+
+}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {}
+bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
+{
+ try {
+ prepared = queue->enqueue(ctxt, message);
+ return true;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to prepare: " << e.what());
+ return false;
+ }
+}
+void Queue::TxPublish::commit() throw()
+{
+ try {
+ if (prepared) queue->process(message);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ }
+}
+void Queue::TxPublish::rollback() throw()
+{
+ try {
+ if (prepared) queue->enqueueAborted(message);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to rollback: " << e.what());
+ }
+}
+
+Queue::Queue(const string& _name, const QueueSettings& _settings,
MessageStore* const _store,
- const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
name(_name),
- autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(0),
consumerCount(0),
browserCount(0),
exclusive(0),
- noLocal(false),
persistLastNode(false),
inLastNodeFailure(false),
messages(new MessageDeque()),
persistenceId(0),
- policyExceeded(false),
+ settings(b ? merge(_settings, b->getOptions()) : _settings),
mgmtObject(0),
brokerMgmtObject(0),
eventMode(0),
- insertSeqNo(0),
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0),
allocator(new FifoDistributor( *messages ))
{
+ if (settings.maxDepth.hasCount()) current.setCount(0);
+ if (settings.maxDepth.hasSize()) current.setSize(0);
+ if (settings.traceExcludes.size()) {
+ split(traceExclude, settings.traceExcludes, ", ");
+ }
+ qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete);
- mgmtObject->set_exclusive(_owner != 0);
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+ mgmtObject->set_arguments(settings.asMap());
agent->addObject(mgmtObject, 0, store != 0);
brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
if (brokerMgmtObject)
@@ -197,32 +217,36 @@ Queue::~Queue()
}
}
-bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+bool isLocalTo(const OwnershipToken* token, const Message& msg)
{
- return token && token->isLocal(msg->getPublisher());
+ return token && token->isLocal(msg.getPublisher());
}
-bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+bool Queue::isLocal(const Message& msg)
{
//message is considered local if it was published on the same
//connection as that of the session which declared this queue
//exclusive (owner) or which has an exclusive subscription
//(exclusive)
- return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
+ return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
-bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
+bool Queue::isExcluded(const Message& msg)
{
- return traceExclude.size() && msg->isExcluded(traceExclude);
+ return traceExclude.size() && msg.isExcluded(traceExclude);
}
-void Queue::deliver(boost::intrusive_ptr<Message> msg){
+void Queue::deliver(Message msg, TxBuffer* txn){
+ //TODO: move some of this out of the queue and into the publishing
+ //'link' for whatever protocol is used; that would let protocol
+ //specific stuff be kept out the queue
+
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
- if (msg->isImmediate() && getConsumerCount() == 0) {
+ if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
- DeliverableMessage deliverable(msg);
+ DeliverableMessage deliverable(msg, 0);
alternateExchange->route(deliverable);
}
} else if (isLocal(msg)) {
@@ -232,47 +256,38 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
- enqueue(0, msg);
- push(msg);
- QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+ if (txn) {
+ TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
+ txn->enlist(op);
+ } else {
+ if (enqueue(0, msg)) {
+ push(msg);
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+ } else {
+ QPID_LOG(debug, "Message " << msg << " dropped from " << name);
+ }
+ }
}
}
-void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
+void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->recoverEnqueued(msg);
+ current += QueueDepth(1, msg.getContentSize());
}
-void Queue::recover(boost::intrusive_ptr<Message>& msg)
+void Queue::recover(Message& msg)
{
- {
- Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->recoverEnqueued(msg);
- }
-
+ recoverPrepared(msg);
push(msg, true);
- if (store){
- // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
- }
-
- if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
- //content has not been loaded, need to ensure that lazy loading mode is set:
- //TODO: find a nicer way to do this
- msg->releaseContent(store);
- // NOTE: The log message in this section are used for flow-to-disk testing (which checks the log for the
- // presence of this message). Do not change this without also checking these tests.
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content released after recovery");
- }
}
-void Queue::process(boost::intrusive_ptr<Message>& msg){
+void Queue::process(Message& msg)
+{
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg->contentSize();
+ const uint64_t contentSize = msg.getContentSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
@@ -285,46 +300,22 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
}
-void Queue::requeue(const QueuedMessage& msg){
+void Queue::release(const QueueCursor& position, bool markRedelivered)
+{
assertClusterSafe();
QueueListeners::NotificationSet copy;
{
- if (!isEnqueued(msg)) return;
- if (deleted) {
- //
- // If the queue has been deleted, requeued messages must be sent to the alternate exchange
- // if one is configured.
- //
- if (alternateExchange.get()) {
- DeliverableMessage dmsg(msg.payload);
- alternateExchange->routeWithAlternate(dmsg);
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandonedViaAlt();
- } else {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandoned();
- }
- mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
- } else {
- {
- Mutex::ScopedLock locker(messageLock);
- messages->release(msg);
- observeRequeue(msg, locker);
+ Mutex::ScopedLock locker(messageLock);
+ if (!deleted) {
+ Message* message = messages->release(position);
+ if (message) {
+ if (!markRedelivered) message->undeliver();
listeners.populate(copy);
- }
-
- if (mgmtObject) {
- mgmtObject->inc_releases();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_releases();
- }
-
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
- if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
- msg.payload->forcePersistent();
- if (msg.payload->isForcedPersistent() ){
- boost::intrusive_ptr<Message> payload = msg.payload;
- enqueue(0, payload);
+ observeRequeue(*message, locker);
+ if (mgmtObject) {
+ mgmtObject->inc_releases();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_releases();
}
}
}
@@ -332,163 +323,118 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::dequeueMessageAt(const SequenceNumber& position)
{
- assertClusterSafe();
- QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (acquire(position, message)) {
- QPID_LOG(debug, "Acquired message at " << position << " from " << name);
- return true;
- } else {
- QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
- return false;
- }
-}
-
-bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
-{
- assertClusterSafe();
- QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
- bool ok;
+ boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
- ok = allocator->allocate( consumer, msg );
- }
- if (!ok) {
- QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
- return false;
- }
-
- QueuedMessage copy(msg);
- if (acquire( msg.position, copy)) {
- QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
- return true;
+ assertClusterSafe();
+ QPID_LOG(debug, "Attempting to dequeue message at " << position);
+ QueueCursor cursor;
+ Message* msg = messages->find(position, &cursor);
+ if (msg) {
+ if (msg->isPersistent()) pmsg = msg->getPersistentContext();
+ observeDequeue(*msg, locker);
+ messages->deleted(cursor);
+ } else {
+ QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message");
+ return false;
+ }
}
- QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
- return false;
+ dequeueFromStore(pmsg);
+ return true;
}
-void Queue::notifyListener()
+bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
{
+ Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
- QueueListeners::NotificationSet set;
- {
- Mutex::ScopedLock locker(messageLock);
- if (messages->size()) {
- listeners.populate(set);
- }
- }
- set.notify();
-}
+ Message* msg;
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
- checkNotDeleted(c);
- if (c->preAcquires()) {
- switch (consumeNextMessage(m, c)) {
- case CONSUMED:
- return true;
- case CANT_CONSUME:
- notifyListener();//let someone else try
- case NO_MESSAGES:
- default:
+ msg = messages->find(position);
+ if (msg) {
+ QPID_LOG(debug, consumer << " attempting to acquire message at " << msg->getSequence());
+ if (!allocator->acquire(consumer, *msg)) {
+ QPID_LOG(debug, "Not permitted to acquire msg at " << msg->getSequence() << " from '" << name);
return false;
+ } else {
+ observeAcquire(*msg, locker);
+ QPID_LOG(debug, "Acquired message at " << msg->getSequence() << " from " << name);
+ return true;
}
} else {
- return browseNextMessage(m, c);
+ QPID_LOG(debug, "Failed to acquire message which no longer exists on " << name);
+ return false;
}
}
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
+ checkNotDeleted(c);
+ QueueListeners::NotificationSet set;
while (true) {
- QueuedMessage msg;
- bool found;
- {
- Mutex::ScopedLock locker(messageLock);
- found = allocator->nextConsumableMessage(c, msg);
- if (!found) listeners.addListener(c);
- }
- if (!found) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- return NO_MESSAGES;
- }
-
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->setPosition(msg.position);
- dequeue(0, msg);
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl();
- }
- continue;
- }
-
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- {
- Mutex::ScopedLock locker(messageLock);
- bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
- (void) ok; assert(ok);
- observeAcquire(msg, locker);
- }
+ //TODO: reduce lock scope
+ Mutex::ScopedLock locker(messageLock);
+ Message* msg = messages->next(*c);
+ if (msg) {
+ if (msg->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ observeDequeue(*msg, locker);
+ //ERROR: don't hold lock across call to store!!
+ if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
if (mgmtObject) {
- mgmtObject->inc_acquires();
+ mgmtObject->inc_discardsTtl();
if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
+ brokerMgmtObject->inc_discardsTtl();
}
- m = msg;
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ messages->deleted(*c);
+ continue;
}
- } else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- }
-
- Mutex::ScopedLock locker(messageLock);
- messages->release(msg);
- return CANT_CONSUME;
- }
-}
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
- while (true) {
- QueuedMessage msg;
- bool found;
- {
- Mutex::ScopedLock locker(messageLock);
- found = allocator->nextBrowsableMessage(c, msg);
- if (!found) listeners.addListener(c);
- }
- if (!found) { // no next available
- QPID_LOG(debug, "No browsable messages available for consumer " <<
- c->getName() << " on queue '" << name << "'");
- return false;
- }
-
- if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
- if (c->accept(msg.payload)) {
- //consumer wants the message
- c->setPosition(msg.position);
- m = msg;
- return true;
+ if (c->filter(*msg)) {
+ if (c->accept(*msg)) {
+ if (c->preAcquires()) {
+ QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState());
+ if (allocator->acquire(c->getName(), *msg)) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+ observeAcquire(*msg, locker);
+ msg->deliver();
+ } else {
+ QPID_LOG(debug, "Could not acquire message from '" << name << "'");
+ continue; //try another message
+ }
+ }
+ QPID_LOG(debug, "Message retrieved from '" << name << "'");
+ m = *msg;
+ return true;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ if (c->preAcquires()) {
+ //let someone else try
+ listeners.populate(set);
+ }
+ break;
+ }
} else {
- //browser hasn't got enough credit for the message
- QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'");
- return false;
+ //consumer will never want this message, try another one
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ if (c->preAcquires()) {
+ //let someone else try to take this one
+ listeners.populate(set);
+ }
}
} else {
- //consumer will never want this message, continue seeking
- QPID_LOG(debug, "Browser skipping message from '" << name << "'");
- c->setPosition(msg.position);
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ listeners.addListener(c);
+ return false;
}
}
+ set.notify();
return false;
}
@@ -507,23 +453,28 @@ void Queue::removeListener(Consumer::shared_ptr c)
bool Queue::dispatch(Consumer::shared_ptr c)
{
- QueuedMessage msg(this);
+ Message msg;
if (getNextMessage(msg, c)) {
- c->deliver(msg);
+ c->deliver(*c, msg);
return true;
} else {
return false;
}
}
-bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
+bool Queue::find(SequenceNumber pos, Message& msg) const
+{
Mutex::ScopedLock locker(messageLock);
- if (messages->find(pos, msg))
+ Message* ptr = messages->find(pos, 0);
+ if (ptr) {
+ msg = *ptr;
return true;
+ }
return false;
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
+{
assertClusterSafe();
{
Mutex::ScopedLock locker(messageLock);
@@ -550,7 +501,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
browserCount++;
consumerCount++;
//reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
+ if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
observeConsumerAdd(*c, locker);
@@ -559,7 +510,8 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
mgmtObject->inc_consumerCount ();
}
-void Queue::cancel(Consumer::shared_ptr c){
+void Queue::cancel(Consumer::shared_ptr c)
+{
removeListener(c);
{
Mutex::ScopedLock locker(messageLock);
@@ -572,65 +524,6 @@ void Queue::cancel(Consumer::shared_ptr c){
mgmtObject->dec_consumerCount ();
}
-QueuedMessage Queue::get(){
- QueuedMessage msg(this);
- bool ok;
- {
- Mutex::ScopedLock locker(messageLock);
- ok = messages->consume(msg);
- if (ok) observeAcquire(msg, locker);
- }
-
- if (ok && mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
-
- return msg;
-}
-
-namespace {
-bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
- std::deque<QueuedMessage>& collection)
-{
- if (predicate(qm)) {
- collection.push_back(qm);
- return true;
- } else {
- return false;
- }
-}
-
-bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
-} // namespace
-
-void Queue::dequeueIf(Messages::Predicate predicate,
- std::deque<QueuedMessage>& dequeued)
-{
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
- }
- if (!dequeued.empty()) {
- if (mgmtObject) {
- mgmtObject->inc_acquires(dequeued.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires(dequeued.size());
- }
- for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
- i != dequeued.end(); ++i) {
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf() call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
- }
- }
-}
-
/**
*@param lapse: time since the last purgeExpired
*/
@@ -642,13 +535,17 @@ void Queue::purgeExpired(sys::Duration lapse) {
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- std::deque<QueuedMessage> dequeued;
- dequeueIf(boost::bind(&isExpired, _1), dequeued);
- if (dequeued.size()) {
- if (mgmtObject) {
- mgmtObject->inc_discardsTtl(dequeued.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsTtl(dequeued.size());
+ uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER);
+ QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
+ //
+ // Report the count of discarded-by-ttl messages
+ //
+ if (mgmtObject && count) {
+ mgmtObject->inc_acquires(count);
+ mgmtObject->inc_discardsTtl(count);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(count);
+ brokerMgmtObject->inc_discardsTtl(count);
}
}
}
@@ -663,7 +560,7 @@ namespace {
static const std::string typeKey;
static const std::string paramsKey;
static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
- virtual bool match( const QueuedMessage& ) const { return true; }
+ virtual bool match( const Message& ) const { return true; }
virtual ~MessageFilter() {}
protected:
MessageFilter() {};
@@ -687,13 +584,9 @@ namespace {
static const std::string valueKey;
HeaderMatchFilter( const std::string& _header, const std::string& _value )
: MessageFilter (), header(_header), value(_value) {}
- bool match( const QueuedMessage& msg ) const
+ bool match( const Message& msg ) const
{
- const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
- if (!headers) return false;
- FieldTable::ValuePtr h = headers->get(header);
- if (!h || !h->convertsTo<std::string>()) return false;
- return h->get<std::string>() == value;
+ return msg.getPropertyAsString(header) == value;
}
private:
const std::string header;
@@ -730,36 +623,68 @@ namespace {
return new MessageFilter();
}
- // used by removeIf() to collect all messages matching a filter, maximum match count is
- // optional.
- struct Collector {
- const uint32_t maxMatches;
- MessageFilter& filter;
- std::deque<QueuedMessage> matches;
- Collector(MessageFilter& filter, uint32_t max)
- : maxMatches(max), filter(filter) {}
- bool operator() (QueuedMessage& qm)
- {
- if (maxMatches == 0 || matches.size() < maxMatches) {
- if (filter.match( qm )) {
- matches.push_back(qm);
- return true;
- }
- }
+ bool reroute(boost::shared_ptr<Exchange> e, const Message& m)
+ {
+ if (e) {
+ DeliverableMessage d(m, 0);
+ d.getMessage().clearTrace();
+ e->routeWithAlternate(d);
+ return true;
+ } else {
return false;
}
- };
-
+ }
+ void moveTo(boost::shared_ptr<Queue> q, Message& m)
+ {
+ if (q) {
+ q->deliver(m);
+ }
+ }
} // end namespace
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type)
+{
+ std::deque<Message> removed;
+ {
+ QueueCursor c(type);
+ uint32_t count(0);
+ Mutex::ScopedLock locker(messageLock);
+ Message* m = messages->next(c);
+ while (m){
+ if (!p || p(*m)) {
+ if (!maxCount || count++ < maxCount) {
+ if (m->getState() == AVAILABLE) {
+ //don't actually acquire, just act as if we did
+ observeAcquire(*m, locker);
+ }
+ observeDequeue(*m, locker);
+ removed.push_back(*m);//takes a copy of the message
+ if (!messages->deleted(c)) {
+ QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
+ assert(false);
+ }
+ } else {
+ break;
+ }
+ }
+ m = messages->next(c);
+ }
+ }
+ for (std::deque<Message>::iterator i = removed.begin(); i != removed.end(); ++i) {
+ if (f) f(*i);//ERROR? need to clear old persistent context?
+ if (i->isPersistent()) dequeueFromStore(i->getPersistentContext());//do this outside of lock and after any re-routing
+ }
+ return removed.size();
+}
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
*
- * purge_request == 0 then purge all messages
- * == N then purge N messages from queue
- * Sometimes purge_request == 1 to unblock the top of queue
+ * qty == 0 then purge all messages
+ * == N then purge N messages from queue
+ * Sometimes qty == 1 to unblock the top of queue
*
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
@@ -768,172 +693,53 @@ namespace {
* An optional filter can be supplied that will be applied against each message. The
* message is purged only if the filter matches. See MessageDistributor for more detail.
*/
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- Collector c(*mf.get(), purge_request);
-
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- }
+ uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/);
- if (!c.matches.empty()) {
- if (mgmtObject) {
- mgmtObject->inc_acquires(c.matches.size());
- if (dest.get()) {
- mgmtObject->inc_reroutes(c.matches.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(c.matches.size());
- brokerMgmtObject->inc_reroutes(c.matches.size());
- }
- } else {
- mgmtObject->inc_discardsPurge(c.matches.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(c.matches.size());
- brokerMgmtObject->inc_discardsPurge(c.matches.size());
- }
- }
- }
-
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
-
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*qmsg, locker);
+ if (mgmtObject && count) {
+ mgmtObject->inc_acquires(count);
+ if (dest.get()) {
+ mgmtObject->inc_reroutes(count);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(count);
+ brokerMgmtObject->inc_reroutes(count);
}
- dequeue(0, *qmsg);
- QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
- // now reroute if necessary
- if (dest.get()) {
- assert(qmsg->payload);
- qmsg->payload->clearTrace();
- DeliverableMessage dmsg(qmsg->payload);
- dest->routeWithAlternate(dmsg);
+ } else {
+ mgmtObject->inc_discardsPurge(count);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_acquires(count);
+ brokerMgmtObject->inc_discardsPurge(count);
}
}
}
- return c.matches.size();
+
+ return count;
}
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- Collector c(*mf.get(), qty);
-
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- }
-
-
- if (!c.matches.empty()) {
- // Update observers and message state:
-
- if (mgmtObject) {
- mgmtObject->inc_acquires(c.matches.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires(c.matches.size());
- }
-
- for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); ++qmsg) {
- {
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*qmsg, locker);
- }
- dequeue(0, *qmsg);
- // and move to destination Queue.
- assert(qmsg->payload);
- destq->deliver(qmsg->payload);
- }
- }
- return c.matches.size();
+ return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/);
}
-/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg)
+void Queue::push(Message& message, bool /*isRecovery*/)
{
- bool ok;
- {
- Mutex::ScopedLock locker(messageLock);
- ok = messages->acquire(position, msg);
- if (ok) observeAcquire(msg, locker);
- }
- if (ok) {
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
- ++dequeueSincePurge;
- return true;
- }
- return false;
-}
-
-void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- QueuedMessage removed, qm(this, msg);
- bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
- qm.position = ++sequence;
- if (messages->push(qm, removed)) {
- dequeueRequired = true;
- observeAcquire(removed, locker);
- }
- observeEnqueue(qm, locker);
- if (policy.get()) {
- policy->enqueued(qm);
- }
+ message.setSequence(++sequence);
+ messages->publish(message);
listeners.populate(copy);
- }
- if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position);
-
- mgntEnqStats(msg, mgmtObject, brokerMgmtObject);
-
- if (dequeueRequired) {
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- mgmtObject->inc_discardsLvq();
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires();
- brokerMgmtObject->inc_discardsLvq();
- }
- }
- if (isRecovery) {
- //can't issue new requests for the store until
- //recovery is complete
- Mutex::ScopedLock locker(messageLock);
- pendingDequeues.push_back(removed);
- } else {
- dequeue(0, removed);
- }
+ observeEnqueue(message, locker);
}
copy.notify();
}
-void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
-{
- if (message.payload->isIngressComplete()) (*result)++;
-}
-
-/** function only provided for unit tests, or code not in critical message path */
-uint32_t Queue::getEnqueueCompleteMessageCount() const
-{
- uint32_t count = 0;
- Mutex::ScopedLock locker(messageLock);
- messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
- return count;
-}
-
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
@@ -949,7 +755,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(messageLock);
- return autodelete && !consumerCount && !owner;
+ return settings.autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
@@ -957,14 +763,9 @@ void Queue::clearLastNodeFailure()
inLastNodeFailure = false;
}
-void Queue::forcePersistent(QueuedMessage& message)
+void Queue::forcePersistent(const Message& /*message*/)
{
- if(!message.payload->isStoredOnQueue(shared_from_this())) {
- message.payload->forcePersistent();
- if (message.payload->isForcedPersistent() ){
- enqueue(0, message.payload);
- }
- }
+ //TODO
}
void Queue::setLastNodeFailure()
@@ -982,153 +783,129 @@ void Queue::setLastNodeFailure()
}
-// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
+/*
+ * return true if enqueue succeeded and message should be made
+ * available; returning false will result in the message being dropped
+ */
+bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
- if (policy.get() && !suppressPolicyCheck) {
- std::deque<QueuedMessage> dequeues;
- {
- Mutex::ScopedLock locker(messageLock);
- try {
- policy->tryEnqueue(msg);
- } catch(ResourceLimitExceededException&) {
- if (mgmtObject) {
- mgmtObject->inc_discardsOverflow();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsOverflow();
- }
- throw;
- }
- policy->getPendingDequeues(dequeues);
- }
- //depending on policy, may have some dequeues that need to performed without holding the lock
-
- //
- // Count the dequeues as ring-discards. We know that these aren't rejects because
- // policy->tryEnqueue would have thrown an exception.
- //
- if (mgmtObject && !dequeues.empty()) {
- mgmtObject->inc_discardsRing(dequeues.size());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsRing(dequeues.size());
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
+ return false;
}
-
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
- msg->forcePersistent();
+ forcePersistent(msg);
}
- if (traceId.size()) {
- msg->addTraceId(traceId);
+ if (settings.traceId.size()) {
+ msg.addTraceId(settings.traceId);
}
- if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
+ if (msg.isPersistent() && store) {
// mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
// when it considers the message stored.
- msg->enqueueAsync(shared_from_this(), store);
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
+ assert(pmsg);
+ pmsg->enqueueAsync(shared_from_this(), store);
store->enqueue(ctxt, pmsg, *this);
- return true;
}
- if (!store) {
- //Messages enqueued on a transient queue should be prevented
- //from having their content released as it may not be
- //recoverable by these queue for delivery
- msg->blockContentRelease();
- }
- return false;
+ return true;
}
-void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+void Queue::enqueueAborted(const Message& msg)
{
+ //Called when any transactional enqueue is aborted (including but
+ //not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ current -= QueueDepth(1, msg.getContentSize());
}
-// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
+void Queue::enqueueCommited(Message& msg)
{
- ScopedUse u(barrier);
- if (!u.acquired) return false;
- {
- Mutex::ScopedLock locker(messageLock);
- if (!isEnqueued(msg)) return false;
- if (!ctxt) {
- if (policy.get()) policy->dequeued(msg);
- messages->deleted(msg);
- observeDequeue(msg, locker);
- }
+ //called when a recovered dtx enqueue operation is committed; the
+ //message is already on disk and space has been reserved in policy
+ //but it should now be made available
+ process(msg);
+}
+void Queue::dequeueAborted(Message& msg)
+{
+ //called when a recovered dtx dequeue operation is aborted; the
+ //message should be added back to the queue
+ push(msg);
+}
+void Queue::dequeueCommited(const Message& msg)
+{
+ //called when a recovered dtx dequeue operation is committed; the
+ //message will at this point have already been removed from the
+ //store and will not be available for delivery. The only action
+ //required is to ensure the observers are notified and the
+ //management stats are correctly decremented
+ Mutex::ScopedLock locker(messageLock);
+ observeDequeue(msg, locker);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTxnDequeues();
+ mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
}
+}
- if (!ctxt) {
- mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
- }
- // This check prevents messages which have been forced persistent on one queue from dequeuing
- // from another on which no forcing has taken place and thus causing a store error.
- bool fp = msg.payload->isForcedPersistent();
- if (!fp || (fp && msg.payload->isStoredOnQueue(shared_from_this()))) {
- if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
- msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
- store->dequeue(ctxt, pmsg, *this);
- return true;
- }
+void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
+{
+ ScopedUse u(barrier);
+ if (u.acquired && msg && store) {
+ store->dequeue(0, msg, *this);
}
- return false;
}
-void Queue::dequeueCommitted(const QueuedMessage& msg)
+void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
{
+ ScopedUse u(barrier);
+ if (!u.acquired) return;
+ boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->dequeued(msg);
- messages->deleted(msg);
- observeDequeue(msg, locker);
+ Message* msg = messages->find(cursor);
+ if (msg) {
+ if (msg->isPersistent()) pmsg = msg->getPersistentContext();
+ if (!ctxt) {
+ observeDequeue(*msg, locker);
+ messages->deleted(cursor);//message pointer not valid after this
+ }
+ } else {
+ return;
+ }
}
- mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
- if (mgmtObject != 0) {
- _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg.payload->contentSize();
- qStats->msgTxnDequeues += 1;
- qStats->byteTxnDequeues += contentSize;
- mgmtObject->statisticsUpdated();
+ if (store && pmsg) {
+ store->dequeue(ctxt, pmsg, *this);
+ }
+}
+
+void Queue::dequeueCommitted(const QueueCursor& cursor)
+{
+ Mutex::ScopedLock locker(messageLock);
+ Message* msg = messages->find(cursor);
+ if (msg) {
+ const uint64_t contentSize = msg->getContentSize();
+ observeDequeue(*msg, locker);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTxnDequeues();
+ mgmtObject->inc_byteTxnDequeues(contentSize);
+ }
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnDequeues += 1;
bStats->byteTxnDequeues += contentSize;
brokerMgmtObject->statisticsUpdated();
}
- }
-}
-
-/**
- * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
- * it from the logical (and persistent if applicable) queue
- */
-bool Queue::popAndDequeue(QueuedMessage& msg)
-{
- bool popped;
- {
- Mutex::ScopedLock locker(messageLock);
- popped = messages->consume(msg);
- if (popped) observeAcquire(msg, locker);
- }
- if (popped) {
- if (mgmtObject) {
- mgmtObject->inc_acquires();
- if (brokerMgmtObject)
- brokerMgmtObject->inc_acquires();
- }
- dequeue(0, msg);
- return true;
+ messages->deleted(cursor);
} else {
- return false;
+ QPID_LOG(error, "Could not find dequeued message on commit");
}
}
@@ -1136,8 +913,10 @@ bool Queue::popAndDequeue(QueuedMessage& msg)
* Updates policy and management when a message has been dequeued,
* Requires messageLock be held by caller.
*/
-void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
{
+ current -= QueueDepth(1, msg.getContentSize());
+ mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -1150,7 +929,7 @@ void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::Sco
/** updates queue observers when a message has become unavailable for transfer.
* Requires messageLock be held by caller.
*/
-void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1164,7 +943,7 @@ void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::Sco
/** updates queue observers when a message has become re-available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1202,13 +981,11 @@ void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::Sc
}
-void Queue::create(const FieldTable& _settings)
+void Queue::create()
{
- settings = _settings;
if (store) {
- store->create(*this, _settings);
+ store->create(*this, settings.storeSettings);
}
- configureImpl(_settings);
}
@@ -1258,112 +1035,21 @@ bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string
}
}
-void Queue::configure(const FieldTable& _settings)
+void Queue::abandoned(const Message& message)
{
- settings = _settings;
- configureImpl(settings);
-}
-
-void Queue::configureImpl(const FieldTable& _settings)
-{
- eventMode = _settings.getAsInt(qpidQueueEventGeneration);
- if (eventMode && broker) {
- broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
- }
-
- if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
- if ( NullMessageStore::isNullStore(store)) {
- QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
- } else if (broker && !(broker->getQueueEvents().isSync()) ) {
- QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
- }
- FieldTable copy(_settings);
- copy.erase(QueuePolicy::typeKey);
- setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
- } else {
- setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
- }
- if (broker && broker->getManagementAgent()) {
- ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio);
- }
-
- //set this regardless of owner to allow use of no-local with exclusive consumers also
- noLocal = getBoolSetting(_settings, qpidNoLocal);
- QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
-
- std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
- if (lvqKey.size()) {
- QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
- messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) {
- QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
- messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (getBoolSetting(_settings, qpidLastValueQueue)) {
- QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
- messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else {
- std::auto_ptr<Messages> m = Fairshare::create(_settings);
- if (m.get()) {
- messages = m;
- allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
- } else { // default (FIFO) queue type
- // override default message allocator if message groups configured.
- boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
- if (mgm) {
- allocator = mgm;
- addObserver(mgm);
- }
- }
- }
-
- persistLastNode = getBoolSetting(_settings, qpidPersistLastNode);
- if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
-
- traceId = _settings.getAsString(qpidTraceIdentity);
- std::string excludeList = _settings.getAsString(qpidTraceExclude);
- if (excludeList.size()) {
- split(traceExclude, excludeList, ", ");
- }
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
- << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
-
- FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
- if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
-
- autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
- if (autoDeleteTimeout)
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
-
- if (mgmtObject != 0) {
- mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
- }
-
- QueueFlowLimit::observe(*this, _settings);
+ if (reroute(alternateExchange, message) && brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ else if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
}
void Queue::destroyed()
{
unbind(broker->getExchanges());
-
- QueuedMessage m;
- while(popAndDequeue(m)) {
- DeliverableMessage msg(m.payload);
- if (alternateExchange.get()) {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandonedViaAlt();
- alternateExchange->routeWithAlternate(msg);
- } else {
- if (brokerMgmtObject)
- brokerMgmtObject->inc_abandoned();
- }
- }
- if (alternateExchange.get())
+ remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/);
+ if (alternateExchange.get()) {
alternateExchange->decAlternateUsers();
+ }
if (store) {
barrier.destroy();
@@ -1401,20 +1087,6 @@ void Queue::unbind(ExchangeRegistry& exchanges)
bindings.unbind(exchanges, shared_from_this());
}
-void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
-{
- Mutex::ScopedLock locker(messageLock);
- policy = _policy;
- if (policy.get())
- policy->setQueue(this);
-}
-
-const QueuePolicy* Queue::getPolicy()
-{
- Mutex::ScopedLock locker(messageLock);
- return policy.get();
-}
-
uint64_t Queue::getPersistenceId() const
{
return persistenceId;
@@ -1434,10 +1106,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
- buffer.put(settings);
- if (policy.get()) {
- buffer.put(*policy);
- }
+ buffer.put(encodableSettings);
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
}
@@ -1445,21 +1114,19 @@ uint32_t Queue::encodedSize() const
{
return name.size() + 1/*short string size octet*/
+ (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
- + settings.encodedSize()
- + (policy.get() ? (*policy).encodedSize() : 0);
+ + encodableSettings.encodedSize();
}
Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
{
string name;
buffer.getShortString(name);
- FieldTable settings;
- buffer.get(settings);
+ FieldTable ft;
+ buffer.get(ft);
boost::shared_ptr<Exchange> alternate;
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true);
- if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
- buffer.get ( *(result.first->policy) );
- }
+ QueueSettings settings(true, false);
+ settings.populate(ft, settings.storeSettings);
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate, true);
if (buffer.available()) {
string altExch;
buffer.getShortString(altExch);
@@ -1523,8 +1190,8 @@ struct AutoDeleteTask : qpid::sys::TimerTask
void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
- if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
- AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+ if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
+ AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
@@ -1543,12 +1210,15 @@ void Queue::releaseExclusiveOwnership()
{
Mutex::ScopedLock locker(ownershipLock);
owner = 0;
+ if (mgmtObject) {
+ mgmtObject->set_exclusive(false);
+ }
}
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
//reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
+ if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
Mutex::ScopedLock locker(ownershipLock);
@@ -1556,6 +1226,9 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o)
return false;
} else {
owner = o;
+ if (mgmtObject) {
+ mgmtObject->set_exclusive(true);
+ }
return true;
}
}
@@ -1687,7 +1360,7 @@ namespace {
struct After {
framing::SequenceNumber seq;
After(framing::SequenceNumber s) : seq(s) {}
- bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+ bool operator()(const Message& m) { return m.getSequence() > seq; }
};
} // namespace
@@ -1695,12 +1368,10 @@ struct After {
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
if (n < sequence) {
- std::deque<QueuedMessage> dequeued;
- dequeueIf(After(n), dequeued);
- messages->setPosition(n);
+ remove(0, After(n), MessagePredicate(), BROWSER);
}
sequence = n;
- QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
+ QPID_LOG(debug, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
@@ -1721,25 +1392,16 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
<< "\": exchange does not exist.");
}
//process any pending dequeues
- std::deque<QueuedMessage> pd;
- {
- Mutex::ScopedLock locker(messageLock);
- pendingDequeues.swap(pd);
+ for (std::vector<Message>::iterator i = pendingDequeues.begin(); i != pendingDequeues.end(); ++i) {
+ dequeueFromStore(i->getPersistentContext());
}
- for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
-}
-
-void Queue::insertSequenceNumbers(const std::string& key)
-{
- seqNoKey = key;
- insertSeqNo = !seqNoKey.empty();
- QPID_LOG(debug, "Inserting sequence numbers as " << key);
+ pendingDequeues.clear();
}
/** updates queue observers and state when a message has become available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1748,32 +1410,7 @@ void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::Scope
QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
}
-}
-
-void Queue::updateEnqueued(const QueuedMessage& m)
-{
- if (m.payload) {
- boost::intrusive_ptr<Message> payload = m.payload;
- enqueue(0, payload, true);
- {
- Mutex::ScopedLock locker(messageLock);
- messages->updateAcquired(m);
- observeEnqueue(m, locker);
- if (policy.get()) {
- policy->recoverEnqueued(payload);
- policy->enqueued(m);
- }
- }
- mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
- } else {
- QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
- }
-}
-
-bool Queue::isEnqueued(const QueuedMessage& msg)
-{
- Mutex::ScopedLock locker(messageLock);
- return !policy.get() || policy->isEnqueued(msg);
+ mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's
@@ -1835,28 +1472,82 @@ void Queue::setDequeueSincePurge(uint32_t value) {
dequeueSincePurge = value;
}
-namespace{
-class FindLowest
+void Queue::reject(const QueueCursor& cursor)
{
- public:
- FindLowest() : init(false) {}
- void process(const QueuedMessage& message) {
- QPID_LOG(debug, "FindLowest processing: " << message.position);
- if (!init || message.position < lowest) lowest = message.position;
- init = true;
- }
- bool getLowest(qpid::framing::SequenceNumber& result) {
- if (init) {
- result = lowest;
- return true;
+ Exchange::shared_ptr alternate = getAlternateExchange();
+ Message copy;
+ boost::intrusive_ptr<PersistableMessage> pmsg;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ Message* message = messages->find(cursor);
+ if (message) {
+ if (alternate) copy = *message;
+ if (message->isPersistent()) pmsg = message->getPersistentContext();
+ countRejected();
+ observeDequeue(*message, locker);
+ messages->deleted(cursor);
} else {
- return false;
+ return;
}
}
- private:
- bool init;
- qpid::framing::SequenceNumber lowest;
-};
+ if (alternate) {
+ copy.resetDeliveryCount();
+ DeliverableMessage delivery(copy, 0);
+ alternate->routeWithAlternate(delivery);
+ QPID_LOG(info, "Routed rejected message from " << getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << getName());
+ }
+ dequeueFromStore(pmsg);
+}
+
+bool Queue::checkDepth(const QueueDepth& increment, const Message&)
+{
+ if (current && (settings.maxDepth - current < increment)) {
+ if (mgmtObject) {
+ mgmtObject->inc_discardsOverflow();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsOverflow();
+ }
+ throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
+ } else {
+ current += increment;
+ return true;
+ }
+}
+
+bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate)
+{
+ Mutex::ScopedLock locker(messageLock);
+ //hold lock across calls to predicate, or take copy of message?
+ //currently hold lock, may want to revise depending on any new use
+ //cases
+ Message* message = messages->next(cursor);
+ while (message && (predicate && !predicate(*message))) {
+ message = messages->next(cursor);
+ }
+ return message != 0;
+}
+
+bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate, qpid::framing::SequenceNumber start)
+{
+ Mutex::ScopedLock locker(messageLock);
+ //hold lock across calls to predicate, or take copy of message?
+ //currently hold lock, may want to revise depending on any new use
+ //cases
+ Message* message;
+ message = messages->find(start, &cursor);
+ if (message && (!predicate || predicate(*message))) return true;
+
+ return seek(cursor, predicate);
+}
+
+bool Queue::seek(QueueCursor& cursor, qpid::framing::SequenceNumber start)
+{
+ Mutex::ScopedLock locker(messageLock);
+ return messages->find(start, &cursor);
}
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}