summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-03-01 19:47:08 +0000
committerAlan Conway <aconway@apache.org>2012-03-01 19:47:08 +0000
commitc837c0fb65a67f84e03839390c00d2a56c69169a (patch)
tree82600afd447829fd6480e0f498774eebe1fca49f
parent247adf7d2c9a20ac4c3c23702e24a4171ba40089 (diff)
downloadqpid-python-c837c0fb65a67f84e03839390c00d2a56c69169a.tar.gz
QPID-3603: Added "ready" command to qpid-ha, minor improvements.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295759 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp16
-rwxr-xr-xqpid/tools/src/py/qpid-ha82
-rw-r--r--qpid/tools/src/py/qpidtoollibs/broker.py22
3 files changed, 73 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index f909aca44f..ad6719f207 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -44,8 +44,10 @@ using namespace std;
namespace {
-const std::string PRIMARY="primary";
+const std::string STANDALONE="standalone";
+const std::string CATCH_UP="catch-up";
const std::string BACKUP="backup";
+const std::string PRIMARY="primary";
} // namespace
@@ -65,12 +67,12 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
ManagementAgent* ma = broker.getManagementAgent();
if (!ma)
throw Exception("Cannot start HA: management is disabled");
- if (ma) {
- _qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
- mgmtObject->set_status(BACKUP);
- ma->addObject(mgmtObject);
- }
+ _qmf::Package packageInit(ma);
+ mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ // FIXME aconway 2012-03-01: should start in catch-up state and move to backup
+ // only when caught up.
+ mgmtObject->set_status(BACKUP);
+ ma->addObject(mgmtObject);
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
index e6ae6c4884..029ecd0c41 100755
--- a/qpid/tools/src/py/qpid-ha
+++ b/qpid/tools/src/py/qpid-ha
@@ -19,7 +19,7 @@
# under the License.
#
-import qmf.console, optparse, sys
+import qmf.console, optparse, sys, time
from qpid.management import managementChannel, managementClient
from qpid.messaging import Connection
from qpid.messaging import Message as QpidMessage
@@ -44,32 +44,26 @@ class Command:
self.op=optparse.OptionParser(usage)
self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>")
- def execute(self, command):
- opts, args = self.op.parse_args(command)
+ def execute(self):
+ opts, args = self.op.parse_args()
if len(args) != len(self.args)+1:
self.op.print_help()
- print "Error: wrong number of arguments"
- return
+ raise Exception("Wrong number of arguments")
broker = opts.broker or "localhost:5672"
connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
- try: self.do_execute(BrokerAgent(connection), opts, args)
+ qmf_broker = BrokerAgent(connection)
+ ha_broker = qmf_broker.getHaBroker()
+ if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker)
+ try: return self.do_execute(qmf_broker, ha_broker, opts, args)
finally: connection.close()
def do_execute(self, qmf_broker, opts, args):
raise Exception("Command '%s' is not yet implemented"%self.name)
-def print_all_help(name):
- print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%name
- for c in Command.commands:
- help = Command.commands[c].help
- print " %-12s %s."%(c, help.split(".")[0])
- print "\nFor help with a command: %s <command> --help\n"%name
-
-
class PromoteCmd(Command):
def __init__(self):
Command.__init__(self, "promote","Promote broker from backup to primary")
- def do_execute(self, qmf_broker, opts, args):
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
qmf_broker._method("promote", {}, HA_BROKER)
PromoteCmd()
@@ -77,14 +71,27 @@ class ReadyCmd(Command):
def __init__(self):
Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.")
self.op.add_option(
- "--wait", type="int", metavar="<seconds>",
+ "--wait", type="int", metavar="<seconds>", default=None,
help="Wait up to <seconds> for broker to be ready. 0 means wait forever.")
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ if (ha_broker.status == "backup"): return
+ if (ha_broker.status != "catch-up"):
+ raise Exception("Broker is not a backup, status is '%s'"%ha_broker.status)
+ if (opts.wait is None): return 1
+ delay = 0.1
+ timeout = time.time() + opts.wait
+ while opts.wait == 0 or time.time() < timeout:
+ time.sleep(delay)
+ delay = min(2*delay, 1)
+ ha_broker = qmf_broker.getHaBroker()
+ if (ha_broker.status == "backup"): return
+ return 1
ReadyCmd()
class ReplicateCmd(Command):
def __init__(self):
Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
- def do_execute(self, qmf_broker, opts, args):
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER)
ReplicateCmd()
@@ -95,38 +102,55 @@ class SetCmd(Command):
self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store")
add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other")
add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers")
- add("--backups", "<n>", "int", "Expect <n> backups to be running")
+ add("--backups", "<n>", "int", "Expect <n> backups to be running"),
- def do_execute(self, qmf_broker, opts, args):
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER)
if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER)
if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER)
+
SetCmd()
class QueryCmd(Command):
def __init__(self):
Command.__init__(self, "query", "Print HA configuration settings")
- def do_execute(self, qmf_broker, opts, args):
- hb = qmf_broker.getHaBroker()
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ hb = ha_broker
for x in [("Status:", hb.status),
("Brokers URL:", hb.brokers),
- ("Public URL:", hb.publicBrokers)]:
- print "%-16s%s"%(x[0], x[1])
-
+ ("Public URL:", hb.publicBrokers),
+ ("Expected Backups:", hb.expectedBackups)
+ ]:
+ print "%-20s %s"%(x[0], x[1])
QueryCmd()
+def print_usage(name):
+ print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%name
+ for name, command in Command.commands.iteritems():
+ help = command.help
+ print " %-12s %s."%(name, help.split(".")[0])
+ print "\nFor help with a command: %s <command> --help\n"%name
+
+def find_command(args):
+ """Find a command among the arguments and options"""
+ for arg in args:
+ if arg in Command.commands:
+ return Command.commands[arg]
+ return None
+
def main(argv):
try:
- command=argv[1:]
- if command and command[0] == "--help-all":
+ args=argv[1:]
+ if args and args[0] == "--help-all":
for c in Command.commands.itervalues():
c.op.print_help(); print
return 1
- if not command or not command[0] in Command.commands:
- print_all_help(argv[0]);
+ command = find_command(args)
+ if not command:
+ print_usage(argv[0]);
return 1;
- Command.commands[command[0]].execute(command)
+ if command.execute(): return 1
except Exception, e:
print e
return 1
diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py
index 95a186a61f..98c1bfaa32 100644
--- a/qpid/tools/src/py/qpidtoollibs/broker.py
+++ b/qpid/tools/src/py/qpidtoollibs/broker.py
@@ -122,13 +122,18 @@ class BrokerAgent(object):
for item in items:
objs.append(cls(self, item))
return objs
-
+
def _getBrokerObject(self, cls, name):
obj = self._doNameQuery(cls.__name__.lower(), name)
if obj:
return cls(self, obj)
return None
+ def _getSingleObject(self, cls):
+ objects = self._getAllBrokerObjects(cls)
+ if objects: return objects[0]
+ return None
+
def getBroker(self):
"""
Get the Broker object that contains broker-scope statistics and operations.
@@ -138,16 +143,14 @@ class BrokerAgent(object):
# of a bug that used to be in the broker whereby by-name queries did not return the
# object timestamps.
#
- brokers = self._getAllBrokerObjects(Broker)
- if brokers:
- return brokers[0]
- return None
+ return self._getSingleObject(Broker)
+
def getCluster(self):
- return self._getAllBrokerObjects(Cluster)[0]
+ return self._getSingleObject(Cluster)
def getHaBroker(self):
- return self._getAllBrokerObjects(HaBroker)[0]
+ return self._getSingleObject(HaBroker)
def getAllConnections(self):
return self._getAllBrokerObjects(Connection)
@@ -186,10 +189,7 @@ class BrokerAgent(object):
return self._getAllBrokerObjects(Link)
def getAcl(self):
- objects = self._getAllBrokerObjects(Acl)
- if len(objects) > 0:
- return objects[0]
- return None # Acl module not loaded
+ return self._getSingleObject(Acl)
def echo(self, sequence, body):
"""Request a response to test the path to the management broker"""