diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 114 |
1 files changed, 108 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 9f738731e2..3a735b5698 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -21,14 +21,16 @@ #include "ConnectionImpl.h" #include "SessionImpl.h" #include "qpid/messaging/Session.h" -#include "qpid/client/ConnectionSettings.h" +#include "qpid/client/PrivateImplRef.h" #include "qpid/log/Statement.h" +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace client { namespace amqp0_10 { using qpid::messaging::Variant; +using namespace qpid::sys; template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) { @@ -56,24 +58,124 @@ void convert(const Variant::Map& from, ConnectionSettings& to) setIfFound(from, "bounds", to.bounds); } -ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) +ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) : + url(u), reconnectionEnabled(true), timeout(-1), + minRetryInterval(1), maxRetryInterval(30) { QPID_LOG(debug, "Opening connection to " << url << " with " << options); - Url u(url); - ConnectionSettings settings; convert(options, settings); - connection.open(u, settings); + setIfFound(options, "reconnection-enabled", reconnectionEnabled); + setIfFound(options, "reconnection-timeout", timeout); + setIfFound(options, "min-retry-interval", minRetryInterval); + setIfFound(options, "max-retry-interval", maxRetryInterval); + connection.open(url, settings); } void ConnectionImpl::close() { + qpid::sys::Mutex::ScopedLock l(lock); connection.close(); } +boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session) +{ + return boost::dynamic_pointer_cast<SessionImpl>( + qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session) + ); +} + +void ConnectionImpl::closed(SessionImpl& s) +{ + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + if (getImplPtr(*i).get() == &s) { + sessions.erase(i); + break; + } + } +} + qpid::messaging::Session ConnectionImpl::newSession() { - qpid::messaging::Session impl(new SessionImpl(connection.newSession())); + qpid::messaging::Session impl(new SessionImpl(*this)); + { + qpid::sys::Mutex::ScopedLock l(lock); + sessions.push_back(impl); + } + try { + getImplPtr(impl)->setSession(connection.newSession()); + } catch (const TransportFailure&) { + reconnect(); + } return impl; } +void ConnectionImpl::reconnect() +{ + AbsTime start = now(); + ScopedLock<Semaphore> l(semaphore); + if (!connection.isOpen()) connect(start); +} + +bool expired(const AbsTime& start, int timeout) +{ + if (timeout == 0) return true; + if (timeout < 0) return false; + Duration used(start, now()); + Duration allowed = timeout * TIME_SEC; + return allowed > used; +} + +void ConnectionImpl::connect(const AbsTime& started) +{ + for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) { + if (expired(started, timeout)) throw TransportFailure(); + else qpid::sys::sleep(i); + } +} + +bool ConnectionImpl::tryConnect() +{ + if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) { + return resetSessions(); + } else { + return false; + } +} + +bool ConnectionImpl::tryConnect(const Url& u) +{ + try { + QPID_LOG(info, "Trying to connect to " << url << "..."); + connection.open(u, settings); + return true; + } catch (const Exception& e) { + //TODO: need to fix timeout on open so that it throws TransportFailure + QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); + } + return false; +} + +bool ConnectionImpl::tryConnect(const std::vector<Url>& urls) +{ + for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + if (tryConnect(*i)) return true; + } + return false; +} + +bool ConnectionImpl::resetSessions() +{ + try { + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + getImplPtr(*i)->setSession(connection.newSession()); + } + return true; + } catch (const TransportFailure&) { + QPID_LOG(debug, "Connection failed while re-inialising sessions"); + return false; + } +} + }}} // namespace qpid::client::amqp0_10 |