diff options
author | Alan Conway <aconway@apache.org> | 2012-06-18 18:08:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-06-18 18:08:09 +0000 |
commit | 81c4f3d48f66fddfb6b0b74f1f768cd7ee245ef7 (patch) | |
tree | 59f0b06393891e199efa667c552eb62152fa9c20 | |
parent | 97596a0be663c723c69a525b2cd40aa3ea9bfde4 (diff) | |
download | qpid-python-81c4f3d48f66fddfb6b0b74f1f768cd7ee245ef7.tar.gz |
QPID-3603: Minor cleanup and test improvements in HA code.
- Enabled 10 queue failover test
- Minor cleanup in types.h
- Rewording, adding comments.
- Detect and reject invalid replication values.
- Cleaned up some unnecessary #includes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351434 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/types.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/types.h | 22 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 14 |
12 files changed, 55 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index dd30dd08d4..da03a04013 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -23,6 +23,7 @@ #include "ConnectionObserver.h" #include "HaBroker.h" #include "Primary.h" +#include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Settings.h" #include "qpid/amqp_0_10/Codecs.h" diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 05dfb539be..a874559655 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -21,6 +21,7 @@ #include "Backup.h" #include "HaBroker.h" #include "Primary.h" +#include "ReplicationTest.h" #include "ReplicatingSubscription.h" #include "RemoteBackup.h" #include "ConnectionObserver.h" @@ -72,6 +73,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : QPID_LOG(debug, logPrefix << "Expected backups: none"); } else { + // NOTE: RemoteBackups must be created before we set the ConfigurationObserver + // orr ConnectionObserver so that there is no client activity while + // the QueueGuards are created. QPID_LOG(debug, logPrefix << "Expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { bool guard = true; // Create queue guards immediately for expected backups. @@ -126,6 +130,8 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { } void Primary::queueCreate(const QueuePtr& q) { + // Throw if there is an invalid replication level in the queue settings. + haBroker.getReplicationTest().replicateLevel(q->getSettings()); Mutex::ScopedLock l(lock); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { i->second->queueCreate(q); @@ -147,7 +153,7 @@ void Primary::opened(broker::Connection& connection) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { QPID_LOG(debug, logPrefix << "New backup connected: " << info); - bool guard = false; // Lazy-create queue guards, pre-creating them here could cause deadlock. + bool guard = false; // Lazy-create guards for new backups. Creating them here could deadlock. backups[info.getSystemId()].reset( new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest(), guard)); } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 93694af671..b391a5257b 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -51,13 +51,15 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : queue(q), subscription(0) { + // NOTE: There is no activity on the queue while QueueGuard constructor is + // running It is called either from Primary before client connections are + // allowed or from ConfigurationObserver::queueCreate before the queue is + // visible. std::ostringstream os; os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); - // Once we call addObserver we can get calls to enqueued and dequeued queue.addObserver(observer); - // Must set after addObserver so we don't miss any enqueues. firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error } @@ -95,6 +97,7 @@ void QueueGuard::cancel() { Mutex::ScopedLock l(lock); if (delayed.empty()) return; // No need if no delayed messages. } + // FIXME aconway 2012-06-15: optimize, only messages in delayed set. queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index bcf7be2297..443c5b0315 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -50,8 +50,10 @@ bool RemoteBackup::isReady() { } void RemoteBackup::initialQueue(const QueuePtr& q) { - if (replicationTest.isReplicated(ALL, *q)) initialQueues.insert(q); - queueCreate(q); + if (replicationTest.isReplicated(ALL, *q)) { + initialQueues.insert(q); + queueCreate(q); + } } RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) { @@ -81,11 +83,12 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { void RemoteBackup::ready(const QueuePtr& q) { initialQueues.erase(q); - QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() << " remaining unready: " << QueueSetPrinter(initialQueues)); + QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() + << " remaining unready: " << QueueSetPrinter(initialQueues)); if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); } -// Called via ConfigurationObserver +// Called via ConfigurationObserver and from initialQueue void RemoteBackup::queueCreate(const QueuePtr& q) { if (createGuards && replicationTest.isReplicated(ALL, *q)) guards[q].reset(new QueueGuard(*q, brokerInfo)); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 8213d6f5d5..40ede938a4 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,6 +20,7 @@ */ #include "QueueGuard.h" +#include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Primary.h" #include "qpid/broker/Queue.h" @@ -161,7 +162,7 @@ struct QueueRange { QueueRange(const framing::FieldTable args) { back = args.getAsInt(ReplicatingSubscription::QPID_BACK); - front = back+1; + front = back+1; // Assume empty empty = !args.isSet(ReplicatingSubscription::QPID_FRONT); if (!empty) { front = args.getAsInt(ReplicatingSubscription::QPID_FRONT); @@ -223,7 +224,7 @@ ReplicatingSubscription::ReplicatingSubscription( // We can re-use some backup messages if backup and primary queues // overlap and the backup is not missing messages at the front of the queue. - // FIXME aconway 2012-06-10: disable re-use of backup queue till stall problem is solved. + /* if (!primary.empty && // Primary not empty !backup.empty && // Backup not empty primary.front >= backup.front && // Not missing messages at the front diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index c25749c6b0..7e46abf2ae 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -22,7 +22,6 @@ * */ -#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY #include "BrokerInfo.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/QueueObserver.h" diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp index 613ac0a8c6..18e0953930 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp @@ -28,9 +28,9 @@ namespace ha { using types::Variant; ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) { - Enum<ReplicateLevel> rl; - if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get()); - else return replicateDefault; + Enum<ReplicateLevel> rl(replicateDefault); + if (!str.empty()) rl.parse(str); + return rl.get(); } ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) { diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index 50a41ccbc3..9f6976a8e4 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -52,8 +52,8 @@ class ReplicationTest ReplicateLevel replicateLevel(const framing::FieldTable& f); ReplicateLevel replicateLevel(const types::Variant::Map& m); - // Return true if replication for a queue is enabled at level or - // higher, taking account of all settings. + // Return true if replication for a queue is enabled at level or higher, + // taking account of default level and queue settings. bool isReplicated(ReplicateLevel level, const types::Variant::Map& args, bool autodelete, bool exclusive); bool isReplicated(ReplicateLevel level, diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp index 92acd76fca..53e2056213 100644 --- a/qpid/cpp/src/qpid/ha/types.cpp +++ b/qpid/cpp/src/qpid/ha/types.cpp @@ -41,7 +41,7 @@ string EnumBase::str() const { void EnumBase::parse(const string& s) { if (!parseNoThrow(s)) - throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << s)); + throw Exception(QPID_MSG("Invalid " << name << " value: " << s)); } bool EnumBase::parseNoThrow(const string& s) { @@ -50,16 +50,15 @@ bool EnumBase::parseNoThrow(const string& s) { return value < count; } -template <> const char* Enum<ReplicateLevel>::NAMES[] = { - "none", "configuration", "all", "replication" -}; +template <> const char* Enum<ReplicateLevel>::NAME = "replication"; +template <> const char* Enum<ReplicateLevel>::NAMES[] = { "none", "configuration", "all" }; template <> const size_t Enum<ReplicateLevel>::N = 3; +template <> const char* Enum<BrokerStatus>::NAME = "HA broker status"; template <> const char* Enum<BrokerStatus>::NAMES[] = { - "joining", "catchup", "ready", "recovering", "active", - "standalone", "broker status" + "joining", "catchup", "ready", "recovering", "active", "standalone" }; -template <> const size_t Enum<BrokerStatus>::N = 7; +template <> const size_t Enum<BrokerStatus>::N = 6; ostream& operator<<(ostream& o, EnumBase e) { return o << e.str(); diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h index e12c039c79..35faf9f624 100644 --- a/qpid/cpp/src/qpid/ha/types.h +++ b/qpid/cpp/src/qpid/ha/types.h @@ -39,8 +39,8 @@ namespace ha { /** Base class for enums with string conversion */ class EnumBase { public: - EnumBase(const char* names_[], size_t count_, unsigned value) - : value(value), names(names_), count(count_) {} + EnumBase(const char* name_, const char* names_[], size_t count_, unsigned value) + : name(name_), names(names_), count(count_), value(value) {} /** Convert to string */ std::string str() const; @@ -50,9 +50,10 @@ class EnumBase { bool parseNoThrow(const std::string&); protected: - unsigned value; + const char* name; const char** names; size_t count; + unsigned value; }; std::ostream& operator<<(std::ostream&, EnumBase); @@ -61,12 +62,14 @@ std::istream& operator>>(std::istream&, EnumBase&); /** Wrapper template for enums with string conversion */ template <class T> class Enum : public EnumBase { public: - Enum(T x=T()) : EnumBase(NAMES, N, x) {} + Enum(T x=T()) : EnumBase(NAME, NAMES, N, x) {} T get() const { return T(value); } void operator=(T x) { value = x; } + private: - static const size_t N; - static const char* NAMES[]; + static const size_t N; // Number of enum values. + static const char* NAMES[]; // Names of enum values. + static const char* NAME; // Descriptive name for the enum type. }; /** To print an enum x: o << printable(x) */ @@ -94,13 +97,10 @@ inline bool isPrimary(BrokerStatus s) { inline bool isBackup(BrokerStatus s) { return !isPrimary(s); } +// String constants. extern const std::string QPID_REPLICATE; -// FIXME aconway 2012-06-04: rename types.h->types.h - -/** - * Define IdSet type, not a typedef so we can overload operator << - */ +/** Define IdSet type, not a typedef so we can overload operator << */ class IdSet : public std::set<types::Uuid> {}; std::ostream& operator<<(std::ostream& o, const IdSet& ids); diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 1dac3d579c..93b3868907 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -76,7 +76,7 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=3, delay=.01): +def retry(function, timeout=10, delay=.01): """Call function until it returns a true value or timeout expires. Double the delay for each retry. Returns what function returns if true, None if timeout expires.""" @@ -243,7 +243,7 @@ class Broker(Popen): _broker_count = 0 _log_count = 0 - def __str__(self): return "Broker<%s %s :%d>"%(self.name, self.pname, self.port()) + def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port()) def find_log(self): self.log = "%03d:%s.log" % (Broker._log_count, self.name) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 06b2aec3cd..0f9efdf80a 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -610,11 +610,13 @@ class ReplicationTests(BrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass - def test_invalid_default(self): - """Verify that a queue with an invalid qpid.replicate gets default treatment""" - cluster = HaCluster(self, 2, ha_replicate="all") - c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") - cluster[1].wait_backup("q") + def test_invalid_replication(self): + """Verify that we reject an attempt to declare a queue with invalid replication value.""" + cluster = HaCluster(self, 1, ha_replicate="all") + try: + c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") + self.fail("Expected ConnectionError") + except ConnectionError: pass def test_exclusive_queue(self): """Ensure that we can back-up exclusive queues, i.e. the replicating @@ -720,7 +722,7 @@ class LongTests(BrokerTest): brokers = HaCluster(self, 3) # Start sender and receiver threads - n = 1; # FIXME aconway 2012-06-10: n = 10 + n = 10; senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False, queue="test%s"%(i)) for i in xrange(n)] receivers = [NumberedReceiver(brokers[0], sender=senders[i], |