summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp51
1 files changed, 34 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index cc6e9b9ab2..3cfd2e37f2 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -29,6 +29,7 @@
#include <boost/intrusive_ptr.hpp>
#include <vector>
#include <sstream>
+#include <limits>
namespace qpid {
namespace client {
@@ -39,6 +40,16 @@ using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
namespace {
+
+double FOREVER(std::numeric_limits<double>::max());
+
+// Time values in seconds can be specified as integer or floating point values.
+double timeValue(const Variant& value) {
+ if (types::isIntegerType(value.getType()))
+ return double(value.asInt64());
+ return value.asDouble();
+}
+
void merge(const std::string& value, std::vector<std::string>& list) {
if (std::find(list.begin(), list.end(), value) == list.end())
list.push_back(value);
@@ -60,11 +71,21 @@ std::string asString(const std::vector<std::string>& v) {
os << "]";
return os.str();
}
+
+bool expired(const sys::AbsTime& start, double timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout == FOREVER) return false;
+ sys::Duration used(start, sys::now());
+ sys::Duration allowed(int64_t(timeout*sys::TIME_SEC));
+ return allowed < used;
}
+} // namespace
+
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
- minReconnectInterval(3), maxReconnectInterval(60),
+ replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+ minReconnectInterval(0.001), maxReconnectInterval(2),
retries(0), reconnectOnLimitExceeded(true)
{
setOptions(options);
@@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
if (name == "reconnect") {
reconnect = value;
} else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
- timeout = value;
+ timeout = timeValue(value);
} else if (name == "reconnect-limit" || name == "reconnect_limit") {
limit = value;
} else if (name == "reconnect-interval" || name == "reconnect_interval") {
- maxReconnectInterval = minReconnectInterval = value;
+ maxReconnectInterval = minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
- minReconnectInterval = value;
+ minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
- maxReconnectInterval = value;
+ maxReconnectInterval = timeValue(value);
} else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
replaceUrls = value.asBool();
} else if (name == "reconnect-urls" || name == "reconnect_urls") {
@@ -236,18 +257,10 @@ void ConnectionImpl::reopen()
}
-bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
-{
- if (timeout == 0) return true;
- if (timeout < 0) return false;
- qpid::sys::Duration used(start, qpid::sys::now());
- qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
- return allowed < used;
-}
-
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+ for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
if (!reconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
@@ -257,8 +270,11 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
if (expired(started, timeout)) {
throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
}
- else qpid::sys::sleep(i);
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+ << asString(urls));
+ qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
}
+ QPID_LOG(debug, "Connection successful, urls=" << asString(urls));
retries = 0;
}
@@ -320,6 +336,7 @@ bool ConnectionImpl::backoff()
return false;
}
}
+
std::string ConnectionImpl::getAuthenticatedUsername()
{
return connection.getNegotiatedSettings().username;