summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp114
1 files changed, 108 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 9f738731e2..3a735b5698 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -21,14 +21,16 @@
#include "ConnectionImpl.h"
#include "SessionImpl.h"
#include "qpid/messaging/Session.h"
-#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/PrivateImplRef.h"
#include "qpid/log/Statement.h"
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace client {
namespace amqp0_10 {
using qpid::messaging::Variant;
+using namespace qpid::sys;
template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
@@ -56,24 +58,124 @@ void convert(const Variant::Map& from, ConnectionSettings& to)
setIfFound(from, "bounds", to.bounds);
}
-ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
+ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) :
+ url(u), reconnectionEnabled(true), timeout(-1),
+ minRetryInterval(1), maxRetryInterval(30)
{
QPID_LOG(debug, "Opening connection to " << url << " with " << options);
- Url u(url);
- ConnectionSettings settings;
convert(options, settings);
- connection.open(u, settings);
+ setIfFound(options, "reconnection-enabled", reconnectionEnabled);
+ setIfFound(options, "reconnection-timeout", timeout);
+ setIfFound(options, "min-retry-interval", minRetryInterval);
+ setIfFound(options, "max-retry-interval", maxRetryInterval);
+ connection.open(url, settings);
}
void ConnectionImpl::close()
{
+ qpid::sys::Mutex::ScopedLock l(lock);
connection.close();
}
+boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
+{
+ return boost::dynamic_pointer_cast<SessionImpl>(
+ qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session)
+ );
+}
+
+void ConnectionImpl::closed(SessionImpl& s)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ if (getImplPtr(*i).get() == &s) {
+ sessions.erase(i);
+ break;
+ }
+ }
+}
+
qpid::messaging::Session ConnectionImpl::newSession()
{
- qpid::messaging::Session impl(new SessionImpl(connection.newSession()));
+ qpid::messaging::Session impl(new SessionImpl(*this));
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ sessions.push_back(impl);
+ }
+ try {
+ getImplPtr(impl)->setSession(connection.newSession());
+ } catch (const TransportFailure&) {
+ reconnect();
+ }
return impl;
}
+void ConnectionImpl::reconnect()
+{
+ AbsTime start = now();
+ ScopedLock<Semaphore> l(semaphore);
+ if (!connection.isOpen()) connect(start);
+}
+
+bool expired(const AbsTime& start, int timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout < 0) return false;
+ Duration used(start, now());
+ Duration allowed = timeout * TIME_SEC;
+ return allowed > used;
+}
+
+void ConnectionImpl::connect(const AbsTime& started)
+{
+ for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
+ if (expired(started, timeout)) throw TransportFailure();
+ else qpid::sys::sleep(i);
+ }
+}
+
+bool ConnectionImpl::tryConnect()
+{
+ if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) {
+ return resetSessions();
+ } else {
+ return false;
+ }
+}
+
+bool ConnectionImpl::tryConnect(const Url& u)
+{
+ try {
+ QPID_LOG(info, "Trying to connect to " << url << "...");
+ connection.open(u, settings);
+ return true;
+ } catch (const Exception& e) {
+ //TODO: need to fix timeout on open so that it throws TransportFailure
+ QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+ }
+ return false;
+}
+
+bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
+{
+ for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ if (tryConnect(*i)) return true;
+ }
+ return false;
+}
+
+bool ConnectionImpl::resetSessions()
+{
+ try {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ getImplPtr(*i)->setSession(connection.newSession());
+ }
+ return true;
+ } catch (const TransportFailure&) {
+ QPID_LOG(debug, "Connection failed while re-inialising sessions");
+ return false;
+ }
+}
+
}}} // namespace qpid::client::amqp0_10