diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 148 |
1 files changed, 101 insertions, 47 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 4242850192..9c1c4e0735 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -1,4 +1,4 @@ -/* + /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,6 +20,7 @@ */ #include "ConnectionImpl.h" #include "SessionImpl.h" +#include "SimpleUrlParser.h" #include "qpid/messaging/Session.h" #include "qpid/client/PrivateImplRef.h" #include "qpid/framing/Uuid.h" @@ -33,13 +34,42 @@ namespace amqp0_10 { using qpid::messaging::Variant; using qpid::framing::Uuid; -using namespace qpid::sys; -template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) +void convert(const Variant::List& from, std::vector<std::string>& to) +{ + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) { + to.push_back(i->asString()); + } +} + +template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value) { Variant::Map::const_iterator i = map.find(key); if (i != map.end()) { value = (T) i->second; + QPID_LOG(debug, "option " << key << " specified as " << i->second); + return true; + } else { + QPID_LOG(debug, "option " << key << " not specified"); + return false; + } +} + +template <> +bool setIfFound< std::vector<std::string> >(const Variant::Map& map, + const std::string& key, + std::vector<std::string>& value) +{ + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + if (i->second.getType() == qpid::messaging::VAR_LIST) { + convert(i->second.asList(), value); + } else { + value.push_back(i->second.asString()); + } + return true; + } else { + return false; } } @@ -59,24 +89,47 @@ void convert(const Variant::Map& from, ConnectionSettings& to) setIfFound(from, "max-channels", to.maxChannels); setIfFound(from, "max-frame-size", to.maxFrameSize); setIfFound(from, "bounds", to.bounds); + + setIfFound(from, "protocol", to.protocol); } ConnectionImpl::ConnectionImpl(const Variant::Map& options) : - reconnectionEnabled(true), timeout(-1), - minRetryInterval(1), maxRetryInterval(30) + reconnect(true), timeout(-1), limit(-1), + minReconnectInterval(3), maxReconnectInterval(60), + retries(0) +{ + QPID_LOG(debug, "Created connection with " << options); + setOptions(options); +} + +void ConnectionImpl::setOptions(const Variant::Map& options) { - QPID_LOG(debug, "Opening connection to " << url << " with " << options); convert(options, settings); - setIfFound(options, "reconnection-enabled", reconnectionEnabled); - setIfFound(options, "reconnection-timeout", timeout); - setIfFound(options, "min-retry-interval", minRetryInterval); - setIfFound(options, "max-retry-interval", maxRetryInterval); + setIfFound(options, "reconnect", reconnect); + setIfFound(options, "reconnect-timeout", timeout); + setIfFound(options, "reconnect-limit", limit); + int64_t reconnectInterval; + if (setIfFound(options, "reconnect-interval", reconnectInterval)) { + minReconnectInterval = maxReconnectInterval = reconnectInterval; + } else { + setIfFound(options, "min-reconnect-interval", minReconnectInterval); + setIfFound(options, "max-reconnect-interval", maxReconnectInterval); + } + setIfFound(options, "urls", urls); +} + +void ConnectionImpl::setOption(const std::string& name, const Variant& value) +{ + Variant::Map options; + options[name] = value; + setOptions(options); + QPID_LOG(debug, "Set " << name << " to " << value); } void ConnectionImpl::open(const std::string& u) { - url = u; - connection.open(url, settings); + urls.push_back(u); + connect(); } void ConnectionImpl::close() @@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st try { getImplPtr(impl)->setSession(connection.newSession(name)); } catch (const TransportFailure&) { - reconnect(); + connect(); } return impl; } -void ConnectionImpl::reconnect() +void ConnectionImpl::connect() { - AbsTime start = now(); - ScopedLock<Semaphore> l(semaphore); + qpid::sys::AbsTime start = qpid::sys::now(); + qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); if (!connection.isOpen()) connect(start); } -bool expired(const AbsTime& start, int timeout) +bool expired(const qpid::sys::AbsTime& start, int64_t timeout) { if (timeout == 0) return true; if (timeout < 0) return false; - Duration used(start, now()); - Duration allowed = timeout * TIME_SEC; - return allowed > used; + qpid::sys::Duration used(start, qpid::sys::now()); + qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC; + return allowed < used; } -void ConnectionImpl::connect(const AbsTime& started) +void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { - for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) { - if (expired(started, timeout)) throw TransportFailure(); + for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { + if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)"); + if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit"); + if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout"); else qpid::sys::sleep(i); } + retries = 0; } bool ConnectionImpl::tryConnect() { - if (tryConnect(url) || - (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers()))) - { - return resetSessions(); - } else { - return false; - } + if (tryConnect(urls)) return resetSessions(); + else return false; } -bool ConnectionImpl::tryConnect(const Url& u) +bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) { - try { - QPID_LOG(info, "Trying to connect to " << url << "..."); - connection.open(u, settings); - failoverListener.reset(new FailoverListener(connection)); - return true; - } catch (const Exception& e) { - //TODO: need to fix timeout on open so that it throws TransportFailure - QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); - } - return false; -} - -bool ConnectionImpl::tryConnect(const std::vector<Url>& urls) -{ - for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { - if (tryConnect(*i)) return true; + for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + try { + QPID_LOG(info, "Trying to connect to " << *i << "..."); + //TODO: when url support is more complete can avoid this test here + if (i->find("amqp:") == 0) { + Url url(*i); + connection.open(url, settings); + } else { + SimpleUrlParser::parse(*i, settings); + connection.open(settings); + } + QPID_LOG(info, "Connected to " << *i); + return true; + } catch (const Exception& e) { + //TODO: need to fix timeout on + //qpid::client::Connection::open() so that it throws + //TransportFailure rather than a ConnectionException + QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); + } } return false; } |