summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-31 13:44:20 +0000
committerAlan Conway <aconway@apache.org>2012-07-31 13:44:20 +0000
commit0840ee923304ad68c36bcd4aa33c8b614d5a24ba (patch)
tree4276f2fa5485b813872cef9725fac650fc547d41
parentabb12c5103fa33efb763e32800a4cfd72117ed7b (diff)
downloadqpid-python-0840ee923304ad68c36bcd4aa33c8b614d5a24ba.tar.gz
WIP: BrokerReplicator, QueueReplicator error handling.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1367543 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h4
4 files changed, 12 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 9214bc2f87..f590fd8c95 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -300,10 +300,12 @@ void BrokerReplicator::route(Deliverable& msg) {
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
<< ": while handling: " << list);
+ haBroker.shutdown();
throw;
}
}
+
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
bool autoDel = values[AUTODEL].asBool();
@@ -543,7 +545,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
{
if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(haBroker.getBrokerInfo(), queue, link));
+ new QueueReplicator(haBroker, queue, link));
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 7b5d70bab4..fcb47e9d8f 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -192,7 +192,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
link->setUrl(url);
// Create a queue replicator
boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(brokerInfo, queue, link));
+ new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
break;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index a55fa00562..be910a087f 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
+#include "HaBroker.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
@@ -58,12 +59,13 @@ bool QueueReplicator::isEventKey(const std::string key) {
return ret;
}
-QueueReplicator::QueueReplicator(const BrokerInfo& info,
+QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
+ haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
- queue(q), link(l), brokerInfo(info)
+ queue(q), link(l), brokerInfo(hb.getBrokerInfo())
{
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -183,6 +185,7 @@ void QueueReplicator::route(Deliverable& msg)
}
catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
+ haBroker.shutdown();
throw;
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 6754174076..f8a68ea38f 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -40,6 +40,7 @@ class Deliverable;
}
namespace ha {
+class HaBroker;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -60,7 +61,7 @@ class QueueReplicator : public broker::Exchange,
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
- QueueReplicator(const BrokerInfo&,
+ QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
@@ -80,6 +81,7 @@ class QueueReplicator : public broker::Exchange,
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+ HaBroker& haBroker;
std::string logPrefix;
std::string bridgeName;
sys::Mutex lock;