summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-01 20:27:26 +0000
committerAlan Conway <aconway@apache.org>2013-08-01 20:27:26 +0000
commit014f0f39d9cfb6242bea173eadbc0f8229ba5f7f (patch)
tree74c5eba8fe21abb8d8ab00014663b1239e82fdde /qpid/cpp/src/qpid/ha/Primary.cpp
parentdcbe820e4ebdbd4919ebd07b56790e15de3013e9 (diff)
downloadqpid-python-014f0f39d9cfb6242bea173eadbc0f8229ba5f7f.tar.gz
QPID-4327: HA TX transactions: basic replication.
On primary a PrimaryTxObserver observes a transaction's TxBuffer and generates transaction events on a tx-replication-queue. On the backup a TxReplicator receives the events and constructs a TxBuffer equivalent to the one in the primary. Unfinished: - Primary does not wait for backups to prepare() before committing. - All connected backups are assumed to be in the transaction, there are race conditions around brokers joining/leavinv where this assumption is invalid. - Need more tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1509423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp41
1 files changed, 39 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index bae651a3fc..5fd7814d62 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -27,6 +27,7 @@
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
#include "QueueReplicator.h"
+#include "PrimaryTxObserver.h"
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/BrokerObserver.h"
@@ -34,16 +35,19 @@
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace ha {
using sys::Mutex;
using boost::shared_ptr;
+using boost::make_shared;
using namespace std;
using namespace framing;
@@ -67,7 +71,10 @@ class PrimaryBrokerObserver : public broker::BrokerObserver
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
- private:
+ void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
+ void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
+
+ private:
Primary& primary;
};
@@ -208,6 +215,26 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
if (backup) checkReady(backup);
}
+void Primary::addReplica(ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
+}
+
+void Primary::skip(
+ const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids)
+{
+ sys::Mutex::ScopedLock l(lock);
+ ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+ if (i != replicas.end()) i->second->addSkip(ids);
+}
+
+void Primary::removeReplica(const ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+}
+
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
// Set replication argument.
@@ -361,4 +388,14 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
backup->startCatchup();
}
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+ QPID_LOG(trace, logPrefix << "Started TX transaction");
+ tx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+}
+
+void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
+ QPID_LOG(trace, logPrefix << "Started DTX transaction");
+ dtx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker)));
+}
+
}} // namespace qpid::ha