diff options
author | Alan Conway <aconway@apache.org> | 2012-09-25 20:49:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-09-25 20:49:33 +0000 |
commit | 7356e5f508658bdba40c865c4a29b54c8566d560 (patch) | |
tree | 922953738515964656ea6e8ab83c91012841412f /cpp/src | |
parent | 4635ead8e8ff61dc5861e16476c16c75e7270240 (diff) | |
download | qpid-python-7356e5f508658bdba40c865c4a29b54c8566d560.tar.gz |
QPID-4325: HA Starting from persistent store
When re-starting a persistent HA cluster, the broker that becomes primary should
keep its store data while all the backup brokers should discard their store data
and catch up from the primary. Backups cannot simply use their own stores
because sequence numbers of stored messages will not match on all brokers. The
backup erases individual queues and exchanges as the catch-up process gets to
them.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1390123 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoverableExchange.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/ha/AlternateExchangeSetter.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 84 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicationTest.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicationTest.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/tests/brokertest.py | 4 | ||||
-rwxr-xr-x | cpp/src/tests/ha_store_tests.py | 130 | ||||
-rwxr-xr-x | cpp/src/tests/ha_test.py | 258 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 224 |
15 files changed, 471 insertions, 261 deletions
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 2b2f7db934..26ee5b0054 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -174,6 +174,7 @@ public: const std::string& getName() const { return name; } bool isDurable() { return durable; } qpid::framing::FieldTable& getArgs() { return args; } + const qpid::framing::FieldTable& getArgs() const { return args; } QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; } QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate); diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h index ca6cc1541e..f8c08b2989 100644 --- a/cpp/src/qpid/broker/RecoverableExchange.h +++ b/cpp/src/qpid/broker/RecoverableExchange.h @@ -43,6 +43,9 @@ public: virtual void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args) = 0; + + virtual std::string getName() const = 0; + virtual ~RecoverableExchange() {}; }; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 7deeba5e65..b04d7c34e0 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -82,6 +82,7 @@ public: RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} void setPersistenceId(uint64_t id); void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); + string getName() const { return exchange->getName(); } }; class RecoverableConfigImpl : public RecoverableConfig diff --git a/cpp/src/qpid/ha/AlternateExchangeSetter.h b/cpp/src/qpid/ha/AlternateExchangeSetter.h index 08690e68bc..1878939aad 100644 --- a/cpp/src/qpid/ha/AlternateExchangeSetter.h +++ b/cpp/src/qpid/ha/AlternateExchangeSetter.h @@ -43,12 +43,14 @@ class AlternateExchangeSetter AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {} + /** If altEx is already known, call setter(altEx) now else save for later */ void setAlternate(const std::string& altEx, const SetFunction& setter) { broker::Exchange::shared_ptr ex = exchanges.find(altEx); if (ex) setter(ex); // Set immediately. else setters.insert(Setters::value_type(altEx, setter)); // Save for later. } + /** Add an exchange and call any setters that are waiting for it. */ void addExchange(boost::shared_ptr<broker::Exchange> exchange) { // Update the setters for this exchange std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName()); diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 0b6280da6d..2f0d304686 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -41,6 +41,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include <boost/bind.hpp> #include <algorithm> #include <sstream> #include <iostream> @@ -313,11 +314,11 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); - broker.getQueues().destroy(name); + broker.deleteQueue(name, userId, remoteHost); stopQueueReplicator(name); } settings.populate(args, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = + CreateQueueResult result = broker.createQueue( name, settings, @@ -325,7 +326,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[ALTEX].asString(), userId, remoteHost); - assert(result.second); // Should be true since we destroyed existing queue above + assert(result.second); startQueueReplicator(result.first); } } @@ -361,12 +362,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { - broker.getExchanges().destroy(name); + broker.deleteExchange(name, userId, remoteHost); QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); } - boost::shared_ptr<Exchange> exchange = - createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); - assert(exchange); + CreateExchangeResult result = createExchange( + name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, + values[ALTEX].asString()); + replicatedExchanges.insert(name); + assert(result.second); } } @@ -380,6 +383,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); broker.deleteExchange(name, userId, remoteHost); + replicatedExchanges.erase(name); } } @@ -399,7 +403,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->bind(queue, key, &args); + queue->bind(exchange, key, args); } } @@ -454,14 +458,20 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { return; string name(values[NAME].asString()); QPID_LOG(debug, logPrefix << "Queue response: " << name); + if (broker.getQueues().find(name)) { // Already exists + if (findQueueReplicator(name)) + return; // Already replicated + else { + QPID_LOG(debug, logPrefix << "Deleting queue to make way for replica: " << name); + broker.deleteQueue(name, userId, remoteHost); + } + } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Queue> queue = + CreateQueueResult result = createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - // It is normal for the queue to already exist if we are failing over. - if (queue) startQueueReplicator(queue); - else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + if (result.second) startQueueReplicator(result.first); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -469,13 +479,22 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { if (!replicationTest.replicateLevel(argsMap)) return; string name = values[NAME].asString(); QPID_LOG(debug, logPrefix << "Exchange response: " << name); + if (broker.getExchanges().find(name)) { + if (replicatedExchanges.find(name) != replicatedExchanges.end()) + return; // Already replicated + else { + QPID_LOG(debug, logPrefix << "Deleting exchange to make way for replica: " + << name); + broker.deleteExchange(name, userId, remoteHost); + } + } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Exchange> exchange = createExchange( + CreateExchangeResult result = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - // It is normal for the exchange to already exist if we are failing over. - QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name); + assert(result.second); + replicatedExchanges.insert(name); } namespace { @@ -563,7 +582,7 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) { } } -boost::shared_ptr<Queue> BrokerReplicator::createQueue( +BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( const std::string& name, bool durable, bool autodelete, @@ -572,7 +591,7 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( { QueueSettings settings(durable, autodelete); settings.populate(arguments, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = + CreateQueueResult result = broker.createQueue( name, settings, @@ -580,24 +599,22 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - if (result.second) { - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); - } - return result.first; + + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); } - else return boost::shared_ptr<Queue>(); + return result; } -boost::shared_ptr<Exchange> BrokerReplicator::createExchange( +BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( const std::string& name, const std::string& type, bool durable, const qpid::framing::FieldTable& args, const std::string& alternateExchange) { - std::pair<boost::shared_ptr<Exchange>, bool> result = + CreateExchangeResult result = broker.createExchange( name, type, @@ -606,15 +623,13 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange( args, userId, remoteHost); - if (result.second) { - alternates.addExchange(result.first); - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); - } - return result.first; + + alternates.addExchange(result.first); + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); } - else return boost::shared_ptr<Exchange>(); + return result; } bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } @@ -623,4 +638,5 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } + }} // namespace broker diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 69653b876a..5c8a983d45 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -78,6 +78,8 @@ class BrokerReplicator : public broker::Exchange, private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; + typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult; + typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult; void initializeBridge(broker::Bridge&, broker::SessionHandler&); @@ -98,14 +100,14 @@ class BrokerReplicator : public broker::Exchange, void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); void stopQueueReplicator(const std::string& name); - boost::shared_ptr<broker::Queue> createQueue( + CreateQueueResult createQueue( const std::string& name, bool durable, bool autodelete, const qpid::framing::FieldTable& arguments, const std::string& alternateExchange); - boost::shared_ptr<broker::Exchange> createExchange( + CreateExchangeResult createExchange( const std::string& name, const std::string& type, bool durable, @@ -121,6 +123,8 @@ class BrokerReplicator : public broker::Exchange, bool initialized; AlternateExchangeSetter alternates; qpid::Address primary; + typedef std::set<std::string> StringSet; + StringSet replicatedExchanges; // exchanges that have been replicated. }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index b0ef167176..8852554d31 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -61,7 +61,10 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) range = QueueRange(q); } -QueueGuard::~QueueGuard() { cancel(); } +QueueGuard::~QueueGuard() { + QPID_LOG(debug, logPrefix << "Cancelled"); + cancel(); +} // NOTE: Called with message lock held. void QueueGuard::enqueued(const Message& m) { @@ -97,7 +100,6 @@ void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) { } void QueueGuard::cancel() { - QPID_LOG(debug, logPrefix << "Cancelled"); queue.removeObserver(observer); Delayed removed; { diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 82daad9d9c..07c94ad261 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -67,6 +67,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, logPrefix("Backup queue "+q->getName()+": "), queue(q), link(l), brokerInfo(hb.getBrokerInfo()) { + args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); } diff --git a/cpp/src/qpid/ha/ReplicationTest.cpp b/cpp/src/qpid/ha/ReplicationTest.cpp index 88a969dbfd..1dd32262a0 100644 --- a/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/cpp/src/qpid/ha/ReplicationTest.cpp @@ -20,6 +20,7 @@ */ #include "ReplicationTest.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" namespace qpid { @@ -71,5 +72,10 @@ bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q) return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner()); } +bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex) +{ + return replicateLevel(ex.getArgs()) >= level; +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicationTest.h b/cpp/src/qpid/ha/ReplicationTest.h index 9f6976a8e4..ab6b1a6bcc 100644 --- a/cpp/src/qpid/ha/ReplicationTest.h +++ b/cpp/src/qpid/ha/ReplicationTest.h @@ -30,6 +30,7 @@ namespace qpid { namespace broker { class Queue; +class Exchange; } namespace framing { @@ -59,6 +60,7 @@ class ReplicationTest bool isReplicated(ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive); bool isReplicated(ReplicateLevel level, const broker::Queue&); + bool isReplicated(ReplicateLevel level, const broker::Exchange&); private: ReplicateLevel replicateDefault; }; diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 2a66aefc03..fb47aea875 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -348,7 +348,9 @@ EXTRA_DIST += \ run_msg_group_tests \ ipv6_test \ run_ha_tests \ + ha_test.py \ ha_tests.py \ + ha_store_tests.py \ test_env.ps1.in check_LTLIBRARIES += libdlclose_noop.la diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index dd09e8aa27..0ab0d13424 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -425,8 +425,8 @@ class Cluster: self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) return self._brokers[-1] - def ready(self): - for b in self: b.ready() + def ready(self, timeout=30, **kwargs): + for b in self: b.ready(**kwargs) def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) diff --git a/cpp/src/tests/ha_store_tests.py b/cpp/src/tests/ha_store_tests.py new file mode 100755 index 0000000000..d1eaca1b87 --- /dev/null +++ b/cpp/src/tests/ha_store_tests.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module contains tests for HA functionality that requires a store. +It is not included as part of "make check" since it will not function +without a store. Currently it can be run from a build of the message +store. +""" + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import traceback +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.datatypes import uuid4 +from brokertest import * +from ha_test import * +from threading import Thread, Lock, Condition +from logging import getLogger, WARN, ERROR, DEBUG, INFO +from qpidtoollibs import BrokerAgent +from uuid import UUID + + +class StoreTests(BrokerTest): + """Test for HA with persistence.""" + + def test_store_recovery(self): + """Verify basic store and recover functionality""" + cluster = HaCluster(self, 2) + sn = cluster[0].connect().session() + s = sn.sender("qq;{create:always,node:{durable:true}}") + sk = sn.sender("xx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:xx,key:k,queue:qq}]}}") + s.send(Message("foo", durable=True)) + s.send(Message("bar", durable=True)) + sk.send(Message("baz", durable=True)) + r = cluster[0].connect().session().receiver("qq") + self.assertEqual(r.fetch().content, "foo") + r.session.acknowledge() + # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush + # the dequeue operation on qq. + s.send(Message("flush", durable=True)) + + def verify(broker, x_count): + sn = broker.connect().session() + assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"]) + sn.sender("xx/k").send(Message("x", durable=True)) + assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"]) + + verify(cluster[0], 0) + cluster.bounce(0, promote_next=False) + cluster[0].promote() + cluster[0].wait_status("active") + verify(cluster[0], 1) + cluster.kill(0, promote_next=False) + cluster[1].promote() + cluster[1].wait_status("active") + verify(cluster[1], 2) + cluster.bounce(1, promote_next=False) + cluster[1].promote() + cluster[1].wait_status("active") + verify(cluster[1], 3) + + def test_catchup_store(self): + """Verify that a backup erases queue data from store recovery before + doing catch-up from the primary.""" + cluster = HaCluster(self, 2) + sn = cluster[0].connect().session() + s1 = sn.sender("q1;{create:always,node:{durable:true}}") + for m in ["foo","bar"]: s1.send(Message(m, durable=True)) + s2 = sn.sender("q2;{create:always,node:{durable:true}}") + sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}") + sk2.send(Message("hello", durable=True)) + # Wait for backup to catch up. + cluster[1].assert_browse_backup("q1", ["foo","bar"]) + cluster[1].assert_browse_backup("q2", ["hello"]) + + # Make changes that the backup doesn't see + cluster.kill(1, promote_next=False) + time.sleep(1) # FIXME aconway 2012-09-25: + r1 = cluster[0].connect().session().receiver("q1") + for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m) + r1.session.acknowledge() + for m in ["x","y","z"]: s1.send(Message(m, durable=True)) + # Use old connection to unbind + us = cluster[0].connect_old().session(str(uuid4())) + us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2") + us.exchange_bind(exchange="ex", binding_key="k1", queue="q1") + # Restart both brokers from store to get inconsistent sequence numbering. + cluster.bounce(0, promote_next=False) + cluster[0].promote() + cluster[0].wait_status("active") + cluster.restart(1) + cluster[1].wait_status("ready") + + # Verify state + cluster[0].assert_browse("q1", ["x","y","z"]) + cluster[1].assert_browse_backup("q1", ["x","y","z"]) + sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over! + sn.sender("ex/k1").send("boo") + cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"]) + cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"]) + sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped. + sn.sender("q2").send("end") # mark the end of the queue for assert_browse + cluster[0].assert_browse("q2", ["hello", "end"]) + cluster[1].assert_browse_backup("q2", ["hello", "end"]) + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + qpid_ha = os.getenv("QPID_HA_EXEC") + if qpid_ha and os.path.exists(qpid_ha): + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "ha_store_tests"] + sys.argv[1:]) + else: + print "Skipping ha_store_tests, %s not available"%(qpid_ha) diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py new file mode 100755 index 0000000000..18a969a07b --- /dev/null +++ b/cpp/src/tests/ha_test.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import traceback +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.datatypes import uuid4 +from brokertest import * +from threading import Thread, Lock, Condition +from logging import getLogger, WARN, ERROR, DEBUG, INFO +from qpidtoollibs import BrokerAgent +from uuid import UUID + +log = getLogger(__name__) + +class QmfAgent(object): + """Access to a QMF broker agent.""" + def __init__(self, address, **kwargs): + self._connection = Connection.establish( + address, client_properties={"qpid.ha-admin":1}, **kwargs) + self._agent = BrokerAgent(self._connection) + + def __getattr__(self, name): + a = getattr(self._agent, name) + return a + +class Credentials(object): + """SASL credentials: username, password, and mechanism""" + def __init__(self, username, password, mechanism): + (self.username, self.password, self.mechanism) = (username, password, mechanism) + + def __str__(self): return "Credentials%s"%(self.tuple(),) + + def tuple(self): return (self.username, self.password, self.mechanism) + + def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) + +class HaBroker(Broker): + """Start a broker with HA enabled + @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. + """ + def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", + client_credentials=None, **kwargs): + assert BrokerTest.ha_lib, "Cannot locate HA plug-in" + args = copy(args) + args += ["--load-module", BrokerTest.ha_lib, + "--log-enable=debug+:ha::", + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-cluster=%s"%ha_cluster] + if ha_replicate is not None: + args += [ "--ha-replicate=%s"%ha_replicate ] + if brokers_url: args += [ "--ha-brokers-url", brokers_url ] + Broker.__init__(self, test, args, **kwargs) + self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") + assert os.path.exists(self.qpid_ha_path) + self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") + assert os.path.exists(self.qpid_config_path) + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + self.qpid_ha_script=import_script(self.qpid_ha_path) + self._agent = None + self.client_credentials = client_credentials + + def __str__(self): return Broker.__str__(self) + + def qpid_ha(self, args): + cred = self.client_credentials + url = self.host_port() + if cred: + url =cred.add_user(url) + args = args + ["--sasl-mechanism", cred.mechanism] + self.qpid_ha_script.main_except(["", "-b", url]+args) + + def promote(self): self.ready(); self.qpid_ha(["promote"]) + def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) + def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) + def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + + def agent(self): + if not self._agent: + cred = self.client_credentials + if cred: + self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) + else: + self._agent = QmfAgent(self.host_port()) + return self._agent + + def ha_status(self): + hb = self.agent().getHaBroker() + hb.update() + return hb.status + + def wait_status(self, status): + def try_get_status(): + self._status = "<unknown>" + # Ignore ConnectionError, the broker may not be up yet. + try: + self._status = self.ha_status() + return self._status == status; + except ConnectionError: return False + assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( + self, status, self._status) + + # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + def qpid_config(self, args): + assert subprocess.call( + [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 + + def config_replicate(self, from_broker, queue): + self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) + + def config_declare(self, queue, replication): + self.qpid_config(["add", "queue", queue, "--replicate", replication]) + + def connect_admin(self, **kwargs): + cred = self.client_credentials + if cred: + return Broker.connect( + self, client_properties={"qpid.ha-admin":1}, + username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, + **kwargs) + else: + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + + def wait_backup(self, address): + """Wait for address to become valid on a backup broker.""" + bs = self.connect_admin().session() + try: wait_address(bs, address) + finally: bs.connection.close() + + def assert_browse(self, queue, expected, **kwargs): + """Verify queue contents by browsing.""" + bs = self.connect().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + + def assert_browse_backup(self, queue, expected, **kwargs): + """Combines wait_backup and assert_browse_retry.""" + bs = self.connect_admin().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + + def assert_connect_fail(self): + try: + self.connect() + self.test.fail("Expected ConnectionError") + except ConnectionError: pass + + def try_connect(self): + try: return self.connect() + except ConnectionError: return None + + def ready(self): + return Broker.ready(self, client_properties={"qpid.ha-admin":1}) + + +class HaCluster(object): + _cluster_count = 0 + + def __init__(self, test, n, promote=True, **kwargs): + """Start a cluster of n brokers""" + self.test = test + self.kwargs = kwargs + self._brokers = [] + self.id = HaCluster._cluster_count + self.broker_id = 0 + HaCluster._cluster_count += 1 + for i in xrange(n): self.start(False) + self.update_urls() + self[0].promote() + + def next_name(self): + name="cluster%s-%s"%(self.id, self.broker_id) + self.broker_id += 1 + return name + + def start(self, update_urls=True, args=[]): + """Start a new broker in the cluster""" + b = HaBroker(self.test, name=self.next_name(), **self.kwargs) + self._brokers.append(b) + if update_urls: self.update_urls() + return b + + def update_urls(self): + self.url = ",".join([b.host_port() for b in self]) + if len(self) > 1: # No failover addresses on a 1 cluster. + for b in self: b.set_brokers_url(self.url) + + def connect(self, i): + """Connect with reconnect_urls""" + return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) + + def kill(self, i, promote_next=True): + """Kill broker i, promote broker i+1""" + self[i].expect = EXPECT_EXIT_FAIL + self[i].kill() + if promote_next: self[(i+1) % len(self)].promote() + + def restart(self, i): + """Start a broker with the same port, name and data directory. It will get + a separate log file: foo.n.log""" + b = self._brokers[i] + self._brokers[i] = HaBroker( + self.test, name=b.name, port=b.port(), brokers_url=self.url, + **self.kwargs) + + def bounce(self, i, promote_next=True): + """Stop and restart a broker in a cluster.""" + if (len(self) == 1): + self.kill(i, promote_next=False) + self.restart(i) + self[i].ready() + if promote_next: self[i].promote() + else: + self.kill(i, promote_next) + self.restart(i) + + # Behave like a list of brokers. + def __len__(self): return len(self._brokers) + def __getitem__(self,index): return self._brokers[index] + def __iter__(self): return self._brokers.__iter__() + +def wait_address(session, address): + """Wait for an address to become valid.""" + def check(): + try: + session.sender(address) + return True + except NotFound: return False + assert retry(check), "Timed out waiting for address %s"%(address) + +def valid_address(session, address): + """Test if an address is valid""" + try: + session.receiver(address) + return True + except NotFound: return False diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 31142de293..de87c49d21 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -23,230 +23,12 @@ import traceback from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4 from brokertest import * +from ha_test import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent from uuid import UUID -log = getLogger(__name__) - -class QmfAgent(object): - """Access to a QMF broker agent.""" - def __init__(self, address, **kwargs): - self._connection = Connection.establish( - address, client_properties={"qpid.ha-admin":1}, **kwargs) - self._agent = BrokerAgent(self._connection) - - def __getattr__(self, name): - a = getattr(self._agent, name) - return a - -class Credentials(object): - """SASL credentials: username, password, and mechanism""" - def __init__(self, username, password, mechanism): - (self.username, self.password, self.mechanism) = (username, password, mechanism) - - def __str__(self): return "Credentials%s"%(self.tuple(),) - - def tuple(self): return (self.username, self.password, self.mechanism) - - def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) - -class HaBroker(Broker): - """Start a broker with HA enabled - @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. - """ - def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", - client_credentials=None, **kwargs): - assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - args = copy(args) - args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=%s"%ha_cluster] - if ha_replicate is not None: - args += [ "--ha-replicate=%s"%ha_replicate ] - if brokers_url: args += [ "--ha-brokers-url", brokers_url ] - Broker.__init__(self, test, args, **kwargs) - self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") - assert os.path.exists(self.qpid_ha_path) - self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") - assert os.path.exists(self.qpid_config_path) - getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - self.qpid_ha_script=import_script(self.qpid_ha_path) - self._agent = None - self.client_credentials = client_credentials - - def __str__(self): return Broker.__str__(self) - - def qpid_ha(self, args): - cred = self.client_credentials - url = self.host_port() - if cred: - url =cred.add_user(url) - args = args + ["--sasl-mechanism", cred.mechanism] - self.qpid_ha_script.main_except(["", "-b", url]+args) - - def promote(self): self.qpid_ha(["promote"]) - def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) - def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) - def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - - def agent(self): - if not self._agent: - cred = self.client_credentials - if cred: - self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) - else: - self._agent = QmfAgent(self.host_port()) - return self._agent - - def ha_status(self): - hb = self.agent().getHaBroker() - hb.update() - return hb.status - - def wait_status(self, status): - def try_get_status(): - self._status = self.ha_status() - # Ignore ConnectionError, the broker may not be up yet. - try: - self._status = self.ha_status() - return self._status == status; - except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( - self, status, self._status) - - # FIXME aconway 2012-05-01: do direct python call to qpid-config code. - def qpid_config(self, args): - assert subprocess.call( - [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 - - def config_replicate(self, from_broker, queue): - self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) - - def config_declare(self, queue, replication): - self.qpid_config(["add", "queue", queue, "--replicate", replication]) - - def connect_admin(self, **kwargs): - cred = self.client_credentials - if cred: - return Broker.connect( - self, client_properties={"qpid.ha-admin":1}, - username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, - **kwargs) - else: - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) - - def wait_backup(self, address): - """Wait for address to become valid on a backup broker.""" - bs = self.connect_admin().session() - try: wait_address(bs, address) - finally: bs.connection.close() - - def assert_browse(self, queue, expected, **kwargs): - """Verify queue contents by browsing.""" - bs = self.connect().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_browse_backup(self, queue, expected, **kwargs): - """Combines wait_backup and assert_browse_retry.""" - bs = self.connect_admin().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_connect_fail(self): - try: - self.connect() - self.test.fail("Expected ConnectionError") - except ConnectionError: pass - - def try_connect(self): - try: return self.connect() - except ConnectionError: return None - -class HaCluster(object): - _cluster_count = 0 - - def __init__(self, test, n, promote=True, **kwargs): - """Start a cluster of n brokers""" - self.test = test - self.kwargs = kwargs - self._brokers = [] - self.id = HaCluster._cluster_count - self.broker_id = 0 - HaCluster._cluster_count += 1 - for i in xrange(n): self.start(False) - self.update_urls() - self[0].promote() - - def next_name(self): - name="cluster%s-%s"%(self.id, self.broker_id) - self.broker_id += 1 - return name - - def start(self, update_urls=True, args=[]): - """Start a new broker in the cluster""" - b = HaBroker(self.test, name=self.next_name(), **self.kwargs) - self._brokers.append(b) - if update_urls: self.update_urls() - return b - - def update_urls(self): - self.url = ",".join([b.host_port() for b in self]) - if len(self) > 1: # No failover addresses on a 1 cluster. - for b in self: b.set_brokers_url(self.url) - - def connect(self, i): - """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) - - def kill(self, i, promote_next=True): - """Kill broker i, promote broker i+1""" - self[i].expect = EXPECT_EXIT_FAIL - self[i].kill() - if promote_next: self[(i+1) % len(self)].promote() - - def restart(self, i): - """Start a broker with the same port, name and data directory. It will get - a separate log file: foo.n.log""" - b = self._brokers[i] - self._brokers[i] = HaBroker( - self.test, name=b.name, port=b.port(), brokers_url=self.url, - **self.kwargs) - - def bounce(self, i, promote_next=True): - """Stop and restart a broker in a cluster.""" - self.kill(i, promote_next) - self.restart(i) - - # Behave like a list of brokers. - def __len__(self): return len(self._brokers) - def __getitem__(self,index): return self._brokers[index] - def __iter__(self): return self._brokers.__iter__() - -def wait_address(session, address): - """Wait for an address to become valid.""" - def check(): - try: - session.sender(address) - return True - except NotFound: return False - assert retry(check), "Timed out waiting for address %s"%(address) - -def valid_address(session, address): - """Test if an address is valid""" - try: - session.receiver(address) - return True - except NotFound: return False - class ReplicationTests(BrokerTest): """Correctness tests for HA replication.""" @@ -927,7 +709,7 @@ class LongTests(BrokerTest): if dead is not None: brokers.restart(dead) # Restart backup - brokers[dead].ready(client_properties={"qpid.ha-admin":1}) + brokers[dead].ready() dead = None i += 1 except: @@ -1031,5 +813,5 @@ if __name__ == "__main__": os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) else: - print "Skipping ha_tests, qpid_ha not available" + print "Skipping ha_tests, %s not available"%(qpid_ha) |