summaryrefslogtreecommitdiff
path: root/tools/src/py/qpid-config
diff options
context:
space:
mode:
Diffstat (limited to 'tools/src/py/qpid-config')
-rwxr-xr-xtools/src/py/qpid-config145
1 files changed, 121 insertions, 24 deletions
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
index 04b31e98ed..bb49b9d7c9 100755
--- a/tools/src/py/qpid-config
+++ b/tools/src/py/qpid-config
@@ -39,17 +39,12 @@ Usage: qpid-config [OPTIONS]
qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"""
description = """
-ADDRESS syntax:
-
- [username/password@] hostname
- ip-address [:<port>]
-
Examples:
$ qpid-config add queue q
-$ qpid-config add exchange direct d localhost:5672
-$ qpid-config exchanges 10.1.1.7:10000
-$ qpid-config queues guest/guest@broker-host:10000
+$ qpid-config add exchange direct d -a localhost:5672
+$ qpid-config exchanges -a 10.1.1.7:10000
+$ qpid-config queues -a guest/guest@broker-host:10000
Add Exchange <type> values:
@@ -80,6 +75,7 @@ class Config:
self._recursive = False
self._host = "localhost"
self._connTimeout = 10
+ self._ignoreDefault = False
self._altern_ex = None
self._passive = False
self._durable = False
@@ -97,6 +93,14 @@ class Config:
self._eventGeneration = None
self._file = None
self._sasl_mechanism = None
+ self._flowStopCount = None
+ self._flowResumeCount = None
+ self._flowStopSize = None
+ self._flowResumeSize = None
+ self._msgGroupHeader = None
+ self._sharedMsgGroup = False
+ self._extra_arguments = []
+ self._returnCode = 0
config = Config()
@@ -111,6 +115,20 @@ LVQNB = "qpid.last_value_queue_no_browse"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+FLOW_STOP_COUNT = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP = "qpid.shared_msg_group"
+#There are various arguments to declare that have specific program
+#options in this utility. However there is now a generic mechanism for
+#passing arguments as well. The SPECIAL_ARGS list contains the
+#arguments for which there are specific program options defined
+#i.e. the arguments for which there is special processing on add and
+#list
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -143,10 +161,14 @@ def OptionsAndArguments(argv):
group1 = OptionGroup(parser, "General Options")
group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list")
- group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
parser.add_option_group(group1)
+ group_ls = OptionGroup(parser, "Options for Listing Exchanges and Queues")
+ group_ls.add_option("--ignore-default", action="store_true", help="Ignore the default exchange in exchange or queue list")
+ parser.add_option_group(group_ls)
+
group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.")
@@ -156,12 +178,26 @@ def OptionsAndArguments(argv):
group3 = OptionGroup(parser, "Options for Adding Queues")
group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node")
group3.add_option("--file-count", action="store", type="int", default=8, metavar="<n>", help="Number of files in queue's persistence journal")
- group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64Kib/page)")
- group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal")
- group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal")
+ group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64KiB/page)")
+ group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
+ group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
+ group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued bytes exceeds this value.")
+ group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued bytes drops below this value.")
+ group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued messages exceeds this value.")
+ group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued messages drops below this value.")
+ group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+ help="Enable message groups. Specify name of header that holds group identifier.")
+ group3.add_option("--shared-groups", action="store_true",
+ help="Allow message group consumption across multiple consumers.")
+ group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
+ metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
parser.add_option_group(group3)
@@ -173,7 +209,7 @@ def OptionsAndArguments(argv):
group5 = OptionGroup(parser, "Options for Deleting Queues")
group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty")
group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty")
- group5.add_option("--force-if-not-used", action="store_true", help="Force delete of queue even if it's currently used")
+ group5.add_option("--force-if-used", action="store_true", help="Force delete of queue even if it's currently used")
parser.add_option_group(group5)
group6 = OptionGroup(parser, "Options for Declaring Bindings")
@@ -196,6 +232,8 @@ def OptionsAndArguments(argv):
config._connTimeout = opts.timeout
if config._connTimeout == 0:
config._connTimeout = None
+ if opts.ignore_default:
+ config._ignoreDefault = True
if opts.alternate_exchange:
config._altern_ex = opts.alternate_exchange
if opts.passive:
@@ -210,7 +248,7 @@ def OptionsAndArguments(argv):
config._fileCount = opts.file_count
if opts.file_size:
config._fileSize = opts.file_size
- if opts.max_queue_size:
+ if opts.max_queue_size != None:
config._maxQueueSize = opts.max_queue_size
if opts.max_queue_count:
config._maxQueueCount = opts.max_queue_count
@@ -229,10 +267,24 @@ def OptionsAndArguments(argv):
config._if_unused = False
if opts.force_if_not_empty:
config._if_empty = False
- if opts.force_if_not_used:
+ if opts.force_if_used:
config._if_unused = False
if opts.sasl_mechanism:
config._sasl_mechanism = opts.sasl_mechanism
+ if opts.flow_stop_size:
+ config._flowStopSize = opts.flow_stop_size
+ if opts.flow_resume_size:
+ config._flowResumeSize = opts.flow_resume_size
+ if opts.flow_stop_count:
+ config._flowStopCount = opts.flow_stop_count
+ if opts.flow_resume_count:
+ config._flowResumeCount = opts.flow_resume_count
+ if opts.group_header:
+ config._msgGroupHeader = opts.group_header
+ if opts.shared_groups:
+ config._sharedMsgGroup = True
+ if opts.extra_arguments:
+ config._extra_arguments = opts.extra_arguments
return args
@@ -323,9 +375,16 @@ class BrokerManager:
caption1 = "Type "
caption2 = "Exchange Name"
maxNameLen = len(caption2)
+ found = False
for ex in exchanges:
if self.match(ex.name, filter):
if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
+ found = True
+ if not found:
+ global config
+ config._returnCode = 1
+ return
+
print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
line = ""
for i in range(((maxNameLen + len(caption1)) / 5) + 5):
@@ -333,9 +392,11 @@ class BrokerManager:
print line
for ex in exchanges:
+ if config._ignoreDefault and not ex.name: continue
if self.match(ex.name, filter):
print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
args = ex.arguments
+ if not args: args = {}
if ex.durable: print "--durable",
if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
if IVE in args and args[IVE] == 1: print "--ive",
@@ -348,6 +409,7 @@ class BrokerManager:
bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
for ex in exchanges:
+ if config._ignoreDefault and not ex.name: continue
if self.match(ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
for bind in bindings:
@@ -361,12 +423,18 @@ class BrokerManager:
def QueueList(self, filter):
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
-
caption = "Queue Name"
maxNameLen = len(caption)
+ found = False
for q in queues:
if self.match(q.name, filter):
if len(q.name) > maxNameLen: maxNameLen = len(q.name)
+ found = True
+ if not found:
+ global config
+ config._returnCode = 1
+ return
+
print "%-*s Attributes" % (maxNameLen, caption)
line = ""
for i in range((maxNameLen / 5) + 5):
@@ -377,21 +445,28 @@ class BrokerManager:
if self.match(q.name, filter):
print "%-*s " % (maxNameLen, q.name),
args = q.arguments
+ if not args: args = {}
if q.durable: print "--durable",
if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
if q.autoDelete: print "auto-del",
if q.exclusive: print "excl",
- if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
- if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
- if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE],
- if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
+ if FILESIZE in args: print "--file-size=%s" % args[FILESIZE],
+ if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT],
+ if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE],
+ if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
if LVQ in args and args[LVQ] == 1: print "--order lvq",
if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse",
- if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
+ if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
if q.altExchange:
print "--alternate-exchange=%s" % q._altExchange_.name,
- print
+ if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
+ if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
+ if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
+ if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+ if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+ if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
+ print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
def QueueListRecurse(self, filter):
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
@@ -407,6 +482,7 @@ class BrokerManager:
if ex != None:
ename = ex.name
if ename == "":
+ if config._ignoreDefault: continue
ename = "''"
print " bind [%s] => %s" % (bind.bindingKey, ename)
@@ -436,11 +512,17 @@ class BrokerManager:
Usage()
qname = args[0]
declArgs = {}
+ for a in config._extra_arguments:
+ r = a.split("=", 1)
+ if len(r) == 2: value = r[1]
+ else: value = None
+ declArgs[r[0]] = value
+
if config._durable:
declArgs[FILECOUNT] = config._fileCount
declArgs[FILESIZE] = config._fileSize
- if config._maxQueueSize:
+ if config._maxQueueSize != None:
declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize
if config._maxQueueCount:
declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount
@@ -468,11 +550,26 @@ class BrokerManager:
if config._eventGeneration:
declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration
+ if config._flowStopSize:
+ declArgs[FLOW_STOP_SIZE] = config._flowStopSize
+ if config._flowResumeSize:
+ declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize
+ if config._flowStopCount:
+ declArgs[FLOW_STOP_COUNT] = config._flowStopCount
+ if config._flowResumeCount:
+ declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+
+ if config._msgGroupHeader:
+ declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+ if config._sharedMsgGroup:
+ declArgs[SHARED_MSG_GROUP] = 1
+
if config._altern_ex != None:
self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else:
self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs)
+
def DelQueue(self, args):
if len(args) < 1:
Usage()
@@ -617,7 +714,7 @@ def main(argv=None):
print "Failed: %s: %s" % (e.__class__.__name__, e)
return 1
- return 0
+ return config._returnCode
if __name__ == "__main__":
sys.exit(main())