summaryrefslogtreecommitdiff
path: root/qpid/tools/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
committerAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
commit3fb61f5354df0b77325e4fc88aa59213d3000a8e (patch)
tree2fe1900c3e715586e75f1d7ba2687b2a7e3f5547 /qpid/tools/src
parentc71af5478c87527b4bd0eb9e0e4e37a9b151ea92 (diff)
downloadqpid-python-3fb61f5354df0b77325e4fc88aa59213d3000a8e.tar.gz
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication. - qpid-ha tool can enable replication of queues. - qpid-config tool can create queues with replication enabled. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tools/src')
-rwxr-xr-xqpid/tools/src/py/qpid-config38
-rwxr-xr-xqpid/tools/src/py/qpid-ha4
2 files changed, 24 insertions, 18 deletions
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index ee1c365fbf..367fd0574e 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -77,7 +77,7 @@ Replication levels:
messages - replicate configuration and messages
"""
-REPLICATE_LEVELS= ["none", "configuration", "messages"]
+REPLICATION_LEVELS= ["none", "configuration", "messages"]
class Config:
def __init__(self):
@@ -87,7 +87,7 @@ class Config:
self._ignoreDefault = False
self._altern_ex = None
self._durable = False
- self._replicate = None
+ self._replication = None
self._ha_admin = False
self._clusterDurable = False
self._if_empty = True
@@ -110,6 +110,7 @@ class Config:
self._msgGroupHeader = None
self._sharedMsgGroup = False
self._extra_arguments = []
+ self._replicate_from = None
self._returnCode = 0
config = Config()
@@ -130,7 +131,7 @@ FLOW_STOP_SIZE = "qpid.flow_stop_size"
FLOW_RESUME_SIZE = "qpid.flow_resume_size"
MSG_GROUP_HDR_KEY = "qpid.group_header_key"
SHARED_MSG_GROUP = "qpid.shared_msg_group"
-REPLICATE = "qpid.replicate"
+REPLICATION = "qpid.replicate"
#There are various arguments to declare that have specific program
#options in this utility. However there is now a generic mechanism for
#passing arguments as well. The SPECIAL_ARGS list contains the
@@ -141,7 +142,7 @@ SPECIAL_ARGS=[
FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,
LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,
FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
- MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATION]
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -185,7 +186,7 @@ def OptionsAndArguments(argv):
group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
- group2.add_option("--replicate", action="store", metavar="<level>", help="Replication level for the new queue or exchange (none, configuration or messages).")
+ group2.add_option("--replication", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'messages').")
group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
parser.add_option_group(group2)
@@ -212,6 +213,7 @@ def OptionsAndArguments(argv):
help="Allow message group consumption across multiple consumers.")
group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
+ group3.add_option("--replicate-from", metavar="<broker-url>", help="Replicate from the same-named queue at <broker-url>")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
parser.add_option_group(group3)
@@ -252,10 +254,10 @@ def OptionsAndArguments(argv):
config._altern_ex = opts.alternate_exchange
if opts.durable:
config._durable = True
- if opts.replicate:
- if not opts.replicate in REPLICATE_LEVELS:
- raise Exception("Invalid replicate level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS)))
- config._replicate = opts.replicate
+ if opts.replication:
+ if not opts.replication in REPLICATION_LEVELS:
+ raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replication, ", ".join(REPLICATION_LEVELS)))
+ config._replication = opts.replication
if opts.ha_admin: config._ha_admin = True
if opts.cluster_durable:
config._clusterDurable = True
@@ -302,6 +304,8 @@ def OptionsAndArguments(argv):
config._sharedMsgGroup = True
if opts.extra_arguments:
config._extra_arguments = opts.extra_arguments
+ if opts.replicate_from:
+ config._replicate_from = opts.replicate_from
return args
@@ -464,7 +468,7 @@ class BrokerManager:
args = q.arguments
if not args: args = {}
if q.durable: print "--durable",
- if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
+ if REPLICATION in args: print "--replication=%s" % args[REPLICATION],
if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
if q.autoDelete: print "auto-del",
if q.exclusive: print "excl",
@@ -526,8 +530,8 @@ class BrokerManager:
declArgs['alternate-exchange'] = config._altern_ex
if config._durable:
declArgs['durable'] = 1
- if config._replicate:
- declArgs[REPLICATE] = config._replicate
+ if config._replication:
+ declArgs[REPLICATION] = config._replication
self.broker.addExchange(etype, ename, declArgs)
@@ -594,11 +598,11 @@ class BrokerManager:
declArgs['alternate-exchange'] = config._altern_ex
if config._durable:
declArgs['durable'] = 1
- if config._replicate:
- declArgs[REPLICATE] = config._replicate
-
+ if config._replication:
+ declArgs[REPLICATION] = config._replication
self.broker.addQueue(qname, declArgs)
-
+ if config._replicate_from: # Start replication
+ self.broker._method("replicate", {"broker":config._replicate_from, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker")
def DelQueue(self, args):
if len(args) < 1:
@@ -751,9 +755,9 @@ def main(argv=None):
if e.__class__.__name__ != "Timeout":
print "Failed: %s: %s" % (e.__class__.__name__, e)
return 1
-
return config._returnCode
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
index 13b4055f71..e6ae6c4884 100755
--- a/qpid/tools/src/py/qpid-ha
+++ b/qpid/tools/src/py/qpid-ha
@@ -83,7 +83,9 @@ ReadyCmd()
class ReplicateCmd(Command):
def __init__(self):
- Command.__init__(self, "replicate", "Replicate <queue> from broker <primary> to the current broker.", ["<queue>", "<primary>"])
+ Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
+ def do_execute(self, qmf_broker, opts, args):
+ qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER)
ReplicateCmd()
class SetCmd(Command):