diff options
-rw-r--r-- | qpid/cpp/include/qpid/messaging/Connection.h | 60 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/types/Variant.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 47 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/types/Variant.cpp | 42 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 6 |
6 files changed, 99 insertions, 64 deletions
diff --git a/qpid/cpp/include/qpid/messaging/Connection.h b/qpid/cpp/include/qpid/messaging/Connection.h index 165573e2ef..1fc5847f74 100644 --- a/qpid/cpp/include/qpid/messaging/Connection.h +++ b/qpid/cpp/include/qpid/messaging/Connection.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,7 +38,7 @@ template <class> class PrivateImplRef; class ConnectionImpl; class Session; -/** \ingroup messaging +/** \ingroup messaging * A connection represents a network connection to a remote endpoint. */ @@ -48,40 +48,42 @@ class QPID_MESSAGING_CLASS_EXTERN Connection : public qpid::messaging::Handle<Co QPID_MESSAGING_EXTERN Connection(ConnectionImpl* impl); QPID_MESSAGING_EXTERN Connection(const Connection&); QPID_MESSAGING_EXTERN Connection(); - /** + /** * Current implementation supports the following options: - * - * username - * password - * heartbeat - * tcp_nodelay - * sasl_mechanisms - * sasl_service - * sasl_min_ssf - * sasl_max_ssf - * transport - * + * + * - username + * - password + * - heartbeat + * - tcp_nodelay + * - sasl_mechanisms + * - sasl_service + * - sasl_min_ssf + * - sasl_max_ssf + * - transport + * * Reconnect behaviour can be controlled through the following options: - * - * reconnect: true/false (enables/disables reconnect entirely) - * reconnect_timeout: number of seconds (give up and report failure after specified time) - * reconnect_limit: n (give up and report failure after specified number of attempts) - * reconnect_interval_min: number of seconds (initial delay between failed reconnection attempts) - * reconnect_interval_max: number of seconds (maximum delay between failed reconnection attempts) - * reconnect_interval: shorthand for setting the same reconnect_interval_min/max - * reconnect_urls: list of alternate urls to try when connecting * - * The reconnect_interval is the time that the client waits - * for after a failed attempt to reconnect before retrying. It - * starts at the value of the min_retry_interval and is - * doubled every failure until the value of max_retry_interval - * is reached. + * - reconnect: true/false (enables/disables reconnect entirely) + * - reconnect_timeout: seconds (give up and report failure after specified time) + * - reconnect_limit: n (give up and report failure after specified number of attempts) + * - reconnect_interval_min: seconds (initial delay between failed reconnection attempts) + * - reconnect_interval_max: seconds (maximum delay between failed reconnection attempts) + * - reconnect_interval: shorthand for setting the same reconnect_interval_min/max + * - reconnect_urls: list of alternate urls to try when connecting + * + * The reconnect_interval is the time that the client waits for + * after a failed attempt to reconnect before retrying. It starts + * at the value of the min_retry_interval and is doubled every + * failure until the value of max_retry_interval is reached. + * + * Values in seconds can be fractional, for example 0.001 is a + * millisecond delay. */ QPID_MESSAGING_EXTERN Connection(const std::string& url, const qpid::types::Variant::Map& options = qpid::types::Variant::Map()); /** * Creates a connection using an option string of the form * {name:value,name2:value2...}, see above for options supported. - * + * * @exception InvalidOptionString if the string does not match the correct syntax */ QPID_MESSAGING_EXTERN Connection(const std::string& url, const std::string& options); diff --git a/qpid/cpp/include/qpid/types/Variant.h b/qpid/cpp/include/qpid/types/Variant.h index 4459fc4123..3feba4a0ec 100644 --- a/qpid/cpp/include/qpid/types/Variant.h +++ b/qpid/cpp/include/qpid/types/Variant.h @@ -62,6 +62,8 @@ enum VariantType { std::string getTypeName(VariantType type); +bool isIntegerType(VariantType type); + class VariantImpl; /** diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index cc6e9b9ab2..a20babdeb4 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -29,6 +29,7 @@ #include <boost/intrusive_ptr.hpp> #include <vector> #include <sstream> +#include <limits> namespace qpid { namespace client { @@ -39,6 +40,16 @@ using qpid::types::VAR_LIST; using qpid::framing::Uuid; namespace { + +double FOREVER(std::numeric_limits<double>::max()); + +// Time values in seconds can be specified as integer or floating point values. +double timeValue(const Variant& value) { + if (types::isIntegerType(value.getType())) + return double(value.asInt64()); + return value.asDouble(); +} + void merge(const std::string& value, std::vector<std::string>& list) { if (std::find(list.begin(), list.end(), value) == list.end()) list.push_back(value); @@ -60,11 +71,21 @@ std::string asString(const std::vector<std::string>& v) { os << "]"; return os.str(); } + +bool expired(const sys::AbsTime& start, double timeout) +{ + if (timeout == 0) return true; + if (timeout == FOREVER) return false; + sys::Duration used(start, sys::now()); + sys::Duration allowed(int64_t(timeout*sys::TIME_SEC)); + return allowed < used; } +} // namespace + ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - replaceUrls(false), reconnect(false), timeout(-1), limit(-1), - minReconnectInterval(3), maxReconnectInterval(60), + replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), + minReconnectInterval(0.001), maxReconnectInterval(2), retries(0), reconnectOnLimitExceeded(true) { setOptions(options); @@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) if (name == "reconnect") { reconnect = value; } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { - timeout = value; + timeout = timeValue(value); } else if (name == "reconnect-limit" || name == "reconnect_limit") { limit = value; } else if (name == "reconnect-interval" || name == "reconnect_interval") { - maxReconnectInterval = minReconnectInterval = value; + maxReconnectInterval = minReconnectInterval = timeValue(value); } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { - minReconnectInterval = value; + minReconnectInterval = timeValue(value); } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { - maxReconnectInterval = value; + maxReconnectInterval = timeValue(value); } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { replaceUrls = value.asBool(); } else if (name == "reconnect-urls" || name == "reconnect_urls") { @@ -236,18 +257,9 @@ void ConnectionImpl::reopen() } -bool expired(const qpid::sys::AbsTime& start, int64_t timeout) -{ - if (timeout == 0) return true; - if (timeout < 0) return false; - qpid::sys::Duration used(start, qpid::sys::now()); - qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC; - return allowed < used; -} - void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { - for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { + for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { if (!reconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); } @@ -257,7 +269,7 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) if (expired(started, timeout)) { throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); } - else qpid::sys::sleep(i); + qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. } retries = 0; } @@ -320,6 +332,7 @@ bool ConnectionImpl::backoff() return false; } } + std::string ConnectionImpl::getAuthenticatedUsername() { return connection.getNegotiatedSettings().username; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 1b58cbbe3e..d1ac4533d5 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -64,10 +64,10 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl std::vector<std::string> urls; qpid::client::ConnectionSettings settings; bool reconnect; - int64_t timeout; + double timeout; int32_t limit; - int64_t minReconnectInterval; - int64_t maxReconnectInterval; + double minReconnectInterval; + double maxReconnectInterval; int32_t retries; bool reconnectOnLimitExceeded; diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp index f563d5de5b..6af06ede5d 100644 --- a/qpid/cpp/src/qpid/types/Variant.cpp +++ b/qpid/cpp/src/qpid/types/Variant.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -88,7 +88,7 @@ class VariantImpl bool isEqualTo(VariantImpl&) const; bool isEquivalentTo(VariantImpl&) const; - static VariantImpl* create(const Variant&); + static VariantImpl* create(const Variant&); private: const VariantType type; union { @@ -150,7 +150,7 @@ VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.v = new VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.v = new Variant::List(l); } VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.v = new Uuid(u); } -VariantImpl::~VariantImpl() { +VariantImpl::~VariantImpl() { switch (type) { case VAR_STRING: delete reinterpret_cast<std::string*>(value.v); @@ -173,7 +173,7 @@ VariantType VariantImpl::getType() const { return type; } namespace { -bool same_char(char a, char b) +bool same_char(char a, char b) { return toupper(a) == toupper(b); } @@ -191,7 +191,7 @@ bool toBool(const std::string& s) if (caseInsensitiveMatch(s, TRUE)) return true; if (caseInsensitiveMatch(s, FALSE)) return false; try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) {} - throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool")); + throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool")); } template <class T> std::string toString(const T& t) @@ -531,9 +531,9 @@ bool VariantImpl::isEqualTo(VariantImpl& other) const case VAR_INT64: return value.i64 == other.value.i64; case VAR_DOUBLE: return value.d == other.value.d; case VAR_FLOAT: return value.f == other.value.f; - case VAR_STRING: return *reinterpret_cast<std::string*>(value.v) + case VAR_STRING: return *reinterpret_cast<std::string*>(value.v) == *reinterpret_cast<std::string*>(other.value.v); - case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v) + case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v) == *reinterpret_cast<Uuid*>(other.value.v); case VAR_LIST: return equal(asList(), other.asList()); case VAR_MAP: return equal(asMap(), other.asMap()); @@ -616,7 +616,25 @@ std::string getTypeName(VariantType type) return "<unknown>";//should never happen } -VariantImpl* VariantImpl::create(const Variant& v) +bool isIntegerType(VariantType type) +{ + switch (type) { + case VAR_BOOL: + case VAR_UINT8: + case VAR_UINT16: + case VAR_UINT32: + case VAR_UINT64: + case VAR_INT8: + case VAR_INT16: + case VAR_INT32: + case VAR_INT64: + return true; + default: + return false; + } +} + +VariantImpl* VariantImpl::create(const Variant& v) { switch (v.getType()) { case VAR_BOOL: return new VariantImpl(v.asBool()); @@ -815,9 +833,9 @@ const Variant::List& Variant::asList() const { if (!impl) throw InvalidConversio Variant::List& Variant::asList() { if (!impl) throw InvalidConversion("Can't convert VOID to LIST"); return impl->asList(); } const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); } std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); } -void Variant::setEncoding(const std::string& s) { +void Variant::setEncoding(const std::string& s) { if (!impl) impl = new VariantImpl(); - impl->setEncoding(s); + impl->setEncoding(s); } const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding() : EMPTY; } @@ -873,7 +891,7 @@ std::ostream& operator<<(std::ostream& out, const Variant& value) out << value.asString(); break; } - return out; + return out; } bool operator==(const Variant& a, const Variant& b) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f28b31dd03..e38538a36f 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -236,7 +236,7 @@ class ShortTests(BrokerTest): print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) raise - def test_failover(self): + def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) @@ -263,6 +263,7 @@ class ShortTests(BrokerTest): c.close() def test_failover_cpp(self): + """Verify that failover works in the C++ client.""" primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) @@ -281,8 +282,7 @@ class ShortTests(BrokerTest): assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die backup.promote() n = receiver.received # Make sure we are still running - # FIXME aconway 2012-02-01: c++ client has 1 sec min retry, hence long timeout - assert retry(lambda: receiver.received > n + 10, timeout=5) + assert retry(lambda: receiver.received > n + 10) sender.stop() receiver.stop() |