summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-05-12 13:41:41 +0000
committerAlan Conway <aconway@apache.org>2010-05-12 13:41:41 +0000
commit0fa302cbc61335e930bc78ed619973e1c0fcce83 (patch)
treeac515ac9ad2ae9fe33dc4a5b535a6708f9449559 /cpp/src
parentd806f791323dd101705c86db17e6956aafb026f3 (diff)
downloadqpid-python-0fa302cbc61335e930bc78ed619973e1c0fcce83.tar.gz
Fixes to new API impl to support failover.
- client/SslConnector.cpp: throw ConnectionException for errors during connection. - client/amqp0_10/ConnectionImpl.cpp: translate unknown exceptions to ConnectionError. - client/amqp0_10/FailoverUpdates.cpp: interrupt receiver by closing session in dtor. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/SslConnector.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp6
-rw-r--r--cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp28
3 files changed, 20 insertions, 16 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp
index 4be8fcaa04..e82fc2f8da 100644
--- a/cpp/src/qpid/client/SslConnector.cpp
+++ b/cpp/src/qpid/client/SslConnector.cpp
@@ -194,7 +194,7 @@ void SslConnector::connect(const std::string& host, int port){
socket.connect(host, port);
} catch (const std::exception& e) {
socket.close();
- throw;
+ throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index a1f285f5a9..2c581e9d41 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -211,7 +211,11 @@ void ConnectionImpl::open()
{
qpid::sys::AbsTime start = qpid::sys::now();
qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
- if (!connection.isOpen()) connect(start);
+ try {
+ if (!connection.isOpen()) connect(start);
+ }
+ catch (const types::Exception&) { throw; }
+ catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
}
bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
diff --git a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
index 2ae882cee9..fc3900f917 100644
--- a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
+++ b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
@@ -42,42 +42,42 @@ struct FailoverUpdatesImpl : qpid::sys::Runnable
Session session;
Receiver receiver;
qpid::sys::Thread thread;
- volatile bool quit;
- FailoverUpdatesImpl(Connection& c) : connection(c), quit(false)
+ FailoverUpdatesImpl(Connection& c) : connection(c)
{
session = connection.createSession("failover-updates");
receiver = session.createReceiver("amq.failover");
thread = qpid::sys::Thread(*this);
}
+ ~FailoverUpdatesImpl() {
+ receiver.close();
+ session.close();
+ thread.join();
+ }
+
void run()
{
try {
Message message;
- while (!quit && receiver.fetch(message)) {
+ while (receiver.fetch(message)) {
connection.setOption("reconnect-urls", message.getProperties()["amq.failover"]);
QPID_LOG(debug, "Set reconnect-urls to " << message.getProperties()["amq.failover"]);
session.acknowledge();
}
- } catch (const qpid::TransportFailure& e) {
+ }
+ catch (const ClosedException&) {}
+ catch (const qpid::TransportFailure& e) {
QPID_LOG(warning, "Failover updates stopped on loss of connection. " << e.what());
- } catch (const std::exception& e) {
+ }
+ catch (const std::exception& e) {
QPID_LOG(warning, "Failover updates stopped due to exception: " << e.what());
}
- receiver.close();
- session.close();
- }
-
- void wait()
- {
- quit = true;
- thread.join();
}
};
FailoverUpdates::FailoverUpdates(Connection& connection) : impl(new FailoverUpdatesImpl(connection)) {}
-FailoverUpdates::~FailoverUpdates() { if (impl) { impl->wait(); delete impl; } }
+FailoverUpdates::~FailoverUpdates() { if (impl) { delete impl; } }
FailoverUpdates::FailoverUpdates(const FailoverUpdates&) : impl(0) {}
FailoverUpdates& FailoverUpdates::operator=(const FailoverUpdates&) { return *this; }