diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-05 12:59:55 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-05 12:59:55 +0000 |
commit | bfbc8b09403cf94a83b5aea177a000c4c7f0381c (patch) | |
tree | 3d248db144084565353894d6b638c5d949ed0020 | |
parent | b0b12b6cd16c5997a2ca77d48f14d8e932b8e7ea (diff) | |
download | qpid-python-bfbc8b09403cf94a83b5aea177a000c4c7f0381c.tar.gz |
QPID-3346: enhance the configuration UI for msg groups
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1179208 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests_soak | 2 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py | 48 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-config | 22 |
8 files changed, 68 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index f576a866fc..9c97a294a9 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -39,6 +39,7 @@ namespace { const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); +const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ @@ -307,6 +308,12 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::s if (settings.isSet(qpidMessageGroupKey)) { + // @todo: remove once "sticky" consumers are supported - see QPID-3347 + if (!settings.isSet(qpidSharedGroup)) { + QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." ); + return empty; + } + std::string headerKey = settings.getAsString(qpidMessageGroupKey); if (headerKey.empty()) { QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index 8c93cd6815..58464c484d 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -63,6 +63,7 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu //Consumers consumers; // index: consumer name static const std::string qpidMessageGroupKey; + static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers static const std::string qpidMessageGroupTimestamp; static const std::string qpidMessageGroupDefault; diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 5274f2370d..7bf061ff54 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -727,6 +727,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { FieldTable args; Queue::shared_ptr queue(new Queue("my_queue", true)); args.setString("qpid.group_header_key", "GROUP-ID"); + args.setInt("qpid.shared_msg_group", 1); queue->configure(args); std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), @@ -918,6 +919,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { FieldTable args; Queue::shared_ptr queue(new Queue("my_queue", true)); args.setString("qpid.group_header_key", "GROUP-ID"); + args.setInt("qpid.shared_msg_group", 1); queue->configure(args); for (int i = 0; i < 3; ++i) { diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 879dcdaeaf..d217f9fbde 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -1463,8 +1463,8 @@ class LongTests(BrokerTest): # create a queue with rather draconian flow control settings ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.group_header_key':'group-id'}}}}") - + q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}" + s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration(); diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests index ee4f4bef77..8423022521 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests +++ b/qpid/cpp/src/tests/run_msg_group_tests @@ -43,10 +43,10 @@ run_test() { declare -i i=0 declare -a tests -tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}" +tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size" - "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}" + "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size" "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size" diff --git a/qpid/cpp/src/tests/run_msg_group_tests_soak b/qpid/cpp/src/tests/run_msg_group_tests_soak index 4d288758bd..5231f74755 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests_soak +++ b/qpid/cpp/src/tests/run_msg_group_tests_soak @@ -43,7 +43,7 @@ run_test() { declare -i i=0 declare -a tests -tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}" +tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47" diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py index e49a997a54..1e7570f15b 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -48,7 +48,8 @@ class MultiConsumerMsgGroupTests(Base): snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","A","A","B","B","B","C","C","C"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -172,7 +173,8 @@ class MultiConsumerMsgGroupTests(Base): snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -228,7 +230,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","A","B","B"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -265,7 +268,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","A","B","B"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -301,7 +305,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","A","B","B"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -366,7 +371,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","A","B","B","A","B"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -476,7 +482,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","C","A","B","C","A"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -534,7 +541,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C","A"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -578,7 +586,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C","A"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -630,7 +639,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C","A"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -685,7 +695,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C","A"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -698,7 +709,8 @@ class MultiConsumerMsgGroupTests(Base): # set up destination queue rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") # acquire group "A" s1 = self.setup_session() @@ -754,7 +766,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C","A"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -767,7 +780,8 @@ class MultiConsumerMsgGroupTests(Base): # set up destination queue rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") # now setup a QMF session, so we can move group B self.qmf_session = qmf.console.Session() @@ -815,7 +829,8 @@ class MultiConsumerMsgGroupTests(Base): """ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C","A"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] @@ -886,7 +901,8 @@ class MultiConsumerMsgGroupTests(Base): snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + " node: {x-declare: {arguments:" + - " {'qpid.group_header_key':'THE-GROUP'}}}}") + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") groups = ["A","B","A","B","C"] messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index cd80e26a1e..1ee35da8c3 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -96,6 +96,8 @@ class Config: self._flowResumeCount = None self._flowStopSize = None self._flowResumeSize = None + self._msgGroupHeader = None + self._sharedMsgGroup = False self._extra_arguments = [] self._returnCode = 0 @@ -116,13 +118,16 @@ FLOW_STOP_COUNT = "qpid.flow_stop_count" FLOW_RESUME_COUNT = "qpid.flow_resume_count" FLOW_STOP_SIZE = "qpid.flow_stop_size" FLOW_RESUME_SIZE = "qpid.flow_resume_size" +MSG_GROUP_HDR_KEY = "qpid.group_header_key" +SHARED_MSG_GROUP = "qpid.shared_msg_group" #There are various arguments to declare that have specific program #options in this utility. However there is now a generic mechanism for #passing arguments as well. The SPECIAL_ARGS list contains the #arguments for which there are specific program options defined #i.e. the arguments for which there is special processing on add and #list -SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE] +SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, + MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP] class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -182,6 +187,10 @@ def OptionsAndArguments(argv): help="Turn on sender flow control when the number of queued messages exceeds this value.") group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>", help="Turn off sender flow control when the number of queued messages drops below this value.") + group3.add_option("--group-header", action="store", type="string", metavar="<header-name>", + help="Enable message groups. Specify name of header that holds group identifier.") + group3.add_option("--shared-groups", action="store_true", + help="Allow message group consumption across multiple consumers.") group3.add_option("--argument", dest="extra_arguments", action="append", default=[], metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments") # no option for declaring an exclusive queue - which can only be used by the session that creates it. @@ -263,6 +272,10 @@ def OptionsAndArguments(argv): config._flowStopCount = opts.flow_stop_count if opts.flow_resume_count: config._flowResumeCount = opts.flow_resume_count + if opts.group_header: + config._msgGroupHeader = opts.group_header + if opts.shared_groups: + config._sharedMsgGroup = True if opts.extra_arguments: config._extra_arguments = opts.extra_arguments return args @@ -442,6 +455,8 @@ class BrokerManager: if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT], + if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY], + if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups", print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): @@ -534,6 +549,11 @@ class BrokerManager: if config._flowResumeCount: declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount + if config._msgGroupHeader: + declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader + if config._sharedMsgGroup: + declArgs[SHARED_MSG_GROUP] = 1 + if config._altern_ex != None: self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) else: |