summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-31 13:58:36 +0000
committerAlan Conway <aconway@apache.org>2012-07-31 13:58:36 +0000
commit221cd0fcbe49acbda7e2d434e759f186afc3afff (patch)
tree171a800961fcc238d1414f9c840bafafd15d0580 /qpid
parenta295ca2b7294353f86405398f7644f73f06ccae4 (diff)
downloadqpid-python-221cd0fcbe49acbda7e2d434e759f186afc3afff.tar.gz
QPID-4176: HA Error handling
Fix error handling so that backup brokers shut down on replication errors. Previously replication errors were being thrown to the primary, breaking the replication session. This would put the primary into an endless futile reconnect attempt. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1367554 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h4
4 files changed, 12 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 91a4bf242b..6a17555b6f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -299,10 +299,12 @@ void BrokerReplicator::route(Deliverable& msg) {
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
<< ": while handling: " << list);
+ haBroker.shutdown();
throw;
}
}
+
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
bool autoDel = values[AUTODEL].asBool();
@@ -542,7 +544,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
{
if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(haBroker.getBrokerInfo(), queue, link));
+ new QueueReplicator(haBroker, queue, link));
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index fc74ce633a..d126639813 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -193,7 +193,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
link->setUrl(url);
// Create a queue replicator
boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(brokerInfo, queue, link));
+ new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
break;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index a55fa00562..be910a087f 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
+#include "HaBroker.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
@@ -58,12 +59,13 @@ bool QueueReplicator::isEventKey(const std::string key) {
return ret;
}
-QueueReplicator::QueueReplicator(const BrokerInfo& info,
+QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
+ haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
- queue(q), link(l), brokerInfo(info)
+ queue(q), link(l), brokerInfo(hb.getBrokerInfo())
{
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -183,6 +185,7 @@ void QueueReplicator::route(Deliverable& msg)
}
catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
+ haBroker.shutdown();
throw;
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 6754174076..f8a68ea38f 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -40,6 +40,7 @@ class Deliverable;
}
namespace ha {
+class HaBroker;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -60,7 +61,7 @@ class QueueReplicator : public broker::Exchange,
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
- QueueReplicator(const BrokerInfo&,
+ QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
@@ -80,6 +81,7 @@ class QueueReplicator : public broker::Exchange,
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+ HaBroker& haBroker;
std::string logPrefix;
std::string bridgeName;
sys::Mutex lock;