diff options
| author | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
| commit | 3fb61f5354df0b77325e4fc88aa59213d3000a8e (patch) | |
| tree | 2fe1900c3e715586e75f1d7ba2687b2a7e3f5547 /qpid/tools/src | |
| parent | c71af5478c87527b4bd0eb9e0e4e37a9b151ea92 (diff) | |
| download | qpid-python-3fb61f5354df0b77325e4fc88aa59213d3000a8e.tar.gz | |
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication.
- qpid-ha tool can enable replication of queues.
- qpid-config tool can create queues with replication enabled.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tools/src')
| -rwxr-xr-x | qpid/tools/src/py/qpid-config | 38 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid-ha | 4 |
2 files changed, 24 insertions, 18 deletions
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index ee1c365fbf..367fd0574e 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -77,7 +77,7 @@ Replication levels: messages - replicate configuration and messages """ -REPLICATE_LEVELS= ["none", "configuration", "messages"] +REPLICATION_LEVELS= ["none", "configuration", "messages"] class Config: def __init__(self): @@ -87,7 +87,7 @@ class Config: self._ignoreDefault = False self._altern_ex = None self._durable = False - self._replicate = None + self._replication = None self._ha_admin = False self._clusterDurable = False self._if_empty = True @@ -110,6 +110,7 @@ class Config: self._msgGroupHeader = None self._sharedMsgGroup = False self._extra_arguments = [] + self._replicate_from = None self._returnCode = 0 config = Config() @@ -130,7 +131,7 @@ FLOW_STOP_SIZE = "qpid.flow_stop_size" FLOW_RESUME_SIZE = "qpid.flow_resume_size" MSG_GROUP_HDR_KEY = "qpid.group_header_key" SHARED_MSG_GROUP = "qpid.shared_msg_group" -REPLICATE = "qpid.replicate" +REPLICATION = "qpid.replicate" #There are various arguments to declare that have specific program #options in this utility. However there is now a generic mechanism for #passing arguments as well. The SPECIAL_ARGS list contains the @@ -141,7 +142,7 @@ SPECIAL_ARGS=[ FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE, LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION, FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, - MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE] + MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATION] class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -185,7 +186,7 @@ def OptionsAndArguments(argv): group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.") - group2.add_option("--replicate", action="store", metavar="<level>", help="Replication level for the new queue or exchange (none, configuration or messages).") + group2.add_option("--replication", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'messages').") group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") parser.add_option_group(group2) @@ -212,6 +213,7 @@ def OptionsAndArguments(argv): help="Allow message group consumption across multiple consumers.") group3.add_option("--argument", dest="extra_arguments", action="append", default=[], metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments") + group3.add_option("--replicate-from", metavar="<broker-url>", help="Replicate from the same-named queue at <broker-url>") # no option for declaring an exclusive queue - which can only be used by the session that creates it. parser.add_option_group(group3) @@ -252,10 +254,10 @@ def OptionsAndArguments(argv): config._altern_ex = opts.alternate_exchange if opts.durable: config._durable = True - if opts.replicate: - if not opts.replicate in REPLICATE_LEVELS: - raise Exception("Invalid replicate level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS))) - config._replicate = opts.replicate + if opts.replication: + if not opts.replication in REPLICATION_LEVELS: + raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replication, ", ".join(REPLICATION_LEVELS))) + config._replication = opts.replication if opts.ha_admin: config._ha_admin = True if opts.cluster_durable: config._clusterDurable = True @@ -302,6 +304,8 @@ def OptionsAndArguments(argv): config._sharedMsgGroup = True if opts.extra_arguments: config._extra_arguments = opts.extra_arguments + if opts.replicate_from: + config._replicate_from = opts.replicate_from return args @@ -464,7 +468,7 @@ class BrokerManager: args = q.arguments if not args: args = {} if q.durable: print "--durable", - if REPLICATE in args: print "--replicate=%s" % args[REPLICATE], + if REPLICATION in args: print "--replication=%s" % args[REPLICATION], if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", if q.autoDelete: print "auto-del", if q.exclusive: print "excl", @@ -526,8 +530,8 @@ class BrokerManager: declArgs['alternate-exchange'] = config._altern_ex if config._durable: declArgs['durable'] = 1 - if config._replicate: - declArgs[REPLICATE] = config._replicate + if config._replication: + declArgs[REPLICATION] = config._replication self.broker.addExchange(etype, ename, declArgs) @@ -594,11 +598,11 @@ class BrokerManager: declArgs['alternate-exchange'] = config._altern_ex if config._durable: declArgs['durable'] = 1 - if config._replicate: - declArgs[REPLICATE] = config._replicate - + if config._replication: + declArgs[REPLICATION] = config._replication self.broker.addQueue(qname, declArgs) - + if config._replicate_from: # Start replication + self.broker._method("replicate", {"broker":config._replicate_from, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker") def DelQueue(self, args): if len(args) < 1: @@ -751,9 +755,9 @@ def main(argv=None): if e.__class__.__name__ != "Timeout": print "Failed: %s: %s" % (e.__class__.__name__, e) return 1 - return config._returnCode + if __name__ == "__main__": sys.exit(main()) diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha index 13b4055f71..e6ae6c4884 100755 --- a/qpid/tools/src/py/qpid-ha +++ b/qpid/tools/src/py/qpid-ha @@ -83,7 +83,9 @@ ReadyCmd() class ReplicateCmd(Command): def __init__(self): - Command.__init__(self, "replicate", "Replicate <queue> from broker <primary> to the current broker.", ["<queue>", "<primary>"]) + Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"]) + def do_execute(self, qmf_broker, opts, args): + qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER) ReplicateCmd() class SetCmd(Command): |
