diff options
author | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
commit | 54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch) | |
tree | f9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp | |
parent | 27d31ba355acfef3ec66c23e48864e88a358014b (diff) | |
download | qpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz |
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover.
- TxReplicator cancel subscriptions when transaction is finished.
- TxReplicator rollback if destroyed prematurely.
- Handle special case of no backups for a tx.
- ha_tests.py: new and modified tests to cover the new functionality.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 80 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 46 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 17 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 79 |
13 files changed, 218 insertions, 95 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 684f408c4b..36bf89fb81 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -229,8 +229,8 @@ class BrokerReplicator::UpdateTracker { typedef boost::function<void (const std::string&)> CleanFn; UpdateTracker(const std::string& type_, // "queue" or "exchange" - CleanFn f, const ReplicationTest& rt) - : type(type_), cleanFn(f), repTest(rt) {} + CleanFn f) + : type(type_), cleanFn(f) {} /** Destructor cleans up remaining initial queues. */ ~UpdateTracker() { @@ -245,16 +245,10 @@ class BrokerReplicator::UpdateTracker { } /** Add an exchange name */ - void addExchange(Exchange::shared_ptr ex) { - if (repTest.getLevel(*ex)) - initial.insert(ex->getName()); - } + void addExchange(Exchange::shared_ptr ex) { initial.insert(ex->getName()); } /** Add a queue name. */ - void addQueue(Queue::shared_ptr q) { - if (repTest.getLevel(*q)) - initial.insert(q->getName()); - } + void addQueue(Queue::shared_ptr q) { initial.insert(q->getName()); } /** Received an event for name */ void event(const std::string& name) { @@ -281,7 +275,6 @@ class BrokerReplicator::UpdateTracker { std::string type; Names initial, events; CleanFn cleanFn; - ReplicationTest repTest; }; namespace { @@ -349,7 +342,8 @@ BrokerReplicator::~BrokerReplicator() { shutdown(); } namespace { void collectQueueReplicators( - const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect) + const boost::shared_ptr<Exchange>& ex, + set<boost::shared_ptr<QueueReplicator> >& collect) { boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); if (qr) collect.insert(qr); @@ -390,16 +384,13 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) exchangeTracker.reset( new UpdateTracker("exchange", - boost::bind(&BrokerReplicator::deleteExchange, this, _1), - replicationTest)); - exchanges.eachExchange( - boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1)); + boost::bind(&BrokerReplicator::deleteExchange, this, _1))); + exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1)); queueTracker.reset( new UpdateTracker("queue", - boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), - replicationTest)); - queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1)); + boost::bind(&BrokerReplicator::deleteQueue, this, _1, true))); + queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1)); framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -428,6 +419,21 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); } +// Called for each queue in existence when the backup connects to a primary. +void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) { + if (replicationTest.getLevel(*q)) { + QPID_LOG(debug, "Existing queue: " << q->getName()); + queueTracker->addQueue(q); + } +} + +void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) { + if (replicationTest.getLevel(*ex)) { + QPID_LOG(debug, "Existing exchange: " << ex->getName()); + exchangeTracker->addExchange(ex); + } +} + void BrokerReplicator::route(Deliverable& msg) { // We transition from JOINING->CATCHUP on the first message received from the primary. // Until now we couldn't be sure if we had a good connection to the primary. @@ -890,24 +896,36 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) { } } -// Callback function for accumulating exchange candidates -namespace { - void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) { - c.push_back(i); - } -} +typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; +typedef vector<boost::shared_ptr<Queue> > QueueVector; // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); connection = 0; - // Clean up auto-delete queues - vector<boost::shared_ptr<Exchange> > collect; - // Make a copy so we can work outside the ExchangeRegistry lock - exchanges.eachExchange( - boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1)); - for_each(collect.begin(), collect.end(), + + // Make copys of queues & exchanges so we can work outside the registry lock. + + ExchangeVector exs; + exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1)); + for_each(exs.begin(), exs.end(), boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); + + QueueVector qs; + queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1)); + for_each(qs.begin(), qs.end(), + boost::bind(&BrokerReplicator::disconnectedQueue, this, _1)); +} + +// Called for queues existing when the backup is disconnected. +void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) { + QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName()); + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName()); + if (qr) { + qr->disconnect(); + if (TxReplicator::isTxQueue(q->getName())) + deleteQueue(q->getName()); + } } void BrokerReplicator::setMembership(const Variant::List& brokers) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index c36aa352f3..8045c2a91f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -102,6 +102,9 @@ class BrokerReplicator : public broker::Exchange, class ConnectionObserver; void connected(broker::Bridge&, broker::SessionHandler&); + void existingQueue(const boost::shared_ptr<broker::Queue>&); + void existingExchange(const boost::shared_ptr<broker::Exchange>&); + void disconnectedQueue(const boost::shared_ptr<broker::Queue>&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index c71342cbc6..606e6452d3 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -214,6 +214,8 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { } void Primary::addReplica(ReplicatingSubscription& rs) { + // Note this is called before the ReplicatingSubscription has been activated + // on the queue. sys::Mutex::ScopedLock l(lock); replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; } @@ -231,6 +233,12 @@ void Primary::skip( void Primary::removeReplica(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); + + TxMap::const_iterator i = txMap.find(rs.getQueue()->getName()); + if (i != txMap.end()) { + boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock(); + if (tx) tx->cancel(rs); + } } // NOTE: Called with queue registry lock held. @@ -387,16 +395,19 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) backup->startCatchup(); } -void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { +shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() { shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); observer->initialize(); - tx->setObserver(observer); + txMap[observer->getTxQueue()->getName()] = observer; + return observer; +} + +void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { + tx->setObserver(makeTxObserver()); } void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) { - shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); - observer->initialize(); - dtx->setObserver(observer); + dtx->setObserver(makeTxObserver()); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 0affc49508..d1350ab261 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -30,6 +30,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/unordered_map.h" #include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> #include <boost/intrusive_ptr.hpp> #include <string> @@ -54,6 +55,7 @@ class ReplicatingSubscription; class RemoteBackup; class QueueGuard; class Membership; +class PrimaryTxObserver; /** * State associated with a primary broker: @@ -113,6 +115,9 @@ class Primary : public Role typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*, Hasher<UuidQueue> > ReplicaMap; + // Map of PrimaryTxObservers by tx-queue name + typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> > TxMap; + RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&); void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&); @@ -121,6 +126,7 @@ class Primary : public Role void checkReady(RemoteBackupPtr); void setCatchupQueues(const RemoteBackupPtr&, bool createGuards); void deduplicate(); + boost::shared_ptr<PrimaryTxObserver> makeTxObserver(); mutable sys::Mutex lock; HaBroker& haBroker; @@ -143,6 +149,7 @@ class Primary : public Role boost::shared_ptr<broker::BrokerObserver> brokerObserver; boost::intrusive_ptr<sys::TimerTask> timerTask; ReplicaMap replicas; + TxMap txMap; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 7307f15fbe..f13edfb31e 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -87,12 +87,13 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : // Latecomers that have replicated the transaction will be rolled back // when the tx-queue is deleted. // - BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups()); - std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()), + BrokerInfo::Set backups(haBroker.getMembership().otherBackups()); + std::transform(backups.begin(), backups.end(), inserter(members, members.begin()), boost::bind(&BrokerInfo::getSystemId, _1)); QPID_LOG(debug, logPrefix << "Started TX " << id); QPID_LOG(debug, logPrefix << "Members: " << members); + unprepared = unfinished = members; pair<QueuePtr, bool> result = broker.getQueues().declare( @@ -102,8 +103,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : txQueue = result.first; txQueue->deliver(TxMembersEvent(members).message()); // Do this last, it will start concurrent callbacks. - haBroker.getMembership().addCallback( - boost::bind(&PrimaryTxObserver::membership, this, _1)); } PrimaryTxObserver::~PrimaryTxObserver() {} @@ -146,8 +145,7 @@ bool PrimaryTxObserver::prepare() { QPID_LOG(debug, logPrefix << "Prepare"); deduplicate(l); txQueue->deliver(TxPrepareEvent().message()); - while (prepared != members && !failed) - lock.wait(); + while (!unprepared.empty() && !failed) lock.wait(); return !failed; } @@ -155,27 +153,30 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); - destroy(); + end(l); } void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); - destroy(); + end(l); } -void PrimaryTxObserver::destroy() { - // Destroying the queue will result in destruction of this when - // the queues observer references are cleared. - haBroker.deleteQueue(txQueue->getName()); +void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { + // Don't destroy the tx-queue if there are connected subscriptions. + if (unfinished.empty()) { + // Destroying the queue will result in destruction of this when + // the queues observer references are cleared. + haBroker.deleteQueue(txQueue->getName()); + } } void PrimaryTxObserver::txPrepareOkEvent(const string& data) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker; QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup); - prepared.insert(backup); + unprepared.erase(backup); lock.notify(); } @@ -183,20 +184,21 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker; QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup); - prepared.insert(backup); + unprepared.erase(backup); failed = true; lock.notify(); } -void PrimaryTxObserver::membership(const BrokerInfo::Map& update) { +void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); - for (UuidSet::iterator i = members.begin(); i != members.end(); ++i) - if (!update.count(*i)) { // A broker is down - failed = true; - lock.notify(); - return; - } + types::Uuid backup = rs.getBrokerInfo().getSystemId(); + if (unprepared.find(backup) != unprepared.end()) { + failed = true; // Canceled before prepared. + unprepared.erase(backup); // Consider it prepared-fail + } + unfinished.erase(backup); + lock.notify(); + end(l); } - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 4efdf06c42..f75c56448a 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -41,6 +41,7 @@ class Consumer; namespace ha { class HaBroker; +class ReplicatingSubscription; /** * Observe events in the lifecycle of a transaction. @@ -74,6 +75,10 @@ class PrimaryTxObserver : public broker::TransactionObserver, void rollback(); types::Uuid getId() const { return id; } + QueuePtr getTxQueue() const { return txQueue; } + + // Notify that a backup subscription has been cancelled. + void cancel(const ReplicatingSubscription&); private: class Exchange; @@ -82,11 +87,10 @@ class PrimaryTxObserver : public broker::TransactionObserver, void membership(const BrokerInfo::Map&); void deduplicate(sys::Mutex::ScopedLock&); + void end(sys::Mutex::ScopedLock&); void txPrepareOkEvent(const std::string& data); void txPrepareFailEvent(const std::string& data); - void consumerRemoved(const broker::Consumer&); - bool isPrepared(sys::Mutex::ScopedLock&); - void destroy(); + sys::Monitor lock; std::string logPrefix; @@ -96,8 +100,10 @@ class PrimaryTxObserver : public broker::TransactionObserver, QueuePtr txQueue; QueueIdsMap enqueues; bool failed; - UuidSet members; - UuidSet prepared; + + UuidSet members; // All members of transaction. + UuidSet unprepared; // Members that have not yet responded to prepare. + UuidSet unfinished; // Members that have not yet disconnected. }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index ff32dfef16..b4bbb3a0c4 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -169,6 +169,11 @@ void QueueReplicator::activate() { boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this()))); } +void QueueReplicator::disconnect() { + Mutex::ScopedLock l(lock); + sessionHandler = 0; +} + QueueReplicator::~QueueReplicator() {} // Called from Queue::destroyed() @@ -220,6 +225,14 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments); } +void QueueReplicator::cancel(Mutex::ScopedLock&) { + if (sessionHandler) { + // Cancel the replicating subscription. + AMQP_ServerProxy peer(sessionHandler->out); + peer.getMessage().cancel(getName()); + } +} + namespace { template <class T> T decodeContent(Message& m) { std::string content = m.getContent(); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index cc6c7ed2a7..06248d32aa 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -68,7 +68,8 @@ class QueueReplicator : public broker::Exchange, ~QueueReplicator(); - void activate(); // Must be called immediately after constructor. + void activate(); // Must be called immediately after constructor. + void disconnect(); // Called when we are disconnected from the primary. std::string getType() const; @@ -93,6 +94,7 @@ class QueueReplicator : public broker::Exchange, virtual void deliver(const broker::Message&); virtual void destroy(); // Called when the queue is destroyed. + void cancel(sys::Mutex::ScopedLock&); sys::Mutex lock; HaBroker& haBroker; diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index c59b54386d..75ad554cb9 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -79,6 +79,7 @@ TxReplicator::TxReplicator( txBuffer(new broker::TxBuffer), store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), channel(link->nextChannel()), + complete(false), dequeueState(hb.getBroker().getQueues()) { string id(getTxId(txQueue->getName())); @@ -200,16 +201,18 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { } } -void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) { +void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { QPID_LOG(debug, logPrefix << "Commit"); if (context.get()) store->commit(*context); txBuffer->commit(); + end(l); } -void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) { +void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { QPID_LOG(debug, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); + end(l); } void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { @@ -223,4 +226,15 @@ void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { } } +void TxReplicator::end(sys::Mutex::ScopedLock& l) { + complete = true; + cancel(l); +} + +void TxReplicator::destroy() { + QueueReplicator::destroy(); + sys::Mutex::ScopedLock l(lock); + if (!complete) rollback(string(), l); +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index 10d7466d0c..4d2eb2f242 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -62,6 +62,9 @@ class TxReplicator : public QueueReplicator { std::string getType() const; + // QueueReplicator overrides + void destroy(); + protected: void deliver(const broker::Message&); @@ -80,6 +83,7 @@ class TxReplicator : public QueueReplicator { void commit(const std::string& data, sys::Mutex::ScopedLock&); void rollback(const std::string& data, sys::Mutex::ScopedLock&); void members(const std::string& data, sys::Mutex::ScopedLock&); + void end(sys::Mutex::ScopedLock&); std::string logPrefix; TxEnqueueEvent enq; // Enqueue data for next deliver. @@ -87,6 +91,7 @@ class TxReplicator : public QueueReplicator { broker::MessageStore* store; std::auto_ptr<broker::TransactionContext> context; framing::ChannelId channel; // Channel to send prepare-complete. + bool complete; // Class to process dequeues and create DeliveryRecords to populate a // TxAccept. diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index b07a5b5d11..a282f59b13 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -415,6 +415,10 @@ class BrokerTest(TestCase): Provides a well-known working directory for each test. """ + def __init__(self, *args, **kwargs): + self.longMessage = True # Enable long messages for assert*(..., msg=xxx) + TestCase.__init__(self, *args, **kwargs) + # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) ha_lib = os.getenv("HA_LIB") diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index ab63602655..4a7b538edd 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -48,9 +48,24 @@ class QmfAgent(object): address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) - def get_queues(self): + def queues(self): return [q.values['name'] for q in self._agent.getAllQueues()] + def repsub_queue(self, sub): + """If QMF subscription sub is a replicating subscription return + the name of the replicated queue, else return None""" + session_name = self.getSession(sub.sessionRef).name + m = re.search("qpid.ha-q:(.*)\.", session_name) + return m and m.group(1) + + def repsub_queues(self): + """Return queue names for all replicating subscriptions""" + return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()]) + + def tx_queues(self): + """Return names of all tx-queues""" + return [q for q in self.queues() if q.startswith("qpid.ha-tx")] + def __getattr__(self, name): a = getattr(self._agent, name) return a diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index c3577ca626..9a39bdf979 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1320,11 +1320,18 @@ class TransactionTests(BrokerTest): sb.send(m) return tx + def tx_subscriptions(self, broker): + """Return list of queue names for tx subscriptions""" + return [q for q in broker.agent().repsub_queues() + if q.startswith("qpid.ha-tx")] + def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() + self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction + # NOTE: backup does not process transactional dequeues until prepare cluster[1].assert_browse_backup("a", ["x","y","z"]) cluster[1].assert_browse_backup("b", ['0', '1', '2']) @@ -1333,10 +1340,12 @@ class TransactionTests(BrokerTest): tx.commit() tx.sync() - for b in cluster: - b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) + for b in cluster: self.assert_simple_commit_outcome(b) + self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled. + def assert_simple_commit_outcome(self, b): + b.assert_browse_backup("a", [], msg=b) + b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) # Check for expected actions on the store expect = """<enqueue a x> <enqueue a y> @@ -1347,42 +1356,59 @@ class TransactionTests(BrokerTest): <dequeue a z tx=1> <commit tx=1> """ - self.assertEqual(expect, open_read(cluster[0].store_log)) - self.assertEqual(expect, open_read(cluster[1].store_log)) + self.assertEqual(expect, open_read(b.store_log), msg=b) + # Check that transaction artifacts are cleaned up. + self.assertEqual([], b.agent().tx_queues(), msg=b) def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.acknowledge() tx.rollback() - for b in cluster: - b.assert_browse_backup("a", ["x","y","z"], msg=b) - b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + for b in cluster: self.assert_simple_rollback_outcome(b) + + def assert_simple_rollback_outcome(self, b): + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) # Check for expected actions on the store expect = """<enqueue a x> <enqueue a y> <enqueue a z> """ - self.assertEqual(open_read(cluster[0].store_log), expect) - self.assertEqual(open_read(cluster[1].store_log), expect) + self.assertEqual(open_read(b.store_log), expect, msg=b) + # Check that transaction artifacts are cleaned up. + self.assertEqual([], b.agent().tx_queues(), msg=b) def test_tx_simple_failover(self): - cluster = HaCluster(self, 2, test_store=True) + cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.sync() tx.acknowledge() cluster.bounce(0) # Should cause roll-back - cluster[0].wait_status("ready") - for b in cluster: - b.assert_browse_backup("a", ["x","y","z"], msg=b) - b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + cluster[0].wait_status("ready") # Restarted. + cluster[1].wait_status("active") # Promoted. + cluster[2].wait_status("ready") # Failed over. + for b in cluster: self.assert_simple_rollback_outcome(b) + + def test_tx_no_backups(self): + """Test the special case of a TX where there are no backups""" + + # Test commit + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() + tx.commit() + tx.sync() + self.assert_simple_commit_outcome(cluster[0]) + + # Test rollback + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() + tx.rollback() + tx.sync() + self.assert_simple_rollback_outcome(cluster[0]) - # Check for expected actions on the store - expect = """<enqueue a x> -<enqueue a y> -<enqueue a z> -""" - self.assertEqual(open_read(cluster[0].store_log), expect) - self.assertEqual(open_read(cluster[1].store_log), expect) def test_tx_backup_fail(self): cluster = HaCluster( @@ -1400,26 +1426,23 @@ class TransactionTests(BrokerTest): """Test cluster members joining/leaving cluster. Also check that tx-queues are cleaned up at end of transaction.""" - def tx_queues(broker): - return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")] - cluster = HaCluster(self, 3) # Leaving tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("a", sync=True) - self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster]) + self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") self.assertRaises(ServerError, tx.commit) - self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]]) + self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]]) # Joining tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") - tx_q = tx_queues(cluster[0])[0] + tx_q = cluster[0].agent().tx_queues()[0] cluster.restart(1) # Verify the new member should not be in the transaction. # but should receive the result of the transaction via normal replication. |