summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:16:02 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:16:02 +0000
commitfd205ac89a9146ef22b4dd057d83fa84fb12ece0 (patch)
tree07f78e1f331653ba76956285098d6898f661e216
parentbe96d978b785daeea813e794c7222755d7e57a36 (diff)
downloadqpid-python-fd205ac89a9146ef22b4dd057d83fa84fb12ece0.tar.gz
QPID-3603: c++ messaging API: allow floating point reconnect durations in seconds.
Allow sub-second intervals, e.g. reconnect_interval_min=0.001 for a millisecond interval. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245546 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/messaging/Connection.h60
-rw-r--r--qpid/cpp/include/qpid/types/Variant.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp47
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h6
-rw-r--r--qpid/cpp/src/qpid/types/Variant.cpp42
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py6
6 files changed, 99 insertions, 64 deletions
diff --git a/qpid/cpp/include/qpid/messaging/Connection.h b/qpid/cpp/include/qpid/messaging/Connection.h
index 165573e2ef..1fc5847f74 100644
--- a/qpid/cpp/include/qpid/messaging/Connection.h
+++ b/qpid/cpp/include/qpid/messaging/Connection.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ template <class> class PrivateImplRef;
class ConnectionImpl;
class Session;
-/** \ingroup messaging
+/** \ingroup messaging
* A connection represents a network connection to a remote endpoint.
*/
@@ -48,40 +48,42 @@ class QPID_MESSAGING_CLASS_EXTERN Connection : public qpid::messaging::Handle<Co
QPID_MESSAGING_EXTERN Connection(ConnectionImpl* impl);
QPID_MESSAGING_EXTERN Connection(const Connection&);
QPID_MESSAGING_EXTERN Connection();
- /**
+ /**
* Current implementation supports the following options:
- *
- * username
- * password
- * heartbeat
- * tcp_nodelay
- * sasl_mechanisms
- * sasl_service
- * sasl_min_ssf
- * sasl_max_ssf
- * transport
- *
+ *
+ * - username
+ * - password
+ * - heartbeat
+ * - tcp_nodelay
+ * - sasl_mechanisms
+ * - sasl_service
+ * - sasl_min_ssf
+ * - sasl_max_ssf
+ * - transport
+ *
* Reconnect behaviour can be controlled through the following options:
- *
- * reconnect: true/false (enables/disables reconnect entirely)
- * reconnect_timeout: number of seconds (give up and report failure after specified time)
- * reconnect_limit: n (give up and report failure after specified number of attempts)
- * reconnect_interval_min: number of seconds (initial delay between failed reconnection attempts)
- * reconnect_interval_max: number of seconds (maximum delay between failed reconnection attempts)
- * reconnect_interval: shorthand for setting the same reconnect_interval_min/max
- * reconnect_urls: list of alternate urls to try when connecting
*
- * The reconnect_interval is the time that the client waits
- * for after a failed attempt to reconnect before retrying. It
- * starts at the value of the min_retry_interval and is
- * doubled every failure until the value of max_retry_interval
- * is reached.
+ * - reconnect: true/false (enables/disables reconnect entirely)
+ * - reconnect_timeout: seconds (give up and report failure after specified time)
+ * - reconnect_limit: n (give up and report failure after specified number of attempts)
+ * - reconnect_interval_min: seconds (initial delay between failed reconnection attempts)
+ * - reconnect_interval_max: seconds (maximum delay between failed reconnection attempts)
+ * - reconnect_interval: shorthand for setting the same reconnect_interval_min/max
+ * - reconnect_urls: list of alternate urls to try when connecting
+ *
+ * The reconnect_interval is the time that the client waits for
+ * after a failed attempt to reconnect before retrying. It starts
+ * at the value of the min_retry_interval and is doubled every
+ * failure until the value of max_retry_interval is reached.
+ *
+ * Values in seconds can be fractional, for example 0.001 is a
+ * millisecond delay.
*/
QPID_MESSAGING_EXTERN Connection(const std::string& url, const qpid::types::Variant::Map& options = qpid::types::Variant::Map());
/**
* Creates a connection using an option string of the form
* {name:value,name2:value2...}, see above for options supported.
- *
+ *
* @exception InvalidOptionString if the string does not match the correct syntax
*/
QPID_MESSAGING_EXTERN Connection(const std::string& url, const std::string& options);
diff --git a/qpid/cpp/include/qpid/types/Variant.h b/qpid/cpp/include/qpid/types/Variant.h
index 4459fc4123..3feba4a0ec 100644
--- a/qpid/cpp/include/qpid/types/Variant.h
+++ b/qpid/cpp/include/qpid/types/Variant.h
@@ -62,6 +62,8 @@ enum VariantType {
std::string getTypeName(VariantType type);
+bool isIntegerType(VariantType type);
+
class VariantImpl;
/**
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index cc6e9b9ab2..a20babdeb4 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -29,6 +29,7 @@
#include <boost/intrusive_ptr.hpp>
#include <vector>
#include <sstream>
+#include <limits>
namespace qpid {
namespace client {
@@ -39,6 +40,16 @@ using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
namespace {
+
+double FOREVER(std::numeric_limits<double>::max());
+
+// Time values in seconds can be specified as integer or floating point values.
+double timeValue(const Variant& value) {
+ if (types::isIntegerType(value.getType()))
+ return double(value.asInt64());
+ return value.asDouble();
+}
+
void merge(const std::string& value, std::vector<std::string>& list) {
if (std::find(list.begin(), list.end(), value) == list.end())
list.push_back(value);
@@ -60,11 +71,21 @@ std::string asString(const std::vector<std::string>& v) {
os << "]";
return os.str();
}
+
+bool expired(const sys::AbsTime& start, double timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout == FOREVER) return false;
+ sys::Duration used(start, sys::now());
+ sys::Duration allowed(int64_t(timeout*sys::TIME_SEC));
+ return allowed < used;
}
+} // namespace
+
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
- minReconnectInterval(3), maxReconnectInterval(60),
+ replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+ minReconnectInterval(0.001), maxReconnectInterval(2),
retries(0), reconnectOnLimitExceeded(true)
{
setOptions(options);
@@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
if (name == "reconnect") {
reconnect = value;
} else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
- timeout = value;
+ timeout = timeValue(value);
} else if (name == "reconnect-limit" || name == "reconnect_limit") {
limit = value;
} else if (name == "reconnect-interval" || name == "reconnect_interval") {
- maxReconnectInterval = minReconnectInterval = value;
+ maxReconnectInterval = minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
- minReconnectInterval = value;
+ minReconnectInterval = timeValue(value);
} else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
- maxReconnectInterval = value;
+ maxReconnectInterval = timeValue(value);
} else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
replaceUrls = value.asBool();
} else if (name == "reconnect-urls" || name == "reconnect_urls") {
@@ -236,18 +257,9 @@ void ConnectionImpl::reopen()
}
-bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
-{
- if (timeout == 0) return true;
- if (timeout < 0) return false;
- qpid::sys::Duration used(start, qpid::sys::now());
- qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
- return allowed < used;
-}
-
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
if (!reconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
@@ -257,7 +269,7 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
if (expired(started, timeout)) {
throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
}
- else qpid::sys::sleep(i);
+ qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
}
retries = 0;
}
@@ -320,6 +332,7 @@ bool ConnectionImpl::backoff()
return false;
}
}
+
std::string ConnectionImpl::getAuthenticatedUsername()
{
return connection.getNegotiatedSettings().username;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 1b58cbbe3e..d1ac4533d5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -64,10 +64,10 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
bool reconnect;
- int64_t timeout;
+ double timeout;
int32_t limit;
- int64_t minReconnectInterval;
- int64_t maxReconnectInterval;
+ double minReconnectInterval;
+ double maxReconnectInterval;
int32_t retries;
bool reconnectOnLimitExceeded;
diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp
index f563d5de5b..6af06ede5d 100644
--- a/qpid/cpp/src/qpid/types/Variant.cpp
+++ b/qpid/cpp/src/qpid/types/Variant.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -88,7 +88,7 @@ class VariantImpl
bool isEqualTo(VariantImpl&) const;
bool isEquivalentTo(VariantImpl&) const;
- static VariantImpl* create(const Variant&);
+ static VariantImpl* create(const Variant&);
private:
const VariantType type;
union {
@@ -150,7 +150,7 @@ VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.v = new
VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.v = new Variant::List(l); }
VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.v = new Uuid(u); }
-VariantImpl::~VariantImpl() {
+VariantImpl::~VariantImpl() {
switch (type) {
case VAR_STRING:
delete reinterpret_cast<std::string*>(value.v);
@@ -173,7 +173,7 @@ VariantType VariantImpl::getType() const { return type; }
namespace {
-bool same_char(char a, char b)
+bool same_char(char a, char b)
{
return toupper(a) == toupper(b);
}
@@ -191,7 +191,7 @@ bool toBool(const std::string& s)
if (caseInsensitiveMatch(s, TRUE)) return true;
if (caseInsensitiveMatch(s, FALSE)) return false;
try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) {}
- throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));
+ throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));
}
template <class T> std::string toString(const T& t)
@@ -531,9 +531,9 @@ bool VariantImpl::isEqualTo(VariantImpl& other) const
case VAR_INT64: return value.i64 == other.value.i64;
case VAR_DOUBLE: return value.d == other.value.d;
case VAR_FLOAT: return value.f == other.value.f;
- case VAR_STRING: return *reinterpret_cast<std::string*>(value.v)
+ case VAR_STRING: return *reinterpret_cast<std::string*>(value.v)
== *reinterpret_cast<std::string*>(other.value.v);
- case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v)
+ case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v)
== *reinterpret_cast<Uuid*>(other.value.v);
case VAR_LIST: return equal(asList(), other.asList());
case VAR_MAP: return equal(asMap(), other.asMap());
@@ -616,7 +616,25 @@ std::string getTypeName(VariantType type)
return "<unknown>";//should never happen
}
-VariantImpl* VariantImpl::create(const Variant& v)
+bool isIntegerType(VariantType type)
+{
+ switch (type) {
+ case VAR_BOOL:
+ case VAR_UINT8:
+ case VAR_UINT16:
+ case VAR_UINT32:
+ case VAR_UINT64:
+ case VAR_INT8:
+ case VAR_INT16:
+ case VAR_INT32:
+ case VAR_INT64:
+ return true;
+ default:
+ return false;
+ }
+}
+
+VariantImpl* VariantImpl::create(const Variant& v)
{
switch (v.getType()) {
case VAR_BOOL: return new VariantImpl(v.asBool());
@@ -815,9 +833,9 @@ const Variant::List& Variant::asList() const { if (!impl) throw InvalidConversio
Variant::List& Variant::asList() { if (!impl) throw InvalidConversion("Can't convert VOID to LIST"); return impl->asList(); }
const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
-void Variant::setEncoding(const std::string& s) {
+void Variant::setEncoding(const std::string& s) {
if (!impl) impl = new VariantImpl();
- impl->setEncoding(s);
+ impl->setEncoding(s);
}
const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding() : EMPTY; }
@@ -873,7 +891,7 @@ std::ostream& operator<<(std::ostream& out, const Variant& value)
out << value.asString();
break;
}
- return out;
+ return out;
}
bool operator==(const Variant& a, const Variant& b)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f28b31dd03..e38538a36f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -236,7 +236,7 @@ class ShortTests(BrokerTest):
print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
raise
- def test_failover(self):
+ def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
@@ -263,6 +263,7 @@ class ShortTests(BrokerTest):
c.close()
def test_failover_cpp(self):
+ """Verify that failover works in the C++ client."""
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
@@ -281,8 +282,7 @@ class ShortTests(BrokerTest):
assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
backup.promote()
n = receiver.received # Make sure we are still running
- # FIXME aconway 2012-02-01: c++ client has 1 sec min retry, hence long timeout
- assert retry(lambda: receiver.received > n + 10, timeout=5)
+ assert retry(lambda: receiver.received > n + 10)
sender.stop()
receiver.stop()