diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoverableExchange.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/ha/AlternateExchangeSetter.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 84 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicationTest.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicationTest.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/tests/brokertest.py | 4 | ||||
-rwxr-xr-x | cpp/src/tests/ha_store_tests.py | 130 | ||||
-rwxr-xr-x | cpp/src/tests/ha_test.py | 258 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 224 |
15 files changed, 471 insertions, 261 deletions
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 2b2f7db934..26ee5b0054 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -174,6 +174,7 @@ public: const std::string& getName() const { return name; } bool isDurable() { return durable; } qpid::framing::FieldTable& getArgs() { return args; } + const qpid::framing::FieldTable& getArgs() const { return args; } QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; } QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate); diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h index ca6cc1541e..f8c08b2989 100644 --- a/cpp/src/qpid/broker/RecoverableExchange.h +++ b/cpp/src/qpid/broker/RecoverableExchange.h @@ -43,6 +43,9 @@ public: virtual void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args) = 0; + + virtual std::string getName() const = 0; + virtual ~RecoverableExchange() {}; }; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 7deeba5e65..b04d7c34e0 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -82,6 +82,7 @@ public: RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} void setPersistenceId(uint64_t id); void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); + string getName() const { return exchange->getName(); } }; class RecoverableConfigImpl : public RecoverableConfig diff --git a/cpp/src/qpid/ha/AlternateExchangeSetter.h b/cpp/src/qpid/ha/AlternateExchangeSetter.h index 08690e68bc..1878939aad 100644 --- a/cpp/src/qpid/ha/AlternateExchangeSetter.h +++ b/cpp/src/qpid/ha/AlternateExchangeSetter.h @@ -43,12 +43,14 @@ class AlternateExchangeSetter AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {} + /** If altEx is already known, call setter(altEx) now else save for later */ void setAlternate(const std::string& altEx, const SetFunction& setter) { broker::Exchange::shared_ptr ex = exchanges.find(altEx); if (ex) setter(ex); // Set immediately. else setters.insert(Setters::value_type(altEx, setter)); // Save for later. } + /** Add an exchange and call any setters that are waiting for it. */ void addExchange(boost::shared_ptr<broker::Exchange> exchange) { // Update the setters for this exchange std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName()); diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 0b6280da6d..2f0d304686 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -41,6 +41,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include <boost/bind.hpp> #include <algorithm> #include <sstream> #include <iostream> @@ -313,11 +314,11 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); - broker.getQueues().destroy(name); + broker.deleteQueue(name, userId, remoteHost); stopQueueReplicator(name); } settings.populate(args, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = + CreateQueueResult result = broker.createQueue( name, settings, @@ -325,7 +326,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[ALTEX].asString(), userId, remoteHost); - assert(result.second); // Should be true since we destroyed existing queue above + assert(result.second); startQueueReplicator(result.first); } } @@ -361,12 +362,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { - broker.getExchanges().destroy(name); + broker.deleteExchange(name, userId, remoteHost); QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); } - boost::shared_ptr<Exchange> exchange = - createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); - assert(exchange); + CreateExchangeResult result = createExchange( + name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, + values[ALTEX].asString()); + replicatedExchanges.insert(name); + assert(result.second); } } @@ -380,6 +383,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); broker.deleteExchange(name, userId, remoteHost); + replicatedExchanges.erase(name); } } @@ -399,7 +403,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->bind(queue, key, &args); + queue->bind(exchange, key, args); } } @@ -454,14 +458,20 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { return; string name(values[NAME].asString()); QPID_LOG(debug, logPrefix << "Queue response: " << name); + if (broker.getQueues().find(name)) { // Already exists + if (findQueueReplicator(name)) + return; // Already replicated + else { + QPID_LOG(debug, logPrefix << "Deleting queue to make way for replica: " << name); + broker.deleteQueue(name, userId, remoteHost); + } + } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Queue> queue = + CreateQueueResult result = createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - // It is normal for the queue to already exist if we are failing over. - if (queue) startQueueReplicator(queue); - else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + if (result.second) startQueueReplicator(result.first); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -469,13 +479,22 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { if (!replicationTest.replicateLevel(argsMap)) return; string name = values[NAME].asString(); QPID_LOG(debug, logPrefix << "Exchange response: " << name); + if (broker.getExchanges().find(name)) { + if (replicatedExchanges.find(name) != replicatedExchanges.end()) + return; // Already replicated + else { + QPID_LOG(debug, logPrefix << "Deleting exchange to make way for replica: " + << name); + broker.deleteExchange(name, userId, remoteHost); + } + } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Exchange> exchange = createExchange( + CreateExchangeResult result = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - // It is normal for the exchange to already exist if we are failing over. - QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name); + assert(result.second); + replicatedExchanges.insert(name); } namespace { @@ -563,7 +582,7 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) { } } -boost::shared_ptr<Queue> BrokerReplicator::createQueue( +BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( const std::string& name, bool durable, bool autodelete, @@ -572,7 +591,7 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( { QueueSettings settings(durable, autodelete); settings.populate(arguments, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = + CreateQueueResult result = broker.createQueue( name, settings, @@ -580,24 +599,22 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - if (result.second) { - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); - } - return result.first; + + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); } - else return boost::shared_ptr<Queue>(); + return result; } -boost::shared_ptr<Exchange> BrokerReplicator::createExchange( +BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( const std::string& name, const std::string& type, bool durable, const qpid::framing::FieldTable& args, const std::string& alternateExchange) { - std::pair<boost::shared_ptr<Exchange>, bool> result = + CreateExchangeResult result = broker.createExchange( name, type, @@ -606,15 +623,13 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange( args, userId, remoteHost); - if (result.second) { - alternates.addExchange(result.first); - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); - } - return result.first; + + alternates.addExchange(result.first); + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); } - else return boost::shared_ptr<Exchange>(); + return result; } bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } @@ -623,4 +638,5 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } + }} // namespace broker diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 69653b876a..5c8a983d45 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -78,6 +78,8 @@ class BrokerReplicator : public broker::Exchange, private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; + typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult; + typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult; void initializeBridge(broker::Bridge&, broker::SessionHandler&); @@ -98,14 +100,14 @@ class BrokerReplicator : public broker::Exchange, void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); void stopQueueReplicator(const std::string& name); - boost::shared_ptr<broker::Queue> createQueue( + CreateQueueResult createQueue( const std::string& name, bool durable, bool autodelete, const qpid::framing::FieldTable& arguments, const std::string& alternateExchange); - boost::shared_ptr<broker::Exchange> createExchange( + CreateExchangeResult createExchange( const std::string& name, const std::string& type, bool durable, @@ -121,6 +123,8 @@ class BrokerReplicator : public broker::Exchange, bool initialized; AlternateExchangeSetter alternates; qpid::Address primary; + typedef std::set<std::string> StringSet; + StringSet replicatedExchanges; // exchanges that have been replicated. }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index b0ef167176..8852554d31 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -61,7 +61,10 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) range = QueueRange(q); } -QueueGuard::~QueueGuard() { cancel(); } +QueueGuard::~QueueGuard() { + QPID_LOG(debug, logPrefix << "Cancelled"); + cancel(); +} // NOTE: Called with message lock held. void QueueGuard::enqueued(const Message& m) { @@ -97,7 +100,6 @@ void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) { } void QueueGuard::cancel() { - QPID_LOG(debug, logPrefix << "Cancelled"); queue.removeObserver(observer); Delayed removed; { diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 82daad9d9c..07c94ad261 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -67,6 +67,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, logPrefix("Backup queue "+q->getName()+": "), queue(q), link(l), brokerInfo(hb.getBrokerInfo()) { + args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); } diff --git a/cpp/src/qpid/ha/ReplicationTest.cpp b/cpp/src/qpid/ha/ReplicationTest.cpp index 88a969dbfd..1dd32262a0 100644 --- a/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/cpp/src/qpid/ha/ReplicationTest.cpp @@ -20,6 +20,7 @@ */ #include "ReplicationTest.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" namespace qpid { @@ -71,5 +72,10 @@ bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q) return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner()); } +bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex) +{ + return replicateLevel(ex.getArgs()) >= level; +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicationTest.h b/cpp/src/qpid/ha/ReplicationTest.h index 9f6976a8e4..ab6b1a6bcc 100644 --- a/cpp/src/qpid/ha/ReplicationTest.h +++ b/cpp/src/qpid/ha/ReplicationTest.h @@ -30,6 +30,7 @@ namespace qpid { namespace broker { class Queue; +class Exchange; } namespace framing { @@ -59,6 +60,7 @@ class ReplicationTest bool isReplicated(ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive); bool isReplicated(ReplicateLevel level, const broker::Queue&); + bool isReplicated(ReplicateLevel level, const broker::Exchange&); private: ReplicateLevel replicateDefault; }; diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 2a66aefc03..fb47aea875 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -348,7 +348,9 @@ EXTRA_DIST += \ run_msg_group_tests \ ipv6_test \ run_ha_tests \ + ha_test.py \ ha_tests.py \ + ha_store_tests.py \ test_env.ps1.in check_LTLIBRARIES += libdlclose_noop.la diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index dd09e8aa27..0ab0d13424 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -425,8 +425,8 @@ class Cluster: self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) return self._brokers[-1] - def ready(self): - for b in self: b.ready() + def ready(self, timeout=30, **kwargs): + for b in self: b.ready(**kwargs) def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) diff --git a/cpp/src/tests/ha_store_tests.py b/cpp/src/tests/ha_store_tests.py new file mode 100755 index 0000000000..d1eaca1b87 --- /dev/null +++ b/cpp/src/tests/ha_store_tests.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module contains tests for HA functionality that requires a store. +It is not included as part of "make check" since it will not function +without a store. Currently it can be run from a build of the message +store. +""" + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import traceback +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.datatypes import uuid4 +from brokertest import * +from ha_test import * +from threading import Thread, Lock, Condition +from logging import getLogger, WARN, ERROR, DEBUG, INFO +from qpidtoollibs import BrokerAgent +from uuid import UUID + + +class StoreTests(BrokerTest): + """Test for HA with persistence.""" + + def test_store_recovery(self): + """Verify basic store and recover functionality""" + cluster = HaCluster(self, 2) + sn = cluster[0].connect().session() + s = sn.sender("qq;{create:always,node:{durable:true}}") + sk = sn.sender("xx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:xx,key:k,queue:qq}]}}") + s.send(Message("foo", durable=True)) + s.send(Message("bar", durable=True)) + sk.send(Message("baz", durable=True)) + r = cluster[0].connect().session().receiver("qq") + self.assertEqual(r.fetch().content, "foo") + r.session.acknowledge() + # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush + # the dequeue operation on qq. + s.send(Message("flush", durable=True)) + + def verify(broker, x_count): + sn = broker.connect().session() + assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"]) + sn.sender("xx/k").send(Message("x", durable=True)) + assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"]) + + verify(cluster[0], 0) + cluster.bounce(0, promote_next=False) + cluster[0].promote() + cluster[0].wait_status("active") + verify(cluster[0], 1) + cluster.kill(0, promote_next=False) + cluster[1].promote() + cluster[1].wait_status("active") + verify(cluster[1], 2) + cluster.bounce(1, promote_next=False) + cluster[1].promote() + cluster[1].wait_status("active") + verify(cluster[1], 3) + + def test_catchup_store(self): + """Verify that a backup erases queue data from store recovery before + doing catch-up from the primary.""" + cluster = HaCluster(self, 2) + sn = cluster[0].connect().session() + s1 = sn.sender("q1;{create:always,node:{durable:true}}") + for m in ["foo","bar"]: s1.send(Message(m, durable=True)) + s2 = sn.sender("q2;{create:always,node:{durable:true}}") + sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}") + sk2.send(Message("hello", durable=True)) + # Wait for backup to catch up. + cluster[1].assert_browse_backup("q1", ["foo","bar"]) + cluster[1].assert_browse_backup("q2", ["hello"]) + + # Make changes that the backup doesn't see + cluster.kill(1, promote_next=False) + time.sleep(1) # FIXME aconway 2012-09-25: + r1 = cluster[0].connect().session().receiver("q1") + for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m) + r1.session.acknowledge() + for m in ["x","y","z"]: s1.send(Message(m, durable=True)) + # Use old connection to unbind + us = cluster[0].connect_old().session(str(uuid4())) + us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2") + us.exchange_bind(exchange="ex", binding_key="k1", queue="q1") + # Restart both brokers from store to get inconsistent sequence numbering. + cluster.bounce(0, promote_next=False) + cluster[0].promote() + cluster[0].wait_status("active") + cluster.restart(1) + cluster[1].wait_status("ready") + + # Verify state + cluster[0].assert_browse("q1", ["x","y","z"]) + cluster[1].assert_browse_backup("q1", ["x","y","z"]) + sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over! + sn.sender("ex/k1").send("boo") + cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"]) + cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"]) + sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped. + sn.sender("q2").send("end") # mark the end of the queue for assert_browse + cluster[0].assert_browse("q2", ["hello", "end"]) + cluster[1].assert_browse_backup("q2", ["hello", "end"]) + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + qpid_ha = os.getenv("QPID_HA_EXEC") + if qpid_ha and os.path.exists(qpid_ha): + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "ha_store_tests"] + sys.argv[1:]) + else: + print "Skipping ha_store_tests, %s not available"%(qpid_ha) diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py new file mode 100755 index 0000000000..18a969a07b --- /dev/null +++ b/cpp/src/tests/ha_test.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import traceback +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.datatypes import uuid4 +from brokertest import * +from threading import Thread, Lock, Condition +from logging import getLogger, WARN, ERROR, DEBUG, INFO +from qpidtoollibs import BrokerAgent +from uuid import UUID + +log = getLogger(__name__) + +class QmfAgent(object): + """Access to a QMF broker agent.""" + def __init__(self, address, **kwargs): + self._connection = Connection.establish( + address, client_properties={"qpid.ha-admin":1}, **kwargs) + self._agent = BrokerAgent(self._connection) + + def __getattr__(self, name): + a = getattr(self._agent, name) + return a + +class Credentials(object): + """SASL credentials: username, password, and mechanism""" + def __init__(self, username, password, mechanism): + (self.username, self.password, self.mechanism) = (username, password, mechanism) + + def __str__(self): return "Credentials%s"%(self.tuple(),) + + def tuple(self): return (self.username, self.password, self.mechanism) + + def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) + +class HaBroker(Broker): + """Start a broker with HA enabled + @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. + """ + def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", + client_credentials=None, **kwargs): + assert BrokerTest.ha_lib, "Cannot locate HA plug-in" + args = copy(args) + args += ["--load-module", BrokerTest.ha_lib, + "--log-enable=debug+:ha::", + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-cluster=%s"%ha_cluster] + if ha_replicate is not None: + args += [ "--ha-replicate=%s"%ha_replicate ] + if brokers_url: args += [ "--ha-brokers-url", brokers_url ] + Broker.__init__(self, test, args, **kwargs) + self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") + assert os.path.exists(self.qpid_ha_path) + self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") + assert os.path.exists(self.qpid_config_path) + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + self.qpid_ha_script=import_script(self.qpid_ha_path) + self._agent = None + self.client_credentials = client_credentials + + def __str__(self): return Broker.__str__(self) + + def qpid_ha(self, args): + cred = self.client_credentials + url = self.host_port() + if cred: + url =cred.add_user(url) + args = args + ["--sasl-mechanism", cred.mechanism] + self.qpid_ha_script.main_except(["", "-b", url]+args) + + def promote(self): self.ready(); self.qpid_ha(["promote"]) + def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) + def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) + def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + + def agent(self): + if not self._agent: + cred = self.client_credentials + if cred: + self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) + else: + self._agent = QmfAgent(self.host_port()) + return self._agent + + def ha_status(self): + hb = self.agent().getHaBroker() + hb.update() + return hb.status + + def wait_status(self, status): + def try_get_status(): + self._status = "<unknown>" + # Ignore ConnectionError, the broker may not be up yet. + try: + self._status = self.ha_status() + return self._status == status; + except ConnectionError: return False + assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( + self, status, self._status) + + # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + def qpid_config(self, args): + assert subprocess.call( + [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 + + def config_replicate(self, from_broker, queue): + self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) + + def config_declare(self, queue, replication): + self.qpid_config(["add", "queue", queue, "--replicate", replication]) + + def connect_admin(self, **kwargs): + cred = self.client_credentials + if cred: + return Broker.connect( + self, client_properties={"qpid.ha-admin":1}, + username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, + **kwargs) + else: + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + + def wait_backup(self, address): + """Wait for address to become valid on a backup broker.""" + bs = self.connect_admin().session() + try: wait_address(bs, address) + finally: bs.connection.close() + + def assert_browse(self, queue, expected, **kwargs): + """Verify queue contents by browsing.""" + bs = self.connect().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + + def assert_browse_backup(self, queue, expected, **kwargs): + """Combines wait_backup and assert_browse_retry.""" + bs = self.connect_admin().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + + def assert_connect_fail(self): + try: + self.connect() + self.test.fail("Expected ConnectionError") + except ConnectionError: pass + + def try_connect(self): + try: return self.connect() + except ConnectionError: return None + + def ready(self): + return Broker.ready(self, client_properties={"qpid.ha-admin":1}) + + +class HaCluster(object): + _cluster_count = 0 + + def __init__(self, test, n, promote=True, **kwargs): + """Start a cluster of n brokers""" + self.test = test + self.kwargs = kwargs + self._brokers = [] + self.id = HaCluster._cluster_count + self.broker_id = 0 + HaCluster._cluster_count += 1 + for i in xrange(n): self.start(False) + self.update_urls() + self[0].promote() + + def next_name(self): + name="cluster%s-%s"%(self.id, self.broker_id) + self.broker_id += 1 + return name + + def start(self, update_urls=True, args=[]): + """Start a new broker in the cluster""" + b = HaBroker(self.test, name=self.next_name(), **self.kwargs) + self._brokers.append(b) + if update_urls: self.update_urls() + return b + + def update_urls(self): + self.url = ",".join([b.host_port() for b in self]) + if len(self) > 1: # No failover addresses on a 1 cluster. + for b in self: b.set_brokers_url(self.url) + + def connect(self, i): + """Connect with reconnect_urls""" + return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) + + def kill(self, i, promote_next=True): + """Kill broker i, promote broker i+1""" + self[i].expect = EXPECT_EXIT_FAIL + self[i].kill() + if promote_next: self[(i+1) % len(self)].promote() + + def restart(self, i): + """Start a broker with the same port, name and data directory. It will get + a separate log file: foo.n.log""" + b = self._brokers[i] + self._brokers[i] = HaBroker( + self.test, name=b.name, port=b.port(), brokers_url=self.url, + **self.kwargs) + + def bounce(self, i, promote_next=True): + """Stop and restart a broker in a cluster.""" + if (len(self) == 1): + self.kill(i, promote_next=False) + self.restart(i) + self[i].ready() + if promote_next: self[i].promote() + else: + self.kill(i, promote_next) + self.restart(i) + + # Behave like a list of brokers. + def __len__(self): return len(self._brokers) + def __getitem__(self,index): return self._brokers[index] + def __iter__(self): return self._brokers.__iter__() + +def wait_address(session, address): + """Wait for an address to become valid.""" + def check(): + try: + session.sender(address) + return True + except NotFound: return False + assert retry(check), "Timed out waiting for address %s"%(address) + +def valid_address(session, address): + """Test if an address is valid""" + try: + session.receiver(address) + return True + except NotFound: return False diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 31142de293..de87c49d21 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -23,230 +23,12 @@ import traceback from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4 from brokertest import * +from ha_test import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent from uuid import UUID -log = getLogger(__name__) - -class QmfAgent(object): - """Access to a QMF broker agent.""" - def __init__(self, address, **kwargs): - self._connection = Connection.establish( - address, client_properties={"qpid.ha-admin":1}, **kwargs) - self._agent = BrokerAgent(self._connection) - - def __getattr__(self, name): - a = getattr(self._agent, name) - return a - -class Credentials(object): - """SASL credentials: username, password, and mechanism""" - def __init__(self, username, password, mechanism): - (self.username, self.password, self.mechanism) = (username, password, mechanism) - - def __str__(self): return "Credentials%s"%(self.tuple(),) - - def tuple(self): return (self.username, self.password, self.mechanism) - - def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) - -class HaBroker(Broker): - """Start a broker with HA enabled - @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. - """ - def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", - client_credentials=None, **kwargs): - assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - args = copy(args) - args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=%s"%ha_cluster] - if ha_replicate is not None: - args += [ "--ha-replicate=%s"%ha_replicate ] - if brokers_url: args += [ "--ha-brokers-url", brokers_url ] - Broker.__init__(self, test, args, **kwargs) - self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") - assert os.path.exists(self.qpid_ha_path) - self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") - assert os.path.exists(self.qpid_config_path) - getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - self.qpid_ha_script=import_script(self.qpid_ha_path) - self._agent = None - self.client_credentials = client_credentials - - def __str__(self): return Broker.__str__(self) - - def qpid_ha(self, args): - cred = self.client_credentials - url = self.host_port() - if cred: - url =cred.add_user(url) - args = args + ["--sasl-mechanism", cred.mechanism] - self.qpid_ha_script.main_except(["", "-b", url]+args) - - def promote(self): self.qpid_ha(["promote"]) - def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) - def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) - def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - - def agent(self): - if not self._agent: - cred = self.client_credentials - if cred: - self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) - else: - self._agent = QmfAgent(self.host_port()) - return self._agent - - def ha_status(self): - hb = self.agent().getHaBroker() - hb.update() - return hb.status - - def wait_status(self, status): - def try_get_status(): - self._status = self.ha_status() - # Ignore ConnectionError, the broker may not be up yet. - try: - self._status = self.ha_status() - return self._status == status; - except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( - self, status, self._status) - - # FIXME aconway 2012-05-01: do direct python call to qpid-config code. - def qpid_config(self, args): - assert subprocess.call( - [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 - - def config_replicate(self, from_broker, queue): - self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) - - def config_declare(self, queue, replication): - self.qpid_config(["add", "queue", queue, "--replicate", replication]) - - def connect_admin(self, **kwargs): - cred = self.client_credentials - if cred: - return Broker.connect( - self, client_properties={"qpid.ha-admin":1}, - username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, - **kwargs) - else: - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) - - def wait_backup(self, address): - """Wait for address to become valid on a backup broker.""" - bs = self.connect_admin().session() - try: wait_address(bs, address) - finally: bs.connection.close() - - def assert_browse(self, queue, expected, **kwargs): - """Verify queue contents by browsing.""" - bs = self.connect().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_browse_backup(self, queue, expected, **kwargs): - """Combines wait_backup and assert_browse_retry.""" - bs = self.connect_admin().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_connect_fail(self): - try: - self.connect() - self.test.fail("Expected ConnectionError") - except ConnectionError: pass - - def try_connect(self): - try: return self.connect() - except ConnectionError: return None - -class HaCluster(object): - _cluster_count = 0 - - def __init__(self, test, n, promote=True, **kwargs): - """Start a cluster of n brokers""" - self.test = test - self.kwargs = kwargs - self._brokers = [] - self.id = HaCluster._cluster_count - self.broker_id = 0 - HaCluster._cluster_count += 1 - for i in xrange(n): self.start(False) - self.update_urls() - self[0].promote() - - def next_name(self): - name="cluster%s-%s"%(self.id, self.broker_id) - self.broker_id += 1 - return name - - def start(self, update_urls=True, args=[]): - """Start a new broker in the cluster""" - b = HaBroker(self.test, name=self.next_name(), **self.kwargs) - self._brokers.append(b) - if update_urls: self.update_urls() - return b - - def update_urls(self): - self.url = ",".join([b.host_port() for b in self]) - if len(self) > 1: # No failover addresses on a 1 cluster. - for b in self: b.set_brokers_url(self.url) - - def connect(self, i): - """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) - - def kill(self, i, promote_next=True): - """Kill broker i, promote broker i+1""" - self[i].expect = EXPECT_EXIT_FAIL - self[i].kill() - if promote_next: self[(i+1) % len(self)].promote() - - def restart(self, i): - """Start a broker with the same port, name and data directory. It will get - a separate log file: foo.n.log""" - b = self._brokers[i] - self._brokers[i] = HaBroker( - self.test, name=b.name, port=b.port(), brokers_url=self.url, - **self.kwargs) - - def bounce(self, i, promote_next=True): - """Stop and restart a broker in a cluster.""" - self.kill(i, promote_next) - self.restart(i) - - # Behave like a list of brokers. - def __len__(self): return len(self._brokers) - def __getitem__(self,index): return self._brokers[index] - def __iter__(self): return self._brokers.__iter__() - -def wait_address(session, address): - """Wait for an address to become valid.""" - def check(): - try: - session.sender(address) - return True - except NotFound: return False - assert retry(check), "Timed out waiting for address %s"%(address) - -def valid_address(session, address): - """Test if an address is valid""" - try: - session.receiver(address) - return True - except NotFound: return False - class ReplicationTests(BrokerTest): """Correctness tests for HA replication.""" @@ -927,7 +709,7 @@ class LongTests(BrokerTest): if dead is not None: brokers.restart(dead) # Restart backup - brokers[dead].ready(client_properties={"qpid.ha-admin":1}) + brokers[dead].ready() dead = None i += 1 except: @@ -1031,5 +813,5 @@ if __name__ == "__main__": os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) else: - print "Skipping ha_tests, qpid_ha not available" + print "Skipping ha_tests, %s not available"%(qpid_ha) |