diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-31 22:16:53 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-31 22:16:53 +0000 |
commit | 91d7819eb5d0e9657195ebabf89e6454f7eb89b5 (patch) | |
tree | 2a0af20e7d0758e081a01d7e73a14bb754dfb686 | |
parent | 14bb9643b6e62b80452c15802bc28687e717d3e0 (diff) | |
download | qpid-python-91d7819eb5d0e9657195ebabf89e6454f7eb89b5.tar.gz |
add mgmt configuration support
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1065831 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueFlowLimitTest.cpp | 17 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 6 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-config | 38 |
6 files changed, 103 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 8f7f275fe6..ae3f84008a 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -645,7 +645,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (policy.get()) { policy->enqueued(qm); } - if (flowLimit.get()) flowLimit->consume(qm); + if (flowLimit.get()) { + bool fc = flowLimit->consume(qm); + if (fc && mgmtObject) + mgmtObject->set_flowStopped(true); + } } copy.notify(); } @@ -844,7 +848,11 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); - if (flowLimit.get()) flowLimit->replenish(msg); + if (flowLimit.get()) { + bool fc = flowLimit->replenish(msg); + if (fc && mgmtObject) + mgmtObject->set_flowStopped(false); + } mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { eventMgr->dequeued(msg); @@ -908,8 +916,16 @@ void Queue::configure(const FieldTable& _settings, bool recovering) flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings); - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); + if (flowLimit.get()) { + mgmtObject->set_flowStopCount(flowLimit->getFlowStopCount()); + mgmtObject->set_flowResumeCount(flowLimit->getFlowResumeCount()); + mgmtObject->set_flowStopSize(flowLimit->getFlowStopSize()); + mgmtObject->set_flowResumeSize(flowLimit->getFlowResumeSize()); + mgmtObject->set_flowStopped(flowLimit->isFlowControlActive()); + } + } if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); @@ -1184,7 +1200,11 @@ void Queue::enqueued(const QueuedMessage& m) policy->recoverEnqueued(m.payload); policy->enqueued(m); } - if (flowLimit.get()) flowLimit->consume(m); + if (flowLimit.get()) { + bool fc = flowLimit->consume(m); + if (fc && mgmtObject) + mgmtObject->set_flowStopped(true); + } mgntEnqStats(m.payload); boost::intrusive_ptr<Message> payload = m.payload; enqueue ( 0, payload, true ); diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index f824b809d3..31dd5987b1 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -86,16 +86,17 @@ namespace { QueueFlowLimit::QueueFlowLimit(Queue *_queue, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), + : queue(_queue), queueName("<unknown>"), + flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0) { uint32_t maxCount(0); uint64_t maxSize(0); - if (_queue) { + if (queue) { queueName = _queue->getName(); - if (_queue->getPolicy()) { + if (queue->getPolicy()) { maxSize = _queue->getPolicy()->getMaxSize(); maxCount = _queue->getPolicy()->getMaxCount(); } @@ -109,9 +110,11 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, -void QueueFlowLimit::consume(const QueuedMessage& msg) +bool QueueFlowLimit::consume(const QueuedMessage& msg) { - if (!msg.payload) return; + bool flowChanged(false); + + if (!msg.payload) return false; sys::Mutex::ScopedLock l(pendingFlowLock); @@ -119,32 +122,36 @@ void QueueFlowLimit::consume(const QueuedMessage& msg) size += msg.payload->contentSize(); if (flowStopCount && !flowStopped && count > flowStopCount) { - flowStopped = true; + flowChanged = flowStopped = true; QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); } if (flowStopSize && !flowStopped && size > flowStopSize) { - flowStopped = true; + flowChanged = flowStopped = true; QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); } - // KAG: test + // KAG: test - REMOVE ONCE STABLE if (index.find(msg.payload) != index.end()) { QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position); } - + if (flowStopped || !pendingFlow.empty()) { msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes pendingFlow.push_back(msg.payload); index.insert(msg.payload); } + + return flowChanged; } -void QueueFlowLimit::replenish(const QueuedMessage& msg) +bool QueueFlowLimit::replenish(const QueuedMessage& msg) { - if (!msg.payload) return; + bool flowChanged(false); + + if (!msg.payload) return false; sys::Mutex::ScopedLock l(pendingFlowLock); @@ -165,6 +172,7 @@ void QueueFlowLimit::replenish(const QueuedMessage& msg) (flowResumeSize == 0 || size < flowResumeSize) && (flowResumeCount == 0 || count < flowResumeCount)) { flowStopped = false; + flowChanged = true; QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." ); } @@ -197,6 +205,8 @@ void QueueFlowLimit::replenish(const QueuedMessage& msg) pendingFlow.pop_front(); } } + + return flowChanged; } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index bd54f18a2b..48c8095470 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -43,6 +43,7 @@ namespace broker { */ class QueueFlowLimit { + Queue *queue; std::string queueName; uint32_t flowStopCount; @@ -63,10 +64,10 @@ class QueueFlowLimit virtual ~QueueFlowLimit() {} - /** the queue has added QueuedMessage */ - void consume(const QueuedMessage&); - /** the queue has removed QueuedMessage */ - void replenish(const QueuedMessage&); + /** the queue has added QueuedMessage. Returns true if flow state changes */ + bool consume(const QueuedMessage&); + /** the queue has removed QueuedMessage. Returns true if flow state changes */ + bool replenish(const QueuedMessage&); uint32_t getFlowStopCount() const { return flowStopCount; } uint32_t getFlowResumeCount() const { return flowResumeCount; } diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp index 5899986ee9..a01907f4ba 100644 --- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp +++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp @@ -61,6 +61,7 @@ QPID_AUTO_TEST_CASE(testFlowCount) BOOST_CHECK(!flow->isFlowControlActive()); BOOST_CHECK(flow->monitorFlowControl()); + bool fc; std::deque<QueuedMessage> msgs; for (size_t i = 0; i < 6; i++) { msgs.push_back(createMessage(10)); @@ -72,11 +73,11 @@ QPID_AUTO_TEST_CASE(testFlowCount) flow->consume(msgs.back()); BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue msgs.push_back(createMessage(10)); - flow->consume(msgs.back()); - BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON + fc = flow->consume(msgs.back()); + BOOST_CHECK(fc && flow->isFlowControlActive()); // 8 on queue, ON msgs.push_back(createMessage(10)); - flow->consume(msgs.back()); - BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue + fc = flow->consume(msgs.back()); + BOOST_CHECK(!fc && flow->isFlowControlActive()); // 9 on queue, no change to flow control flow->replenish(msgs.front()); msgs.pop_front(); @@ -87,13 +88,13 @@ QPID_AUTO_TEST_CASE(testFlowCount) flow->replenish(msgs.front()); msgs.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue - flow->replenish(msgs.front()); + fc = flow->replenish(msgs.front()); msgs.pop_front(); - BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue + BOOST_CHECK(!fc && flow->isFlowControlActive()); // 5 on queue, no change - flow->replenish(msgs.front()); + fc = flow->replenish(msgs.front()); msgs.pop_front(); - BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF + BOOST_CHECK(fc && !flow->isFlowControlActive()); // 4 on queue, OFF } diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index b59f4c79d1..6d17357f00 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -143,6 +143,11 @@ <property name="exclusive" type="bool" access="RC"/> <property name="arguments" type="map" access="RO" desc="Arguments supplied in queue.declare"/> <property name="altExchange" type="objId" references="Exchange" access="RO" optional="y"/> + <property name="flowStopCount" type="uint32" access="RO" optional="y" desc="Flow control: # messages to force flow control."/> + <property name="flowResumeCount" type="uint32" access="RO" optional="y" desc="Flow control: # messages to release flow control."/> + <property name="flowStopSize" type="uint64" access="RO" optional="y" desc="Flow control: # enqueued bytes to force flow control."/> + <property name="flowResumeSize" type="uint64" access="RO" optional="y" desc="Flow control: # enqueued bytes to release flow control."/> + <statistic name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/> <statistic name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/> @@ -162,6 +167,7 @@ <statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/> <statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> <statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/> + <statistic name="flowStopped" type="bool" desc="Flow control active."/> <method name="purge" desc="Discard all or some messages on a queue"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index cdbe72baa0..6686431730 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -47,6 +47,10 @@ class Config: self._eventGeneration = None self._file = None self._sasl_mechanism = None + self._flowStopCount = None + self._flowResumeCount = None + self._flowStopSize = None + self._flowResumeSize = None config = Config() @@ -61,6 +65,10 @@ LVQNB = "qpid.last_value_queue_no_browse" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" +FLOW_STOP_COUNT = "qpid.flow_stop_count" +FLOW_RESUME_COUNT = "qpid.flow_resume_count" +FLOW_STOP_SIZE = "qpid.flow_stop_size" +FLOW_RESUME_SIZE = "qpid.flow_resume_size" class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -160,6 +168,14 @@ def OptionsAndArguments(argv): group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached") group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy") group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") + group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>", + help="Turn on sender flow control when the number of queued bytes exceeds this value.") + group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>", + help="Turn off sender flow control when the number of queued bytes drops below this value.") + group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>", + help="Turn on sender flow control when the number of queued messages exceeds this value.") + group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>", + help="Turn off sender flow control when the number of queued messages drops below this value.") # no option for declaring an exclusive queue - which can only be used by the session that creates it. parser.add_option_group(group3) @@ -231,6 +247,14 @@ def OptionsAndArguments(argv): config._if_unused = False if opts.sasl_mechanism: config._sasl_mechanism = opts.sasl_mechanism + if opts.flow_stop_size: + config._flowStopSize = opts.flow_stop_size + if opts.flow_resume_size: + config._flowResumeSize = opts.flow_resume_size + if opts.flow_stop_count: + config._flowStopCount = opts.flow_stop_count + if opts.flow_resume_count: + config._flowResumeCount = opts.flow_resume_count return args @@ -389,6 +413,10 @@ class BrokerManager: if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q._altExchange_.name, + if FLOW_STOP_SIZE in args: print "--flow-stop-size=%d" % args[FLOW_STOP_SIZE], + if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%d" % args[FLOW_RESUME_SIZE], + if FLOW_STOP_COUNT in args: print "--flow-stop-count=%d" % args[FLOW_STOP_COUNT], + if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%d" % args[FLOW_RESUME_COUNT], print def QueueListRecurse(self, filter): @@ -466,11 +494,21 @@ class BrokerManager: if config._eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration + if config._flowStopSize: + declArgs[FLOW_STOP_SIZE] = config._flowStopSize + if config._flowResumeSize: + declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize + if config._flowStopCount: + declArgs[FLOW_STOP_COUNT] = config._flowStopCount + if config._flowResumeCount: + declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount + if config._altern_ex != None: self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) else: self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) + def DelQueue(self, args): if len(args) < 1: Usage() |