diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40cb80010c..d18b0fcda3 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" @@ -163,13 +164,8 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); } else { - // if no store then mark as enqueued - if (!enqueue(0, msg)){ - push(msg); - msg->enqueueComplete(); - }else { - push(msg); - } + enqueue(0, msg); + push(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -546,7 +542,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - if (message.payload->isEnqueueComplete()) (*result)++; + if (message.payload->isIngressComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -819,11 +815,14 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (autoDeleteTimeout) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); + } if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); + + QueueFlowLimit::observe(*this, _settings); } void Queue::destroyed() @@ -1176,6 +1175,7 @@ void Queue::flush() if (u.acquired && store) store->flush(*this); } + bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments) { @@ -1190,6 +1190,13 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, } } + +const Broker* Queue::getBroker() +{ + return broker; +} + + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() |