summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-25 22:46:54 +0000
committerAlan Conway <aconway@apache.org>2012-01-25 22:46:54 +0000
commit03ffe85d335d0e8f66f5afa4eb151417f297c85f (patch)
treee2082023d9a6c981ebf3083c48048cfbd8d2f300
parentb7ad9087a63f885f5fd475b0fd13a784ab0b9303 (diff)
downloadqpid-python-03ffe85d335d0e8f66f5afa4eb151417f297c85f.tar.gz
QPID-3603: Change replication level names, update doc notes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1235976 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/design_docs/new-ha-design.txt56
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp28
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py34
4 files changed, 72 insertions, 48 deletions
diff --git a/qpid/cpp/design_docs/new-ha-design.txt b/qpid/cpp/design_docs/new-ha-design.txt
index 9b6d7d676c..053dd7227d 100644
--- a/qpid/cpp/design_docs/new-ha-design.txt
+++ b/qpid/cpp/design_docs/new-ha-design.txt
@@ -327,25 +327,49 @@ Notes to seed initial user documentation. Loosely tracking the implementation,
some points mentioned in the doc may not be implemented yet.
** High Availability Overview
-Explain basic concepts: hot standby, primary/backup, replicated queue/exchange.
-Network topology: backup links, corosync, separate client/cluster networks.
-Describe failover mechanisms.
-- Client view: URLs, failover, exclusion & discovery.
-- Broker view: similar.
-Role of rmganager & corosync.
-** Client view.
-Clients use multi-address URL in base case.
-Clients can't connect to backups, retry till they find primary.
-Only qpid.cluster-admin can connect to backup, must not mess with replicated queues.
-Note connection known-hosts returns client URL, as does amq.failover exchange.
+HA is implemented using a 'hot standby' approach. Clients are directed
+to a single "primary" broker. The primary executes client requests and
+also replicates them to one or more "backup" brokers. If the primary
+fails, one of the backups takes over the role of primary carrying on
+from where the primary left off. Clients will fail over to the new
+primary automatically and continue their work.
+
+TODO: at least once, deduplication.
+
+** Enabling replication on the client.
+
+To enable replication set the qpid.replicate argument when creating a
+queue or exchange.
+
+This can have one of 3 values
+- none: the object is not replicated
+- configuration: queues, exchanges and bindings are replicated but messages are not.
+- messages: configuration and messages are replicated.
+
+TODO: examples
+TODO: more options for default value of qpid.replicate
-Creating replicated queues & exchanges:
-- qpid.replicate argument,
-- examples using addressing and qpid-config)
+A HA client connection has multiple addresses, one for each broker. If
+the it fails to connect to an address, or the connection breaks,
+it will automatically fail-over to another address.
-** Configuring corosync
-Must be on same network as backup links.
+Only the primary broker accepts connections, the backup brokers abort
+connection attempts. That ensures clients connect to the primary only.
+
+TODO: using multiple-address connections, examples c++, python, java.
+
+TODO: dynamic cluster addressing?
+
+TODO: need de-duplication.
+
+** Enabling replication on the broker.
+
+Network topology: backup links, separate client/broker networks.
+Describe failover mechanisms.
+- Client view: URLs, failover, exclusion & discovery.
+- Broker view: similar.
+Role of rmganager
** Configuring rgmanager
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index bbf3f4ae73..a8f05c1fe3 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -55,7 +55,7 @@ using namespace broker;
namespace {
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
const string QPID_REPLICATE("qpid.replicate");
const string CLASS_NAME("_class_name");
@@ -113,16 +113,16 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
const string S_NONE="none";
-const string S_WIRING="wiring";
-const string S_ALL="all";
+const string S_CONFIGURATION="configuration";
+const string S_MESSAGES="messages";
ReplicateLevel replicateLevel(const string& level) {
- ReplicateLevel rl = RL_NONE;
- if (level == S_WIRING) rl = RL_WIRING;
- else if (level == S_ALL) rl = RL_ALL;
- return rl;
+ if (level == S_NONE) return RL_NONE;
+ if (level == S_CONFIGURATION) return RL_CONFIGURATION;
+ if (level == S_MESSAGES) return RL_MESSAGES;
+ throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
}
ReplicateLevel replicateLevel(const framing::FieldTable& f) {
@@ -184,15 +184,15 @@ Variant::Map asMapVoid(const Variant& value) {
BrokerReplicator::~BrokerReplicator() {}
BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
- : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+ : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
{
QPID_LOG(info, "HA: Backup replicating from " <<
link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
broker.getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
- QPID_WIRING_REPLICATOR, // src
- QPID_WIRING_REPLICATOR, // dest
+ QPID_CONFIGURATION_REPLICATOR, // src
+ QPID_CONFIGURATION_REPLICATOR, // dest
"", // key
false, // isQueue
false, // isLocal
@@ -222,7 +222,7 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(QUEUE, queueName, sessionHandler);
sendQuery(EXCHANGE, queueName, sessionHandler);
sendQuery(BINDING, queueName, sessionHandler);
- QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName);
+ QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
}
// FIXME aconway 2011-12-02: error handling in route.
@@ -481,7 +481,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
}
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
- if (replicateLevel(queue->getSettings()) == RL_ALL) {
+ if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
broker.getExchanges().registerExchange(qr);
qr->activate();
@@ -492,6 +492,6 @@ bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const frami
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
-string BrokerReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
+string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index f0b9e0b599..cfb6cf9a28 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -38,7 +38,7 @@ class SessionHandler;
namespace ha {
/**
- * Replicate wiring on a backup broker.
+ * Replicate configuration on a backup broker.
*
* Implemented as an exchange that subscribes to receive QMF
* configuration events from the primary. It configures local queues
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 6d5dc19aa6..7ccea16a08 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -39,7 +39,7 @@ class ShortTests(BrokerTest):
] + args,
**kwargs)
- # FIXME aconway 2011-11-15: work around async wiring replication.
+ # FIXME aconway 2011-11-15: work around async configuration replication.
# Wait for an address to become valid.
def wait(self, session, address):
def check():
@@ -70,7 +70,7 @@ class ShortTests(BrokerTest):
return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
def test_replication(self):
- """Test basic replication of wiring and messages before and
+ """Test basic replication of configuration and messages before and
after backup has connected"""
def queue(name, replicate):
@@ -80,30 +80,30 @@ class ShortTests(BrokerTest):
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
def setup(p, prefix, primary):
"""Create config, send messages on the primary p"""
- s = p.sender(queue(prefix+"q1", "all"))
+ s = p.sender(queue(prefix+"q1", "messages"))
for m in ["a", "b", "1"]: s.send(Message(m))
# Test replication of dequeue
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
p.acknowledge()
- p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+ p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
- p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
- p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+ p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5"))
# Test unbind
- p.sender(queue(prefix+"q4", "all")).send(Message("6"))
- s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
+ p.sender(queue(prefix+"q4", "messages")).send(Message("6"))
+ s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4"))
s3.send(Message("7"))
# Use old connection to unbind
us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
# FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
- p.sender(queue(prefix+"x", "wiring"))
+ p.sender(queue(prefix+"x", "configuration"))
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
- # FIXME aconway 2011-11-21: wait for wiring to replicate.
+ # FIXME aconway 2011-11-21: wait for configuration to replicate.
self.wait(b, prefix+"x");
# FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
@@ -112,11 +112,11 @@ class ShortTests(BrokerTest):
p.acknowledge()
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
- self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+ self.assert_browse_retry(b, prefix+"q2", []) # configuration only
self.assert_missing(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
- b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
+ b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
@@ -136,7 +136,7 @@ class ShortTests(BrokerTest):
verify(b, "1", p)
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
- s = p.sender(queue("foo","all"))
+ s = p.sender(queue("foo","messages"))
self.wait(b, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
@@ -158,7 +158,7 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", msgs[i+1:])
self.assert_browse_retry(b, "foo", msgs[i+1:])
- def qpid_replicate(self, value="all"):
+ def qpid_replicate(self, value="messages"):
return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
def test_sync(self):
@@ -166,7 +166,7 @@ class ShortTests(BrokerTest):
return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
- s = p.sender(queue("q","all"))
+ s = p.sender(queue("q","messages"))
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
@@ -192,14 +192,14 @@ class ShortTests(BrokerTest):
sender = self.popen(
["qpid-send",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
"--messages=990",
"--timeout=10"
])