diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index cc6e9b9ab2..3cfd2e37f2 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -29,6 +29,7 @@ #include <boost/intrusive_ptr.hpp> #include <vector> #include <sstream> +#include <limits> namespace qpid { namespace client { @@ -39,6 +40,16 @@ using qpid::types::VAR_LIST; using qpid::framing::Uuid; namespace { + +double FOREVER(std::numeric_limits<double>::max()); + +// Time values in seconds can be specified as integer or floating point values. +double timeValue(const Variant& value) { + if (types::isIntegerType(value.getType())) + return double(value.asInt64()); + return value.asDouble(); +} + void merge(const std::string& value, std::vector<std::string>& list) { if (std::find(list.begin(), list.end(), value) == list.end()) list.push_back(value); @@ -60,11 +71,21 @@ std::string asString(const std::vector<std::string>& v) { os << "]"; return os.str(); } + +bool expired(const sys::AbsTime& start, double timeout) +{ + if (timeout == 0) return true; + if (timeout == FOREVER) return false; + sys::Duration used(start, sys::now()); + sys::Duration allowed(int64_t(timeout*sys::TIME_SEC)); + return allowed < used; } +} // namespace + ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - replaceUrls(false), reconnect(false), timeout(-1), limit(-1), - minReconnectInterval(3), maxReconnectInterval(60), + replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), + minReconnectInterval(0.001), maxReconnectInterval(2), retries(0), reconnectOnLimitExceeded(true) { setOptions(options); @@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) if (name == "reconnect") { reconnect = value; } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { - timeout = value; + timeout = timeValue(value); } else if (name == "reconnect-limit" || name == "reconnect_limit") { limit = value; } else if (name == "reconnect-interval" || name == "reconnect_interval") { - maxReconnectInterval = minReconnectInterval = value; + maxReconnectInterval = minReconnectInterval = timeValue(value); } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { - minReconnectInterval = value; + minReconnectInterval = timeValue(value); } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { - maxReconnectInterval = value; + maxReconnectInterval = timeValue(value); } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { replaceUrls = value.asBool(); } else if (name == "reconnect-urls" || name == "reconnect_urls") { @@ -236,18 +257,10 @@ void ConnectionImpl::reopen() } -bool expired(const qpid::sys::AbsTime& start, int64_t timeout) -{ - if (timeout == 0) return true; - if (timeout < 0) return false; - qpid::sys::Duration used(start, qpid::sys::now()); - qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC; - return allowed < used; -} - void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { - for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { + QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); + for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { if (!reconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); } @@ -257,8 +270,11 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) if (expired(started, timeout)) { throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); } - else qpid::sys::sleep(i); + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls=" + << asString(urls)); + qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. } + QPID_LOG(debug, "Connection successful, urls=" << asString(urls)); retries = 0; } @@ -320,6 +336,7 @@ bool ConnectionImpl::backoff() return false; } } + std::string ConnectionImpl::getAuthenticatedUsername() { return connection.getNegotiatedSettings().username; |