summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-01-31 22:16:53 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-01-31 22:16:53 +0000
commit91d7819eb5d0e9657195ebabf89e6454f7eb89b5 (patch)
tree2a0af20e7d0758e081a01d7e73a14bb754dfb686
parent14bb9643b6e62b80452c15802bc28687e717d3e0 (diff)
downloadqpid-python-91d7819eb5d0e9657195ebabf89e6454f7eb89b5.tar.gz
add mgmt configuration support
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1065831 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h9
-rw-r--r--qpid/cpp/src/tests/QueueFlowLimitTest.cpp17
-rw-r--r--qpid/specs/management-schema.xml6
-rwxr-xr-xqpid/tools/src/py/qpid-config38
6 files changed, 103 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 8f7f275fe6..ae3f84008a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -645,7 +645,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (policy.get()) {
policy->enqueued(qm);
}
- if (flowLimit.get()) flowLimit->consume(qm);
+ if (flowLimit.get()) {
+ bool fc = flowLimit->consume(qm);
+ if (fc && mgmtObject)
+ mgmtObject->set_flowStopped(true);
+ }
}
copy.notify();
}
@@ -844,7 +848,11 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
- if (flowLimit.get()) flowLimit->replenish(msg);
+ if (flowLimit.get()) {
+ bool fc = flowLimit->replenish(msg);
+ if (fc && mgmtObject)
+ mgmtObject->set_flowStopped(false);
+ }
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
eventMgr->dequeued(msg);
@@ -908,8 +916,16 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
+ if (flowLimit.get()) {
+ mgmtObject->set_flowStopCount(flowLimit->getFlowStopCount());
+ mgmtObject->set_flowResumeCount(flowLimit->getFlowResumeCount());
+ mgmtObject->set_flowStopSize(flowLimit->getFlowStopSize());
+ mgmtObject->set_flowResumeSize(flowLimit->getFlowResumeSize());
+ mgmtObject->set_flowStopped(flowLimit->isFlowControlActive());
+ }
+ }
if ( isDurable() && ! getPersistenceId() && ! recovering )
store->create(*this, _settings);
@@ -1184,7 +1200,11 @@ void Queue::enqueued(const QueuedMessage& m)
policy->recoverEnqueued(m.payload);
policy->enqueued(m);
}
- if (flowLimit.get()) flowLimit->consume(m);
+ if (flowLimit.get()) {
+ bool fc = flowLimit->consume(m);
+ if (fc && mgmtObject)
+ mgmtObject->set_flowStopped(true);
+ }
mgntEnqStats(m.payload);
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index f824b809d3..31dd5987b1 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -86,16 +86,17 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
+ : queue(_queue), queueName("<unknown>"),
+ flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0)
{
uint32_t maxCount(0);
uint64_t maxSize(0);
- if (_queue) {
+ if (queue) {
queueName = _queue->getName();
- if (_queue->getPolicy()) {
+ if (queue->getPolicy()) {
maxSize = _queue->getPolicy()->getMaxSize();
maxCount = _queue->getPolicy()->getMaxCount();
}
@@ -109,9 +110,11 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
-void QueueFlowLimit::consume(const QueuedMessage& msg)
+bool QueueFlowLimit::consume(const QueuedMessage& msg)
{
- if (!msg.payload) return;
+ bool flowChanged(false);
+
+ if (!msg.payload) return false;
sys::Mutex::ScopedLock l(pendingFlowLock);
@@ -119,32 +122,36 @@ void QueueFlowLimit::consume(const QueuedMessage& msg)
size += msg.payload->contentSize();
if (flowStopCount && !flowStopped && count > flowStopCount) {
- flowStopped = true;
+ flowChanged = flowStopped = true;
QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
}
if (flowStopSize && !flowStopped && size > flowStopSize) {
- flowStopped = true;
+ flowChanged = flowStopped = true;
QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
}
- // KAG: test
+ // KAG: test - REMOVE ONCE STABLE
if (index.find(msg.payload) != index.end()) {
QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
}
-
+
if (flowStopped || !pendingFlow.empty()) {
msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes
pendingFlow.push_back(msg.payload);
index.insert(msg.payload);
}
+
+ return flowChanged;
}
-void QueueFlowLimit::replenish(const QueuedMessage& msg)
+bool QueueFlowLimit::replenish(const QueuedMessage& msg)
{
- if (!msg.payload) return;
+ bool flowChanged(false);
+
+ if (!msg.payload) return false;
sys::Mutex::ScopedLock l(pendingFlowLock);
@@ -165,6 +172,7 @@ void QueueFlowLimit::replenish(const QueuedMessage& msg)
(flowResumeSize == 0 || size < flowResumeSize) &&
(flowResumeCount == 0 || count < flowResumeCount)) {
flowStopped = false;
+ flowChanged = true;
QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." );
}
@@ -197,6 +205,8 @@ void QueueFlowLimit::replenish(const QueuedMessage& msg)
pendingFlow.pop_front();
}
}
+
+ return flowChanged;
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index bd54f18a2b..48c8095470 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -43,6 +43,7 @@ namespace broker {
*/
class QueueFlowLimit
{
+ Queue *queue;
std::string queueName;
uint32_t flowStopCount;
@@ -63,10 +64,10 @@ class QueueFlowLimit
virtual ~QueueFlowLimit() {}
- /** the queue has added QueuedMessage */
- void consume(const QueuedMessage&);
- /** the queue has removed QueuedMessage */
- void replenish(const QueuedMessage&);
+ /** the queue has added QueuedMessage. Returns true if flow state changes */
+ bool consume(const QueuedMessage&);
+ /** the queue has removed QueuedMessage. Returns true if flow state changes */
+ bool replenish(const QueuedMessage&);
uint32_t getFlowStopCount() const { return flowStopCount; }
uint32_t getFlowResumeCount() const { return flowResumeCount; }
diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
index 5899986ee9..a01907f4ba 100644
--- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
+++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -61,6 +61,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
+ bool fc;
std::deque<QueuedMessage> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
@@ -72,11 +73,11 @@ QPID_AUTO_TEST_CASE(testFlowCount)
flow->consume(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
- BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON
+ fc = flow->consume(msgs.back());
+ BOOST_CHECK(fc && flow->isFlowControlActive()); // 8 on queue, ON
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
- BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue
+ fc = flow->consume(msgs.back());
+ BOOST_CHECK(!fc && flow->isFlowControlActive()); // 9 on queue, no change to flow control
flow->replenish(msgs.front());
msgs.pop_front();
@@ -87,13 +88,13 @@ QPID_AUTO_TEST_CASE(testFlowCount)
flow->replenish(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue
- flow->replenish(msgs.front());
+ fc = flow->replenish(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue
+ BOOST_CHECK(!fc && flow->isFlowControlActive()); // 5 on queue, no change
- flow->replenish(msgs.front());
+ fc = flow->replenish(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF
+ BOOST_CHECK(fc && !flow->isFlowControlActive()); // 4 on queue, OFF
}
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index b59f4c79d1..6d17357f00 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -143,6 +143,11 @@
<property name="exclusive" type="bool" access="RC"/>
<property name="arguments" type="map" access="RO" desc="Arguments supplied in queue.declare"/>
<property name="altExchange" type="objId" references="Exchange" access="RO" optional="y"/>
+ <property name="flowStopCount" type="uint32" access="RO" optional="y" desc="Flow control: # messages to force flow control."/>
+ <property name="flowResumeCount" type="uint32" access="RO" optional="y" desc="Flow control: # messages to release flow control."/>
+ <property name="flowStopSize" type="uint64" access="RO" optional="y" desc="Flow control: # enqueued bytes to force flow control."/>
+ <property name="flowResumeSize" type="uint64" access="RO" optional="y" desc="Flow control: # enqueued bytes to release flow control."/>
+
<statistic name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/>
<statistic name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/>
@@ -162,6 +167,7 @@
<statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/>
<statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/>
<statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/>
+ <statistic name="flowStopped" type="bool" desc="Flow control active."/>
<method name="purge" desc="Discard all or some messages on a queue">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index cdbe72baa0..6686431730 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -47,6 +47,10 @@ class Config:
self._eventGeneration = None
self._file = None
self._sasl_mechanism = None
+ self._flowStopCount = None
+ self._flowResumeCount = None
+ self._flowStopSize = None
+ self._flowResumeSize = None
config = Config()
@@ -61,6 +65,10 @@ LVQNB = "qpid.last_value_queue_no_browse"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+FLOW_STOP_COUNT = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE = "qpid.flow_resume_size"
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -160,6 +168,14 @@ def OptionsAndArguments(argv):
group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
+ group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued bytes exceeds this value.")
+ group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued bytes drops below this value.")
+ group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued messages exceeds this value.")
+ group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued messages drops below this value.")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
parser.add_option_group(group3)
@@ -231,6 +247,14 @@ def OptionsAndArguments(argv):
config._if_unused = False
if opts.sasl_mechanism:
config._sasl_mechanism = opts.sasl_mechanism
+ if opts.flow_stop_size:
+ config._flowStopSize = opts.flow_stop_size
+ if opts.flow_resume_size:
+ config._flowResumeSize = opts.flow_resume_size
+ if opts.flow_stop_count:
+ config._flowStopCount = opts.flow_stop_count
+ if opts.flow_resume_count:
+ config._flowResumeCount = opts.flow_resume_count
return args
@@ -389,6 +413,10 @@ class BrokerManager:
if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
if q.altExchange:
print "--alternate-exchange=%s" % q._altExchange_.name,
+ if FLOW_STOP_SIZE in args: print "--flow-stop-size=%d" % args[FLOW_STOP_SIZE],
+ if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%d" % args[FLOW_RESUME_SIZE],
+ if FLOW_STOP_COUNT in args: print "--flow-stop-count=%d" % args[FLOW_STOP_COUNT],
+ if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%d" % args[FLOW_RESUME_COUNT],
print
def QueueListRecurse(self, filter):
@@ -466,11 +494,21 @@ class BrokerManager:
if config._eventGeneration:
declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration
+ if config._flowStopSize:
+ declArgs[FLOW_STOP_SIZE] = config._flowStopSize
+ if config._flowResumeSize:
+ declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize
+ if config._flowStopCount:
+ declArgs[FLOW_STOP_COUNT] = config._flowStopCount
+ if config._flowResumeCount:
+ declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+
if config._altern_ex != None:
self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else:
self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs)
+
def DelQueue(self, args):
if len(args) < 1:
Usage()