summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-31 16:11:44 +0000
committerAlan Conway <aconway@apache.org>2012-07-31 16:11:44 +0000
commit01bcc91ada7b79f16a65995c620d436c21f827be (patch)
tree83f44565d5e59d4f80a9446a5854778d98cbd3e4
parent221cd0fcbe49acbda7e2d434e759f186afc3afff (diff)
downloadqpid-python-01bcc91ada7b79f16a65995c620d436c21f827be.tar.gz
QPID-4176: HA Error handling
Additional error handling and logging for ConnectionObserver, Primary and ReplicatingSubscription. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1367649 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp44
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp42
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp22
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h1
4 files changed, 66 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 553f50d802..3f7a1710d9 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -52,28 +52,40 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
}
void ConnectionObserver::opened(broker::Connection& connection) {
- if (connection.isLink()) return; // Allow outgoing links.
- if (connection.getClientProperties().isSet(ADMIN_TAG)) {
- QPID_LOG(debug, logPrefix << "Allowing admin connection: "
- << connection.getMgmtId());
- return; // No need to call observer, always allow admins.
- }
- BrokerInfo info; // Avoid self connections.
- if (getBrokerInfo(connection, info)) {
- if (info.getSystemId() == self) {
- QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
- connection.abort();
+ try {
+ if (connection.isLink()) return; // Allow outgoing links.
+ if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+ QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+ << connection.getMgmtId());
+ return; // No need to call observer, always allow admins.
}
+ BrokerInfo info; // Avoid self connections.
+ if (getBrokerInfo(connection, info)) {
+ if (info.getSystemId() == self) {
+ QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
+ connection.abort();
+ }
+ }
+ ObserverPtr o(getObserver());
+ if (o) o->opened(connection);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Open error: " << e.what());
+ throw;
}
- ObserverPtr o(getObserver());
- if (o) o->opened(connection);
}
void ConnectionObserver::closed(broker::Connection& connection) {
- BrokerInfo info;
- ObserverPtr o(getObserver());
- if (o) o->closed(connection);
+ try {
+ BrokerInfo info;
+ ObserverPtr o(getObserver());
+ if (o) o->closed(connection);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Close error: " << e.what());
+ throw;
+ }
}
const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index e7aa4858be..69c94bfc7d 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -141,26 +141,32 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
}
void Primary::timeoutExpectedBackups() {
- sys::Mutex::ScopedLock l(lock);
- if (active) return; // Already activated
- // Remove records for any expectedBackups that are not yet connected
- // Allow backups that are connected to continue becoming ready.
- for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
- {
- boost::shared_ptr<RemoteBackup> rb = *i;
- if (!rb->isConnected()) {
- BrokerInfo info = rb->getBrokerInfo();
- QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
- expectedBackups.erase(i++);
- backups.erase(info.getSystemId());
- rb->cancel();
- // Downgrade the broker to CATCHUP
- info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ try {
+ sys::Mutex::ScopedLock l(lock);
+ if (active) return; // Already activated
+ // Remove records for any expectedBackups that are not yet connected
+ // Allow backups that are connected to continue becoming ready.
+ for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
+ {
+ boost::shared_ptr<RemoteBackup> rb = *i;
+ if (!rb->isConnected()) {
+ BrokerInfo info = rb->getBrokerInfo();
+ QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
+ expectedBackups.erase(i++);
+ backups.erase(info.getSystemId());
+ rb->cancel();
+ // Downgrade the broker to CATCHUP
+ info.setStatus(CATCHUP);
+ haBroker.addBroker(info);
+ }
+ else ++i;
}
- else ++i;
+ checkReady(l);
+ }
+ catch(const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what());
+ // No-where for this exception to go.
}
- checkReady(l);
}
void Primary::readyReplica(const ReplicatingSubscription& rs) {
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 08d8877e77..381744b271 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -228,8 +228,8 @@ ReplicatingSubscription::ReplicatingSubscription(
if (guard->subscriptionStart(position)) setReady();
}
catch (const std::exception& e) {
- throw InvalidArgumentException(QPID_MSG(logPrefix << e.what()
- << ": arguments=" << arguments));
+ QPID_LOG(error, logPrefix << "Creation error: " << e.what()
+ << ": arguments=" << getArguments());
}
}
@@ -242,13 +242,19 @@ ReplicatingSubscription::~ReplicatingSubscription() {
// shared_from_this
//
void ReplicatingSubscription::initialize() {
- Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
+ try {
+ Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
- // Send initial dequeues and position to the backup.
- // There must be a shared_ptr(this) when sending.
- sendDequeueEvent(l);
- sendPositionEvent(position, l);
- backupPosition = position;
+ // Send initial dequeues and position to the backup.
+ // There must be a shared_ptr(this) when sending.
+ sendDequeueEvent(l);
+ sendPositionEvent(position, l);
+ backupPosition = position;
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
+ << ": arguments=" << getArguments());
+ }
}
// Message is delivered in the subscription's connection thread.
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index afa503d8cc..a80141a6c2 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -24,7 +24,6 @@
#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
-#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/types/Uuid.h"
#include <iosfwd>