summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp')
-rw-r--r--qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp141
1 files changed, 134 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
index 9e1e5f23fe..dbb0d6dfc2 100644
--- a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
+++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
@@ -22,14 +22,116 @@
#include "qpid/messaging/exceptions.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/client/LoadPlugins.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Options.h"
+#include "qpid/StringUtils.h"
+#include "config.h"
#include <map>
+#include <sstream>
+#include <boost/bind.hpp>
using qpid::types::Variant;
namespace qpid {
namespace messaging {
namespace {
-typedef std::map<std::string, ProtocolRegistry::Factory*> Registry;
+struct ProtocolOptions : qpid::Options
+{
+ std::string protocolDefaults;
+
+ ProtocolOptions() : qpid::Options("Protocol Settings")
+ {
+ addOptions()
+ ("protocol-defaults", optValue(protocolDefaults, "PROTOCOLS"), "Protocols to use when none are specified");
+ }
+};
+const std::string SEPARATOR(", ");
+const std::string EMPTY;
+std::string join(const std::vector<std::string>& in, const std::string& base=EMPTY, const std::string& separator = SEPARATOR)
+{
+ std::stringstream out;
+ if (!base.empty()) out << base;
+ for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
+ if (i != in.begin()) out << separator;
+ out << *i;
+ }
+ return out.str();
+}
+
+typedef std::map<std::string, ProtocolRegistry::Factory*> Factories;
+
+ConnectionImpl* create_0_10(const std::string& url, const qpid::types::Variant::Map& options)
+{
+ return new qpid::client::amqp0_10::ConnectionImpl(url, options);
+}
+
+class Registry
+{
+ public:
+ Registry()
+ {
+ factories["amqp0-10"] = &create_0_10;
+ CommonOptions common("", "", QPIDC_CONF_FILE);
+ ProtocolOptions options;
+ try {
+ common.parse(0, 0, common.clientConfig, true);
+ options.parse (0, 0, common.clientConfig, true);
+ } catch (const std::exception& e) {
+ throw qpid::types::Exception(QPID_MSG("Failed to parse options while initialising Protocol Registry: " << e.what()));
+ }
+ QPID_LOG(debug, "Protocol defaults: " << options.protocolDefaults);
+ if (!options.protocolDefaults.empty()) {
+ split(versions, options.protocolDefaults, ", ");
+ }
+ }
+ ProtocolRegistry::Factory* find(const std::string& name) const
+ {
+ Factories::const_iterator i = factories.find(name);
+ if (i == factories.end()) {
+ std::stringstream error;
+ error << "Unsupported protocol: " << name;
+ error << " (valid values are " << getNames() << ")";
+ throw MessagingException(error.str());
+ } else {
+ return i->second;
+ }
+ }
+ void add(const std::string& name, ProtocolRegistry::Factory* factory)
+ {
+ factories[name] = factory;
+ }
+ std::string getNames() const
+ {
+ std::stringstream names;
+ for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) {
+ if (i != factories.begin()) names << ", ";
+ names << i->first;
+ }
+ return names.str();
+ }
+ void collectNames(std::vector<std::string>& names) const
+ {
+ for (std::vector< std::string >::const_iterator i = versions.begin(); i != versions.end(); ++i) {
+ Factories::const_iterator j = factories.find(*i);
+ if (j == factories.end()) {
+ QPID_LOG(notice, "Unsupported protocol specified in defaults " << *i);
+ } else {
+ names.push_back(*i);
+ }
+ }
+ if (names.empty()) {
+ if (!versions.empty()) {
+ QPID_LOG(warning, "Protocol defaults specified are not valid (" << join(versions) << ") falling back to " << getNames());
+ }
+ for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) {
+ names.push_back(i->first);
+ }
+ }
+ }
+ private:
+ Factories factories;
+ std::vector<std::string> versions;
+};
Registry& theRegistry()
{
@@ -57,17 +159,42 @@ ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::
qpid::client::theModuleLoader();//ensure modules are loaded
Variant name;
Variant::Map stripped;
+ std::vector<std::string> versions;
if (extract("protocol", name, options, stripped)) {
- Registry::const_iterator i = theRegistry().find(name.asString());
- if (i != theRegistry().end()) return (i->second)(url, stripped);
- else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
- else throw MessagingException("Unsupported protocol: " + name.asString());
+ split(versions, name.asString(), ", ");
+ } else {
+ theRegistry().collectNames(versions);
}
- return 0;
+ bool debugOn;
+ QPID_LOG_TEST(debug, debugOn);
+ if (debugOn) {
+ QPID_LOG(debug, "Trying versions " << join(versions));
+ }
+ return createInternal(versions, url, stripped, join(versions, "No suitable protocol version supported by peer, tried "));
+}
+
+ConnectionImpl* ProtocolRegistry::createInternal(const std::vector<std::string>& requested, const std::string& url, const Variant::Map& options, const std::string& error)
+{
+ std::vector<std::string>::const_iterator i = requested.begin();
+ if (i == requested.end())
+ throw MessagingException(error);
+ std::string name = *i;
+ ConnectionImpl* result = theRegistry().find(name)(url, options);
+ result->next = boost::bind(&ProtocolRegistry::createInternal, std::vector<std::string>(++i, requested.end()), url, options, error);
+ return result;
+ }
+
+ConnectionImpl* ProtocolRegistry::next(ConnectionImpl* last)
+{
+ if (last->next) {
+ return last->next();
+ }
+ throw MessagingException("No suitable protocol version supported by peer");
}
+
void ProtocolRegistry::add(const std::string& name, Factory* factory)
{
- theRegistry()[name] = factory;
+ theRegistry().add(name, factory);
}
}} // namespace qpid::messaging