diff options
author | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:26 +0000 |
commit | 014f0f39d9cfb6242bea173eadbc0f8229ba5f7f (patch) | |
tree | 74c5eba8fe21abb8d8ab00014663b1239e82fdde /qpid/cpp/src/qpid/ha/Primary.cpp | |
parent | dcbe820e4ebdbd4919ebd07b56790e15de3013e9 (diff) | |
download | qpid-python-014f0f39d9cfb6242bea173eadbc0f8229ba5f7f.tar.gz |
QPID-4327: HA TX transactions: basic replication.
On primary a PrimaryTxObserver observes a transaction's TxBuffer and generates
transaction events on a tx-replication-queue. On the backup a TxReplicator
receives the events and constructs a TxBuffer equivalent to the one in the
primary.
Unfinished:
- Primary does not wait for backups to prepare() before committing.
- All connected backups are assumed to be in the transaction, there are race
conditions around brokers joining/leavinv where this assumption is invalid.
- Need more tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1509423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 41 |
1 files changed, 39 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index bae651a3fc..5fd7814d62 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -27,6 +27,7 @@ #include "RemoteBackup.h" #include "ConnectionObserver.h" #include "QueueReplicator.h" +#include "PrimaryTxObserver.h" #include "qpid/assert.h" #include "qpid/broker/Broker.h" #include "qpid/broker/BrokerObserver.h" @@ -34,16 +35,19 @@ #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" -#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/types/Uuid.h" #include "qpid/sys/Timer.h" #include <boost/bind.hpp> +#include <boost/make_shared.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { namespace ha { using sys::Mutex; using boost::shared_ptr; +using boost::make_shared; using namespace std; using namespace framing; @@ -67,7 +71,10 @@ class PrimaryBrokerObserver : public broker::BrokerObserver void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } - private: + void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); } + void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); } + + private: Primary& primary; }; @@ -208,6 +215,26 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { if (backup) checkReady(backup); } +void Primary::addReplica(ReplicatingSubscription& rs) { + sys::Mutex::ScopedLock l(lock); + replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; +} + +void Primary::skip( + const types::Uuid& backup, + const boost::shared_ptr<broker::Queue>& queue, + const ReplicationIdSet& ids) +{ + sys::Mutex::ScopedLock l(lock); + ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); + if (i != replicas.end()) i->second->addSkip(ids); +} + +void Primary::removeReplica(const ReplicatingSubscription& rs) { + sys::Mutex::ScopedLock l(lock); + replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); +} + // NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { // Set replication argument. @@ -361,4 +388,14 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) backup->startCatchup(); } +void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { + QPID_LOG(trace, logPrefix << "Started TX transaction"); + tx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker))); +} + +void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) { + QPID_LOG(trace, logPrefix << "Started DTX transaction"); + dtx->setObserver(make_shared<PrimaryTxObserver>(boost::ref(haBroker))); +} + }} // namespace qpid::ha |