summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
committerAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
commit54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch)
treef9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
parent27d31ba355acfef3ec66c23e48864e88a358014b (diff)
downloadqpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover. - TxReplicator cancel subscriptions when transaction is finished. - TxReplicator rollback if destroyed prematurely. - Handle special case of no backups for a tx. - ha_tests.py: new and modified tests to cover the new functionality. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp80
1 files changed, 49 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 684f408c4b..36bf89fb81 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -229,8 +229,8 @@ class BrokerReplicator::UpdateTracker {
typedef boost::function<void (const std::string&)> CleanFn;
UpdateTracker(const std::string& type_, // "queue" or "exchange"
- CleanFn f, const ReplicationTest& rt)
- : type(type_), cleanFn(f), repTest(rt) {}
+ CleanFn f)
+ : type(type_), cleanFn(f) {}
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
@@ -245,16 +245,10 @@ class BrokerReplicator::UpdateTracker {
}
/** Add an exchange name */
- void addExchange(Exchange::shared_ptr ex) {
- if (repTest.getLevel(*ex))
- initial.insert(ex->getName());
- }
+ void addExchange(Exchange::shared_ptr ex) { initial.insert(ex->getName()); }
/** Add a queue name. */
- void addQueue(Queue::shared_ptr q) {
- if (repTest.getLevel(*q))
- initial.insert(q->getName());
- }
+ void addQueue(Queue::shared_ptr q) { initial.insert(q->getName()); }
/** Received an event for name */
void event(const std::string& name) {
@@ -281,7 +275,6 @@ class BrokerReplicator::UpdateTracker {
std::string type;
Names initial, events;
CleanFn cleanFn;
- ReplicationTest repTest;
};
namespace {
@@ -349,7 +342,8 @@ BrokerReplicator::~BrokerReplicator() { shutdown(); }
namespace {
void collectQueueReplicators(
- const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+ const boost::shared_ptr<Exchange>& ex,
+ set<boost::shared_ptr<QueueReplicator> >& collect)
{
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
if (qr) collect.insert(qr);
@@ -390,16 +384,13 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
exchangeTracker.reset(
new UpdateTracker("exchange",
- boost::bind(&BrokerReplicator::deleteExchange, this, _1),
- replicationTest));
- exchanges.eachExchange(
- boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+ exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
queueTracker.reset(
new UpdateTracker("queue",
- boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
- replicationTest));
- queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+ queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -428,6 +419,21 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
}
+// Called for each queue in existence when the backup connects to a primary.
+void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
+ if (replicationTest.getLevel(*q)) {
+ QPID_LOG(debug, "Existing queue: " << q->getName());
+ queueTracker->addQueue(q);
+ }
+}
+
+void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) {
+ if (replicationTest.getLevel(*ex)) {
+ QPID_LOG(debug, "Existing exchange: " << ex->getName());
+ exchangeTracker->addExchange(ex);
+ }
+}
+
void BrokerReplicator::route(Deliverable& msg) {
// We transition from JOINING->CATCHUP on the first message received from the primary.
// Until now we couldn't be sure if we had a good connection to the primary.
@@ -890,24 +896,36 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
}
}
-// Callback function for accumulating exchange candidates
-namespace {
- void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
- c.push_back(i);
- }
-}
+typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
+typedef vector<boost::shared_ptr<Queue> > QueueVector;
// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
connection = 0;
- // Clean up auto-delete queues
- vector<boost::shared_ptr<Exchange> > collect;
- // Make a copy so we can work outside the ExchangeRegistry lock
- exchanges.eachExchange(
- boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
- for_each(collect.begin(), collect.end(),
+
+ // Make copys of queues & exchanges so we can work outside the registry lock.
+
+ ExchangeVector exs;
+ exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1));
+ for_each(exs.begin(), exs.end(),
boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+
+ QueueVector qs;
+ queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1));
+ for_each(qs.begin(), qs.end(),
+ boost::bind(&BrokerReplicator::disconnectedQueue, this, _1));
+}
+
+// Called for queues existing when the backup is disconnected.
+void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) {
+ QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName());
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName());
+ if (qr) {
+ qr->disconnect();
+ if (TxReplicator::isTxQueue(q->getName()))
+ deleteQueue(q->getName());
+ }
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {