summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/TxReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/TxReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp58
1 files changed, 35 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 63301a92f5..95afdb9759 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -26,6 +26,7 @@
#include "BrokerReplicator.h"
#include "Event.h"
#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
#include "types.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -53,10 +54,7 @@ using qpid::broker::amqp_0_10::MessageTransfer;
using qpid::types::Uuid;
namespace {
-const string QPID_HA(QPID_HA_PREFIX);
-const string TYPE_NAME(QPID_HA+"tx-replicator");
const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
-
} // namespace
@@ -70,17 +68,16 @@ string TxReplicator::getTxId(const string& q) {
return q.substr(PREFIX.size());
}
-string TxReplicator::getType() const { return TYPE_NAME; }
+string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
TxReplicator::TxReplicator(
HaBroker& hb,
const boost::shared_ptr<broker::Queue>& txQueue,
const boost::shared_ptr<broker::Link>& link) :
QueueReplicator(hb, txQueue, link),
- txBuffer(new broker::TxBuffer),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- complete(false), ignore(false),
+ ended(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -100,8 +97,8 @@ TxReplicator::TxReplicator(
boost::bind(&TxReplicator::commit, this, _1, _2);
dispatch[TxRollbackEvent::KEY] =
boost::bind(&TxReplicator::rollback, this, _1, _2);
- dispatch[TxMembersEvent::KEY] =
- boost::bind(&TxReplicator::members, this, _1, _2);
+ dispatch[TxBackupsEvent::KEY] =
+ boost::bind(&TxReplicator::backups, this, _1, _2);
}
TxReplicator::~TxReplicator() {
@@ -121,11 +118,12 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc
}
void TxReplicator::route(broker::Deliverable& deliverable) {
- if (!ignore) QueueReplicator::route(deliverable);
+ QueueReplicator::route(deliverable);
}
void TxReplicator::deliver(const broker::Message& m_) {
sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
// Deliver message to the target queue, not the tx-queue.
broker::Message m(m_);
m.setReplicationId(enq.id); // Use replicated id.
@@ -138,6 +136,7 @@ void TxReplicator::deliver(const broker::Message& m_) {
void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
TxEnqueueEvent e;
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Enqueue: " << e);
@@ -146,6 +145,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
TxDequeueEvent e;
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Dequeue: " << e);
@@ -195,18 +195,20 @@ boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
}
void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
txBuffer->enlist(dequeueState.makeAccept());
context = store->begin();
if (txBuffer->prepare(context.get())) {
- QPID_LOG(debug, logPrefix << "Prepared OK");
+ QPID_LOG(debug, logPrefix << "Local prepare OK");
sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
} else {
- QPID_LOG(debug, logPrefix << "Prepare failed");
+ QPID_LOG(debug, logPrefix << "Local prepare failed");
sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
}
}
void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
QPID_LOG(debug, logPrefix << "Commit");
if (context.get()) store->commit(*context);
txBuffer->commit();
@@ -214,34 +216,44 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
}
void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
end(l);
}
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
- TxMembersEvent e;
+void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) {
+ TxBackupsEvent e;
decodeStr(data, e);
- QPID_LOG(debug, logPrefix << "Members: " << e.members);
- if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) {
+ if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
QPID_LOG(info, logPrefix << "Not participating in transaction");
- ignore = true;
+ end(l);
+ } else {
+ QPID_LOG(debug, logPrefix << "Backups: " << e.backups);
+ txBuffer = new broker::TxBuffer;
}
}
void TxReplicator::end(sys::Mutex::ScopedLock&) {
- complete = true;
- if (!getQueue()) return; // Already destroyed
- // Destroy will cancel the subscription to the primary tx-queue which
- // informs the primary that we have completed the transaction.
- destroy();
+ ended = true;
+ txBuffer.reset();
+ // QueueReplicator::destroy cancels subscription to the primary tx-queue
+ // which allows the primary to clean up resources.
+ sys::Mutex::ScopedUnlock u(lock);
+ QueueReplicator::destroy();
}
+// Called when the tx queue is deleted.
void TxReplicator::destroy() {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!ended) {
+ QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback.");
+ rollback(string(), l);
+ }
+ }
QueueReplicator::destroy();
- sys::Mutex::ScopedLock l(lock);
- if (!ignore && !complete) rollback(string(), l);
}
}} // namespace qpid::ha