diff options
author | Alan Conway <aconway@apache.org> | 2012-03-01 19:47:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-03-01 19:47:08 +0000 |
commit | c837c0fb65a67f84e03839390c00d2a56c69169a (patch) | |
tree | 82600afd447829fd6480e0f498774eebe1fca49f | |
parent | 247adf7d2c9a20ac4c3c23702e24a4171ba40089 (diff) | |
download | qpid-python-c837c0fb65a67f84e03839390c00d2a56c69169a.tar.gz |
QPID-3603: Added "ready" command to qpid-ha, minor improvements.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295759 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 16 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-ha | 82 | ||||
-rw-r--r-- | qpid/tools/src/py/qpidtoollibs/broker.py | 22 |
3 files changed, 73 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index f909aca44f..ad6719f207 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -44,8 +44,10 @@ using namespace std; namespace { -const std::string PRIMARY="primary"; +const std::string STANDALONE="standalone"; +const std::string CATCH_UP="catch-up"; const std::string BACKUP="backup"; +const std::string PRIMARY="primary"; } // namespace @@ -65,12 +67,12 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) ManagementAgent* ma = broker.getManagementAgent(); if (!ma) throw Exception("Cannot start HA: management is disabled"); - if (ma) { - _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); - mgmtObject->set_status(BACKUP); - ma->addObject(mgmtObject); - } + _qmf::Package packageInit(ma); + mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); + // FIXME aconway 2012-03-01: should start in catch-up state and move to backup + // only when caught up. + mgmtObject->set_status(BACKUP); + ma->addObject(mgmtObject); sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha index e6ae6c4884..029ecd0c41 100755 --- a/qpid/tools/src/py/qpid-ha +++ b/qpid/tools/src/py/qpid-ha @@ -19,7 +19,7 @@ # under the License. # -import qmf.console, optparse, sys +import qmf.console, optparse, sys, time from qpid.management import managementChannel, managementClient from qpid.messaging import Connection from qpid.messaging import Message as QpidMessage @@ -44,32 +44,26 @@ class Command: self.op=optparse.OptionParser(usage) self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>") - def execute(self, command): - opts, args = self.op.parse_args(command) + def execute(self): + opts, args = self.op.parse_args() if len(args) != len(self.args)+1: self.op.print_help() - print "Error: wrong number of arguments" - return + raise Exception("Wrong number of arguments") broker = opts.broker or "localhost:5672" connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) - try: self.do_execute(BrokerAgent(connection), opts, args) + qmf_broker = BrokerAgent(connection) + ha_broker = qmf_broker.getHaBroker() + if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker) + try: return self.do_execute(qmf_broker, ha_broker, opts, args) finally: connection.close() def do_execute(self, qmf_broker, opts, args): raise Exception("Command '%s' is not yet implemented"%self.name) -def print_all_help(name): - print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%name - for c in Command.commands: - help = Command.commands[c].help - print " %-12s %s."%(c, help.split(".")[0]) - print "\nFor help with a command: %s <command> --help\n"%name - - class PromoteCmd(Command): def __init__(self): Command.__init__(self, "promote","Promote broker from backup to primary") - def do_execute(self, qmf_broker, opts, args): + def do_execute(self, qmf_broker, ha_broker, opts, args): qmf_broker._method("promote", {}, HA_BROKER) PromoteCmd() @@ -77,14 +71,27 @@ class ReadyCmd(Command): def __init__(self): Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.") self.op.add_option( - "--wait", type="int", metavar="<seconds>", + "--wait", type="int", metavar="<seconds>", default=None, help="Wait up to <seconds> for broker to be ready. 0 means wait forever.") + def do_execute(self, qmf_broker, ha_broker, opts, args): + if (ha_broker.status == "backup"): return + if (ha_broker.status != "catch-up"): + raise Exception("Broker is not a backup, status is '%s'"%ha_broker.status) + if (opts.wait is None): return 1 + delay = 0.1 + timeout = time.time() + opts.wait + while opts.wait == 0 or time.time() < timeout: + time.sleep(delay) + delay = min(2*delay, 1) + ha_broker = qmf_broker.getHaBroker() + if (ha_broker.status == "backup"): return + return 1 ReadyCmd() class ReplicateCmd(Command): def __init__(self): Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"]) - def do_execute(self, qmf_broker, opts, args): + def do_execute(self, qmf_broker, ha_broker, opts, args): qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER) ReplicateCmd() @@ -95,38 +102,55 @@ class SetCmd(Command): self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other") add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers") - add("--backups", "<n>", "int", "Expect <n> backups to be running") + add("--backups", "<n>", "int", "Expect <n> backups to be running"), - def do_execute(self, qmf_broker, opts, args): + def do_execute(self, qmf_broker, ha_broker, opts, args): if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER) if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER) if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) + SetCmd() class QueryCmd(Command): def __init__(self): Command.__init__(self, "query", "Print HA configuration settings") - def do_execute(self, qmf_broker, opts, args): - hb = qmf_broker.getHaBroker() + def do_execute(self, qmf_broker, ha_broker, opts, args): + hb = ha_broker for x in [("Status:", hb.status), ("Brokers URL:", hb.brokers), - ("Public URL:", hb.publicBrokers)]: - print "%-16s%s"%(x[0], x[1]) - + ("Public URL:", hb.publicBrokers), + ("Expected Backups:", hb.expectedBackups) + ]: + print "%-20s %s"%(x[0], x[1]) QueryCmd() +def print_usage(name): + print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%name + for name, command in Command.commands.iteritems(): + help = command.help + print " %-12s %s."%(name, help.split(".")[0]) + print "\nFor help with a command: %s <command> --help\n"%name + +def find_command(args): + """Find a command among the arguments and options""" + for arg in args: + if arg in Command.commands: + return Command.commands[arg] + return None + def main(argv): try: - command=argv[1:] - if command and command[0] == "--help-all": + args=argv[1:] + if args and args[0] == "--help-all": for c in Command.commands.itervalues(): c.op.print_help(); print return 1 - if not command or not command[0] in Command.commands: - print_all_help(argv[0]); + command = find_command(args) + if not command: + print_usage(argv[0]); return 1; - Command.commands[command[0]].execute(command) + if command.execute(): return 1 except Exception, e: print e return 1 diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py index 95a186a61f..98c1bfaa32 100644 --- a/qpid/tools/src/py/qpidtoollibs/broker.py +++ b/qpid/tools/src/py/qpidtoollibs/broker.py @@ -122,13 +122,18 @@ class BrokerAgent(object): for item in items: objs.append(cls(self, item)) return objs - + def _getBrokerObject(self, cls, name): obj = self._doNameQuery(cls.__name__.lower(), name) if obj: return cls(self, obj) return None + def _getSingleObject(self, cls): + objects = self._getAllBrokerObjects(cls) + if objects: return objects[0] + return None + def getBroker(self): """ Get the Broker object that contains broker-scope statistics and operations. @@ -138,16 +143,14 @@ class BrokerAgent(object): # of a bug that used to be in the broker whereby by-name queries did not return the # object timestamps. # - brokers = self._getAllBrokerObjects(Broker) - if brokers: - return brokers[0] - return None + return self._getSingleObject(Broker) + def getCluster(self): - return self._getAllBrokerObjects(Cluster)[0] + return self._getSingleObject(Cluster) def getHaBroker(self): - return self._getAllBrokerObjects(HaBroker)[0] + return self._getSingleObject(HaBroker) def getAllConnections(self): return self._getAllBrokerObjects(Connection) @@ -186,10 +189,7 @@ class BrokerAgent(object): return self._getAllBrokerObjects(Link) def getAcl(self): - objects = self._getAllBrokerObjects(Acl) - if len(objects) > 0: - return objects[0] - return None # Acl module not loaded + return self._getSingleObject(Acl) def echo(self, sequence, body): """Request a response to test the path to the management broker""" |