From fed050c3b5e7156787035ac33e9953e64de89a21 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 17 Feb 2012 14:13:40 +0000 Subject: QPID-3603: Change replication level names, update doc notes. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245530 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/design_docs/new-ha-design.txt | 56 ++++++++++++++++++++++--------- qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 28 ++++++++-------- qpid/cpp/src/qpid/ha/BrokerReplicator.h | 2 +- qpid/cpp/src/tests/ha_tests.py | 34 +++++++++---------- 4 files changed, 72 insertions(+), 48 deletions(-) diff --git a/qpid/cpp/design_docs/new-ha-design.txt b/qpid/cpp/design_docs/new-ha-design.txt index 9b6d7d676c..053dd7227d 100644 --- a/qpid/cpp/design_docs/new-ha-design.txt +++ b/qpid/cpp/design_docs/new-ha-design.txt @@ -327,25 +327,49 @@ Notes to seed initial user documentation. Loosely tracking the implementation, some points mentioned in the doc may not be implemented yet. ** High Availability Overview -Explain basic concepts: hot standby, primary/backup, replicated queue/exchange. -Network topology: backup links, corosync, separate client/cluster networks. -Describe failover mechanisms. -- Client view: URLs, failover, exclusion & discovery. -- Broker view: similar. -Role of rmganager & corosync. -** Client view. -Clients use multi-address URL in base case. -Clients can't connect to backups, retry till they find primary. -Only qpid.cluster-admin can connect to backup, must not mess with replicated queues. -Note connection known-hosts returns client URL, as does amq.failover exchange. +HA is implemented using a 'hot standby' approach. Clients are directed +to a single "primary" broker. The primary executes client requests and +also replicates them to one or more "backup" brokers. If the primary +fails, one of the backups takes over the role of primary carrying on +from where the primary left off. Clients will fail over to the new +primary automatically and continue their work. + +TODO: at least once, deduplication. + +** Enabling replication on the client. + +To enable replication set the qpid.replicate argument when creating a +queue or exchange. + +This can have one of 3 values +- none: the object is not replicated +- configuration: queues, exchanges and bindings are replicated but messages are not. +- messages: configuration and messages are replicated. + +TODO: examples +TODO: more options for default value of qpid.replicate -Creating replicated queues & exchanges: -- qpid.replicate argument, -- examples using addressing and qpid-config) +A HA client connection has multiple addresses, one for each broker. If +the it fails to connect to an address, or the connection breaks, +it will automatically fail-over to another address. -** Configuring corosync -Must be on same network as backup links. +Only the primary broker accepts connections, the backup brokers abort +connection attempts. That ensures clients connect to the primary only. + +TODO: using multiple-address connections, examples c++, python, java. + +TODO: dynamic cluster addressing? + +TODO: need de-duplication. + +** Enabling replication on the broker. + +Network topology: backup links, separate client/broker networks. +Describe failover mechanisms. +- Client view: URLs, failover, exclusion & discovery. +- Broker view: similar. +Role of rmganager ** Configuring rgmanager diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index bbf3f4ae73..a8f05c1fe3 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -55,7 +55,7 @@ using namespace broker; namespace { -const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); +const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); const string QPID_REPLICATE("qpid.replicate"); const string CLASS_NAME("_class_name"); @@ -113,16 +113,16 @@ template bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL }; +enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES }; const string S_NONE="none"; -const string S_WIRING="wiring"; -const string S_ALL="all"; +const string S_CONFIGURATION="configuration"; +const string S_MESSAGES="messages"; ReplicateLevel replicateLevel(const string& level) { - ReplicateLevel rl = RL_NONE; - if (level == S_WIRING) rl = RL_WIRING; - else if (level == S_ALL) rl = RL_ALL; - return rl; + if (level == S_NONE) return RL_NONE; + if (level == S_CONFIGURATION) return RL_CONFIGURATION; + if (level == S_MESSAGES) return RL_MESSAGES; + throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); } ReplicateLevel replicateLevel(const framing::FieldTable& f) { @@ -184,15 +184,15 @@ Variant::Map asMapVoid(const Variant& value) { BrokerReplicator::~BrokerReplicator() {} BrokerReplicator::BrokerReplicator(const boost::shared_ptr& l) - : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) + : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) { QPID_LOG(info, "HA: Backup replicating from " << link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); broker.getLinks().declare( link->getHost(), link->getPort(), false, // durable - QPID_WIRING_REPLICATOR, // src - QPID_WIRING_REPLICATOR, // dest + QPID_CONFIGURATION_REPLICATOR, // src + QPID_CONFIGURATION_REPLICATOR, // dest "", // key false, // isQueue false, // isLocal @@ -222,7 +222,7 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(QUEUE, queueName, sessionHandler); sendQuery(EXCHANGE, queueName, sessionHandler); sendQuery(BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName); + QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); } // FIXME aconway 2011-12-02: error handling in route. @@ -481,7 +481,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { } void BrokerReplicator::startQueueReplicator(const boost::shared_ptr& queue) { - if (replicateLevel(queue->getSettings()) == RL_ALL) { + if (replicateLevel(queue->getSettings()) == RL_MESSAGES) { boost::shared_ptr qr(new QueueReplicator(queue, link)); broker.getExchanges().registerExchange(qr); qr->activate(); @@ -492,6 +492,6 @@ bool BrokerReplicator::bind(boost::shared_ptr, const string&, const frami bool BrokerReplicator::unbind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr, const string* const, const framing::FieldTable* const) { return false; } -string BrokerReplicator::getType() const { return QPID_WIRING_REPLICATOR; } +string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index f0b9e0b599..cfb6cf9a28 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -38,7 +38,7 @@ class SessionHandler; namespace ha { /** - * Replicate wiring on a backup broker. + * Replicate configuration on a backup broker. * * Implemented as an exchange that subscribes to receive QMF * configuration events from the primary. It configures local queues diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 6d5dc19aa6..7ccea16a08 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -39,7 +39,7 @@ class ShortTests(BrokerTest): ] + args, **kwargs) - # FIXME aconway 2011-11-15: work around async wiring replication. + # FIXME aconway 2011-11-15: work around async configuration replication. # Wait for an address to become valid. def wait(self, session, address): def check(): @@ -70,7 +70,7 @@ class ShortTests(BrokerTest): return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) def test_replication(self): - """Test basic replication of wiring and messages before and + """Test basic replication of configuration and messages before and after backup has connected""" def queue(name, replicate): @@ -80,30 +80,30 @@ class ShortTests(BrokerTest): return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) def setup(p, prefix, primary): """Create config, send messages on the primary p""" - s = p.sender(queue(prefix+"q1", "all")) + s = p.sender(queue(prefix+"q1", "messages")) for m in ["a", "b", "1"]: s.send(Message(m)) # Test replication of dequeue self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") p.acknowledge() - p.sender(queue(prefix+"q2", "wiring")).send(Message("2")) + p.sender(queue(prefix+"q2", "configuration")).send(Message("2")) p.sender(queue(prefix+"q3", "none")).send(Message("3")) - p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) - p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5")) + p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4")) + p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5")) # Test unbind - p.sender(queue(prefix+"q4", "all")).send(Message("6")) - s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4")) + p.sender(queue(prefix+"q4", "messages")).send(Message("6")) + s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4")) s3.send(Message("7")) # Use old connection to unbind us = primary.connect_old().session(str(qpid.datatypes.uuid4())) us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done. - p.sender(queue(prefix+"x", "wiring")) + p.sender(queue(prefix+"x", "configuration")) def verify(b, prefix, p): """Verify setup was replicated to backup b""" - # FIXME aconway 2011-11-21: wait for wiring to replicate. + # FIXME aconway 2011-11-21: wait for configuration to replicate. self.wait(b, prefix+"x"); # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication. self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) @@ -112,11 +112,11 @@ class ShortTests(BrokerTest): p.acknowledge() self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) - self.assert_browse_retry(b, prefix+"q2", []) # wiring only + self.assert_browse_retry(b, prefix+"q2", []) # configuration only self.assert_missing(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) - b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring + b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. @@ -136,7 +136,7 @@ class ShortTests(BrokerTest): verify(b, "1", p) verify(b, "2", p) # Test a series of messages, enqueue all then dequeue all. - s = p.sender(queue("foo","all")) + s = p.sender(queue("foo","messages")) self.wait(b, "foo") msgs = [str(i) for i in range(10)] for m in msgs: s.send(Message(m)) @@ -158,7 +158,7 @@ class ShortTests(BrokerTest): self.assert_browse_retry(p, "foo", msgs[i+1:]) self.assert_browse_retry(b, "foo", msgs[i+1:]) - def qpid_replicate(self, value="all"): + def qpid_replicate(self, value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value def test_sync(self): @@ -166,7 +166,7 @@ class ShortTests(BrokerTest): return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() - s = p.sender(queue("q","all")) + s = p.sender(queue("q","messages")) for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) @@ -192,14 +192,14 @@ class ShortTests(BrokerTest): sender = self.popen( ["qpid-send", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")), + "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), "--messages=1000", "--content-string=x" ]) receiver = self.popen( ["qpid-receive", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")), + "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), "--messages=990", "--timeout=10" ]) -- cgit v1.2.1