diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 15:03:16 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 15:03:16 +0000 |
commit | 81584c84fadc886b0ad53dceb479073e56bf8cdd (patch) | |
tree | f48206d10d52fdbb5a4ce93ec8068f0de4fbc9f5 /cpp/src/qpid/broker/Queue.cpp | |
parent | ccd0e27fdf0c5a90a7f85099dac4f63dbd7a5d15 (diff) | |
download | qpid-python-81584c84fadc886b0ad53dceb479073e56bf8cdd.tar.gz |
QPID-2935: merge producer flow control (C++ broker).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1072356 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40cb80010c..d18b0fcda3 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" @@ -163,13 +164,8 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); } else { - // if no store then mark as enqueued - if (!enqueue(0, msg)){ - push(msg); - msg->enqueueComplete(); - }else { - push(msg); - } + enqueue(0, msg); + push(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -546,7 +542,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - if (message.payload->isEnqueueComplete()) (*result)++; + if (message.payload->isIngressComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -819,11 +815,14 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (autoDeleteTimeout) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); + } if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); + + QueueFlowLimit::observe(*this, _settings); } void Queue::destroyed() @@ -1176,6 +1175,7 @@ void Queue::flush() if (u.acquired && store) store->flush(*this); } + bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments) { @@ -1190,6 +1190,13 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, } } + +const Broker* Queue::getBroker() +{ + return broker; +} + + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() |