diff options
author | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
commit | 8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4 (patch) | |
tree | 7961e97040227d8253bb8cdc6bf7de11ec92d8de | |
parent | 866bf57249e41266cd713a5f73e3d18d216fad31 (diff) | |
download | qpid-python-8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4.tar.gz |
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction:
- A broker leaving aborts the transaction.
- A broker joining does not participate in the transaction
- but does receive the results of the TX via normal replication.
Clean up tx-queues when the transaction completes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1510678 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Event.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Event.h | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 38 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/types.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/types.h | 9 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 9 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 38 |
18 files changed, 165 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 47860b433f..684f408c4b 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -793,7 +793,7 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { // messages. Any reroutes will be done at the primary and // replicated as normal. if (purge) queue->purge(0, boost::shared_ptr<Exchange>()); - broker.deleteQueue(name, userId, remoteHost); + haBroker.deleteQueue(name, remoteHost); QPID_LOG(debug, logPrefix << "Queue deleted: " << name); } } diff --git a/qpid/cpp/src/qpid/ha/Event.cpp b/qpid/cpp/src/qpid/ha/Event.cpp index 25e7947267..8265a6edd3 100644 --- a/qpid/cpp/src/qpid/ha/Event.cpp +++ b/qpid/cpp/src/qpid/ha/Event.cpp @@ -44,13 +44,14 @@ bool isEventKey(const std::string& key) { const string DequeueEvent::KEY(QPID_HA+"de"); const string IdEvent::KEY(QPID_HA+"id"); -const string TxEnqueueEvent::KEY(QPID_HA+"txen"); -const string TxDequeueEvent::KEY(QPID_HA+"txde"); -const string TxPrepareEvent::KEY(QPID_HA+"txpr"); -const string TxCommitEvent::KEY(QPID_HA+"txcm"); +const string TxEnqueueEvent::KEY(QPID_HA+"txenq"); +const string TxDequeueEvent::KEY(QPID_HA+"txdeq"); +const string TxPrepareEvent::KEY(QPID_HA+"txpre"); +const string TxCommitEvent::KEY(QPID_HA+"txcom"); const string TxRollbackEvent::KEY(QPID_HA+"txrb"); const string TxPrepareOkEvent::KEY(QPID_HA+"txok"); const string TxPrepareFailEvent::KEY(QPID_HA+"txno"); +const string TxMembersEvent::KEY(QPID_HA+"txmem"); broker::Message makeMessage( const string& data, const string& destination, const string& routingKey) diff --git a/qpid/cpp/src/qpid/ha/Event.h b/qpid/cpp/src/qpid/ha/Event.h index daaa6eada3..f292499e7b 100644 --- a/qpid/cpp/src/qpid/ha/Event.h +++ b/qpid/cpp/src/qpid/ha/Event.h @@ -43,7 +43,8 @@ broker::Message makeMessage( bool isEventKey(const std::string& key); /** Base class for encodable events */ -struct Event { +class Event { + public: virtual ~Event() {} virtual void encode(framing::Buffer& buffer) const = 0; virtual void decode(framing::Buffer& buffer) = 0; @@ -62,7 +63,8 @@ inline std::ostream& operator<<(std::ostream& o, const Event& e) { } /** Event base template */ -template <class Derived> struct EventBase : public Event { +template <class Derived> class EventBase : public Event { + public: std::string key() const { return Derived::KEY; } }; @@ -176,6 +178,16 @@ struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> { void print(std::ostream& o) const { o << broker; } }; +struct TxMembersEvent : public EventBase<TxMembersEvent> { + static const std::string KEY; + UuidSet members; + TxMembersEvent(const UuidSet& s=UuidSet()) : members(s) {} + void encode(framing::Buffer& b) const { b.put(members); } + void decode(framing::Buffer& b) { b.get(members); } + size_t encodedSize() const { return members.encodedSize(); } + void print(std::ostream& o) const { o << members; } +}; + }} // namespace qpid::ha #endif /*!QPID_HA_EVENT_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 5896a5568a..635aa1f65c 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -221,4 +221,8 @@ boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::stri broker.getExchanges().find(QueueReplicator::replicatorName(queueName))); } +void HaBroker::deleteQueue(const string& name, const string& connectionId) { + broker.deleteQueue(name, settings.username, connectionId); +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 6084db3fc3..c18e44d949 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -102,6 +102,10 @@ class HaBroker : public management::Manageable boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName); + /**@param connectionId optional, only available on backup */ + void deleteQueue(const std::string& name, + const std::string& connectionId=std::string()); + private: void setPublicUrl(const Url&); void setBrokerUrl(const Url&); diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index af57145cc2..ddb603ca14 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -144,9 +144,19 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { } } // namespace +void Membership::addCallback(UpdateCallback cb) { + Mutex::ScopedLock l(lock); + callbacks.push_back(cb); + cb(brokers); // Give an initial update. +} void Membership::update(Mutex::ScopedLock& l) { QPID_LOG(info, "Membership: " << brokers); + + // Call callbacks + for_each(callbacks.begin(), callbacks.end(), + boost::bind<void>(&UpdateCallback::operator(), _1, boost::cref(brokers))); + // Update managment and send update event. BrokerStatus newStatus = getStatus(l); Variant::List brokerList = asList(l); diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h index 828f9e8403..f8b7f7f8ad 100644 --- a/qpid/cpp/src/qpid/ha/Membership.h +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -61,6 +61,11 @@ class Membership void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>); + /** Call callback when membership changes. + * NOTE: called with Membership lock held. + */ + typedef boost::function<void(const BrokerInfo::Map&)> UpdateCallback; + void addCallback(UpdateCallback); void clear(); ///< Clear all but self. void add(const BrokerInfo& b); void remove(const types::Uuid& id); @@ -94,6 +99,7 @@ class Membership const types::Uuid self; BrokerInfo::Map brokers; BrokerStatus oldStatus; + std::vector<UpdateCallback> callbacks; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 796de0341d..c71342cbc6 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -301,6 +301,7 @@ void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLoc backup->cancel(); expectedBackups.erase(backup); backups.erase(id); + membership.remove(id); } @@ -387,14 +388,12 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) } void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { - QPID_LOG(debug, logPrefix << "Started TX transaction"); shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); observer->initialize(); tx->setObserver(observer); } void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) { - QPID_LOG(debug, logPrefix << "Started DTX transaction"); shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); observer->initialize(); dtx->setObserver(observer); diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 96be5e552e..7307f15fbe 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include <boost/lexical_cast.hpp> +#include <algorithm> namespace qpid { namespace framing { @@ -87,9 +88,11 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : // when the tx-queue is deleted. // BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups()); - std::transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()), - boost::bind(&BrokerInfo::getSystemId, _1)); - QPID_LOG(debug, logPrefix << "Started on " << backups); + std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()), + boost::bind(&BrokerInfo::getSystemId, _1)); + + QPID_LOG(debug, logPrefix << "Started TX " << id); + QPID_LOG(debug, logPrefix << "Members: " << members); pair<QueuePtr, bool> result = broker.getQueues().declare( @@ -97,8 +100,15 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : QueueSettings(/*durable*/false, /*autodelete*/true)); assert(result.second); txQueue = result.first; + txQueue->deliver(TxMembersEvent(members).message()); + // Do this last, it will start concurrent callbacks. + haBroker.getMembership().addCallback( + boost::bind(&PrimaryTxObserver::membership, this, _1)); } +PrimaryTxObserver::~PrimaryTxObserver() {} + + void PrimaryTxObserver::initialize() { broker.getExchanges().registerExchange( boost::shared_ptr<Exchange>(new Exchange(shared_from_this()))); @@ -125,7 +135,7 @@ void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) { boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())); assert(primary); // Tell replicating subscriptions to skip IDs in the transaction. - for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) + for (UuidSet::iterator b = members.begin(); b != members.end(); ++b) for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) primary->skip(*b, q->first, q->second); } @@ -136,7 +146,8 @@ bool PrimaryTxObserver::prepare() { QPID_LOG(debug, logPrefix << "Prepare"); deduplicate(l); txQueue->deliver(TxPrepareEvent().message()); - while (!isPrepared(l)) lock.wait(); + while (prepared != members && !failed) + lock.wait(); return !failed; } @@ -144,12 +155,20 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); + destroy(); } void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); + destroy(); +} + +void PrimaryTxObserver::destroy() { + // Destroying the queue will result in destruction of this when + // the queues observer references are cleared. + haBroker.deleteQueue(txQueue->getName()); } void PrimaryTxObserver::txPrepareOkEvent(const string& data) { @@ -169,8 +188,15 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) { lock.notify(); } -bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) { - return (prepared == backups || failed); +void PrimaryTxObserver::membership(const BrokerInfo::Map& update) { + sys::Mutex::ScopedLock l(lock); + for (UuidSet::iterator i = members.begin(); i != members.end(); ++i) + if (!update.count(*i)) { // A broker is down + failed = true; + lock.notify(); + return; + } } + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 413c627d68..2c207c55fa 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -63,6 +63,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, { public: PrimaryTxObserver(HaBroker&); + ~PrimaryTxObserver(); /** Call immediately after constructor, uses shared_from_this. */ void initialize(); @@ -80,11 +81,13 @@ class PrimaryTxObserver : public broker::TransactionObserver, typedef qpid::sys::unordered_map< QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap; + void membership(const BrokerInfo::Map&); void deduplicate(sys::Mutex::ScopedLock&); void txPrepareOkEvent(const std::string& data); void txPrepareFailEvent(const std::string& data); void consumerRemoved(const broker::Consumer&); bool isPrepared(sys::Mutex::ScopedLock&); + void destroy(); sys::Monitor lock; std::string logPrefix; @@ -94,7 +97,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, QueuePtr txQueue; QueueIdsMap enqueues; bool failed; - UuidSet backups; + UuidSet members; UuidSet prepared; }; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index a4dd656766..ff32dfef16 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -182,7 +182,6 @@ void QueueReplicator::destroy() { bridge2 = bridge; // call close outside the lock. // Need to drop shared pointers to avoid pointer cycles keeping this in memory. queue.reset(); - link.reset(); bridge.reset(); getBroker()->getExchanges().destroy(getName()); } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index bbc193ac78..71993bcb12 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -43,7 +43,7 @@ class Buffer; namespace ha { class QueueGuard; class HaBroker; -struct Event; +class Event; class Primary; /** diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 2c45752dcc..c59b54386d 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -81,13 +81,11 @@ TxReplicator::TxReplicator( channel(link->nextChannel()), dequeueState(hb.getBroker().getQueues()) { - string shortId = getTxId(txQueue->getName()).substr(0, 8); + string id(getTxId(txQueue->getName())); + string shortId = id.substr(0, 8); logPrefix = "Backup of transaction "+shortId+": "; - + QPID_LOG(debug, logPrefix << "Started TX " << id); if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded.")); - boost::shared_ptr<Backup> backup = boost::dynamic_pointer_cast<Backup>(hb.getRole()); - if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode.")); - brokerReplicator = backup->getBrokerReplicator(); // Dispatch transaction events. dispatch[TxEnqueueEvent::KEY] = @@ -100,6 +98,8 @@ TxReplicator::TxReplicator( boost::bind(&TxReplicator::commit, this, _1, _2); dispatch[TxRollbackEvent::KEY] = boost::bind(&TxReplicator::rollback, this, _1, _2); + dispatch[TxMembersEvent::KEY] = + boost::bind(&TxReplicator::members, this, _1, _2); } TxReplicator::~TxReplicator() { @@ -130,15 +130,6 @@ void TxReplicator::deliver(const broker::Message& m_) { dm.deliverTo(queue); } -void TxReplicator::destroy() { - { - sys::Mutex::ScopedLock l(lock); - if (context.get()) store->abort(*context); - txBuffer->rollback(); - } - QueueReplicator::destroy(); -} - void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { sys::Mutex::ScopedLock l(lock); TxEnqueueEvent e; @@ -209,24 +200,27 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { } } -void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { +void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) { QPID_LOG(debug, logPrefix << "Commit"); if (context.get()) store->commit(*context); txBuffer->commit(); - end(l); } -void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { +void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) { QPID_LOG(debug, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); - end(l); } -void TxReplicator::end(sys::Mutex::ScopedLock&) { - // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. - // FIXME aconway 2013-08-01: what about connection & user ID for destroy? - haBroker.getBroker().getQueues().destroy(queue->getName()); +void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { + TxMembersEvent e; + decodeStr(data, e); + QPID_LOG(debug, logPrefix << "Members: " << e.members); + if (!e.members.count(haBroker.getMembership().getSelf())) { + QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating"); + // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. + haBroker.deleteQueue(getQueue()->getName()); + } } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index 41555ef094..10d7466d0c 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -65,7 +65,6 @@ class TxReplicator : public QueueReplicator { protected: void deliver(const broker::Message&); - void destroy(); private: @@ -80,13 +79,12 @@ class TxReplicator : public QueueReplicator { void prepare(const std::string& data, sys::Mutex::ScopedLock&); void commit(const std::string& data, sys::Mutex::ScopedLock&); void rollback(const std::string& data, sys::Mutex::ScopedLock&); - void end(sys::Mutex::ScopedLock&); + void members(const std::string& data, sys::Mutex::ScopedLock&); std::string logPrefix; TxEnqueueEvent enq; // Enqueue data for next deliver. boost::shared_ptr<broker::TxBuffer> txBuffer; broker::MessageStore* store; - boost::shared_ptr<BrokerReplicator> brokerReplicator; std::auto_ptr<broker::TransactionContext> context; framing::ChannelId channel; // Channel to send prepare-complete. diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp index 500a35051a..c02ae33470 100644 --- a/qpid/cpp/src/qpid/ha/types.cpp +++ b/qpid/cpp/src/qpid/ha/types.cpp @@ -108,6 +108,24 @@ std::ostream& operator<<(std::ostream& o, const LogMessageId& m) { return o << m.queue << "[" << m.position << "]=" << m.replicationId; } +void UuidSet::encode(framing::Buffer& b) const { + b.putLong(size()); + for (const_iterator i = begin(); i != end(); ++i) + b.putRawData(i->data(), i->size()); +} + +void UuidSet::decode(framing::Buffer& b) { + size_t n = b.getLong(); + for ( ; n > 0; --n) { + types::Uuid id; + b.getRawData(const_cast<unsigned char*>(id.data()), id.size()); + insert(id); + } +} + +size_t UuidSet::encodedSize() const { + return sizeof(uint32_t) + size()*16; +} }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h index 91e43137d2..92157d411b 100644 --- a/qpid/cpp/src/qpid/ha/types.h +++ b/qpid/cpp/src/qpid/ha/types.h @@ -116,8 +116,13 @@ extern const char* TRANSACTION_REPLICATOR_PREFIX; bool startsWith(const std::string& name, const std::string& prefix); -/** Define IdSet type, not a typedef so we can overload operator << */ -class UuidSet : public std::set<types::Uuid> {}; +/** Define IdSet type, not a typedef so we can overload operator << and add encoding.*/ +class UuidSet : public std::set<types::Uuid> { + public: + void encode(framing::Buffer&) const; + void decode(framing::Buffer&); + size_t encodedSize() const; +}; std::ostream& operator<<(std::ostream& o, const UuidSet& ids); diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index cceb9795eb..ab63602655 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -48,6 +48,9 @@ class QmfAgent(object): address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) + def get_queues(self): + return [q.values['name'] for q in self._agent.getAllQueues()] + def __getattr__(self, name): a = getattr(self._agent, name) return a @@ -107,7 +110,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=trace+:ha::", # FIXME aconway 2013-07-29: debug + "--log-enable=debug+:ha::", # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont @@ -276,8 +279,8 @@ class HaCluster(object): @s_args: args for specific brokers: s_args[i] for broker i. """ self.test = test - self.args = args - self.s_args = s_args + self.args = copy(args) + self.s_args = copy(s_args) self.kwargs = kwargs self._ports = [HaPort(test) for i in xrange(n)] self._set_url() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e97614d785..e1ad5cc4fa 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError from qpid.datatypes import uuid4, UUID from brokertest import * from ha_test import * @@ -1278,7 +1278,7 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q1", ["x","y","z"]) cluster[1].assert_browse_backup("q1", ["x","y","z"]) - sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over! + sn = cluster[0].connect(heartbeat=1).session() sn.sender("ex/k1").send("boo") cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"]) cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"]) @@ -1382,18 +1382,48 @@ class TransactionTests(BrokerTest): self.assertEqual(open_read(cluster[1].store_log), expect) def test_tx_backup_fail(self): - # FIXME aconway 2013-07-31: check exception types, reduce timeout. cluster = HaCluster( self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]]) c = cluster[0].connect() tx = c.session(transactional=True) s = tx.sender("q;{create:always,node:{durable:true}}") for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) - self.assertRaises(Exception, tx.commit) + self.assertRaises(ServerError, tx.commit) for b in cluster: b.assert_browse_backup("q", []) self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n") self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n") + def test_tx_join_leave(self): + """Test cluster members joining/leaving cluster. + Also check that tx-queues are cleaned up at end of transaction.""" + + def tx_queues(broker): + return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")] + + cluster = HaCluster(self, 3) + + # Leaving + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("a", sync=True) + self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster]) + cluster[1].kill(final=False) + s.send("b") + self.assertRaises(ServerError, tx.commit) + self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]]) + + # Joining + tx = cluster[0].connect().session(transactional=True) + s = tx.sender("q;{create:always}") + s.send("foo") + tx_q = tx_queues(cluster[0])[0] + cluster.restart(1) + # Verify the new member should not be in the transaction. + # but should receive the result of the transaction via normal replication. + cluster[1].wait_no_queue(tx_q) + tx.commit() + for b in cluster: b.assert_browse_backup("q", ["foo"]) + if __name__ == "__main__": outdir = "ha_tests.tmp" shutil.rmtree(outdir, True) |