diff options
author | Gordon Sim <gsim@apache.org> | 2015-06-17 17:08:55 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2015-06-17 17:08:55 +0000 |
commit | 57b80ff868420d23f80d36a1a9eaf630bee48734 (patch) | |
tree | 13750bb1fe96d5915e3cc45f3a093ab84d3d6972 /qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp | |
parent | b997784b3fc015206cf51c0ca11e9af66893b156 (diff) | |
download | qpid-python-57b80ff868420d23f80d36a1a9eaf630bee48734.tar.gz |
QPID-6256: Improved handling of protocol version incompatibilities
* 0-10 path no longer hans on open when connecting to a broker not
supporting that version
* the 'protocol' connection option now supports specifying multiple
protocols to try in order (as a coma separated list)
* the protocol defaults, i.e. the value assumed if the 'protocol'
connection option is not specified, can now be set via the client
config file (e.g. protocol-defaults=amqp1.0,amqp0-10) or an
environment variable (e.g QPID_PROTOCOL_DEFAULTS=amqp1.0,amqp0-10)
* if neither the connection option nor the defaults are specified
all valid versions will be tried (currently amqp0-10, then amqp1.0
but this may change in a future version)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1686078 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp | 141 |
1 files changed, 134 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp index 9e1e5f23fe..dbb0d6dfc2 100644 --- a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp +++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp @@ -22,14 +22,116 @@ #include "qpid/messaging/exceptions.h" #include "qpid/client/amqp0_10/ConnectionImpl.h" #include "qpid/client/LoadPlugins.h" +#include "qpid/log/Statement.h" +#include "qpid/Options.h" +#include "qpid/StringUtils.h" +#include "config.h" #include <map> +#include <sstream> +#include <boost/bind.hpp> using qpid::types::Variant; namespace qpid { namespace messaging { namespace { -typedef std::map<std::string, ProtocolRegistry::Factory*> Registry; +struct ProtocolOptions : qpid::Options +{ + std::string protocolDefaults; + + ProtocolOptions() : qpid::Options("Protocol Settings") + { + addOptions() + ("protocol-defaults", optValue(protocolDefaults, "PROTOCOLS"), "Protocols to use when none are specified"); + } +}; +const std::string SEPARATOR(", "); +const std::string EMPTY; +std::string join(const std::vector<std::string>& in, const std::string& base=EMPTY, const std::string& separator = SEPARATOR) +{ + std::stringstream out; + if (!base.empty()) out << base; + for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { + if (i != in.begin()) out << separator; + out << *i; + } + return out.str(); +} + +typedef std::map<std::string, ProtocolRegistry::Factory*> Factories; + +ConnectionImpl* create_0_10(const std::string& url, const qpid::types::Variant::Map& options) +{ + return new qpid::client::amqp0_10::ConnectionImpl(url, options); +} + +class Registry +{ + public: + Registry() + { + factories["amqp0-10"] = &create_0_10; + CommonOptions common("", "", QPIDC_CONF_FILE); + ProtocolOptions options; + try { + common.parse(0, 0, common.clientConfig, true); + options.parse (0, 0, common.clientConfig, true); + } catch (const std::exception& e) { + throw qpid::types::Exception(QPID_MSG("Failed to parse options while initialising Protocol Registry: " << e.what())); + } + QPID_LOG(debug, "Protocol defaults: " << options.protocolDefaults); + if (!options.protocolDefaults.empty()) { + split(versions, options.protocolDefaults, ", "); + } + } + ProtocolRegistry::Factory* find(const std::string& name) const + { + Factories::const_iterator i = factories.find(name); + if (i == factories.end()) { + std::stringstream error; + error << "Unsupported protocol: " << name; + error << " (valid values are " << getNames() << ")"; + throw MessagingException(error.str()); + } else { + return i->second; + } + } + void add(const std::string& name, ProtocolRegistry::Factory* factory) + { + factories[name] = factory; + } + std::string getNames() const + { + std::stringstream names; + for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) { + if (i != factories.begin()) names << ", "; + names << i->first; + } + return names.str(); + } + void collectNames(std::vector<std::string>& names) const + { + for (std::vector< std::string >::const_iterator i = versions.begin(); i != versions.end(); ++i) { + Factories::const_iterator j = factories.find(*i); + if (j == factories.end()) { + QPID_LOG(notice, "Unsupported protocol specified in defaults " << *i); + } else { + names.push_back(*i); + } + } + if (names.empty()) { + if (!versions.empty()) { + QPID_LOG(warning, "Protocol defaults specified are not valid (" << join(versions) << ") falling back to " << getNames()); + } + for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) { + names.push_back(i->first); + } + } + } + private: + Factories factories; + std::vector<std::string> versions; +}; Registry& theRegistry() { @@ -57,17 +159,42 @@ ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant:: qpid::client::theModuleLoader();//ensure modules are loaded Variant name; Variant::Map stripped; + std::vector<std::string> versions; if (extract("protocol", name, options, stripped)) { - Registry::const_iterator i = theRegistry().find(name.asString()); - if (i != theRegistry().end()) return (i->second)(url, stripped); - else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped); - else throw MessagingException("Unsupported protocol: " + name.asString()); + split(versions, name.asString(), ", "); + } else { + theRegistry().collectNames(versions); } - return 0; + bool debugOn; + QPID_LOG_TEST(debug, debugOn); + if (debugOn) { + QPID_LOG(debug, "Trying versions " << join(versions)); + } + return createInternal(versions, url, stripped, join(versions, "No suitable protocol version supported by peer, tried ")); +} + +ConnectionImpl* ProtocolRegistry::createInternal(const std::vector<std::string>& requested, const std::string& url, const Variant::Map& options, const std::string& error) +{ + std::vector<std::string>::const_iterator i = requested.begin(); + if (i == requested.end()) + throw MessagingException(error); + std::string name = *i; + ConnectionImpl* result = theRegistry().find(name)(url, options); + result->next = boost::bind(&ProtocolRegistry::createInternal, std::vector<std::string>(++i, requested.end()), url, options, error); + return result; + } + +ConnectionImpl* ProtocolRegistry::next(ConnectionImpl* last) +{ + if (last->next) { + return last->next(); + } + throw MessagingException("No suitable protocol version supported by peer"); } + void ProtocolRegistry::add(const std::string& name, Factory* factory) { - theRegistry()[name] = factory; + theRegistry().add(name, factory); } }} // namespace qpid::messaging |