diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/TxReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 63301a92f5..95afdb9759 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -26,6 +26,7 @@ #include "BrokerReplicator.h" #include "Event.h" #include "HaBroker.h" +#include "ReplicatingSubscription.h" #include "types.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -53,10 +54,7 @@ using qpid::broker::amqp_0_10::MessageTransfer; using qpid::types::Uuid; namespace { -const string QPID_HA(QPID_HA_PREFIX); -const string TYPE_NAME(QPID_HA+"tx-replicator"); const string PREFIX(TRANSACTION_REPLICATOR_PREFIX); - } // namespace @@ -70,17 +68,16 @@ string TxReplicator::getTxId(const string& q) { return q.substr(PREFIX.size()); } -string TxReplicator::getType() const { return TYPE_NAME; } +string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; } TxReplicator::TxReplicator( HaBroker& hb, const boost::shared_ptr<broker::Queue>& txQueue, const boost::shared_ptr<broker::Link>& link) : QueueReplicator(hb, txQueue, link), - txBuffer(new broker::TxBuffer), store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), channel(link->nextChannel()), - complete(false), ignore(false), + ended(false), dequeueState(hb.getBroker().getQueues()) { string id(getTxId(txQueue->getName())); @@ -100,8 +97,8 @@ TxReplicator::TxReplicator( boost::bind(&TxReplicator::commit, this, _1, _2); dispatch[TxRollbackEvent::KEY] = boost::bind(&TxReplicator::rollback, this, _1, _2); - dispatch[TxMembersEvent::KEY] = - boost::bind(&TxReplicator::members, this, _1, _2); + dispatch[TxBackupsEvent::KEY] = + boost::bind(&TxReplicator::backups, this, _1, _2); } TxReplicator::~TxReplicator() { @@ -121,11 +118,12 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc } void TxReplicator::route(broker::Deliverable& deliverable) { - if (!ignore) QueueReplicator::route(deliverable); + QueueReplicator::route(deliverable); } void TxReplicator::deliver(const broker::Message& m_) { sys::Mutex::ScopedLock l(lock); + if (!txBuffer) return; // Deliver message to the target queue, not the tx-queue. broker::Message m(m_); m.setReplicationId(enq.id); // Use replicated id. @@ -138,6 +136,7 @@ void TxReplicator::deliver(const broker::Message& m_) { void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { sys::Mutex::ScopedLock l(lock); + if (!txBuffer) return; TxEnqueueEvent e; decodeStr(data, e); QPID_LOG(trace, logPrefix << "Enqueue: " << e); @@ -146,6 +145,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { sys::Mutex::ScopedLock l(lock); + if (!txBuffer) return; TxDequeueEvent e; decodeStr(data, e); QPID_LOG(trace, logPrefix << "Dequeue: " << e); @@ -195,18 +195,20 @@ boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() { } void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { + if (!txBuffer) return; txBuffer->enlist(dequeueState.makeAccept()); context = store->begin(); if (txBuffer->prepare(context.get())) { - QPID_LOG(debug, logPrefix << "Prepared OK"); + QPID_LOG(debug, logPrefix << "Local prepare OK"); sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l); } else { - QPID_LOG(debug, logPrefix << "Prepare failed"); + QPID_LOG(debug, logPrefix << "Local prepare failed"); sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l); } } void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { + if (!txBuffer) return; QPID_LOG(debug, logPrefix << "Commit"); if (context.get()) store->commit(*context); txBuffer->commit(); @@ -214,34 +216,44 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { } void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { + if (!txBuffer) return; QPID_LOG(debug, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); end(l); } -void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { - TxMembersEvent e; +void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) { + TxBackupsEvent e; decodeStr(data, e); - QPID_LOG(debug, logPrefix << "Members: " << e.members); - if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) { + if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) { QPID_LOG(info, logPrefix << "Not participating in transaction"); - ignore = true; + end(l); + } else { + QPID_LOG(debug, logPrefix << "Backups: " << e.backups); + txBuffer = new broker::TxBuffer; } } void TxReplicator::end(sys::Mutex::ScopedLock&) { - complete = true; - if (!getQueue()) return; // Already destroyed - // Destroy will cancel the subscription to the primary tx-queue which - // informs the primary that we have completed the transaction. - destroy(); + ended = true; + txBuffer.reset(); + // QueueReplicator::destroy cancels subscription to the primary tx-queue + // which allows the primary to clean up resources. + sys::Mutex::ScopedUnlock u(lock); + QueueReplicator::destroy(); } +// Called when the tx queue is deleted. void TxReplicator::destroy() { + { + sys::Mutex::ScopedLock l(lock); + if (!ended) { + QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback."); + rollback(string(), l); + } + } QueueReplicator::destroy(); - sys::Mutex::ScopedLock l(lock); - if (!ignore && !complete) rollback(string(), l); } }} // namespace qpid::ha |