summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
committerAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
commit54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch)
treef9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp/src
parent27d31ba355acfef3ec66c23e48864e88a358014b (diff)
downloadqpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover. - TxReplicator cancel subscriptions when transaction is finished. - TxReplicator rollback if destroyed prematurely. - Handle special case of no backups for a tx. - ha_tests.py: new and modified tests to cover the new functionality. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp80
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h3
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h7
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp46
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h16
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp13
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h4
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp18
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h5
-rw-r--r--qpid/cpp/src/tests/brokertest.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py79
13 files changed, 218 insertions, 95 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 684f408c4b..36bf89fb81 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -229,8 +229,8 @@ class BrokerReplicator::UpdateTracker {
typedef boost::function<void (const std::string&)> CleanFn;
UpdateTracker(const std::string& type_, // "queue" or "exchange"
- CleanFn f, const ReplicationTest& rt)
- : type(type_), cleanFn(f), repTest(rt) {}
+ CleanFn f)
+ : type(type_), cleanFn(f) {}
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
@@ -245,16 +245,10 @@ class BrokerReplicator::UpdateTracker {
}
/** Add an exchange name */
- void addExchange(Exchange::shared_ptr ex) {
- if (repTest.getLevel(*ex))
- initial.insert(ex->getName());
- }
+ void addExchange(Exchange::shared_ptr ex) { initial.insert(ex->getName()); }
/** Add a queue name. */
- void addQueue(Queue::shared_ptr q) {
- if (repTest.getLevel(*q))
- initial.insert(q->getName());
- }
+ void addQueue(Queue::shared_ptr q) { initial.insert(q->getName()); }
/** Received an event for name */
void event(const std::string& name) {
@@ -281,7 +275,6 @@ class BrokerReplicator::UpdateTracker {
std::string type;
Names initial, events;
CleanFn cleanFn;
- ReplicationTest repTest;
};
namespace {
@@ -349,7 +342,8 @@ BrokerReplicator::~BrokerReplicator() { shutdown(); }
namespace {
void collectQueueReplicators(
- const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+ const boost::shared_ptr<Exchange>& ex,
+ set<boost::shared_ptr<QueueReplicator> >& collect)
{
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
if (qr) collect.insert(qr);
@@ -390,16 +384,13 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
exchangeTracker.reset(
new UpdateTracker("exchange",
- boost::bind(&BrokerReplicator::deleteExchange, this, _1),
- replicationTest));
- exchanges.eachExchange(
- boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+ exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
queueTracker.reset(
new UpdateTracker("queue",
- boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
- replicationTest));
- queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+ queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -428,6 +419,21 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
}
+// Called for each queue in existence when the backup connects to a primary.
+void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
+ if (replicationTest.getLevel(*q)) {
+ QPID_LOG(debug, "Existing queue: " << q->getName());
+ queueTracker->addQueue(q);
+ }
+}
+
+void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) {
+ if (replicationTest.getLevel(*ex)) {
+ QPID_LOG(debug, "Existing exchange: " << ex->getName());
+ exchangeTracker->addExchange(ex);
+ }
+}
+
void BrokerReplicator::route(Deliverable& msg) {
// We transition from JOINING->CATCHUP on the first message received from the primary.
// Until now we couldn't be sure if we had a good connection to the primary.
@@ -890,24 +896,36 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
}
}
-// Callback function for accumulating exchange candidates
-namespace {
- void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
- c.push_back(i);
- }
-}
+typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
+typedef vector<boost::shared_ptr<Queue> > QueueVector;
// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
connection = 0;
- // Clean up auto-delete queues
- vector<boost::shared_ptr<Exchange> > collect;
- // Make a copy so we can work outside the ExchangeRegistry lock
- exchanges.eachExchange(
- boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
- for_each(collect.begin(), collect.end(),
+
+ // Make copys of queues & exchanges so we can work outside the registry lock.
+
+ ExchangeVector exs;
+ exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1));
+ for_each(exs.begin(), exs.end(),
boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+
+ QueueVector qs;
+ queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1));
+ for_each(qs.begin(), qs.end(),
+ boost::bind(&BrokerReplicator::disconnectedQueue, this, _1));
+}
+
+// Called for queues existing when the backup is disconnected.
+void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) {
+ QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName());
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName());
+ if (qr) {
+ qr->disconnect();
+ if (TxReplicator::isTxQueue(q->getName()))
+ deleteQueue(q->getName());
+ }
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index c36aa352f3..8045c2a91f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -102,6 +102,9 @@ class BrokerReplicator : public broker::Exchange,
class ConnectionObserver;
void connected(broker::Bridge&, broker::SessionHandler&);
+ void existingQueue(const boost::shared_ptr<broker::Queue>&);
+ void existingExchange(const boost::shared_ptr<broker::Exchange>&);
+ void disconnectedQueue(const boost::shared_ptr<broker::Queue>&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index c71342cbc6..606e6452d3 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -214,6 +214,8 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
}
void Primary::addReplica(ReplicatingSubscription& rs) {
+ // Note this is called before the ReplicatingSubscription has been activated
+ // on the queue.
sys::Mutex::ScopedLock l(lock);
replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
}
@@ -231,6 +233,12 @@ void Primary::skip(
void Primary::removeReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+
+ TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
+ if (i != txMap.end()) {
+ boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock();
+ if (tx) tx->cancel(rs);
+ }
}
// NOTE: Called with queue registry lock held.
@@ -387,16 +395,19 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
backup->startCatchup();
}
-void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
observer->initialize();
- tx->setObserver(observer);
+ txMap[observer->getTxQueue()->getName()] = observer;
+ return observer;
+}
+
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+ tx->setObserver(makeTxObserver());
}
void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
- shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
- observer->initialize();
- dtx->setObserver(observer);
+ dtx->setObserver(makeTxObserver());
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 0affc49508..d1350ab261 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -30,6 +30,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/unordered_map.h"
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <string>
@@ -54,6 +55,7 @@ class ReplicatingSubscription;
class RemoteBackup;
class QueueGuard;
class Membership;
+class PrimaryTxObserver;
/**
* State associated with a primary broker:
@@ -113,6 +115,9 @@ class Primary : public Role
typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
Hasher<UuidQueue> > ReplicaMap;
+ // Map of PrimaryTxObservers by tx-queue name
+ typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> > TxMap;
+
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
@@ -121,6 +126,7 @@ class Primary : public Role
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
void deduplicate();
+ boost::shared_ptr<PrimaryTxObserver> makeTxObserver();
mutable sys::Mutex lock;
HaBroker& haBroker;
@@ -143,6 +149,7 @@ class Primary : public Role
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
ReplicaMap replicas;
+ TxMap txMap;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 7307f15fbe..f13edfb31e 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -87,12 +87,13 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
// Latecomers that have replicated the transaction will be rolled back
// when the tx-queue is deleted.
//
- BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups());
- std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()),
+ BrokerInfo::Set backups(haBroker.getMembership().otherBackups());
+ std::transform(backups.begin(), backups.end(), inserter(members, members.begin()),
boost::bind(&BrokerInfo::getSystemId, _1));
QPID_LOG(debug, logPrefix << "Started TX " << id);
QPID_LOG(debug, logPrefix << "Members: " << members);
+ unprepared = unfinished = members;
pair<QueuePtr, bool> result =
broker.getQueues().declare(
@@ -102,8 +103,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
txQueue = result.first;
txQueue->deliver(TxMembersEvent(members).message());
// Do this last, it will start concurrent callbacks.
- haBroker.getMembership().addCallback(
- boost::bind(&PrimaryTxObserver::membership, this, _1));
}
PrimaryTxObserver::~PrimaryTxObserver() {}
@@ -146,8 +145,7 @@ bool PrimaryTxObserver::prepare() {
QPID_LOG(debug, logPrefix << "Prepare");
deduplicate(l);
txQueue->deliver(TxPrepareEvent().message());
- while (prepared != members && !failed)
- lock.wait();
+ while (!unprepared.empty() && !failed) lock.wait();
return !failed;
}
@@ -155,27 +153,30 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
- destroy();
+ end(l);
}
void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
- destroy();
+ end(l);
}
-void PrimaryTxObserver::destroy() {
- // Destroying the queue will result in destruction of this when
- // the queues observer references are cleared.
- haBroker.deleteQueue(txQueue->getName());
+void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
+ // Don't destroy the tx-queue if there are connected subscriptions.
+ if (unfinished.empty()) {
+ // Destroying the queue will result in destruction of this when
+ // the queues observer references are cleared.
+ haBroker.deleteQueue(txQueue->getName());
+ }
}
void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
- prepared.insert(backup);
+ unprepared.erase(backup);
lock.notify();
}
@@ -183,20 +184,21 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup);
- prepared.insert(backup);
+ unprepared.erase(backup);
failed = true;
lock.notify();
}
-void PrimaryTxObserver::membership(const BrokerInfo::Map& update) {
+void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
- for (UuidSet::iterator i = members.begin(); i != members.end(); ++i)
- if (!update.count(*i)) { // A broker is down
- failed = true;
- lock.notify();
- return;
- }
+ types::Uuid backup = rs.getBrokerInfo().getSystemId();
+ if (unprepared.find(backup) != unprepared.end()) {
+ failed = true; // Canceled before prepared.
+ unprepared.erase(backup); // Consider it prepared-fail
+ }
+ unfinished.erase(backup);
+ lock.notify();
+ end(l);
}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 4efdf06c42..f75c56448a 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -41,6 +41,7 @@ class Consumer;
namespace ha {
class HaBroker;
+class ReplicatingSubscription;
/**
* Observe events in the lifecycle of a transaction.
@@ -74,6 +75,10 @@ class PrimaryTxObserver : public broker::TransactionObserver,
void rollback();
types::Uuid getId() const { return id; }
+ QueuePtr getTxQueue() const { return txQueue; }
+
+ // Notify that a backup subscription has been cancelled.
+ void cancel(const ReplicatingSubscription&);
private:
class Exchange;
@@ -82,11 +87,10 @@ class PrimaryTxObserver : public broker::TransactionObserver,
void membership(const BrokerInfo::Map&);
void deduplicate(sys::Mutex::ScopedLock&);
+ void end(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
void txPrepareFailEvent(const std::string& data);
- void consumerRemoved(const broker::Consumer&);
- bool isPrepared(sys::Mutex::ScopedLock&);
- void destroy();
+
sys::Monitor lock;
std::string logPrefix;
@@ -96,8 +100,10 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueuePtr txQueue;
QueueIdsMap enqueues;
bool failed;
- UuidSet members;
- UuidSet prepared;
+
+ UuidSet members; // All members of transaction.
+ UuidSet unprepared; // Members that have not yet responded to prepare.
+ UuidSet unfinished; // Members that have not yet disconnected.
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index ff32dfef16..b4bbb3a0c4 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -169,6 +169,11 @@ void QueueReplicator::activate() {
boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}
+void QueueReplicator::disconnect() {
+ Mutex::ScopedLock l(lock);
+ sessionHandler = 0;
+}
+
QueueReplicator::~QueueReplicator() {}
// Called from Queue::destroyed()
@@ -220,6 +225,14 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
+void QueueReplicator::cancel(Mutex::ScopedLock&) {
+ if (sessionHandler) {
+ // Cancel the replicating subscription.
+ AMQP_ServerProxy peer(sessionHandler->out);
+ peer.getMessage().cancel(getName());
+ }
+}
+
namespace {
template <class T> T decodeContent(Message& m) {
std::string content = m.getContent();
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index cc6c7ed2a7..06248d32aa 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -68,7 +68,8 @@ class QueueReplicator : public broker::Exchange,
~QueueReplicator();
- void activate(); // Must be called immediately after constructor.
+ void activate(); // Must be called immediately after constructor.
+ void disconnect(); // Called when we are disconnected from the primary.
std::string getType() const;
@@ -93,6 +94,7 @@ class QueueReplicator : public broker::Exchange,
virtual void deliver(const broker::Message&);
virtual void destroy(); // Called when the queue is destroyed.
+ void cancel(sys::Mutex::ScopedLock&);
sys::Mutex lock;
HaBroker& haBroker;
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index c59b54386d..75ad554cb9 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -79,6 +79,7 @@ TxReplicator::TxReplicator(
txBuffer(new broker::TxBuffer),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
+ complete(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -200,16 +201,18 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
}
}
-void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
QPID_LOG(debug, logPrefix << "Commit");
if (context.get()) store->commit(*context);
txBuffer->commit();
+ end(l);
}
-void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
+ end(l);
}
void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
@@ -223,4 +226,15 @@ void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
}
}
+void TxReplicator::end(sys::Mutex::ScopedLock& l) {
+ complete = true;
+ cancel(l);
+}
+
+void TxReplicator::destroy() {
+ QueueReplicator::destroy();
+ sys::Mutex::ScopedLock l(lock);
+ if (!complete) rollback(string(), l);
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index 10d7466d0c..4d2eb2f242 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -62,6 +62,9 @@ class TxReplicator : public QueueReplicator {
std::string getType() const;
+ // QueueReplicator overrides
+ void destroy();
+
protected:
void deliver(const broker::Message&);
@@ -80,6 +83,7 @@ class TxReplicator : public QueueReplicator {
void commit(const std::string& data, sys::Mutex::ScopedLock&);
void rollback(const std::string& data, sys::Mutex::ScopedLock&);
void members(const std::string& data, sys::Mutex::ScopedLock&);
+ void end(sys::Mutex::ScopedLock&);
std::string logPrefix;
TxEnqueueEvent enq; // Enqueue data for next deliver.
@@ -87,6 +91,7 @@ class TxReplicator : public QueueReplicator {
broker::MessageStore* store;
std::auto_ptr<broker::TransactionContext> context;
framing::ChannelId channel; // Channel to send prepare-complete.
+ bool complete;
// Class to process dequeues and create DeliveryRecords to populate a
// TxAccept.
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index b07a5b5d11..a282f59b13 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -415,6 +415,10 @@ class BrokerTest(TestCase):
Provides a well-known working directory for each test.
"""
+ def __init__(self, *args, **kwargs):
+ self.longMessage = True # Enable long messages for assert*(..., msg=xxx)
+ TestCase.__init__(self, *args, **kwargs)
+
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
ha_lib = os.getenv("HA_LIB")
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index ab63602655..4a7b538edd 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -48,9 +48,24 @@ class QmfAgent(object):
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
- def get_queues(self):
+ def queues(self):
return [q.values['name'] for q in self._agent.getAllQueues()]
+ def repsub_queue(self, sub):
+ """If QMF subscription sub is a replicating subscription return
+ the name of the replicated queue, else return None"""
+ session_name = self.getSession(sub.sessionRef).name
+ m = re.search("qpid.ha-q:(.*)\.", session_name)
+ return m and m.group(1)
+
+ def repsub_queues(self):
+ """Return queue names for all replicating subscriptions"""
+ return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()])
+
+ def tx_queues(self):
+ """Return names of all tx-queues"""
+ return [q for q in self.queues() if q.startswith("qpid.ha-tx")]
+
def __getattr__(self, name):
a = getattr(self._agent, name)
return a
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index c3577ca626..9a39bdf979 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1320,11 +1320,18 @@ class TransactionTests(BrokerTest):
sb.send(m)
return tx
+ def tx_subscriptions(self, broker):
+ """Return list of queue names for tx subscriptions"""
+ return [q for q in broker.agent().repsub_queues()
+ if q.startswith("qpid.ha-tx")]
+
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
+ self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction
+
# NOTE: backup does not process transactional dequeues until prepare
cluster[1].assert_browse_backup("a", ["x","y","z"])
cluster[1].assert_browse_backup("b", ['0', '1', '2'])
@@ -1333,10 +1340,12 @@ class TransactionTests(BrokerTest):
tx.commit()
tx.sync()
- for b in cluster:
- b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+ for b in cluster: self.assert_simple_commit_outcome(b)
+ self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled.
+ def assert_simple_commit_outcome(self, b):
+ b.assert_browse_backup("a", [], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
# Check for expected actions on the store
expect = """<enqueue a x>
<enqueue a y>
@@ -1347,42 +1356,59 @@ class TransactionTests(BrokerTest):
<dequeue a z tx=1>
<commit tx=1>
"""
- self.assertEqual(expect, open_read(cluster[0].store_log))
- self.assertEqual(expect, open_read(cluster[1].store_log))
+ self.assertEqual(expect, open_read(b.store_log), msg=b)
+ # Check that transaction artifacts are cleaned up.
+ self.assertEqual([], b.agent().tx_queues(), msg=b)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.acknowledge()
tx.rollback()
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ for b in cluster: self.assert_simple_rollback_outcome(b)
+
+ def assert_simple_rollback_outcome(self, b):
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
# Check for expected actions on the store
expect = """<enqueue a x>
<enqueue a y>
<enqueue a z>
"""
- self.assertEqual(open_read(cluster[0].store_log), expect)
- self.assertEqual(open_read(cluster[1].store_log), expect)
+ self.assertEqual(open_read(b.store_log), expect, msg=b)
+ # Check that transaction artifacts are cleaned up.
+ self.assertEqual([], b.agent().tx_queues(), msg=b)
def test_tx_simple_failover(self):
- cluster = HaCluster(self, 2, test_store=True)
+ cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
- cluster[0].wait_status("ready")
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ cluster[0].wait_status("ready") # Restarted.
+ cluster[1].wait_status("active") # Promoted.
+ cluster[2].wait_status("ready") # Failed over.
+ for b in cluster: self.assert_simple_rollback_outcome(b)
+
+ def test_tx_no_backups(self):
+ """Test the special case of a TX where there are no backups"""
+
+ # Test commit
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.commit()
+ tx.sync()
+ self.assert_simple_commit_outcome(cluster[0])
+
+ # Test rollback
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.rollback()
+ tx.sync()
+ self.assert_simple_rollback_outcome(cluster[0])
- # Check for expected actions on the store
- expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-"""
- self.assertEqual(open_read(cluster[0].store_log), expect)
- self.assertEqual(open_read(cluster[1].store_log), expect)
def test_tx_backup_fail(self):
cluster = HaCluster(
@@ -1400,26 +1426,23 @@ class TransactionTests(BrokerTest):
"""Test cluster members joining/leaving cluster.
Also check that tx-queues are cleaned up at end of transaction."""
- def tx_queues(broker):
- return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")]
-
cluster = HaCluster(self, 3)
# Leaving
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("a", sync=True)
- self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster])
+ self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
self.assertRaises(ServerError, tx.commit)
- self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]])
+ self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- tx_q = tx_queues(cluster[0])[0]
+ tx_q = cluster[0].agent().tx_queues()[0]
cluster.restart(1)
# Verify the new member should not be in the transaction.
# but should receive the result of the transaction via normal replication.