summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-18 18:08:09 +0000
committerAlan Conway <aconway@apache.org>2012-06-18 18:08:09 +0000
commit81c4f3d48f66fddfb6b0b74f1f768cd7ee245ef7 (patch)
tree59f0b06393891e199efa667c552eb62152fa9c20
parent97596a0be663c723c69a525b2cd40aa3ea9bfde4 (diff)
downloadqpid-python-81c4f3d48f66fddfb6b0b74f1f768cd7ee245ef7.tar.gz
QPID-3603: Minor cleanup and test improvements in HA code.
- Enabled 10 queue failover test - Minor cleanup in types.h - Rewording, adding comments. - Detect and reject invalid replication values. - Cleaned up some unnecessary #includes git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351434 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h4
-rw-r--r--qpid/cpp/src/qpid/ha/types.cpp13
-rw-r--r--qpid/cpp/src/qpid/ha/types.h22
-rw-r--r--qpid/cpp/src/tests/brokertest.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py14
12 files changed, 55 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index dd30dd08d4..da03a04013 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -23,6 +23,7 @@
#include "ConnectionObserver.h"
#include "HaBroker.h"
#include "Primary.h"
+#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "qpid/amqp_0_10/Codecs.h"
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 05dfb539be..a874559655 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -21,6 +21,7 @@
#include "Backup.h"
#include "HaBroker.h"
#include "Primary.h"
+#include "ReplicationTest.h"
#include "ReplicatingSubscription.h"
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
@@ -72,6 +73,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
QPID_LOG(debug, logPrefix << "Expected backups: none");
}
else {
+ // NOTE: RemoteBackups must be created before we set the ConfigurationObserver
+ // orr ConnectionObserver so that there is no client activity while
+ // the QueueGuards are created.
QPID_LOG(debug, logPrefix << "Expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
bool guard = true; // Create queue guards immediately for expected backups.
@@ -126,6 +130,8 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
}
void Primary::queueCreate(const QueuePtr& q) {
+ // Throw if there is an invalid replication level in the queue settings.
+ haBroker.getReplicationTest().replicateLevel(q->getSettings());
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
i->second->queueCreate(q);
@@ -147,7 +153,7 @@ void Primary::opened(broker::Connection& connection) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
QPID_LOG(debug, logPrefix << "New backup connected: " << info);
- bool guard = false; // Lazy-create queue guards, pre-creating them here could cause deadlock.
+ bool guard = false; // Lazy-create guards for new backups. Creating them here could deadlock.
backups[info.getSystemId()].reset(
new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest(), guard));
}
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 93694af671..b391a5257b 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -51,13 +51,15 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
: queue(q), subscription(0)
{
+ // NOTE: There is no activity on the queue while QueueGuard constructor is
+ // running It is called either from Primary before client connections are
+ // allowed or from ConfigurationObserver::queueCreate before the queue is
+ // visible.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
- // Once we call addObserver we can get calls to enqueued and dequeued
queue.addObserver(observer);
- // Must set after addObserver so we don't miss any enqueues.
firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error
}
@@ -95,6 +97,7 @@ void QueueGuard::cancel() {
Mutex::ScopedLock l(lock);
if (delayed.empty()) return; // No need if no delayed messages.
}
+ // FIXME aconway 2012-06-15: optimize, only messages in delayed set.
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index bcf7be2297..443c5b0315 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -50,8 +50,10 @@ bool RemoteBackup::isReady() {
}
void RemoteBackup::initialQueue(const QueuePtr& q) {
- if (replicationTest.isReplicated(ALL, *q)) initialQueues.insert(q);
- queueCreate(q);
+ if (replicationTest.isReplicated(ALL, *q)) {
+ initialQueues.insert(q);
+ queueCreate(q);
+ }
}
RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
@@ -81,11 +83,12 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
void RemoteBackup::ready(const QueuePtr& q) {
initialQueues.erase(q);
- QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() << " remaining unready: " << QueueSetPrinter(initialQueues));
+ QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
+ << " remaining unready: " << QueueSetPrinter(initialQueues));
if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
}
-// Called via ConfigurationObserver
+// Called via ConfigurationObserver and from initialQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
if (createGuards && replicationTest.isReplicated(ALL, *q))
guards[q].reset(new QueueGuard(*q, brokerInfo));
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 8213d6f5d5..40ede938a4 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,6 +20,7 @@
*/
#include "QueueGuard.h"
+#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Primary.h"
#include "qpid/broker/Queue.h"
@@ -161,7 +162,7 @@ struct QueueRange {
QueueRange(const framing::FieldTable args) {
back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
- front = back+1;
+ front = back+1; // Assume empty
empty = !args.isSet(ReplicatingSubscription::QPID_FRONT);
if (!empty) {
front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
@@ -223,7 +224,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// We can re-use some backup messages if backup and primary queues
// overlap and the backup is not missing messages at the front of the queue.
- // FIXME aconway 2012-06-10: disable re-use of backup queue till stall problem is solved.
+
/* if (!primary.empty && // Primary not empty
!backup.empty && // Backup not empty
primary.front >= backup.front && // Not missing messages at the front
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index c25749c6b0..7e46abf2ae 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -22,7 +22,6 @@
*
*/
-#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY
#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/QueueObserver.h"
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
index 613ac0a8c6..18e0953930 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -28,9 +28,9 @@ namespace ha {
using types::Variant;
ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
- Enum<ReplicateLevel> rl;
- if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get());
- else return replicateDefault;
+ Enum<ReplicateLevel> rl(replicateDefault);
+ if (!str.empty()) rl.parse(str);
+ return rl.get();
}
ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index 50a41ccbc3..9f6976a8e4 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -52,8 +52,8 @@ class ReplicationTest
ReplicateLevel replicateLevel(const framing::FieldTable& f);
ReplicateLevel replicateLevel(const types::Variant::Map& m);
- // Return true if replication for a queue is enabled at level or
- // higher, taking account of all settings.
+ // Return true if replication for a queue is enabled at level or higher,
+ // taking account of default level and queue settings.
bool isReplicated(ReplicateLevel level,
const types::Variant::Map& args, bool autodelete, bool exclusive);
bool isReplicated(ReplicateLevel level,
diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp
index 92acd76fca..53e2056213 100644
--- a/qpid/cpp/src/qpid/ha/types.cpp
+++ b/qpid/cpp/src/qpid/ha/types.cpp
@@ -41,7 +41,7 @@ string EnumBase::str() const {
void EnumBase::parse(const string& s) {
if (!parseNoThrow(s))
- throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << s));
+ throw Exception(QPID_MSG("Invalid " << name << " value: " << s));
}
bool EnumBase::parseNoThrow(const string& s) {
@@ -50,16 +50,15 @@ bool EnumBase::parseNoThrow(const string& s) {
return value < count;
}
-template <> const char* Enum<ReplicateLevel>::NAMES[] = {
- "none", "configuration", "all", "replication"
-};
+template <> const char* Enum<ReplicateLevel>::NAME = "replication";
+template <> const char* Enum<ReplicateLevel>::NAMES[] = { "none", "configuration", "all" };
template <> const size_t Enum<ReplicateLevel>::N = 3;
+template <> const char* Enum<BrokerStatus>::NAME = "HA broker status";
template <> const char* Enum<BrokerStatus>::NAMES[] = {
- "joining", "catchup", "ready", "recovering", "active",
- "standalone", "broker status"
+ "joining", "catchup", "ready", "recovering", "active", "standalone"
};
-template <> const size_t Enum<BrokerStatus>::N = 7;
+template <> const size_t Enum<BrokerStatus>::N = 6;
ostream& operator<<(ostream& o, EnumBase e) {
return o << e.str();
diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h
index e12c039c79..35faf9f624 100644
--- a/qpid/cpp/src/qpid/ha/types.h
+++ b/qpid/cpp/src/qpid/ha/types.h
@@ -39,8 +39,8 @@ namespace ha {
/** Base class for enums with string conversion */
class EnumBase {
public:
- EnumBase(const char* names_[], size_t count_, unsigned value)
- : value(value), names(names_), count(count_) {}
+ EnumBase(const char* name_, const char* names_[], size_t count_, unsigned value)
+ : name(name_), names(names_), count(count_), value(value) {}
/** Convert to string */
std::string str() const;
@@ -50,9 +50,10 @@ class EnumBase {
bool parseNoThrow(const std::string&);
protected:
- unsigned value;
+ const char* name;
const char** names;
size_t count;
+ unsigned value;
};
std::ostream& operator<<(std::ostream&, EnumBase);
@@ -61,12 +62,14 @@ std::istream& operator>>(std::istream&, EnumBase&);
/** Wrapper template for enums with string conversion */
template <class T> class Enum : public EnumBase {
public:
- Enum(T x=T()) : EnumBase(NAMES, N, x) {}
+ Enum(T x=T()) : EnumBase(NAME, NAMES, N, x) {}
T get() const { return T(value); }
void operator=(T x) { value = x; }
+
private:
- static const size_t N;
- static const char* NAMES[];
+ static const size_t N; // Number of enum values.
+ static const char* NAMES[]; // Names of enum values.
+ static const char* NAME; // Descriptive name for the enum type.
};
/** To print an enum x: o << printable(x) */
@@ -94,13 +97,10 @@ inline bool isPrimary(BrokerStatus s) {
inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
+// String constants.
extern const std::string QPID_REPLICATE;
-// FIXME aconway 2012-06-04: rename types.h->types.h
-
-/**
- * Define IdSet type, not a typedef so we can overload operator <<
- */
+/** Define IdSet type, not a typedef so we can overload operator << */
class IdSet : public std::set<types::Uuid> {};
std::ostream& operator<<(std::ostream& o, const IdSet& ids);
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 1dac3d579c..93b3868907 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -76,7 +76,7 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=3, delay=.01):
+def retry(function, timeout=10, delay=.01):
"""Call function until it returns a true value or timeout expires.
Double the delay for each retry. Returns what function returns if
true, None if timeout expires."""
@@ -243,7 +243,7 @@ class Broker(Popen):
_broker_count = 0
_log_count = 0
- def __str__(self): return "Broker<%s %s :%d>"%(self.name, self.pname, self.port())
+ def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port())
def find_log(self):
self.log = "%03d:%s.log" % (Broker._log_count, self.name)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 06b2aec3cd..0f9efdf80a 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -610,11 +610,13 @@ class ReplicationTests(BrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
- def test_invalid_default(self):
- """Verify that a queue with an invalid qpid.replicate gets default treatment"""
- cluster = HaCluster(self, 2, ha_replicate="all")
- c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
- cluster[1].wait_backup("q")
+ def test_invalid_replication(self):
+ """Verify that we reject an attempt to declare a queue with invalid replication value."""
+ cluster = HaCluster(self, 1, ha_replicate="all")
+ try:
+ c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+ self.fail("Expected ConnectionError")
+ except ConnectionError: pass
def test_exclusive_queue(self):
"""Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -720,7 +722,7 @@ class LongTests(BrokerTest):
brokers = HaCluster(self, 3)
# Start sender and receiver threads
- n = 1; # FIXME aconway 2012-06-10: n = 10
+ n = 10;
senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
receivers = [NumberedReceiver(brokers[0], sender=senders[i],