summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp148
1 files changed, 101 insertions, 47 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 4242850192..9c1c4e0735 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -1,4 +1,4 @@
-/*
+ /*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,6 +20,7 @@
*/
#include "ConnectionImpl.h"
#include "SessionImpl.h"
+#include "SimpleUrlParser.h"
#include "qpid/messaging/Session.h"
#include "qpid/client/PrivateImplRef.h"
#include "qpid/framing/Uuid.h"
@@ -33,13 +34,42 @@ namespace amqp0_10 {
using qpid::messaging::Variant;
using qpid::framing::Uuid;
-using namespace qpid::sys;
-template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
+}
+
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
Variant::Map::const_iterator i = map.find(key);
if (i != map.end()) {
value = (T) i->second;
+ QPID_LOG(debug, "option " << key << " specified as " << i->second);
+ return true;
+ } else {
+ QPID_LOG(debug, "option " << key << " not specified");
+ return false;
+ }
+}
+
+template <>
+bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ if (i->second.getType() == qpid::messaging::VAR_LIST) {
+ convert(i->second.asList(), value);
+ } else {
+ value.push_back(i->second.asString());
+ }
+ return true;
+ } else {
+ return false;
}
}
@@ -59,24 +89,47 @@ void convert(const Variant::Map& from, ConnectionSettings& to)
setIfFound(from, "max-channels", to.maxChannels);
setIfFound(from, "max-frame-size", to.maxFrameSize);
setIfFound(from, "bounds", to.bounds);
+
+ setIfFound(from, "protocol", to.protocol);
}
ConnectionImpl::ConnectionImpl(const Variant::Map& options) :
- reconnectionEnabled(true), timeout(-1),
- minRetryInterval(1), maxRetryInterval(30)
+ reconnect(true), timeout(-1), limit(-1),
+ minReconnectInterval(3), maxReconnectInterval(60),
+ retries(0)
+{
+ QPID_LOG(debug, "Created connection with " << options);
+ setOptions(options);
+}
+
+void ConnectionImpl::setOptions(const Variant::Map& options)
{
- QPID_LOG(debug, "Opening connection to " << url << " with " << options);
convert(options, settings);
- setIfFound(options, "reconnection-enabled", reconnectionEnabled);
- setIfFound(options, "reconnection-timeout", timeout);
- setIfFound(options, "min-retry-interval", minRetryInterval);
- setIfFound(options, "max-retry-interval", maxRetryInterval);
+ setIfFound(options, "reconnect", reconnect);
+ setIfFound(options, "reconnect-timeout", timeout);
+ setIfFound(options, "reconnect-limit", limit);
+ int64_t reconnectInterval;
+ if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+ minReconnectInterval = maxReconnectInterval = reconnectInterval;
+ } else {
+ setIfFound(options, "min-reconnect-interval", minReconnectInterval);
+ setIfFound(options, "max-reconnect-interval", maxReconnectInterval);
+ }
+ setIfFound(options, "urls", urls);
+}
+
+void ConnectionImpl::setOption(const std::string& name, const Variant& value)
+{
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
+ QPID_LOG(debug, "Set " << name << " to " << value);
}
void ConnectionImpl::open(const std::string& u)
{
- url = u;
- connection.open(url, settings);
+ urls.push_back(u);
+ connect();
}
void ConnectionImpl::close()
@@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
try {
getImplPtr(impl)->setSession(connection.newSession(name));
} catch (const TransportFailure&) {
- reconnect();
+ connect();
}
return impl;
}
-void ConnectionImpl::reconnect()
+void ConnectionImpl::connect()
{
- AbsTime start = now();
- ScopedLock<Semaphore> l(semaphore);
+ qpid::sys::AbsTime start = qpid::sys::now();
+ qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
if (!connection.isOpen()) connect(start);
}
-bool expired(const AbsTime& start, int timeout)
+bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
if (timeout < 0) return false;
- Duration used(start, now());
- Duration allowed = timeout * TIME_SEC;
- return allowed > used;
+ qpid::sys::Duration used(start, qpid::sys::now());
+ qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
+ return allowed < used;
}
-void ConnectionImpl::connect(const AbsTime& started)
+void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
- if (expired(started, timeout)) throw TransportFailure();
+ for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)");
+ if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit");
+ if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout");
else qpid::sys::sleep(i);
}
+ retries = 0;
}
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(url) ||
- (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
- {
- return resetSessions();
- } else {
- return false;
- }
+ if (tryConnect(urls)) return resetSessions();
+ else return false;
}
-bool ConnectionImpl::tryConnect(const Url& u)
+bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
{
- try {
- QPID_LOG(info, "Trying to connect to " << url << "...");
- connection.open(u, settings);
- failoverListener.reset(new FailoverListener(connection));
- return true;
- } catch (const Exception& e) {
- //TODO: need to fix timeout on open so that it throws TransportFailure
- QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
- }
- return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
-{
- for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
- if (tryConnect(*i)) return true;
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ //TODO: when url support is more complete can avoid this test here
+ if (i->find("amqp:") == 0) {
+ Url url(*i);
+ connection.open(url, settings);
+ } else {
+ SimpleUrlParser::parse(*i, settings);
+ connection.open(settings);
+ }
+ QPID_LOG(info, "Connected to " << *i);
+ return true;
+ } catch (const Exception& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
}
return false;
}