diff options
author | Alan Conway <aconway@apache.org> | 2012-07-31 16:11:44 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-07-31 16:11:44 +0000 |
commit | 01bcc91ada7b79f16a65995c620d436c21f827be (patch) | |
tree | 83f44565d5e59d4f80a9446a5854778d98cbd3e4 | |
parent | 221cd0fcbe49acbda7e2d434e759f186afc3afff (diff) | |
download | qpid-python-01bcc91ada7b79f16a65995c620d436c21f827be.tar.gz |
QPID-4176: HA Error handling
Additional error handling and logging for ConnectionObserver, Primary and
ReplicatingSubscription.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1367649 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionObserver.cpp | 44 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 42 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 1 |
4 files changed, 66 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp index 553f50d802..3f7a1710d9 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -52,28 +52,40 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() { } void ConnectionObserver::opened(broker::Connection& connection) { - if (connection.isLink()) return; // Allow outgoing links. - if (connection.getClientProperties().isSet(ADMIN_TAG)) { - QPID_LOG(debug, logPrefix << "Allowing admin connection: " - << connection.getMgmtId()); - return; // No need to call observer, always allow admins. - } - BrokerInfo info; // Avoid self connections. - if (getBrokerInfo(connection, info)) { - if (info.getSystemId() == self) { - QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId()); - connection.abort(); + try { + if (connection.isLink()) return; // Allow outgoing links. + if (connection.getClientProperties().isSet(ADMIN_TAG)) { + QPID_LOG(debug, logPrefix << "Allowing admin connection: " + << connection.getMgmtId()); + return; // No need to call observer, always allow admins. } + BrokerInfo info; // Avoid self connections. + if (getBrokerInfo(connection, info)) { + if (info.getSystemId() == self) { + QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId()); + connection.abort(); + } + } + ObserverPtr o(getObserver()); + if (o) o->opened(connection); + } + catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Open error: " << e.what()); + throw; } - ObserverPtr o(getObserver()); - if (o) o->opened(connection); } void ConnectionObserver::closed(broker::Connection& connection) { - BrokerInfo info; - ObserverPtr o(getObserver()); - if (o) o->closed(connection); + try { + BrokerInfo info; + ObserverPtr o(getObserver()); + if (o) o->closed(connection); + } + catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Close error: " << e.what()); + throw; + } } const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin"; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index e7aa4858be..69c94bfc7d 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -141,26 +141,32 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { } void Primary::timeoutExpectedBackups() { - sys::Mutex::ScopedLock l(lock); - if (active) return; // Already activated - // Remove records for any expectedBackups that are not yet connected - // Allow backups that are connected to continue becoming ready. - for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();) - { - boost::shared_ptr<RemoteBackup> rb = *i; - if (!rb->isConnected()) { - BrokerInfo info = rb->getBrokerInfo(); - QPID_LOG(error, logPrefix << "Expected backup timed out: " << info); - expectedBackups.erase(i++); - backups.erase(info.getSystemId()); - rb->cancel(); - // Downgrade the broker to CATCHUP - info.setStatus(CATCHUP); - haBroker.addBroker(info); + try { + sys::Mutex::ScopedLock l(lock); + if (active) return; // Already activated + // Remove records for any expectedBackups that are not yet connected + // Allow backups that are connected to continue becoming ready. + for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();) + { + boost::shared_ptr<RemoteBackup> rb = *i; + if (!rb->isConnected()) { + BrokerInfo info = rb->getBrokerInfo(); + QPID_LOG(error, logPrefix << "Expected backup timed out: " << info); + expectedBackups.erase(i++); + backups.erase(info.getSystemId()); + rb->cancel(); + // Downgrade the broker to CATCHUP + info.setStatus(CATCHUP); + haBroker.addBroker(info); + } + else ++i; } - else ++i; + checkReady(l); + } + catch(const std::exception& e) { + QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what()); + // No-where for this exception to go. } - checkReady(l); } void Primary::readyReplica(const ReplicatingSubscription& rs) { diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 08d8877e77..381744b271 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -228,8 +228,8 @@ ReplicatingSubscription::ReplicatingSubscription( if (guard->subscriptionStart(position)) setReady(); } catch (const std::exception& e) { - throw InvalidArgumentException(QPID_MSG(logPrefix << e.what() - << ": arguments=" << arguments)); + QPID_LOG(error, logPrefix << "Creation error: " << e.what() + << ": arguments=" << getArguments()); } } @@ -242,13 +242,19 @@ ReplicatingSubscription::~ReplicatingSubscription() { // shared_from_this // void ReplicatingSubscription::initialize() { - Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. + try { + Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. - // Send initial dequeues and position to the backup. - // There must be a shared_ptr(this) when sending. - sendDequeueEvent(l); - sendPositionEvent(position, l); - backupPosition = position; + // Send initial dequeues and position to the backup. + // There must be a shared_ptr(this) when sending. + sendDequeueEvent(l); + sendPositionEvent(position, l); + backupPosition = position; + } + catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Initialization error: " << e.what() + << ": arguments=" << getArguments()); + } } // Message is delivered in the subscription's connection thread. diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index afa503d8cc..a80141a6c2 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -24,7 +24,6 @@ #include "BrokerInfo.h" #include "qpid/broker/SemanticState.h" -#include "qpid/broker/QueueObserver.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/types/Uuid.h" #include <iosfwd> |