summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
committerAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
commit66084bcbe57e598b857edcfab669849b9c7ebc9d (patch)
treec0b1ee7e738a301db93784134841d208d856bd65 /cpp/src/qpid
parentbddb4f43f3592d741ef2574adfb7fa7ee7ee5c11 (diff)
downloadqpid-python-66084bcbe57e598b857edcfab669849b9c7ebc9d.tar.gz
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction: - A broker leaving aborts the transaction. - A broker joining does not participate in the transaction - but does receive the results of the TX via normal replication. Clean up tx-queues when the transaction completes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1510678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--cpp/src/qpid/ha/Event.cpp9
-rw-r--r--cpp/src/qpid/ha/Event.h16
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp4
-rw-r--r--cpp/src/qpid/ha/HaBroker.h4
-rw-r--r--cpp/src/qpid/ha/Membership.cpp10
-rw-r--r--cpp/src/qpid/ha/Membership.h6
-rw-r--r--cpp/src/qpid/ha/Primary.cpp3
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp40
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.h5
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp1
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h2
-rw-r--r--cpp/src/qpid/ha/TxReplicator.cpp38
-rw-r--r--cpp/src/qpid/ha/TxReplicator.h4
-rw-r--r--cpp/src/qpid/ha/types.cpp18
-rw-r--r--cpp/src/qpid/ha/types.h9
16 files changed, 125 insertions, 46 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 47860b433f..684f408c4b 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -793,7 +793,7 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
// messages. Any reroutes will be done at the primary and
// replicated as normal.
if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
- broker.deleteQueue(name, userId, remoteHost);
+ haBroker.deleteQueue(name, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
}
}
diff --git a/cpp/src/qpid/ha/Event.cpp b/cpp/src/qpid/ha/Event.cpp
index 25e7947267..8265a6edd3 100644
--- a/cpp/src/qpid/ha/Event.cpp
+++ b/cpp/src/qpid/ha/Event.cpp
@@ -44,13 +44,14 @@ bool isEventKey(const std::string& key) {
const string DequeueEvent::KEY(QPID_HA+"de");
const string IdEvent::KEY(QPID_HA+"id");
-const string TxEnqueueEvent::KEY(QPID_HA+"txen");
-const string TxDequeueEvent::KEY(QPID_HA+"txde");
-const string TxPrepareEvent::KEY(QPID_HA+"txpr");
-const string TxCommitEvent::KEY(QPID_HA+"txcm");
+const string TxEnqueueEvent::KEY(QPID_HA+"txenq");
+const string TxDequeueEvent::KEY(QPID_HA+"txdeq");
+const string TxPrepareEvent::KEY(QPID_HA+"txpre");
+const string TxCommitEvent::KEY(QPID_HA+"txcom");
const string TxRollbackEvent::KEY(QPID_HA+"txrb");
const string TxPrepareOkEvent::KEY(QPID_HA+"txok");
const string TxPrepareFailEvent::KEY(QPID_HA+"txno");
+const string TxMembersEvent::KEY(QPID_HA+"txmem");
broker::Message makeMessage(
const string& data, const string& destination, const string& routingKey)
diff --git a/cpp/src/qpid/ha/Event.h b/cpp/src/qpid/ha/Event.h
index daaa6eada3..f292499e7b 100644
--- a/cpp/src/qpid/ha/Event.h
+++ b/cpp/src/qpid/ha/Event.h
@@ -43,7 +43,8 @@ broker::Message makeMessage(
bool isEventKey(const std::string& key);
/** Base class for encodable events */
-struct Event {
+class Event {
+ public:
virtual ~Event() {}
virtual void encode(framing::Buffer& buffer) const = 0;
virtual void decode(framing::Buffer& buffer) = 0;
@@ -62,7 +63,8 @@ inline std::ostream& operator<<(std::ostream& o, const Event& e) {
}
/** Event base template */
-template <class Derived> struct EventBase : public Event {
+template <class Derived> class EventBase : public Event {
+ public:
std::string key() const { return Derived::KEY; }
};
@@ -176,6 +178,16 @@ struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> {
void print(std::ostream& o) const { o << broker; }
};
+struct TxMembersEvent : public EventBase<TxMembersEvent> {
+ static const std::string KEY;
+ UuidSet members;
+ TxMembersEvent(const UuidSet& s=UuidSet()) : members(s) {}
+ void encode(framing::Buffer& b) const { b.put(members); }
+ void decode(framing::Buffer& b) { b.get(members); }
+ size_t encodedSize() const { return members.encodedSize(); }
+ void print(std::ostream& o) const { o << members; }
+};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_EVENT_H*/
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index 5896a5568a..635aa1f65c 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -221,4 +221,8 @@ boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::stri
broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
}
+void HaBroker::deleteQueue(const string& name, const string& connectionId) {
+ broker.deleteQueue(name, settings.username, connectionId);
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 6084db3fc3..c18e44d949 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -102,6 +102,10 @@ class HaBroker : public management::Manageable
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
+ /**@param connectionId optional, only available on backup */
+ void deleteQueue(const std::string& name,
+ const std::string& connectionId=std::string());
+
private:
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp
index af57145cc2..ddb603ca14 100644
--- a/cpp/src/qpid/ha/Membership.cpp
+++ b/cpp/src/qpid/ha/Membership.cpp
@@ -144,9 +144,19 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
}
} // namespace
+void Membership::addCallback(UpdateCallback cb) {
+ Mutex::ScopedLock l(lock);
+ callbacks.push_back(cb);
+ cb(brokers); // Give an initial update.
+}
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
+
+ // Call callbacks
+ for_each(callbacks.begin(), callbacks.end(),
+ boost::bind<void>(&UpdateCallback::operator(), _1, boost::cref(brokers)));
+
// Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h
index 828f9e8403..f8b7f7f8ad 100644
--- a/cpp/src/qpid/ha/Membership.h
+++ b/cpp/src/qpid/ha/Membership.h
@@ -61,6 +61,11 @@ class Membership
void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
+ /** Call callback when membership changes.
+ * NOTE: called with Membership lock held.
+ */
+ typedef boost::function<void(const BrokerInfo::Map&)> UpdateCallback;
+ void addCallback(UpdateCallback);
void clear(); ///< Clear all but self.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
@@ -94,6 +99,7 @@ class Membership
const types::Uuid self;
BrokerInfo::Map brokers;
BrokerStatus oldStatus;
+ std::vector<UpdateCallback> callbacks;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 796de0341d..c71342cbc6 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -301,6 +301,7 @@ void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLoc
backup->cancel();
expectedBackups.erase(backup);
backups.erase(id);
+ membership.remove(id);
}
@@ -387,14 +388,12 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
}
void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
- QPID_LOG(debug, logPrefix << "Started TX transaction");
shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
observer->initialize();
tx->setObserver(observer);
}
void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
- QPID_LOG(debug, logPrefix << "Started DTX transaction");
shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
observer->initialize();
dtx->setObserver(observer);
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 96be5e552e..7307f15fbe 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include <boost/lexical_cast.hpp>
+#include <algorithm>
namespace qpid {
namespace framing {
@@ -87,9 +88,11 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
// when the tx-queue is deleted.
//
BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups());
- std::transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()),
- boost::bind(&BrokerInfo::getSystemId, _1));
- QPID_LOG(debug, logPrefix << "Started on " << backups);
+ std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()),
+ boost::bind(&BrokerInfo::getSystemId, _1));
+
+ QPID_LOG(debug, logPrefix << "Started TX " << id);
+ QPID_LOG(debug, logPrefix << "Members: " << members);
pair<QueuePtr, bool> result =
broker.getQueues().declare(
@@ -97,8 +100,15 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
QueueSettings(/*durable*/false, /*autodelete*/true));
assert(result.second);
txQueue = result.first;
+ txQueue->deliver(TxMembersEvent(members).message());
+ // Do this last, it will start concurrent callbacks.
+ haBroker.getMembership().addCallback(
+ boost::bind(&PrimaryTxObserver::membership, this, _1));
}
+PrimaryTxObserver::~PrimaryTxObserver() {}
+
+
void PrimaryTxObserver::initialize() {
broker.getExchanges().registerExchange(
boost::shared_ptr<Exchange>(new Exchange(shared_from_this())));
@@ -125,7 +135,7 @@ void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
assert(primary);
// Tell replicating subscriptions to skip IDs in the transaction.
- for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
+ for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
primary->skip(*b, q->first, q->second);
}
@@ -136,7 +146,8 @@ bool PrimaryTxObserver::prepare() {
QPID_LOG(debug, logPrefix << "Prepare");
deduplicate(l);
txQueue->deliver(TxPrepareEvent().message());
- while (!isPrepared(l)) lock.wait();
+ while (prepared != members && !failed)
+ lock.wait();
return !failed;
}
@@ -144,12 +155,20 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
+ destroy();
}
void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
+ destroy();
+}
+
+void PrimaryTxObserver::destroy() {
+ // Destroying the queue will result in destruction of this when
+ // the queues observer references are cleared.
+ haBroker.deleteQueue(txQueue->getName());
}
void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
@@ -169,8 +188,15 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
lock.notify();
}
-bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) {
- return (prepared == backups || failed);
+void PrimaryTxObserver::membership(const BrokerInfo::Map& update) {
+ sys::Mutex::ScopedLock l(lock);
+ for (UuidSet::iterator i = members.begin(); i != members.end(); ++i)
+ if (!update.count(*i)) { // A broker is down
+ failed = true;
+ lock.notify();
+ return;
+ }
}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h
index 413c627d68..2c207c55fa 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -63,6 +63,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
{
public:
PrimaryTxObserver(HaBroker&);
+ ~PrimaryTxObserver();
/** Call immediately after constructor, uses shared_from_this. */
void initialize();
@@ -80,11 +81,13 @@ class PrimaryTxObserver : public broker::TransactionObserver,
typedef qpid::sys::unordered_map<
QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap;
+ void membership(const BrokerInfo::Map&);
void deduplicate(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
void txPrepareFailEvent(const std::string& data);
void consumerRemoved(const broker::Consumer&);
bool isPrepared(sys::Mutex::ScopedLock&);
+ void destroy();
sys::Monitor lock;
std::string logPrefix;
@@ -94,7 +97,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueuePtr txQueue;
QueueIdsMap enqueues;
bool failed;
- UuidSet backups;
+ UuidSet members;
UuidSet prepared;
};
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index a4dd656766..ff32dfef16 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -182,7 +182,6 @@ void QueueReplicator::destroy() {
bridge2 = bridge; // call close outside the lock.
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
- link.reset();
bridge.reset();
getBroker()->getExchanges().destroy(getName());
}
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index bbc193ac78..71993bcb12 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -43,7 +43,7 @@ class Buffer;
namespace ha {
class QueueGuard;
class HaBroker;
-struct Event;
+class Event;
class Primary;
/**
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp
index 2c45752dcc..c59b54386d 100644
--- a/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/cpp/src/qpid/ha/TxReplicator.cpp
@@ -81,13 +81,11 @@ TxReplicator::TxReplicator(
channel(link->nextChannel()),
dequeueState(hb.getBroker().getQueues())
{
- string shortId = getTxId(txQueue->getName()).substr(0, 8);
+ string id(getTxId(txQueue->getName()));
+ string shortId = id.substr(0, 8);
logPrefix = "Backup of transaction "+shortId+": ";
-
+ QPID_LOG(debug, logPrefix << "Started TX " << id);
if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
- boost::shared_ptr<Backup> backup = boost::dynamic_pointer_cast<Backup>(hb.getRole());
- if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode."));
- brokerReplicator = backup->getBrokerReplicator();
// Dispatch transaction events.
dispatch[TxEnqueueEvent::KEY] =
@@ -100,6 +98,8 @@ TxReplicator::TxReplicator(
boost::bind(&TxReplicator::commit, this, _1, _2);
dispatch[TxRollbackEvent::KEY] =
boost::bind(&TxReplicator::rollback, this, _1, _2);
+ dispatch[TxMembersEvent::KEY] =
+ boost::bind(&TxReplicator::members, this, _1, _2);
}
TxReplicator::~TxReplicator() {
@@ -130,15 +130,6 @@ void TxReplicator::deliver(const broker::Message& m_) {
dm.deliverTo(queue);
}
-void TxReplicator::destroy() {
- {
- sys::Mutex::ScopedLock l(lock);
- if (context.get()) store->abort(*context);
- txBuffer->rollback();
- }
- QueueReplicator::destroy();
-}
-
void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
sys::Mutex::ScopedLock l(lock);
TxEnqueueEvent e;
@@ -209,24 +200,27 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
}
}
-void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
QPID_LOG(debug, logPrefix << "Commit");
if (context.get()) store->commit(*context);
txBuffer->commit();
- end(l);
}
-void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
- end(l);
}
-void TxReplicator::end(sys::Mutex::ScopedLock&) {
- // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- // FIXME aconway 2013-08-01: what about connection & user ID for destroy?
- haBroker.getBroker().getQueues().destroy(queue->getName());
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
+ TxMembersEvent e;
+ decodeStr(data, e);
+ QPID_LOG(debug, logPrefix << "Members: " << e.members);
+ if (!e.members.count(haBroker.getMembership().getSelf())) {
+ QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
+ // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
+ haBroker.deleteQueue(getQueue()->getName());
+ }
}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h
index 41555ef094..10d7466d0c 100644
--- a/cpp/src/qpid/ha/TxReplicator.h
+++ b/cpp/src/qpid/ha/TxReplicator.h
@@ -65,7 +65,6 @@ class TxReplicator : public QueueReplicator {
protected:
void deliver(const broker::Message&);
- void destroy();
private:
@@ -80,13 +79,12 @@ class TxReplicator : public QueueReplicator {
void prepare(const std::string& data, sys::Mutex::ScopedLock&);
void commit(const std::string& data, sys::Mutex::ScopedLock&);
void rollback(const std::string& data, sys::Mutex::ScopedLock&);
- void end(sys::Mutex::ScopedLock&);
+ void members(const std::string& data, sys::Mutex::ScopedLock&);
std::string logPrefix;
TxEnqueueEvent enq; // Enqueue data for next deliver.
boost::shared_ptr<broker::TxBuffer> txBuffer;
broker::MessageStore* store;
- boost::shared_ptr<BrokerReplicator> brokerReplicator;
std::auto_ptr<broker::TransactionContext> context;
framing::ChannelId channel; // Channel to send prepare-complete.
diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp
index 500a35051a..c02ae33470 100644
--- a/cpp/src/qpid/ha/types.cpp
+++ b/cpp/src/qpid/ha/types.cpp
@@ -108,6 +108,24 @@ std::ostream& operator<<(std::ostream& o, const LogMessageId& m) {
return o << m.queue << "[" << m.position << "]=" << m.replicationId;
}
+void UuidSet::encode(framing::Buffer& b) const {
+ b.putLong(size());
+ for (const_iterator i = begin(); i != end(); ++i)
+ b.putRawData(i->data(), i->size());
+}
+
+void UuidSet::decode(framing::Buffer& b) {
+ size_t n = b.getLong();
+ for ( ; n > 0; --n) {
+ types::Uuid id;
+ b.getRawData(const_cast<unsigned char*>(id.data()), id.size());
+ insert(id);
+ }
+}
+
+size_t UuidSet::encodedSize() const {
+ return sizeof(uint32_t) + size()*16;
+}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h
index 91e43137d2..92157d411b 100644
--- a/cpp/src/qpid/ha/types.h
+++ b/cpp/src/qpid/ha/types.h
@@ -116,8 +116,13 @@ extern const char* TRANSACTION_REPLICATOR_PREFIX;
bool startsWith(const std::string& name, const std::string& prefix);
-/** Define IdSet type, not a typedef so we can overload operator << */
-class UuidSet : public std::set<types::Uuid> {};
+/** Define IdSet type, not a typedef so we can overload operator << and add encoding.*/
+class UuidSet : public std::set<types::Uuid> {
+ public:
+ void encode(framing::Buffer&) const;
+ void decode(framing::Buffer&);
+ size_t encodedSize() const;
+};
std::ostream& operator<<(std::ostream& o, const UuidSet& ids);