diff options
| author | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
| commit | 66084bcbe57e598b857edcfab669849b9c7ebc9d (patch) | |
| tree | c0b1ee7e738a301db93784134841d208d856bd65 /cpp/src/qpid | |
| parent | bddb4f43f3592d741ef2574adfb7fa7ee7ee5c11 (diff) | |
| download | qpid-python-66084bcbe57e598b857edcfab669849b9c7ebc9d.tar.gz | |
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction:
- A broker leaving aborts the transaction.
- A broker joining does not participate in the transaction
- but does receive the results of the TX via normal replication.
Clean up tx-queues when the transaction completes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1510678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Event.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Event.h | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Membership.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Membership.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/PrimaryTxObserver.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/PrimaryTxObserver.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/TxReplicator.cpp | 38 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/TxReplicator.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/types.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/types.h | 9 |
16 files changed, 125 insertions, 46 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 47860b433f..684f408c4b 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -793,7 +793,7 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { // messages. Any reroutes will be done at the primary and // replicated as normal. if (purge) queue->purge(0, boost::shared_ptr<Exchange>()); - broker.deleteQueue(name, userId, remoteHost); + haBroker.deleteQueue(name, remoteHost); QPID_LOG(debug, logPrefix << "Queue deleted: " << name); } } diff --git a/cpp/src/qpid/ha/Event.cpp b/cpp/src/qpid/ha/Event.cpp index 25e7947267..8265a6edd3 100644 --- a/cpp/src/qpid/ha/Event.cpp +++ b/cpp/src/qpid/ha/Event.cpp @@ -44,13 +44,14 @@ bool isEventKey(const std::string& key) { const string DequeueEvent::KEY(QPID_HA+"de"); const string IdEvent::KEY(QPID_HA+"id"); -const string TxEnqueueEvent::KEY(QPID_HA+"txen"); -const string TxDequeueEvent::KEY(QPID_HA+"txde"); -const string TxPrepareEvent::KEY(QPID_HA+"txpr"); -const string TxCommitEvent::KEY(QPID_HA+"txcm"); +const string TxEnqueueEvent::KEY(QPID_HA+"txenq"); +const string TxDequeueEvent::KEY(QPID_HA+"txdeq"); +const string TxPrepareEvent::KEY(QPID_HA+"txpre"); +const string TxCommitEvent::KEY(QPID_HA+"txcom"); const string TxRollbackEvent::KEY(QPID_HA+"txrb"); const string TxPrepareOkEvent::KEY(QPID_HA+"txok"); const string TxPrepareFailEvent::KEY(QPID_HA+"txno"); +const string TxMembersEvent::KEY(QPID_HA+"txmem"); broker::Message makeMessage( const string& data, const string& destination, const string& routingKey) diff --git a/cpp/src/qpid/ha/Event.h b/cpp/src/qpid/ha/Event.h index daaa6eada3..f292499e7b 100644 --- a/cpp/src/qpid/ha/Event.h +++ b/cpp/src/qpid/ha/Event.h @@ -43,7 +43,8 @@ broker::Message makeMessage( bool isEventKey(const std::string& key); /** Base class for encodable events */ -struct Event { +class Event { + public: virtual ~Event() {} virtual void encode(framing::Buffer& buffer) const = 0; virtual void decode(framing::Buffer& buffer) = 0; @@ -62,7 +63,8 @@ inline std::ostream& operator<<(std::ostream& o, const Event& e) { } /** Event base template */ -template <class Derived> struct EventBase : public Event { +template <class Derived> class EventBase : public Event { + public: std::string key() const { return Derived::KEY; } }; @@ -176,6 +178,16 @@ struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> { void print(std::ostream& o) const { o << broker; } }; +struct TxMembersEvent : public EventBase<TxMembersEvent> { + static const std::string KEY; + UuidSet members; + TxMembersEvent(const UuidSet& s=UuidSet()) : members(s) {} + void encode(framing::Buffer& b) const { b.put(members); } + void decode(framing::Buffer& b) { b.get(members); } + size_t encodedSize() const { return members.encodedSize(); } + void print(std::ostream& o) const { o << members; } +}; + }} // namespace qpid::ha #endif /*!QPID_HA_EVENT_H*/ diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 5896a5568a..635aa1f65c 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -221,4 +221,8 @@ boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::stri broker.getExchanges().find(QueueReplicator::replicatorName(queueName))); } +void HaBroker::deleteQueue(const string& name, const string& connectionId) { + broker.deleteQueue(name, settings.username, connectionId); +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h index 6084db3fc3..c18e44d949 100644 --- a/cpp/src/qpid/ha/HaBroker.h +++ b/cpp/src/qpid/ha/HaBroker.h @@ -102,6 +102,10 @@ class HaBroker : public management::Manageable boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName); + /**@param connectionId optional, only available on backup */ + void deleteQueue(const std::string& name, + const std::string& connectionId=std::string()); + private: void setPublicUrl(const Url&); void setBrokerUrl(const Url&); diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp index af57145cc2..ddb603ca14 100644 --- a/cpp/src/qpid/ha/Membership.cpp +++ b/cpp/src/qpid/ha/Membership.cpp @@ -144,9 +144,19 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { } } // namespace +void Membership::addCallback(UpdateCallback cb) { + Mutex::ScopedLock l(lock); + callbacks.push_back(cb); + cb(brokers); // Give an initial update. +} void Membership::update(Mutex::ScopedLock& l) { QPID_LOG(info, "Membership: " << brokers); + + // Call callbacks + for_each(callbacks.begin(), callbacks.end(), + boost::bind<void>(&UpdateCallback::operator(), _1, boost::cref(brokers))); + // Update managment and send update event. BrokerStatus newStatus = getStatus(l); Variant::List brokerList = asList(l); diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h index 828f9e8403..f8b7f7f8ad 100644 --- a/cpp/src/qpid/ha/Membership.h +++ b/cpp/src/qpid/ha/Membership.h @@ -61,6 +61,11 @@ class Membership void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>); + /** Call callback when membership changes. + * NOTE: called with Membership lock held. + */ + typedef boost::function<void(const BrokerInfo::Map&)> UpdateCallback; + void addCallback(UpdateCallback); void clear(); ///< Clear all but self. void add(const BrokerInfo& b); void remove(const types::Uuid& id); @@ -94,6 +99,7 @@ class Membership const types::Uuid self; BrokerInfo::Map brokers; BrokerStatus oldStatus; + std::vector<UpdateCallback> callbacks; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index 796de0341d..c71342cbc6 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -301,6 +301,7 @@ void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLoc backup->cancel(); expectedBackups.erase(backup); backups.erase(id); + membership.remove(id); } @@ -387,14 +388,12 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) } void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { - QPID_LOG(debug, logPrefix << "Started TX transaction"); shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); observer->initialize(); tx->setObserver(observer); } void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) { - QPID_LOG(debug, logPrefix << "Started DTX transaction"); shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); observer->initialize(); dtx->setObserver(observer); diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 96be5e552e..7307f15fbe 100644 --- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include <boost/lexical_cast.hpp> +#include <algorithm> namespace qpid { namespace framing { @@ -87,9 +88,11 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : // when the tx-queue is deleted. // BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups()); - std::transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()), - boost::bind(&BrokerInfo::getSystemId, _1)); - QPID_LOG(debug, logPrefix << "Started on " << backups); + std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()), + boost::bind(&BrokerInfo::getSystemId, _1)); + + QPID_LOG(debug, logPrefix << "Started TX " << id); + QPID_LOG(debug, logPrefix << "Members: " << members); pair<QueuePtr, bool> result = broker.getQueues().declare( @@ -97,8 +100,15 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : QueueSettings(/*durable*/false, /*autodelete*/true)); assert(result.second); txQueue = result.first; + txQueue->deliver(TxMembersEvent(members).message()); + // Do this last, it will start concurrent callbacks. + haBroker.getMembership().addCallback( + boost::bind(&PrimaryTxObserver::membership, this, _1)); } +PrimaryTxObserver::~PrimaryTxObserver() {} + + void PrimaryTxObserver::initialize() { broker.getExchanges().registerExchange( boost::shared_ptr<Exchange>(new Exchange(shared_from_this()))); @@ -125,7 +135,7 @@ void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) { boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())); assert(primary); // Tell replicating subscriptions to skip IDs in the transaction. - for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) + for (UuidSet::iterator b = members.begin(); b != members.end(); ++b) for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) primary->skip(*b, q->first, q->second); } @@ -136,7 +146,8 @@ bool PrimaryTxObserver::prepare() { QPID_LOG(debug, logPrefix << "Prepare"); deduplicate(l); txQueue->deliver(TxPrepareEvent().message()); - while (!isPrepared(l)) lock.wait(); + while (prepared != members && !failed) + lock.wait(); return !failed; } @@ -144,12 +155,20 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); + destroy(); } void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); + destroy(); +} + +void PrimaryTxObserver::destroy() { + // Destroying the queue will result in destruction of this when + // the queues observer references are cleared. + haBroker.deleteQueue(txQueue->getName()); } void PrimaryTxObserver::txPrepareOkEvent(const string& data) { @@ -169,8 +188,15 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) { lock.notify(); } -bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) { - return (prepared == backups || failed); +void PrimaryTxObserver::membership(const BrokerInfo::Map& update) { + sys::Mutex::ScopedLock l(lock); + for (UuidSet::iterator i = members.begin(); i != members.end(); ++i) + if (!update.count(*i)) { // A broker is down + failed = true; + lock.notify(); + return; + } } + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h index 413c627d68..2c207c55fa 100644 --- a/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -63,6 +63,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, { public: PrimaryTxObserver(HaBroker&); + ~PrimaryTxObserver(); /** Call immediately after constructor, uses shared_from_this. */ void initialize(); @@ -80,11 +81,13 @@ class PrimaryTxObserver : public broker::TransactionObserver, typedef qpid::sys::unordered_map< QueuePtr, ReplicationIdSet, boost::hash<QueuePtr> > QueueIdsMap; + void membership(const BrokerInfo::Map&); void deduplicate(sys::Mutex::ScopedLock&); void txPrepareOkEvent(const std::string& data); void txPrepareFailEvent(const std::string& data); void consumerRemoved(const broker::Consumer&); bool isPrepared(sys::Mutex::ScopedLock&); + void destroy(); sys::Monitor lock; std::string logPrefix; @@ -94,7 +97,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, QueuePtr txQueue; QueueIdsMap enqueues; bool failed; - UuidSet backups; + UuidSet members; UuidSet prepared; }; diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index a4dd656766..ff32dfef16 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -182,7 +182,6 @@ void QueueReplicator::destroy() { bridge2 = bridge; // call close outside the lock. // Need to drop shared pointers to avoid pointer cycles keeping this in memory. queue.reset(); - link.reset(); bridge.reset(); getBroker()->getExchanges().destroy(getName()); } diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h index bbc193ac78..71993bcb12 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -43,7 +43,7 @@ class Buffer; namespace ha { class QueueGuard; class HaBroker; -struct Event; +class Event; class Primary; /** diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp index 2c45752dcc..c59b54386d 100644 --- a/cpp/src/qpid/ha/TxReplicator.cpp +++ b/cpp/src/qpid/ha/TxReplicator.cpp @@ -81,13 +81,11 @@ TxReplicator::TxReplicator( channel(link->nextChannel()), dequeueState(hb.getBroker().getQueues()) { - string shortId = getTxId(txQueue->getName()).substr(0, 8); + string id(getTxId(txQueue->getName())); + string shortId = id.substr(0, 8); logPrefix = "Backup of transaction "+shortId+": "; - + QPID_LOG(debug, logPrefix << "Started TX " << id); if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded.")); - boost::shared_ptr<Backup> backup = boost::dynamic_pointer_cast<Backup>(hb.getRole()); - if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode.")); - brokerReplicator = backup->getBrokerReplicator(); // Dispatch transaction events. dispatch[TxEnqueueEvent::KEY] = @@ -100,6 +98,8 @@ TxReplicator::TxReplicator( boost::bind(&TxReplicator::commit, this, _1, _2); dispatch[TxRollbackEvent::KEY] = boost::bind(&TxReplicator::rollback, this, _1, _2); + dispatch[TxMembersEvent::KEY] = + boost::bind(&TxReplicator::members, this, _1, _2); } TxReplicator::~TxReplicator() { @@ -130,15 +130,6 @@ void TxReplicator::deliver(const broker::Message& m_) { dm.deliverTo(queue); } -void TxReplicator::destroy() { - { - sys::Mutex::ScopedLock l(lock); - if (context.get()) store->abort(*context); - txBuffer->rollback(); - } - QueueReplicator::destroy(); -} - void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { sys::Mutex::ScopedLock l(lock); TxEnqueueEvent e; @@ -209,24 +200,27 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { } } -void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { +void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) { QPID_LOG(debug, logPrefix << "Commit"); if (context.get()) store->commit(*context); txBuffer->commit(); - end(l); } -void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { +void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) { QPID_LOG(debug, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); - end(l); } -void TxReplicator::end(sys::Mutex::ScopedLock&) { - // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. - // FIXME aconway 2013-08-01: what about connection & user ID for destroy? - haBroker.getBroker().getQueues().destroy(queue->getName()); +void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { + TxMembersEvent e; + decodeStr(data, e); + QPID_LOG(debug, logPrefix << "Members: " << e.members); + if (!e.members.count(haBroker.getMembership().getSelf())) { + QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating"); + // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. + haBroker.deleteQueue(getQueue()->getName()); + } } }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h index 41555ef094..10d7466d0c 100644 --- a/cpp/src/qpid/ha/TxReplicator.h +++ b/cpp/src/qpid/ha/TxReplicator.h @@ -65,7 +65,6 @@ class TxReplicator : public QueueReplicator { protected: void deliver(const broker::Message&); - void destroy(); private: @@ -80,13 +79,12 @@ class TxReplicator : public QueueReplicator { void prepare(const std::string& data, sys::Mutex::ScopedLock&); void commit(const std::string& data, sys::Mutex::ScopedLock&); void rollback(const std::string& data, sys::Mutex::ScopedLock&); - void end(sys::Mutex::ScopedLock&); + void members(const std::string& data, sys::Mutex::ScopedLock&); std::string logPrefix; TxEnqueueEvent enq; // Enqueue data for next deliver. boost::shared_ptr<broker::TxBuffer> txBuffer; broker::MessageStore* store; - boost::shared_ptr<BrokerReplicator> brokerReplicator; std::auto_ptr<broker::TransactionContext> context; framing::ChannelId channel; // Channel to send prepare-complete. diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp index 500a35051a..c02ae33470 100644 --- a/cpp/src/qpid/ha/types.cpp +++ b/cpp/src/qpid/ha/types.cpp @@ -108,6 +108,24 @@ std::ostream& operator<<(std::ostream& o, const LogMessageId& m) { return o << m.queue << "[" << m.position << "]=" << m.replicationId; } +void UuidSet::encode(framing::Buffer& b) const { + b.putLong(size()); + for (const_iterator i = begin(); i != end(); ++i) + b.putRawData(i->data(), i->size()); +} + +void UuidSet::decode(framing::Buffer& b) { + size_t n = b.getLong(); + for ( ; n > 0; --n) { + types::Uuid id; + b.getRawData(const_cast<unsigned char*>(id.data()), id.size()); + insert(id); + } +} + +size_t UuidSet::encodedSize() const { + return sizeof(uint32_t) + size()*16; +} }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h index 91e43137d2..92157d411b 100644 --- a/cpp/src/qpid/ha/types.h +++ b/cpp/src/qpid/ha/types.h @@ -116,8 +116,13 @@ extern const char* TRANSACTION_REPLICATOR_PREFIX; bool startsWith(const std::string& name, const std::string& prefix); -/** Define IdSet type, not a typedef so we can overload operator << */ -class UuidSet : public std::set<types::Uuid> {}; +/** Define IdSet type, not a typedef so we can overload operator << and add encoding.*/ +class UuidSet : public std::set<types::Uuid> { + public: + void encode(framing::Buffer&) const; + void decode(framing::Buffer&); + size_t encodedSize() const; +}; std::ostream& operator<<(std::ostream& o, const UuidSet& ids); |
