summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
committerAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
commite9734954a808abd4255e7c82e793d390fa78d7df (patch)
treec0b1ee7e738a301db93784134841d208d856bd65
parent6a75c2b40b0df9fab0ce675540557fff7b46f9de (diff)
downloadqpid-python-e9734954a808abd4255e7c82e793d390fa78d7df.tar.gz
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction: - A broker leaving aborts the transaction. - A broker joining does not participate in the transaction - but does receive the results of the TX via normal replication. Clean up tx-queues when the transaction completes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1510678 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--cpp/src/qpid/ha/Event.cpp9
-rw-r--r--cpp/src/qpid/ha/Event.h16
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp4
-rw-r--r--cpp/src/qpid/ha/HaBroker.h4
-rw-r--r--cpp/src/qpid/ha/Membership.cpp10
-rw-r--r--cpp/src/qpid/ha/Membership.h6
-rw-r--r--cpp/src/qpid/ha/Primary.cpp3
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp40
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.h5
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp1
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h2
-rw-r--r--cpp/src/qpid/ha/TxReplicator.cpp38
-rw-r--r--cpp/src/qpid/ha/TxReplicator.h4
-rw-r--r--cpp/src/qpid/ha/types.cpp18
-rw-r--r--cpp/src/qpid/ha/types.h9
-rwxr-xr-xcpp/src/tests/ha_test.py9
-rwxr-xr-xcpp/src/tests/ha_tests.py38
18 files changed, 165 insertions, 53 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 47860b433f..684f408c4b 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -793,7 +793,7 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
// messages. Any reroutes will be done at the primary and
// replicated as normal.
if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
- broker.deleteQueue(name, userId, remoteHost);
+ haBroker.deleteQueue(name, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
}
}
diff --git a/cpp/src/qpid/ha/Event.cpp b/cpp/src/qpid/ha/Event.cpp
index 25e7947267..8265a6edd3 100644
--- a/cpp/src/qpid/ha/Event.cpp
+++ b/cpp/src/qpid/ha/Event.cpp
@@ -44,13 +44,14 @@ bool isEventKey(const std::string& key) {
const string DequeueEvent::KEY(QPID_HA+"de");
const string IdEvent::KEY(QPID_HA+"id");
-const string TxEnqueueEvent::KEY(QPID_HA+"txen");
-const string TxDequeueEvent::KEY(QPID_HA+"txde");
-const string TxPrepareEvent::KEY(QPID_HA+"txpr");
-const string TxCommitEvent::KEY(QPID_HA+"txcm");
+const string TxEnqueueEvent::KEY(QPID_HA+"txenq");
+const string TxDequeueEvent::KEY(QPID_HA+"txdeq");
+const string TxPrepareEvent::KEY(QPID_HA+"txpre");
+const string TxCommitEvent::KEY(QPID_HA+"txcom");
const string TxRollbackEvent::KEY(QPID_HA+"txrb");
const string TxPrepareOkEvent::KEY(QPID_HA+"txok");
const string TxPrepareFailEvent::KEY(QPID_HA+"txno");
+const string TxMembersEvent::KEY(QPID_HA+"txmem");
broker::Message makeMessage(
const string& data, const string& destination, const string& routingKey)
diff --git a/cpp/src/qpid/ha/Event.h b/cpp/src/qpid/ha/Event.h
index daaa6eada3..f292499e7b 100644
--- a/cpp/src/qpid/ha/Event.h
+++ b/cpp/src/qpid/ha/Event.h
@@ -43,7 +43,8 @@ broker::Message makeMessage(
bool isEventKey(const std::string& key);
/** Base class for encodable events */
-struct Event {
+class Event {
+ public:
virtual ~Event() {}
virtual void encode(framing::Buffer& buffer) const = 0;
virtual void decode(framing::Buffer& buffer) = 0;
@@ -62,7 +63,8 @@ inline std::ostream& operator<<(std::ostream& o, const Event& e) {
}
/** Event base template */
-template <class Derived> struct EventBase : public Event {
+template <class Derived> class EventBase : public Event {
+ public:
std::string key() const { return Derived::KEY; }
};
@@ -176,6 +178,16 @@ struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> {
void print(std::ostream& o) const { o << broker; }
};
+struct TxMembersEvent : public EventBase<TxMembersEvent> {
+ static const std::string KEY;
+ UuidSet members;
+ TxMembersEvent(const UuidSet& s=UuidSet()) : members(s) {}
+ void encode(framing::Buffer& b) const { b.put(members); }
+ void decode(framing::Buffer& b) { b.get(members); }
+ size_t encodedSize() const { return members.encodedSize(); }
+ void print(std::ostream& o) const { o << members; }
+};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_EVENT_H*/
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index 5896a5568a..635aa1f65c 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -221,4 +221,8 @@ boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::stri
broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
}
+void HaBroker::deleteQueue(const string& name, const string& connectionId) {
+ broker.deleteQueue(name, settings.username, connectionId);
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 6084db3fc3..c18e44d949 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -102,6 +102,10 @@ class HaBroker : public management::Manageable
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
+ /**@param connectionId optional, only available on backup */
+ void deleteQueue(const std::string& name,
+ const std::string& connectionId=std::string());
+
private:
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp
index af57145cc2..ddb603ca14 100644
--- a/cpp/src/qpid/ha/Membership.cpp
+++ b/cpp/src/qpid/ha/Membership.cpp
@@ -144,9 +144,19 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
}
} // namespace
+void Membership::addCallback(UpdateCallback cb) {
+ Mutex::ScopedLock l(lock);
+ callbacks.push_back(cb);
+ cb(brokers); // Give an initial update.
+}
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
+
+ // Call callbacks
+ for_each(callbacks.begin(), callbacks.end(),
+ boost::bind<void>(&UpdateCallback::operator(), _1, boost::cref(brokers)));
+
// Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h
index 828f9e8403..f8b7f7f8ad 100644
--- a/cpp/src/qpid/ha/Membership.h
+++ b/cpp/src/qpid/ha/Membership.h
@@ -61,6 +61,11 @@ class Membership
void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
+ /** Call callback when membership changes.
+ * NOTE: called with Membership lock held.
+ */
+ typedef boost::function<void(const BrokerInfo::Map&)> UpdateCallback;
+ void addCallback(UpdateCallback);
void clear(); ///< Clear all but self.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
@@ -94,6 +99,7 @@ class Membership
const types::Uuid self;
BrokerInfo::Map brokers;
BrokerStatus oldStatus;
+ std::vector<UpdateCallback> callbacks;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 796de0341d..c71342cbc6 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -301,6 +301,7 @@ void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLoc
backup->cancel();
expectedBackups.erase(backup);
backups.erase(id);
+ membership.remove(id);
}
@@ -387,14 +388,12 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
}
void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
- QPID_LOG(debug, logPrefix << "Started TX transaction");
shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
observer->initialize();
tx->setObserver(observer);
}
void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
- QPID_LOG(debug, logPrefix << "Started DTX transaction");
shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
observer->initialize();
dtx->setObserver(observer);
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 96be5e552e..7307f15fbe 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include <boost/lexical_cast.hpp>
+#include <algorithm>
namespace qpid {
namespace framing {
@@ -87,9 +88,11 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
// when the tx-queue is deleted.
//
BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups());
- std::transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()),
- boost::bind(&BrokerInfo::getSystemId, _1));
- QPID_LOG(debug, logPrefix << "Started on " << backups);
+ std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()),
+ boost::bind(&BrokerInfo::getSystemId, _1));
+
+ QPID_LOG(debug, logPrefix << "Started TX " << id);
+ QPID_LOG(debug, logPrefix << "Members: " << members);
pair<QueuePtr, bool> result =
broker.getQueues().declare(
@@ -97,8 +100,15 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
QueueSettings(/*durable*/false, /*autodelete*/true));
assert(result.second);
txQueue = result.first;
+ txQueue->deliver(TxMembersEvent(members).message());
+ // Do this last, it will start concurrent callbacks.
+ haBroker.getMembership().addCallback(
+ boost::bind(&PrimaryTxObserver::membership, this, _1));
}
+PrimaryTxObserver::~PrimaryTxObserver() {}
+
+
void PrimaryTxObserver::initialize() {
broker.getExchanges().registerExchange(
boost::shared_ptr<Exchange>(new Exchange(shared_from_this())));
@@ -125,7 +135,7 @@ void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
assert(primary);
// Tell replicating subscriptions to skip IDs in the transaction.
- for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
+ for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
primary->skip(*b, q->first, q->second);
}
@@ -136,7 +146,8 @@ bool PrimaryTxObserver::prepare() {
QPID_LOG(debug, logPrefix << "Prepare");
deduplicate(l);
txQueue->deliver(TxPrepareEvent().message());
- while (!isPrepared(l)) lock.wait();
+ while (prepared != members && !failed)
+ lock.wait();
return !failed;
}
@@ -144,12 +155,20 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
+ destroy();
}
void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
+ destroy();
+}
+
+void PrimaryTxObserver::destroy() {
+ // Destroying the queue will result in destruction of this when
+ // the queues observer references are cleared.
+ haBroker.deleteQueue(txQueue->getName());
}
void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
@@ -169,8 +188,15 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
lock.notify();
}
-bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) {
- return (prepared == backups || failed);
+void PrimaryTxObserver::membership(const BrokerInfo::Map& update) {
+ sys::Mutex::ScopedLock l(lock);
+ for (UuidSet::iterator i = members.begin(); i != members.end(); ++i)
+ if (!update.count(*i)) { // A broker is down
+ failed = true;
+ lock.notify();
+ return;
+ }
}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h
index 413c627d68..2c207c55fa 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -63,6 +63,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
{
public:
PrimaryTxObserver(HaBroker&);
+ ~PrimaryTxObserver();
/** Call immediately after constructor, uses shared_from_this. */
void initialize();
@@ -80,11 +81,13 @@ class PrimaryTxObserver : public broker::TransactionObserver,
typedef qpid::sys::unordered_map<
QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap;
+ void membership(const BrokerInfo::Map&);
void deduplicate(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
void txPrepareFailEvent(const std::string& data);
void consumerRemoved(const broker::Consumer&);
bool isPrepared(sys::Mutex::ScopedLock&);
+ void destroy();
sys::Monitor lock;
std::string logPrefix;
@@ -94,7 +97,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueuePtr txQueue;
QueueIdsMap enqueues;
bool failed;
- UuidSet backups;
+ UuidSet members;
UuidSet prepared;
};
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index a4dd656766..ff32dfef16 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -182,7 +182,6 @@ void QueueReplicator::destroy() {
bridge2 = bridge; // call close outside the lock.
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
- link.reset();
bridge.reset();
getBroker()->getExchanges().destroy(getName());
}
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index bbc193ac78..71993bcb12 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -43,7 +43,7 @@ class Buffer;
namespace ha {
class QueueGuard;
class HaBroker;
-struct Event;
+class Event;
class Primary;
/**
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp
index 2c45752dcc..c59b54386d 100644
--- a/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/cpp/src/qpid/ha/TxReplicator.cpp
@@ -81,13 +81,11 @@ TxReplicator::TxReplicator(
channel(link->nextChannel()),
dequeueState(hb.getBroker().getQueues())
{
- string shortId = getTxId(txQueue->getName()).substr(0, 8);
+ string id(getTxId(txQueue->getName()));
+ string shortId = id.substr(0, 8);
logPrefix = "Backup of transaction "+shortId+": ";
-
+ QPID_LOG(debug, logPrefix << "Started TX " << id);
if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
- boost::shared_ptr<Backup> backup = boost::dynamic_pointer_cast<Backup>(hb.getRole());
- if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode."));
- brokerReplicator = backup->getBrokerReplicator();
// Dispatch transaction events.
dispatch[TxEnqueueEvent::KEY] =
@@ -100,6 +98,8 @@ TxReplicator::TxReplicator(
boost::bind(&TxReplicator::commit, this, _1, _2);
dispatch[TxRollbackEvent::KEY] =
boost::bind(&TxReplicator::rollback, this, _1, _2);
+ dispatch[TxMembersEvent::KEY] =
+ boost::bind(&TxReplicator::members, this, _1, _2);
}
TxReplicator::~TxReplicator() {
@@ -130,15 +130,6 @@ void TxReplicator::deliver(const broker::Message& m_) {
dm.deliverTo(queue);
}
-void TxReplicator::destroy() {
- {
- sys::Mutex::ScopedLock l(lock);
- if (context.get()) store->abort(*context);
- txBuffer->rollback();
- }
- QueueReplicator::destroy();
-}
-
void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
sys::Mutex::ScopedLock l(lock);
TxEnqueueEvent e;
@@ -209,24 +200,27 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
}
}
-void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
QPID_LOG(debug, logPrefix << "Commit");
if (context.get()) store->commit(*context);
txBuffer->commit();
- end(l);
}
-void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
- end(l);
}
-void TxReplicator::end(sys::Mutex::ScopedLock&) {
- // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- // FIXME aconway 2013-08-01: what about connection & user ID for destroy?
- haBroker.getBroker().getQueues().destroy(queue->getName());
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
+ TxMembersEvent e;
+ decodeStr(data, e);
+ QPID_LOG(debug, logPrefix << "Members: " << e.members);
+ if (!e.members.count(haBroker.getMembership().getSelf())) {
+ QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
+ // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
+ haBroker.deleteQueue(getQueue()->getName());
+ }
}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h
index 41555ef094..10d7466d0c 100644
--- a/cpp/src/qpid/ha/TxReplicator.h
+++ b/cpp/src/qpid/ha/TxReplicator.h
@@ -65,7 +65,6 @@ class TxReplicator : public QueueReplicator {
protected:
void deliver(const broker::Message&);
- void destroy();
private:
@@ -80,13 +79,12 @@ class TxReplicator : public QueueReplicator {
void prepare(const std::string& data, sys::Mutex::ScopedLock&);
void commit(const std::string& data, sys::Mutex::ScopedLock&);
void rollback(const std::string& data, sys::Mutex::ScopedLock&);
- void end(sys::Mutex::ScopedLock&);
+ void members(const std::string& data, sys::Mutex::ScopedLock&);
std::string logPrefix;
TxEnqueueEvent enq; // Enqueue data for next deliver.
boost::shared_ptr<broker::TxBuffer> txBuffer;
broker::MessageStore* store;
- boost::shared_ptr<BrokerReplicator> brokerReplicator;
std::auto_ptr<broker::TransactionContext> context;
framing::ChannelId channel; // Channel to send prepare-complete.
diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp
index 500a35051a..c02ae33470 100644
--- a/cpp/src/qpid/ha/types.cpp
+++ b/cpp/src/qpid/ha/types.cpp
@@ -108,6 +108,24 @@ std::ostream& operator<<(std::ostream& o, const LogMessageId& m) {
return o << m.queue << "[" << m.position << "]=" << m.replicationId;
}
+void UuidSet::encode(framing::Buffer& b) const {
+ b.putLong(size());
+ for (const_iterator i = begin(); i != end(); ++i)
+ b.putRawData(i->data(), i->size());
+}
+
+void UuidSet::decode(framing::Buffer& b) {
+ size_t n = b.getLong();
+ for ( ; n > 0; --n) {
+ types::Uuid id;
+ b.getRawData(const_cast<unsigned char*>(id.data()), id.size());
+ insert(id);
+ }
+}
+
+size_t UuidSet::encodedSize() const {
+ return sizeof(uint32_t) + size()*16;
+}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h
index 91e43137d2..92157d411b 100644
--- a/cpp/src/qpid/ha/types.h
+++ b/cpp/src/qpid/ha/types.h
@@ -116,8 +116,13 @@ extern const char* TRANSACTION_REPLICATOR_PREFIX;
bool startsWith(const std::string& name, const std::string& prefix);
-/** Define IdSet type, not a typedef so we can overload operator << */
-class UuidSet : public std::set<types::Uuid> {};
+/** Define IdSet type, not a typedef so we can overload operator << and add encoding.*/
+class UuidSet : public std::set<types::Uuid> {
+ public:
+ void encode(framing::Buffer&) const;
+ void decode(framing::Buffer&);
+ size_t encodedSize() const;
+};
std::ostream& operator<<(std::ostream& o, const UuidSet& ids);
diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py
index cceb9795eb..ab63602655 100755
--- a/cpp/src/tests/ha_test.py
+++ b/cpp/src/tests/ha_test.py
@@ -48,6 +48,9 @@ class QmfAgent(object):
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
+ def get_queues(self):
+ return [q.values['name'] for q in self._agent.getAllQueues()]
+
def __getattr__(self, name):
a = getattr(self._agent, name)
return a
@@ -107,7 +110,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=trace+:ha::", # FIXME aconway 2013-07-29: debug
+ "--log-enable=debug+:ha::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
@@ -276,8 +279,8 @@ class HaCluster(object):
@s_args: args for specific brokers: s_args[i] for broker i.
"""
self.test = test
- self.args = args
- self.s_args = s_args
+ self.args = copy(args)
+ self.s_args = copy(s_args)
self.kwargs = kwargs
self._ports = [HaPort(test) for i in xrange(n)]
self._set_url()
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index e97614d785..e1ad5cc4fa 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError
from qpid.datatypes import uuid4, UUID
from brokertest import *
from ha_test import *
@@ -1278,7 +1278,7 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q1", ["x","y","z"])
cluster[1].assert_browse_backup("q1", ["x","y","z"])
- sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over!
+ sn = cluster[0].connect(heartbeat=1).session()
sn.sender("ex/k1").send("boo")
cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1382,18 +1382,48 @@ class TransactionTests(BrokerTest):
self.assertEqual(open_read(cluster[1].store_log), expect)
def test_tx_backup_fail(self):
- # FIXME aconway 2013-07-31: check exception types, reduce timeout.
cluster = HaCluster(
self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
c = cluster[0].connect()
tx = c.session(transactional=True)
s = tx.sender("q;{create:always,node:{durable:true}}")
for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
- self.assertRaises(Exception, tx.commit)
+ self.assertRaises(ServerError, tx.commit)
for b in cluster: b.assert_browse_backup("q", [])
self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
+ def test_tx_join_leave(self):
+ """Test cluster members joining/leaving cluster.
+ Also check that tx-queues are cleaned up at end of transaction."""
+
+ def tx_queues(broker):
+ return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")]
+
+ cluster = HaCluster(self, 3)
+
+ # Leaving
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("a", sync=True)
+ self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster])
+ cluster[1].kill(final=False)
+ s.send("b")
+ self.assertRaises(ServerError, tx.commit)
+ self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]])
+
+ # Joining
+ tx = cluster[0].connect().session(transactional=True)
+ s = tx.sender("q;{create:always}")
+ s.send("foo")
+ tx_q = tx_queues(cluster[0])[0]
+ cluster.restart(1)
+ # Verify the new member should not be in the transaction.
+ # but should receive the result of the transaction via normal replication.
+ cluster[1].wait_no_queue(tx_q)
+ tx.commit()
+ for b in cluster: b.assert_browse_backup("q", ["foo"])
+
if __name__ == "__main__":
outdir = "ha_tests.tmp"
shutil.rmtree(outdir, True)