summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
committerAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
commit3fb61f5354df0b77325e4fc88aa59213d3000a8e (patch)
tree2fe1900c3e715586e75f1d7ba2687b2a7e3f5547
parentc71af5478c87527b4bd0eb9e0e4e37a9b151ea92 (diff)
downloadqpid-python-3fb61f5354df0b77325e4fc88aa59213d3000a8e.tar.gz
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication. - qpid-ha tool can enable replication of queues. - qpid-config tool can create queues with replication enabled. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295339 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp33
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h4
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml5
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py170
-rwxr-xr-xqpid/tools/src/py/qpid-config38
-rwxr-xr-xqpid/tools/src/py/qpid-ha4
8 files changed, 199 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 4af1e6d6bd..56a90e7fb7 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -206,11 +206,9 @@ void Link::closed(int, std::string text)
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
-
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
- QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -405,7 +403,6 @@ uint Link::nextChannel()
void Link::notifyConnectionForced(const string text)
{
Mutex::ScopedLock mutex(lock);
-
setStateLH(STATE_FAILED);
if (!hideManagement())
mgmtObject->set_lastError(text);
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index d92749abeb..f909aca44f 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -25,8 +25,11 @@
#include "ReplicatingSubscription.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
@@ -50,7 +53,6 @@ const std::string BACKUP="backup";
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
- backup(new Backup(b, s)),
mgmtObject(0)
{
// Register a factory for replicating subscriptions.
@@ -72,6 +74,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+
+ // If we are in a cluster, we start in backup mode.
+ if (settings.cluster) backup.reset(new Backup(b, s));
}
HaBroker::~HaBroker() {}
@@ -81,8 +86,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
if (backup.get()) { // I am a backup
- // FIXME aconway 2012-01-26: create primary state before resetting backup
- // as that allows client connections.
+ // NOTE: resetting backup allows client connections, so any
+ // primary state should be set up here before backup.reset()
backup.reset();
QPID_LOG(notice, "HA: Primary promoted from backup");
mgmtObject->set_status(PRIMARY);
@@ -100,7 +105,27 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
break;
- }
+ }
+ case _qmf::HaBroker::METHOD_REPLICATE: {
+ _qmf::ArgsHaBrokerReplicate& bq_args =
+ dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
+ QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker);
+
+ boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ Url url(bq_args.i_broker);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password);
+ boost::shared_ptr<broker::Link> link = result.first;
+ link->setUrl(url);
+ // Create a queue replicator
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ qr->activate();
+ break;
+ }
default:
return Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index b3080330fb..6a43b591b0 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -31,7 +31,7 @@ struct Options : public qpid::Options {
Settings& settings;
Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
addOptions()
- ("ha-cluster", optValue(settings.enabled, "yes|no"),
+ ("ha-cluster", optValue(settings.cluster, "yes|no"),
"Join a HA active/passive cluster.")
("ha-brokers", optValue(settings.brokerUrl,"URL"),
"URL that backup brokers use to connect and fail over.")
@@ -63,11 +63,7 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker && settings.enabled) {
- QPID_LOG(notice, "HA: Enabled");
- haBroker.reset(new ha::HaBroker(*broker, settings));
- } else
- QPID_LOG(notice, "HA: Disabled");
+ if (broker) haBroker.reset(new ha::HaBroker(*broker, settings));
}
};
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 52a64c8330..7df18b4ef4 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -33,8 +33,8 @@ namespace ha {
class Settings
{
public:
- Settings() : enabled(false), expectedBackups(0) {}
- bool enabled;
+ Settings() : cluster(false), expectedBackups(0) {}
+ bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
size_t expectedBackups;
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index 05ed5f02ce..9a815b346c 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -47,6 +47,11 @@
<method name="setExpectedBackups" desc="Set number of backups expected">
<arg name="expectedBackups" type="uint16" dir="I"/>
</method>
+
+ <method name="replicate" desc="Replicate from a remote queue to the local broker.">
+ <arg name="broker" type="sstr" dir="I"/>
+ <arg name="queue" type="sstr" dir="I"/>
+ </method>
</class>
</schema>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 264a636f29..18f47b17c5 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -24,34 +24,81 @@ from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG
-
+from qpidtoollibs.broker import BrokerAgent
log = getLogger("qpid.ha-tests")
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, **kwargs):
+ def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- args=["--load-module", BrokerTest.ha_lib,
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=yes"]
- if broker_url: args += [ "--ha-brokers", broker_url ]
+ args = copy(args)
+ args.extend(["--load-module", BrokerTest.ha_lib,
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster])
+ if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
+ self.commands=os.getenv("PYTHON_COMMANDS")
+ assert os.path.isdir(self.commands)
def promote(self):
- assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0
+ assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
def set_client_url(self, url):
assert os.system(
- "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0
+ "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
def set_broker_url(self, url):
assert os.system(
- "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0
+ "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+
+ def replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_declare(self, queue, replication):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0
+
+class HaCluster(object):
+ _cluster_count = 0
+
+ def __init__(self, test, n, **kwargs):
+ """Start a cluster of n brokers"""
+ self.test = test
+ self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ HaCluster._cluster_count += 1
+ self[0].promote()
+ self.url = ",".join([b.host_port() for b in self])
+ for b in self: b.set_broker_url(self.url)
+
+ def connect(self, i):
+ """Connect with reconnect_urls"""
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-def set_broker_urls(brokers):
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
+ def kill(self, i):
+ """Kill broker i, promote broker i+1"""
+ self[i].kill()
+ self[i].expect = EXPECT_EXIT_FAIL
+ self[(i+1) % len(self)].promote()
+
+ def bounce(self, i):
+ """Stop and restart a broker in a cluster."""
+ self.kill(i)
+ b = self[i]
+ self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+
+def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
@@ -92,6 +139,8 @@ class ShortTests(BrokerTest):
"""Test basic replication of configuration and messages before and
after backup has connected"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+
def queue(name, replicate):
return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
@@ -177,12 +226,9 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", msgs[i+1:])
self.assert_browse_retry(b, "foo", msgs[i+1:])
- def qpid_replicate(self, value="messages"):
- return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
def test_sync(self):
def queue(name, replicate):
- return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
+ return "%s;{create:always,%s}"%(name, qr_node(replicate))
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
@@ -206,6 +252,7 @@ class ShortTests(BrokerTest):
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -213,14 +260,14 @@ class ShortTests(BrokerTest):
sender = self.popen(
["qpid-send",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("messages")),
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("messages")),
"--messages=990",
"--timeout=10"
])
@@ -239,7 +286,7 @@ class ShortTests(BrokerTest):
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
- getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
@@ -254,7 +301,7 @@ class ShortTests(BrokerTest):
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ sender = s.sender("q;{create:always,%s}"%(qr_node()))
self.wait_backup(backup, "q")
sender.send("foo")
primary.kill()
@@ -269,7 +316,7 @@ class ShortTests(BrokerTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
self.wait_backup(backup, "q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
@@ -288,19 +335,75 @@ class ShortTests(BrokerTest):
receiver.stop()
def test_backup_failover(self):
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
- for name in ["a","b","c"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
+ """Verify that a backup broker fails over and recovers queue state"""
+ brokers = HaCluster(self, 3)
brokers[0].connect().session().sender(
- "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
+ "q;{create:always,%s}"%(qr_node())).send("a")
for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
- brokers[0].kill()
- brokers[2].promote() # c must fail over to b.
- brokers[2].connect().session().sender("q").send("b")
- self.assert_browse_backup(brokers[1], "q", ["a","b"])
- for b in brokers[1:]: b.kill()
+ brokers[0].expect = EXPECT_EXIT_FAIL
+ brokers.kill(0)
+ brokers[1].connect().session().sender("q").send("b")
+ self.assert_browse_backup(brokers[2], "q", ["a","b"])
+ s = brokers[1].connect().session()
+ self.assertEqual("a", s.receiver("q").fetch().content)
+ s.acknowledge()
+ self.assert_browse_backup(brokers[2], "q", ["b"])
+
+ def test_qpid_config_replication(self):
+ """Set up replication via qpid-config"""
+ brokers = HaCluster(self,2)
+ brokers[0].config_declare("q","messages")
+ brokers[0].connect().session().sender("q").send("foo")
+ self.assert_browse_backup(brokers[1], "q", ["foo"])
+
+ def test_standalone_queue_replica(self):
+ """Test replication of individual queues outside of cluster mode"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"])
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ br = backup.connect().session().receiver("q;{create:always}")
+
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ ps.send("a")
+ self.assert_browse_backup(backup, "q", ["a"])
+ ps.send("b")
+ self.assert_browse_backup(backup, "q", ["a", "b"])
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ self.assert_browse_backup(backup, "q", ["b"])
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x")
+ self.assert_browse_backup(backup, "q2", ["x"])
+
+
+ def test_queue_replica_failover(self):
+ """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ cluster = HaCluster(self, 2)
+ primary = cluster[0]
+ pc = cluster.connect(0)
+ ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages"))
+ pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages"))
+ backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ br = backup.connect().session().receiver("q;{create:always}")
+ backup.replicate(cluster.url, "q")
+ ps.send("a")
+ self.assert_browse_backup(backup, "q", ["a"])
+ cluster.bounce(0)
+ self.assert_browse_backup(backup, "q", ["a"])
+ ps.send("b")
+ self.assert_browse_backup(backup, "q", ["a", "b"])
+ cluster.bounce(1)
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ self.assert_browse_backup(backup, "q", ["b"])
def test_lvq(self):
"""Verify that we replicate to an LVQ correctly"""
@@ -328,6 +431,7 @@ class ShortTests(BrokerTest):
self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
def test_reject(self):
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index ee1c365fbf..367fd0574e 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -77,7 +77,7 @@ Replication levels:
messages - replicate configuration and messages
"""
-REPLICATE_LEVELS= ["none", "configuration", "messages"]
+REPLICATION_LEVELS= ["none", "configuration", "messages"]
class Config:
def __init__(self):
@@ -87,7 +87,7 @@ class Config:
self._ignoreDefault = False
self._altern_ex = None
self._durable = False
- self._replicate = None
+ self._replication = None
self._ha_admin = False
self._clusterDurable = False
self._if_empty = True
@@ -110,6 +110,7 @@ class Config:
self._msgGroupHeader = None
self._sharedMsgGroup = False
self._extra_arguments = []
+ self._replicate_from = None
self._returnCode = 0
config = Config()
@@ -130,7 +131,7 @@ FLOW_STOP_SIZE = "qpid.flow_stop_size"
FLOW_RESUME_SIZE = "qpid.flow_resume_size"
MSG_GROUP_HDR_KEY = "qpid.group_header_key"
SHARED_MSG_GROUP = "qpid.shared_msg_group"
-REPLICATE = "qpid.replicate"
+REPLICATION = "qpid.replicate"
#There are various arguments to declare that have specific program
#options in this utility. However there is now a generic mechanism for
#passing arguments as well. The SPECIAL_ARGS list contains the
@@ -141,7 +142,7 @@ SPECIAL_ARGS=[
FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,
LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,
FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
- MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATION]
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -185,7 +186,7 @@ def OptionsAndArguments(argv):
group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
- group2.add_option("--replicate", action="store", metavar="<level>", help="Replication level for the new queue or exchange (none, configuration or messages).")
+ group2.add_option("--replication", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'messages').")
group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
parser.add_option_group(group2)
@@ -212,6 +213,7 @@ def OptionsAndArguments(argv):
help="Allow message group consumption across multiple consumers.")
group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
+ group3.add_option("--replicate-from", metavar="<broker-url>", help="Replicate from the same-named queue at <broker-url>")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
parser.add_option_group(group3)
@@ -252,10 +254,10 @@ def OptionsAndArguments(argv):
config._altern_ex = opts.alternate_exchange
if opts.durable:
config._durable = True
- if opts.replicate:
- if not opts.replicate in REPLICATE_LEVELS:
- raise Exception("Invalid replicate level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS)))
- config._replicate = opts.replicate
+ if opts.replication:
+ if not opts.replication in REPLICATION_LEVELS:
+ raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replication, ", ".join(REPLICATION_LEVELS)))
+ config._replication = opts.replication
if opts.ha_admin: config._ha_admin = True
if opts.cluster_durable:
config._clusterDurable = True
@@ -302,6 +304,8 @@ def OptionsAndArguments(argv):
config._sharedMsgGroup = True
if opts.extra_arguments:
config._extra_arguments = opts.extra_arguments
+ if opts.replicate_from:
+ config._replicate_from = opts.replicate_from
return args
@@ -464,7 +468,7 @@ class BrokerManager:
args = q.arguments
if not args: args = {}
if q.durable: print "--durable",
- if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
+ if REPLICATION in args: print "--replication=%s" % args[REPLICATION],
if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
if q.autoDelete: print "auto-del",
if q.exclusive: print "excl",
@@ -526,8 +530,8 @@ class BrokerManager:
declArgs['alternate-exchange'] = config._altern_ex
if config._durable:
declArgs['durable'] = 1
- if config._replicate:
- declArgs[REPLICATE] = config._replicate
+ if config._replication:
+ declArgs[REPLICATION] = config._replication
self.broker.addExchange(etype, ename, declArgs)
@@ -594,11 +598,11 @@ class BrokerManager:
declArgs['alternate-exchange'] = config._altern_ex
if config._durable:
declArgs['durable'] = 1
- if config._replicate:
- declArgs[REPLICATE] = config._replicate
-
+ if config._replication:
+ declArgs[REPLICATION] = config._replication
self.broker.addQueue(qname, declArgs)
-
+ if config._replicate_from: # Start replication
+ self.broker._method("replicate", {"broker":config._replicate_from, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker")
def DelQueue(self, args):
if len(args) < 1:
@@ -751,9 +755,9 @@ def main(argv=None):
if e.__class__.__name__ != "Timeout":
print "Failed: %s: %s" % (e.__class__.__name__, e)
return 1
-
return config._returnCode
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
index 13b4055f71..e6ae6c4884 100755
--- a/qpid/tools/src/py/qpid-ha
+++ b/qpid/tools/src/py/qpid-ha
@@ -83,7 +83,9 @@ ReadyCmd()
class ReplicateCmd(Command):
def __init__(self):
- Command.__init__(self, "replicate", "Replicate <queue> from broker <primary> to the current broker.", ["<queue>", "<primary>"])
+ Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
+ def do_execute(self, qmf_broker, opts, args):
+ qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER)
ReplicateCmd()
class SetCmd(Command):