diff options
author | Alan Conway <aconway@apache.org> | 2010-05-12 13:41:41 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-05-12 13:41:41 +0000 |
commit | 0fa302cbc61335e930bc78ed619973e1c0fcce83 (patch) | |
tree | ac515ac9ad2ae9fe33dc4a5b535a6708f9449559 /cpp/src | |
parent | d806f791323dd101705c86db17e6956aafb026f3 (diff) | |
download | qpid-python-0fa302cbc61335e930bc78ed619973e1c0fcce83.tar.gz |
Fixes to new API impl to support failover.
- client/SslConnector.cpp: throw ConnectionException for errors during connection.
- client/amqp0_10/ConnectionImpl.cpp: translate unknown exceptions to ConnectionError.
- client/amqp0_10/FailoverUpdates.cpp: interrupt receiver by closing session in dtor.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp | 28 |
3 files changed, 20 insertions, 16 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 4be8fcaa04..e82fc2f8da 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -194,7 +194,7 @@ void SslConnector::connect(const std::string& host, int port){ socket.connect(host, port); } catch (const std::exception& e) { socket.close(); - throw; + throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what()); } identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index a1f285f5a9..2c581e9d41 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -211,7 +211,11 @@ void ConnectionImpl::open() { qpid::sys::AbsTime start = qpid::sys::now(); qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); - if (!connection.isOpen()) connect(start); + try { + if (!connection.isOpen()) connect(start); + } + catch (const types::Exception&) { throw; } + catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); } } bool expired(const qpid::sys::AbsTime& start, int64_t timeout) diff --git a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp index 2ae882cee9..fc3900f917 100644 --- a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp +++ b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp @@ -42,42 +42,42 @@ struct FailoverUpdatesImpl : qpid::sys::Runnable Session session; Receiver receiver; qpid::sys::Thread thread; - volatile bool quit; - FailoverUpdatesImpl(Connection& c) : connection(c), quit(false) + FailoverUpdatesImpl(Connection& c) : connection(c) { session = connection.createSession("failover-updates"); receiver = session.createReceiver("amq.failover"); thread = qpid::sys::Thread(*this); } + ~FailoverUpdatesImpl() { + receiver.close(); + session.close(); + thread.join(); + } + void run() { try { Message message; - while (!quit && receiver.fetch(message)) { + while (receiver.fetch(message)) { connection.setOption("reconnect-urls", message.getProperties()["amq.failover"]); QPID_LOG(debug, "Set reconnect-urls to " << message.getProperties()["amq.failover"]); session.acknowledge(); } - } catch (const qpid::TransportFailure& e) { + } + catch (const ClosedException&) {} + catch (const qpid::TransportFailure& e) { QPID_LOG(warning, "Failover updates stopped on loss of connection. " << e.what()); - } catch (const std::exception& e) { + } + catch (const std::exception& e) { QPID_LOG(warning, "Failover updates stopped due to exception: " << e.what()); } - receiver.close(); - session.close(); - } - - void wait() - { - quit = true; - thread.join(); } }; FailoverUpdates::FailoverUpdates(Connection& connection) : impl(new FailoverUpdatesImpl(connection)) {} -FailoverUpdates::~FailoverUpdates() { if (impl) { impl->wait(); delete impl; } } +FailoverUpdates::~FailoverUpdates() { if (impl) { delete impl; } } FailoverUpdates::FailoverUpdates(const FailoverUpdates&) : impl(0) {} FailoverUpdates& FailoverUpdates::operator=(const FailoverUpdates&) { return *this; } |