summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-19 15:03:16 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-19 15:03:16 +0000
commit81584c84fadc886b0ad53dceb479073e56bf8cdd (patch)
treef48206d10d52fdbb5a4ce93ec8068f0de4fbc9f5 /cpp/src/qpid/broker/Queue.cpp
parentccd0e27fdf0c5a90a7f85099dac4f63dbd7a5d15 (diff)
downloadqpid-python-81584c84fadc886b0ad53dceb479073e56bf8cdd.tar.gz
QPID-2935: merge producer flow control (C++ broker).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1072356 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp25
1 files changed, 16 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 40cb80010c..d18b0fcda3 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/StringUtils.h"
@@ -163,13 +164,8 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
- // if no store then mark as enqueued
- if (!enqueue(0, msg)){
- push(msg);
- msg->enqueueComplete();
- }else {
- push(msg);
- }
+ enqueue(0, msg);
+ push(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -546,7 +542,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
{
- if (message.payload->isEnqueueComplete()) (*result)++;
+ if (message.payload->isIngressComplete()) (*result)++;
}
/** function only provided for unit tests, or code not in critical message path */
@@ -819,11 +815,14 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
if (autoDeleteTimeout)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
+ }
if ( isDurable() && ! getPersistenceId() && ! recovering )
store->create(*this, _settings);
+
+ QueueFlowLimit::observe(*this, _settings);
}
void Queue::destroyed()
@@ -1176,6 +1175,7 @@ void Queue::flush()
if (u.acquired && store) store->flush(*this);
}
+
bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments)
{
@@ -1190,6 +1190,13 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
}
}
+
+const Broker* Queue::getBroker()
+{
+ return broker;
+}
+
+
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()