summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp323
1 files changed, 323 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
new file mode 100644
index 0000000000..a87a8dea67
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -0,0 +1,323 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionImpl.h"
+#include "SessionImpl.h"
+#include "SimpleUrlParser.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Url.h"
+#include <boost/intrusive_ptr.hpp>
+#include <vector>
+#include <sstream>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::types::Variant;
+using qpid::types::VAR_LIST;
+using qpid::framing::Uuid;
+
+namespace {
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
+}
+
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << *i;
+ }
+ os << "]";
+ return os.str();
+}
+}
+
+ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
+ reconnect(false), timeout(-1), limit(-1),
+ minReconnectInterval(3), maxReconnectInterval(60),
+ retries(0), reconnectOnLimitExceeded(true)
+{
+ setOptions(options);
+ urls.insert(urls.begin(), url);
+ QPID_LOG(debug, "Created connection " << url << " with " << options);
+}
+
+void ConnectionImpl::setOptions(const Variant::Map& options)
+{
+ for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
+ setOption(i->first, i->second);
+ }
+}
+
+void ConnectionImpl::setOption(const std::string& name, const Variant& value)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (name == "reconnect") {
+ reconnect = value;
+ } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
+ timeout = value;
+ } else if (name == "reconnect-limit" || name == "reconnect_limit") {
+ limit = value;
+ } else if (name == "reconnect-interval" || name == "reconnect_interval") {
+ maxReconnectInterval = minReconnectInterval = value;
+ } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
+ minReconnectInterval = value;
+ } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
+ maxReconnectInterval = value;
+ } else if (name == "reconnect-urls" || name == "reconnect_urls") {
+ if (value.getType() == VAR_LIST) {
+ convert(value.asList(), urls);
+ } else {
+ urls.push_back(value.asString());
+ }
+ } else if (name == "username") {
+ settings.username = value.asString();
+ } else if (name == "password") {
+ settings.password = value.asString();
+ } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
+ name == "sasl-mechanisms" || name == "sasl_mechanisms") {
+ settings.mechanism = value.asString();
+ } else if (name == "sasl-service" || name == "sasl_service") {
+ settings.service = value.asString();
+ } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
+ settings.minSsf = value;
+ } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
+ settings.maxSsf = value;
+ } else if (name == "heartbeat") {
+ settings.heartbeat = value;
+ } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
+ settings.tcpNoDelay = value;
+ } else if (name == "locale") {
+ settings.locale = value.asString();
+ } else if (name == "max-channels" || name == "max_channels") {
+ settings.maxChannels = value;
+ } else if (name == "max-frame-size" || name == "max_frame_size") {
+ settings.maxFrameSize = value;
+ } else if (name == "bounds") {
+ settings.bounds = value;
+ } else if (name == "transport") {
+ settings.protocol = value.asString();
+ } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
+ settings.sslCertName = value.asString();
+ } else {
+ throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
+ }
+}
+
+
+void ConnectionImpl::close()
+{
+ while(true) {
+ messaging::Session session;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (sessions.empty()) break;
+ session = sessions.begin()->second;
+ }
+ session.close();
+ }
+ detach();
+}
+
+void ConnectionImpl::detach()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ connection.close();
+}
+
+bool ConnectionImpl::isOpen() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return connection.isOpen();
+}
+
+boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
+{
+ return boost::dynamic_pointer_cast<SessionImpl>(
+ qpid::messaging::PrivateImplRef<qpid::messaging::Session>::get(session)
+ );
+}
+
+void ConnectionImpl::closed(SessionImpl& s)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ if (getImplPtr(i->second).get() == &s) {
+ sessions.erase(i);
+ break;
+ }
+ }
+}
+
+qpid::messaging::Session ConnectionImpl::getSession(const std::string& name) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Sessions::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ throw qpid::messaging::KeyError("No such session: " + name);
+ } else {
+ return i->second;
+ }
+}
+
+qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const std::string& n)
+{
+ std::string name = n.empty() ? Uuid(true).str() : n;
+ qpid::messaging::Session impl(new SessionImpl(*this, transactional));
+ while (true) {
+ try {
+ getImplPtr(impl)->setSession(connection.newSession(name));
+ qpid::sys::Mutex::ScopedLock l(lock);
+ sessions[name] = impl;
+ break;
+ } catch (const qpid::TransportFailure&) {
+ open();
+ } catch (const qpid::SessionException& e) {
+ throw qpid::messaging::SessionError(e.what());
+ } catch (const std::exception& e) {
+ throw qpid::messaging::MessagingException(e.what());
+ }
+ }
+ return impl;
+}
+
+void ConnectionImpl::open()
+{
+ qpid::sys::AbsTime start = qpid::sys::now();
+ qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
+ try {
+ if (!connection.isOpen()) connect(start);
+ }
+ catch (const types::Exception&) { throw; }
+ catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
+}
+
+bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout < 0) return false;
+ qpid::sys::Duration used(start, qpid::sys::now());
+ qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
+ return allowed < used;
+}
+
+void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
+{
+ for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ if (!reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ if (limit >= 0 && retries++ >= limit) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
+ }
+ if (expired(started, timeout)) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
+ }
+ else qpid::sys::sleep(i);
+ }
+ retries = 0;
+}
+
+void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
+ if (more.size()) {
+ for (size_t i = 0; i < more.size(); ++i) {
+ if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+ urls.push_back(more[i].str());
+ }
+ }
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ }
+}
+
+bool ConnectionImpl::tryConnect()
+{
+ sys::Mutex::ScopedLock l(lock);
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ //TODO: when url support is more complete can avoid this test here
+ if (i->find("amqp:") == 0) {
+ Url url(*i);
+ connection.open(url, settings);
+ } else {
+ SimpleUrlParser::parse(*i, settings);
+ connection.open(settings);
+ }
+ QPID_LOG(info, "Connected to " << *i);
+ mergeUrls(connection.getInitialBrokers(), l);
+ return resetSessions(l);
+ } catch (const qpid::ConnectionException& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
+ }
+ return false;
+}
+
+bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& )
+{
+ try {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ getImplPtr(i->second)->setSession(connection.newSession(i->first));
+ }
+ return true;
+ } catch (const qpid::TransportFailure&) {
+ QPID_LOG(debug, "Connection failed while re-initialising sessions");
+ return false;
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (reconnectOnLimitExceeded) {
+ QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what());
+ detach();
+ return false;
+ } else {
+ throw qpid::messaging::TargetCapacityExceeded(e.what());
+ }
+ }
+}
+
+bool ConnectionImpl::backoff()
+{
+ if (reconnectOnLimitExceeded) {
+ detach();
+ open();
+ return true;
+ } else {
+ return false;
+ }
+}
+std::string ConnectionImpl::getAuthenticatedUsername()
+{
+ return connection.getNegotiatedSettings().username;
+}
+
+}}} // namespace qpid::client::amqp0_10