summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Exchange.h1
-rw-r--r--cpp/src/qpid/broker/RecoverableExchange.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp1
-rw-r--r--cpp/src/qpid/ha/AlternateExchangeSetter.h2
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp84
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h8
-rw-r--r--cpp/src/qpid/ha/QueueGuard.cpp6
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp1
-rw-r--r--cpp/src/qpid/ha/ReplicationTest.cpp6
-rw-r--r--cpp/src/qpid/ha/ReplicationTest.h2
-rw-r--r--cpp/src/tests/Makefile.am2
-rw-r--r--cpp/src/tests/brokertest.py4
-rwxr-xr-xcpp/src/tests/ha_store_tests.py130
-rwxr-xr-xcpp/src/tests/ha_test.py258
-rwxr-xr-xcpp/src/tests/ha_tests.py224
15 files changed, 471 insertions, 261 deletions
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 2b2f7db934..26ee5b0054 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -174,6 +174,7 @@ public:
const std::string& getName() const { return name; }
bool isDurable() { return durable; }
qpid::framing::FieldTable& getArgs() { return args; }
+ const qpid::framing::FieldTable& getArgs() const { return args; }
QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h
index ca6cc1541e..f8c08b2989 100644
--- a/cpp/src/qpid/broker/RecoverableExchange.h
+++ b/cpp/src/qpid/broker/RecoverableExchange.h
@@ -43,6 +43,9 @@ public:
virtual void bind(const std::string& queue,
const std::string& routingKey,
qpid::framing::FieldTable& args) = 0;
+
+ virtual std::string getName() const = 0;
+
virtual ~RecoverableExchange() {};
};
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 7deeba5e65..b04d7c34e0 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -82,6 +82,7 @@ public:
RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
void setPersistenceId(uint64_t id);
void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
+ string getName() const { return exchange->getName(); }
};
class RecoverableConfigImpl : public RecoverableConfig
diff --git a/cpp/src/qpid/ha/AlternateExchangeSetter.h b/cpp/src/qpid/ha/AlternateExchangeSetter.h
index 08690e68bc..1878939aad 100644
--- a/cpp/src/qpid/ha/AlternateExchangeSetter.h
+++ b/cpp/src/qpid/ha/AlternateExchangeSetter.h
@@ -43,12 +43,14 @@ class AlternateExchangeSetter
AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {}
+ /** If altEx is already known, call setter(altEx) now else save for later */
void setAlternate(const std::string& altEx, const SetFunction& setter) {
broker::Exchange::shared_ptr ex = exchanges.find(altEx);
if (ex) setter(ex); // Set immediately.
else setters.insert(Setters::value_type(altEx, setter)); // Save for later.
}
+ /** Add an exchange and call any setters that are waiting for it. */
void addExchange(boost::shared_ptr<broker::Exchange> exchange) {
// Update the setters for this exchange
std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName());
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0b6280da6d..2f0d304686 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -41,6 +41,7 @@
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include <boost/bind.hpp>
#include <algorithm>
#include <sstream>
#include <iostream>
@@ -313,11 +314,11 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
- broker.getQueues().destroy(name);
+ broker.deleteQueue(name, userId, remoteHost);
stopQueueReplicator(name);
}
settings.populate(args, settings.storeSettings);
- std::pair<boost::shared_ptr<Queue>, bool> result =
+ CreateQueueResult result =
broker.createQueue(
name,
settings,
@@ -325,7 +326,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[ALTEX].asString(),
userId,
remoteHost);
- assert(result.second); // Should be true since we destroyed existing queue above
+ assert(result.second);
startQueueReplicator(result.first);
}
}
@@ -361,12 +362,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
- broker.getExchanges().destroy(name);
+ broker.deleteExchange(name, userId, remoteHost);
QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
}
- boost::shared_ptr<Exchange> exchange =
- createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
- assert(exchange);
+ CreateExchangeResult result = createExchange(
+ name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
+ values[ALTEX].asString());
+ replicatedExchanges.insert(name);
+ assert(result.second);
}
}
@@ -380,6 +383,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
broker.deleteExchange(name, userId, remoteHost);
+ replicatedExchanges.erase(name);
}
}
@@ -399,7 +403,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->bind(queue, key, &args);
+ queue->bind(exchange, key, args);
}
}
@@ -454,14 +458,20 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
return;
string name(values[NAME].asString());
QPID_LOG(debug, logPrefix << "Queue response: " << name);
+ if (broker.getQueues().find(name)) { // Already exists
+ if (findQueueReplicator(name))
+ return; // Already replicated
+ else {
+ QPID_LOG(debug, logPrefix << "Deleting queue to make way for replica: " << name);
+ broker.deleteQueue(name, userId, remoteHost);
+ }
+ }
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- boost::shared_ptr<Queue> queue =
+ CreateQueueResult result =
createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the queue to already exist if we are failing over.
- if (queue) startQueueReplicator(queue);
- else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+ if (result.second) startQueueReplicator(result.first);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -469,13 +479,22 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
if (!replicationTest.replicateLevel(argsMap)) return;
string name = values[NAME].asString();
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
+ if (broker.getExchanges().find(name)) {
+ if (replicatedExchanges.find(name) != replicatedExchanges.end())
+ return; // Already replicated
+ else {
+ QPID_LOG(debug, logPrefix << "Deleting exchange to make way for replica: "
+ << name);
+ broker.deleteExchange(name, userId, remoteHost);
+ }
+ }
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- boost::shared_ptr<Exchange> exchange = createExchange(
+ CreateExchangeResult result = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the exchange to already exist if we are failing over.
- QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
+ assert(result.second);
+ replicatedExchanges.insert(name);
}
namespace {
@@ -563,7 +582,7 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) {
}
}
-boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
const std::string& name,
bool durable,
bool autodelete,
@@ -572,7 +591,7 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue(
{
QueueSettings settings(durable, autodelete);
settings.populate(arguments, settings.storeSettings);
- std::pair<boost::shared_ptr<Queue>, bool> result =
+ CreateQueueResult result =
broker.createQueue(
name,
settings,
@@ -580,24 +599,22 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue(
string(), // Set alternate exchange below
userId,
remoteHost);
- if (result.second) {
- if (!alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
- }
- return result.first;
+
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
}
- else return boost::shared_ptr<Queue>();
+ return result;
}
-boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
const std::string& name,
const std::string& type,
bool durable,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange)
{
- std::pair<boost::shared_ptr<Exchange>, bool> result =
+ CreateExchangeResult result =
broker.createExchange(
name,
type,
@@ -606,15 +623,13 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
args,
userId,
remoteHost);
- if (result.second) {
- alternates.addExchange(result.first);
- if (!alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
- }
- return result.first;
+
+ alternates.addExchange(result.first);
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
}
- else return boost::shared_ptr<Exchange>();
+ return result;
}
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
@@ -623,4 +638,5 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+
}} // namespace broker
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index 69653b876a..5c8a983d45 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -78,6 +78,8 @@ class BrokerReplicator : public broker::Exchange,
private:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+ typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
+ typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
@@ -98,14 +100,14 @@ class BrokerReplicator : public broker::Exchange,
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
void stopQueueReplicator(const std::string& name);
- boost::shared_ptr<broker::Queue> createQueue(
+ CreateQueueResult createQueue(
const std::string& name,
bool durable,
bool autodelete,
const qpid::framing::FieldTable& arguments,
const std::string& alternateExchange);
- boost::shared_ptr<broker::Exchange> createExchange(
+ CreateExchangeResult createExchange(
const std::string& name,
const std::string& type,
bool durable,
@@ -121,6 +123,8 @@ class BrokerReplicator : public broker::Exchange,
bool initialized;
AlternateExchangeSetter alternates;
qpid::Address primary;
+ typedef std::set<std::string> StringSet;
+ StringSet replicatedExchanges; // exchanges that have been replicated.
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp
index b0ef167176..8852554d31 100644
--- a/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/cpp/src/qpid/ha/QueueGuard.cpp
@@ -61,7 +61,10 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
range = QueueRange(q);
}
-QueueGuard::~QueueGuard() { cancel(); }
+QueueGuard::~QueueGuard() {
+ QPID_LOG(debug, logPrefix << "Cancelled");
+ cancel();
+}
// NOTE: Called with message lock held.
void QueueGuard::enqueued(const Message& m) {
@@ -97,7 +100,6 @@ void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
}
void QueueGuard::cancel() {
- QPID_LOG(debug, logPrefix << "Cancelled");
queue.removeObserver(observer);
Delayed removed;
{
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 82daad9d9c..07c94ad261 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -67,6 +67,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
logPrefix("Backup queue "+q->getName()+": "),
queue(q), link(l), brokerInfo(hb.getBrokerInfo())
{
+ args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
}
diff --git a/cpp/src/qpid/ha/ReplicationTest.cpp b/cpp/src/qpid/ha/ReplicationTest.cpp
index 88a969dbfd..1dd32262a0 100644
--- a/cpp/src/qpid/ha/ReplicationTest.cpp
+++ b/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -20,6 +20,7 @@
*/
#include "ReplicationTest.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
namespace qpid {
@@ -71,5 +72,10 @@ bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
}
+bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex)
+{
+ return replicateLevel(ex.getArgs()) >= level;
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicationTest.h b/cpp/src/qpid/ha/ReplicationTest.h
index 9f6976a8e4..ab6b1a6bcc 100644
--- a/cpp/src/qpid/ha/ReplicationTest.h
+++ b/cpp/src/qpid/ha/ReplicationTest.h
@@ -30,6 +30,7 @@ namespace qpid {
namespace broker {
class Queue;
+class Exchange;
}
namespace framing {
@@ -59,6 +60,7 @@ class ReplicationTest
bool isReplicated(ReplicateLevel level,
const framing::FieldTable& args, bool autodelete, bool exclusive);
bool isReplicated(ReplicateLevel level, const broker::Queue&);
+ bool isReplicated(ReplicateLevel level, const broker::Exchange&);
private:
ReplicateLevel replicateDefault;
};
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 2a66aefc03..fb47aea875 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -348,7 +348,9 @@ EXTRA_DIST += \
run_msg_group_tests \
ipv6_test \
run_ha_tests \
+ ha_test.py \
ha_tests.py \
+ ha_store_tests.py \
test_env.ps1.in
check_LTLIBRARIES += libdlclose_noop.la
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index dd09e8aa27..0ab0d13424 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -425,8 +425,8 @@ class Cluster:
self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
return self._brokers[-1]
- def ready(self):
- for b in self: b.ready()
+ def ready(self, timeout=30, **kwargs):
+ for b in self: b.ready(**kwargs)
def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
diff --git a/cpp/src/tests/ha_store_tests.py b/cpp/src/tests/ha_store_tests.py
new file mode 100755
index 0000000000..d1eaca1b87
--- /dev/null
+++ b/cpp/src/tests/ha_store_tests.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module contains tests for HA functionality that requires a store.
+It is not included as part of "make check" since it will not function
+without a store. Currently it can be run from a build of the message
+store.
+"""
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.datatypes import uuid4
+from brokertest import *
+from ha_test import *
+from threading import Thread, Lock, Condition
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
+from qpidtoollibs import BrokerAgent
+from uuid import UUID
+
+
+class StoreTests(BrokerTest):
+ """Test for HA with persistence."""
+
+ def test_store_recovery(self):
+ """Verify basic store and recover functionality"""
+ cluster = HaCluster(self, 2)
+ sn = cluster[0].connect().session()
+ s = sn.sender("qq;{create:always,node:{durable:true}}")
+ sk = sn.sender("xx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:xx,key:k,queue:qq}]}}")
+ s.send(Message("foo", durable=True))
+ s.send(Message("bar", durable=True))
+ sk.send(Message("baz", durable=True))
+ r = cluster[0].connect().session().receiver("qq")
+ self.assertEqual(r.fetch().content, "foo")
+ r.session.acknowledge()
+ # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush
+ # the dequeue operation on qq.
+ s.send(Message("flush", durable=True))
+
+ def verify(broker, x_count):
+ sn = broker.connect().session()
+ assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"])
+ sn.sender("xx/k").send(Message("x", durable=True))
+ assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"])
+
+ verify(cluster[0], 0)
+ cluster.bounce(0, promote_next=False)
+ cluster[0].promote()
+ cluster[0].wait_status("active")
+ verify(cluster[0], 1)
+ cluster.kill(0, promote_next=False)
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ verify(cluster[1], 2)
+ cluster.bounce(1, promote_next=False)
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ verify(cluster[1], 3)
+
+ def test_catchup_store(self):
+ """Verify that a backup erases queue data from store recovery before
+ doing catch-up from the primary."""
+ cluster = HaCluster(self, 2)
+ sn = cluster[0].connect().session()
+ s1 = sn.sender("q1;{create:always,node:{durable:true}}")
+ for m in ["foo","bar"]: s1.send(Message(m, durable=True))
+ s2 = sn.sender("q2;{create:always,node:{durable:true}}")
+ sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}")
+ sk2.send(Message("hello", durable=True))
+ # Wait for backup to catch up.
+ cluster[1].assert_browse_backup("q1", ["foo","bar"])
+ cluster[1].assert_browse_backup("q2", ["hello"])
+
+ # Make changes that the backup doesn't see
+ cluster.kill(1, promote_next=False)
+ time.sleep(1) # FIXME aconway 2012-09-25:
+ r1 = cluster[0].connect().session().receiver("q1")
+ for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
+ r1.session.acknowledge()
+ for m in ["x","y","z"]: s1.send(Message(m, durable=True))
+ # Use old connection to unbind
+ us = cluster[0].connect_old().session(str(uuid4()))
+ us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2")
+ us.exchange_bind(exchange="ex", binding_key="k1", queue="q1")
+ # Restart both brokers from store to get inconsistent sequence numbering.
+ cluster.bounce(0, promote_next=False)
+ cluster[0].promote()
+ cluster[0].wait_status("active")
+ cluster.restart(1)
+ cluster[1].wait_status("ready")
+
+ # Verify state
+ cluster[0].assert_browse("q1", ["x","y","z"])
+ cluster[1].assert_browse_backup("q1", ["x","y","z"])
+ sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over!
+ sn.sender("ex/k1").send("boo")
+ cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
+ cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
+ sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped.
+ sn.sender("q2").send("end") # mark the end of the queue for assert_browse
+ cluster[0].assert_browse("q2", ["hello", "end"])
+ cluster[1].assert_browse_backup("q2", ["hello", "end"])
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ qpid_ha = os.getenv("QPID_HA_EXEC")
+ if qpid_ha and os.path.exists(qpid_ha):
+ os.execvp("qpid-python-test",
+ ["qpid-python-test", "-m", "ha_store_tests"] + sys.argv[1:])
+ else:
+ print "Skipping ha_store_tests, %s not available"%(qpid_ha)
diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py
new file mode 100755
index 0000000000..18a969a07b
--- /dev/null
+++ b/cpp/src/tests/ha_test.py
@@ -0,0 +1,258 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.datatypes import uuid4
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
+from qpidtoollibs import BrokerAgent
+from uuid import UUID
+
+log = getLogger(__name__)
+
+class QmfAgent(object):
+ """Access to a QMF broker agent."""
+ def __init__(self, address, **kwargs):
+ self._connection = Connection.establish(
+ address, client_properties={"qpid.ha-admin":1}, **kwargs)
+ self._agent = BrokerAgent(self._connection)
+
+ def __getattr__(self, name):
+ a = getattr(self._agent, name)
+ return a
+
+class Credentials(object):
+ """SASL credentials: username, password, and mechanism"""
+ def __init__(self, username, password, mechanism):
+ (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+ def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+ def tuple(self): return (self.username, self.password, self.mechanism)
+
+ def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
+class HaBroker(Broker):
+ """Start a broker with HA enabled
+ @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+ """
+ def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
+ client_credentials=None, **kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ args = copy(args)
+ args += ["--load-module", BrokerTest.ha_lib,
+ "--log-enable=debug+:ha::",
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster]
+ if ha_replicate is not None:
+ args += [ "--ha-replicate=%s"%ha_replicate ]
+ if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
+ Broker.__init__(self, test, args, **kwargs)
+ self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+ assert os.path.exists(self.qpid_ha_path)
+ self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+ assert os.path.exists(self.qpid_config_path)
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ self.qpid_ha_script=import_script(self.qpid_ha_path)
+ self._agent = None
+ self.client_credentials = client_credentials
+
+ def __str__(self): return Broker.__str__(self)
+
+ def qpid_ha(self, args):
+ cred = self.client_credentials
+ url = self.host_port()
+ if cred:
+ url =cred.add_user(url)
+ args = args + ["--sasl-mechanism", cred.mechanism]
+ self.qpid_ha_script.main_except(["", "-b", url]+args)
+
+ def promote(self): self.ready(); self.qpid_ha(["promote"])
+ def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+ def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+ def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+
+ def agent(self):
+ if not self._agent:
+ cred = self.client_credentials
+ if cred:
+ self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+ else:
+ self._agent = QmfAgent(self.host_port())
+ return self._agent
+
+ def ha_status(self):
+ hb = self.agent().getHaBroker()
+ hb.update()
+ return hb.status
+
+ def wait_status(self, status):
+ def try_get_status():
+ self._status = "<unknown>"
+ # Ignore ConnectionError, the broker may not be up yet.
+ try:
+ self._status = self.ha_status()
+ return self._status == status;
+ except ConnectionError: return False
+ assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
+ self, status, self._status)
+
+ # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ def qpid_config(self, args):
+ assert subprocess.call(
+ [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
+
+ def config_replicate(self, from_broker, queue):
+ self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
+
+ def config_declare(self, queue, replication):
+ self.qpid_config(["add", "queue", queue, "--replicate", replication])
+
+ def connect_admin(self, **kwargs):
+ cred = self.client_credentials
+ if cred:
+ return Broker.connect(
+ self, client_properties={"qpid.ha-admin":1},
+ username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+ **kwargs)
+ else:
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+
+ def wait_backup(self, address):
+ """Wait for address to become valid on a backup broker."""
+ bs = self.connect_admin().session()
+ try: wait_address(bs, address)
+ finally: bs.connection.close()
+
+ def assert_browse(self, queue, expected, **kwargs):
+ """Verify queue contents by browsing."""
+ bs = self.connect().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
+ def assert_browse_backup(self, queue, expected, **kwargs):
+ """Combines wait_backup and assert_browse_retry."""
+ bs = self.connect_admin().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
+ def assert_connect_fail(self):
+ try:
+ self.connect()
+ self.test.fail("Expected ConnectionError")
+ except ConnectionError: pass
+
+ def try_connect(self):
+ try: return self.connect()
+ except ConnectionError: return None
+
+ def ready(self):
+ return Broker.ready(self, client_properties={"qpid.ha-admin":1})
+
+
+class HaCluster(object):
+ _cluster_count = 0
+
+ def __init__(self, test, n, promote=True, **kwargs):
+ """Start a cluster of n brokers"""
+ self.test = test
+ self.kwargs = kwargs
+ self._brokers = []
+ self.id = HaCluster._cluster_count
+ self.broker_id = 0
+ HaCluster._cluster_count += 1
+ for i in xrange(n): self.start(False)
+ self.update_urls()
+ self[0].promote()
+
+ def next_name(self):
+ name="cluster%s-%s"%(self.id, self.broker_id)
+ self.broker_id += 1
+ return name
+
+ def start(self, update_urls=True, args=[]):
+ """Start a new broker in the cluster"""
+ b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ self._brokers.append(b)
+ if update_urls: self.update_urls()
+ return b
+
+ def update_urls(self):
+ self.url = ",".join([b.host_port() for b in self])
+ if len(self) > 1: # No failover addresses on a 1 cluster.
+ for b in self: b.set_brokers_url(self.url)
+
+ def connect(self, i):
+ """Connect with reconnect_urls"""
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+
+ def kill(self, i, promote_next=True):
+ """Kill broker i, promote broker i+1"""
+ self[i].expect = EXPECT_EXIT_FAIL
+ self[i].kill()
+ if promote_next: self[(i+1) % len(self)].promote()
+
+ def restart(self, i):
+ """Start a broker with the same port, name and data directory. It will get
+ a separate log file: foo.n.log"""
+ b = self._brokers[i]
+ self._brokers[i] = HaBroker(
+ self.test, name=b.name, port=b.port(), brokers_url=self.url,
+ **self.kwargs)
+
+ def bounce(self, i, promote_next=True):
+ """Stop and restart a broker in a cluster."""
+ if (len(self) == 1):
+ self.kill(i, promote_next=False)
+ self.restart(i)
+ self[i].ready()
+ if promote_next: self[i].promote()
+ else:
+ self.kill(i, promote_next)
+ self.restart(i)
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+def wait_address(session, address):
+ """Wait for an address to become valid."""
+ def check():
+ try:
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for address %s"%(address)
+
+def valid_address(session, address):
+ """Test if an address is valid"""
+ try:
+ session.receiver(address)
+ return True
+ except NotFound: return False
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 31142de293..de87c49d21 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -23,230 +23,12 @@ import traceback
from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4
from brokertest import *
+from ha_test import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
from uuid import UUID
-log = getLogger(__name__)
-
-class QmfAgent(object):
- """Access to a QMF broker agent."""
- def __init__(self, address, **kwargs):
- self._connection = Connection.establish(
- address, client_properties={"qpid.ha-admin":1}, **kwargs)
- self._agent = BrokerAgent(self._connection)
-
- def __getattr__(self, name):
- a = getattr(self._agent, name)
- return a
-
-class Credentials(object):
- """SASL credentials: username, password, and mechanism"""
- def __init__(self, username, password, mechanism):
- (self.username, self.password, self.mechanism) = (username, password, mechanism)
-
- def __str__(self): return "Credentials%s"%(self.tuple(),)
-
- def tuple(self): return (self.username, self.password, self.mechanism)
-
- def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
-
-class HaBroker(Broker):
- """Start a broker with HA enabled
- @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
- """
- def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
- client_credentials=None, **kwargs):
- assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- args = copy(args)
- args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::",
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=%s"%ha_cluster]
- if ha_replicate is not None:
- args += [ "--ha-replicate=%s"%ha_replicate ]
- if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
- Broker.__init__(self, test, args, **kwargs)
- self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
- assert os.path.exists(self.qpid_ha_path)
- self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
- assert os.path.exists(self.qpid_config_path)
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- self.qpid_ha_script=import_script(self.qpid_ha_path)
- self._agent = None
- self.client_credentials = client_credentials
-
- def __str__(self): return Broker.__str__(self)
-
- def qpid_ha(self, args):
- cred = self.client_credentials
- url = self.host_port()
- if cred:
- url =cred.add_user(url)
- args = args + ["--sasl-mechanism", cred.mechanism]
- self.qpid_ha_script.main_except(["", "-b", url]+args)
-
- def promote(self): self.qpid_ha(["promote"])
- def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
- def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
- def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
- def agent(self):
- if not self._agent:
- cred = self.client_credentials
- if cred:
- self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
- else:
- self._agent = QmfAgent(self.host_port())
- return self._agent
-
- def ha_status(self):
- hb = self.agent().getHaBroker()
- hb.update()
- return hb.status
-
- def wait_status(self, status):
- def try_get_status():
- self._status = self.ha_status()
- # Ignore ConnectionError, the broker may not be up yet.
- try:
- self._status = self.ha_status()
- return self._status == status;
- except ConnectionError: return False
- assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
- self, status, self._status)
-
- # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
- def qpid_config(self, args):
- assert subprocess.call(
- [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
-
- def config_replicate(self, from_broker, queue):
- self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
-
- def config_declare(self, queue, replication):
- self.qpid_config(["add", "queue", queue, "--replicate", replication])
-
- def connect_admin(self, **kwargs):
- cred = self.client_credentials
- if cred:
- return Broker.connect(
- self, client_properties={"qpid.ha-admin":1},
- username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
- **kwargs)
- else:
- return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
-
- def wait_backup(self, address):
- """Wait for address to become valid on a backup broker."""
- bs = self.connect_admin().session()
- try: wait_address(bs, address)
- finally: bs.connection.close()
-
- def assert_browse(self, queue, expected, **kwargs):
- """Verify queue contents by browsing."""
- bs = self.connect().session()
- try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
-
- def assert_browse_backup(self, queue, expected, **kwargs):
- """Combines wait_backup and assert_browse_retry."""
- bs = self.connect_admin().session()
- try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
-
- def assert_connect_fail(self):
- try:
- self.connect()
- self.test.fail("Expected ConnectionError")
- except ConnectionError: pass
-
- def try_connect(self):
- try: return self.connect()
- except ConnectionError: return None
-
-class HaCluster(object):
- _cluster_count = 0
-
- def __init__(self, test, n, promote=True, **kwargs):
- """Start a cluster of n brokers"""
- self.test = test
- self.kwargs = kwargs
- self._brokers = []
- self.id = HaCluster._cluster_count
- self.broker_id = 0
- HaCluster._cluster_count += 1
- for i in xrange(n): self.start(False)
- self.update_urls()
- self[0].promote()
-
- def next_name(self):
- name="cluster%s-%s"%(self.id, self.broker_id)
- self.broker_id += 1
- return name
-
- def start(self, update_urls=True, args=[]):
- """Start a new broker in the cluster"""
- b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
- self._brokers.append(b)
- if update_urls: self.update_urls()
- return b
-
- def update_urls(self):
- self.url = ",".join([b.host_port() for b in self])
- if len(self) > 1: # No failover addresses on a 1 cluster.
- for b in self: b.set_brokers_url(self.url)
-
- def connect(self, i):
- """Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-
- def kill(self, i, promote_next=True):
- """Kill broker i, promote broker i+1"""
- self[i].expect = EXPECT_EXIT_FAIL
- self[i].kill()
- if promote_next: self[(i+1) % len(self)].promote()
-
- def restart(self, i):
- """Start a broker with the same port, name and data directory. It will get
- a separate log file: foo.n.log"""
- b = self._brokers[i]
- self._brokers[i] = HaBroker(
- self.test, name=b.name, port=b.port(), brokers_url=self.url,
- **self.kwargs)
-
- def bounce(self, i, promote_next=True):
- """Stop and restart a broker in a cluster."""
- self.kill(i, promote_next)
- self.restart(i)
-
- # Behave like a list of brokers.
- def __len__(self): return len(self._brokers)
- def __getitem__(self,index): return self._brokers[index]
- def __iter__(self): return self._brokers.__iter__()
-
-def wait_address(session, address):
- """Wait for an address to become valid."""
- def check():
- try:
- session.sender(address)
- return True
- except NotFound: return False
- assert retry(check), "Timed out waiting for address %s"%(address)
-
-def valid_address(session, address):
- """Test if an address is valid"""
- try:
- session.receiver(address)
- return True
- except NotFound: return False
-
class ReplicationTests(BrokerTest):
"""Correctness tests for HA replication."""
@@ -927,7 +709,7 @@ class LongTests(BrokerTest):
if dead is not None:
brokers.restart(dead) # Restart backup
- brokers[dead].ready(client_properties={"qpid.ha-admin":1})
+ brokers[dead].ready()
dead = None
i += 1
except:
@@ -1031,5 +813,5 @@ if __name__ == "__main__":
os.execvp("qpid-python-test",
["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
else:
- print "Skipping ha_tests, qpid_ha not available"
+ print "Skipping ha_tests, %s not available"%(qpid_ha)