summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-05 12:59:55 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-05 12:59:55 +0000
commitbfbc8b09403cf94a83b5aea177a000c4c7f0381c (patch)
tree3d248db144084565353894d6b638c5d949ed0020
parentb0b12b6cd16c5997a2ca77d48f14d8e932b8e7ea (diff)
downloadqpid-python-bfbc8b09403cf94a83b5aea177a000c4c7f0381c.tar.gz
QPID-3346: enhance the configuration UI for msg groups
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1179208 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h1
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py4
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests4
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests_soak2
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py48
-rwxr-xr-xqpid/tools/src/py/qpid-config22
8 files changed, 68 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index f576a866fc..9c97a294a9 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -39,6 +39,7 @@ namespace {
const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
@@ -307,6 +308,12 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::s
if (settings.isSet(qpidMessageGroupKey)) {
+ // @todo: remove once "sticky" consumers are supported - see QPID-3347
+ if (!settings.isSet(qpidSharedGroup)) {
+ QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
+ return empty;
+ }
+
std::string headerKey = settings.getAsString(qpidMessageGroupKey);
if (headerKey.empty()) {
QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index 8c93cd6815..58464c484d 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -63,6 +63,7 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
//Consumers consumers; // index: consumer name
static const std::string qpidMessageGroupKey;
+ static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
static const std::string qpidMessageGroupTimestamp;
static const std::string qpidMessageGroupDefault;
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 5274f2370d..7bf061ff54 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -727,6 +727,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
FieldTable args;
Queue::shared_ptr queue(new Queue("my_queue", true));
args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
queue->configure(args);
std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
@@ -918,6 +919,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
FieldTable args;
Queue::shared_ptr queue(new Queue("my_queue", true));
args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
queue->configure(args);
for (int i = 0; i < 3; ++i) {
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 879dcdaeaf..d217f9fbde 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -1463,8 +1463,8 @@ class LongTests(BrokerTest):
# create a queue with rather draconian flow control settings
ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.group_header_key':'group-id'}}}}")
-
+ q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+ s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration();
diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests
index ee4f4bef77..8423022521 100755
--- a/qpid/cpp/src/tests/run_msg_group_tests
+++ b/qpid/cpp/src/tests/run_msg_group_tests
@@ -43,10 +43,10 @@ run_test() {
declare -i i=0
declare -a tests
-tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
- "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}"
+ "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
"msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"
diff --git a/qpid/cpp/src/tests/run_msg_group_tests_soak b/qpid/cpp/src/tests/run_msg_group_tests_soak
index 4d288758bd..5231f74755 100755
--- a/qpid/cpp/src/tests/run_msg_group_tests_soak
+++ b/qpid/cpp/src/tests/run_msg_group_tests_soak
@@ -43,7 +43,7 @@ run_test() {
declare -i i=0
declare -a tests
-tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47"
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index e49a997a54..1e7570f15b 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -48,7 +48,8 @@ class MultiConsumerMsgGroupTests(Base):
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","A","A","B","B","B","C","C","C"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -172,7 +173,8 @@ class MultiConsumerMsgGroupTests(Base):
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -228,7 +230,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","A","B","B"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -265,7 +268,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","A","B","B"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -301,7 +305,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","A","B","B"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -366,7 +371,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","A","B","B","A","B"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -476,7 +482,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","C","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -534,7 +541,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -578,7 +586,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -630,7 +639,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -685,7 +695,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -698,7 +709,8 @@ class MultiConsumerMsgGroupTests(Base):
# set up destination queue
rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
# acquire group "A"
s1 = self.setup_session()
@@ -754,7 +766,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -767,7 +780,8 @@ class MultiConsumerMsgGroupTests(Base):
# set up destination queue
rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
# now setup a QMF session, so we can move group B
self.qmf_session = qmf.console.Session()
@@ -815,7 +829,8 @@ class MultiConsumerMsgGroupTests(Base):
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -886,7 +901,8 @@ class MultiConsumerMsgGroupTests(Base):
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'}}}}")
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index cd80e26a1e..1ee35da8c3 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -96,6 +96,8 @@ class Config:
self._flowResumeCount = None
self._flowStopSize = None
self._flowResumeSize = None
+ self._msgGroupHeader = None
+ self._sharedMsgGroup = False
self._extra_arguments = []
self._returnCode = 0
@@ -116,13 +118,16 @@ FLOW_STOP_COUNT = "qpid.flow_stop_count"
FLOW_RESUME_COUNT = "qpid.flow_resume_count"
FLOW_STOP_SIZE = "qpid.flow_stop_size"
FLOW_RESUME_SIZE = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP = "qpid.shared_msg_group"
#There are various arguments to declare that have specific program
#options in this utility. However there is now a generic mechanism for
#passing arguments as well. The SPECIAL_ARGS list contains the
#arguments for which there are specific program options defined
#i.e. the arguments for which there is special processing on add and
#list
-SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE]
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -182,6 +187,10 @@ def OptionsAndArguments(argv):
help="Turn on sender flow control when the number of queued messages exceeds this value.")
group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
help="Turn off sender flow control when the number of queued messages drops below this value.")
+ group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+ help="Enable message groups. Specify name of header that holds group identifier.")
+ group3.add_option("--shared-groups", action="store_true",
+ help="Allow message group consumption across multiple consumers.")
group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
@@ -263,6 +272,10 @@ def OptionsAndArguments(argv):
config._flowStopCount = opts.flow_stop_count
if opts.flow_resume_count:
config._flowResumeCount = opts.flow_resume_count
+ if opts.group_header:
+ config._msgGroupHeader = opts.group_header
+ if opts.shared_groups:
+ config._sharedMsgGroup = True
if opts.extra_arguments:
config._extra_arguments = opts.extra_arguments
return args
@@ -442,6 +455,8 @@ class BrokerManager:
if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+ if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+ if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
def QueueListRecurse(self, filter):
@@ -534,6 +549,11 @@ class BrokerManager:
if config._flowResumeCount:
declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+ if config._msgGroupHeader:
+ declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+ if config._sharedMsgGroup:
+ declArgs[SHARED_MSG_GROUP] = 1
+
if config._altern_ex != None:
self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else: