summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2015-06-17 17:08:55 +0000
committerGordon Sim <gsim@apache.org>2015-06-17 17:08:55 +0000
commit57b80ff868420d23f80d36a1a9eaf630bee48734 (patch)
tree13750bb1fe96d5915e3cc45f3a093ab84d3d6972
parentb997784b3fc015206cf51c0ca11e9af66893b156 (diff)
downloadqpid-python-57b80ff868420d23f80d36a1a9eaf630bee48734.tar.gz
QPID-6256: Improved handling of protocol version incompatibilities
* 0-10 path no longer hans on open when connecting to a broker not supporting that version * the 'protocol' connection option now supports specifying multiple protocols to try in order (as a coma separated list) * the protocol defaults, i.e. the value assumed if the 'protocol' connection option is not specified, can now be set via the client config file (e.g. protocol-defaults=amqp1.0,amqp0-10) or an environment variable (e.g QPID_PROTOCOL_DEFAULTS=amqp1.0,amqp0-10) * if neither the connection option nor the defaults are specified all valid versions will be tried (currently amqp0-10, then amqp1.0 but this may change in a future version) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1686078 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/etc/qpidc.conf3
-rw-r--r--qpid/cpp/include/qpid/messaging/exceptions.h5
-rw-r--r--qpid/cpp/src/qpid/Exception.h4
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp9
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp22
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h7
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp21
-rw-r--r--qpid/cpp/src/qpid/client/SslConnector.cpp24
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.cpp24
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/ProtocolInitiation.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/Connection.cpp29
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionImpl.h5
-rw-r--r--qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp141
-rw-r--r--qpid/cpp/src/qpid/messaging/ProtocolRegistry.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/exceptions.cpp1
17 files changed, 238 insertions, 66 deletions
diff --git a/qpid/cpp/etc/qpidc.conf b/qpid/cpp/etc/qpidc.conf
index 588999c074..36dcbb3183 100644
--- a/qpid/cpp/etc/qpidc.conf
+++ b/qpid/cpp/etc/qpidc.conf
@@ -21,3 +21,6 @@
# name=value
#
# (Note: no spaces on either side of '=')
+
+# To make AMQP 1.0 the default, uncomment the following line
+#protocol-defaults=amqp1.0,amqp0-10
diff --git a/qpid/cpp/include/qpid/messaging/exceptions.h b/qpid/cpp/include/qpid/messaging/exceptions.h
index 391eb11db9..04d4d818f7 100644
--- a/qpid/cpp/include/qpid/messaging/exceptions.h
+++ b/qpid/cpp/include/qpid/messaging/exceptions.h
@@ -220,6 +220,11 @@ struct QPID_MESSAGING_CLASS_EXTERN ConnectionError : public MessagingException
QPID_MESSAGING_EXTERN ConnectionError(const std::string&);
};
+struct QPID_MESSAGING_CLASS_EXTERN ProtocolVersionError : public ConnectionError
+{
+ QPID_MESSAGING_EXTERN ProtocolVersionError(const std::string&);
+};
+
struct QPID_MESSAGING_CLASS_EXTERN AuthenticationFailure : public ConnectionError
{
QPID_MESSAGING_EXTERN AuthenticationFailure(const std::string&);
diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h
index cbd175214d..6df012ba28 100644
--- a/qpid/cpp/src/qpid/Exception.h
+++ b/qpid/cpp/src/qpid/Exception.h
@@ -86,6 +86,10 @@ struct TransportFailure : public Exception {
TransportFailure(const std::string& msg=std::string()) : Exception(msg) {}
};
+struct ProtocolVersionError : public TransportFailure {
+ ProtocolVersionError(const std::string& msg=std::string()) : TransportFailure(msg) {}
+};
+
} // namespace qpid
#endif /*!_Exception_*/
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index 26e69233af..f7d6341128 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -45,7 +45,7 @@ using namespace qpid::sys;
namespace qpid {
namespace client {
-Connection::Connection() : version(framing::highestProtocolVersion)
+Connection::Connection() : version(framing::ProtocolVersion(0, 10))
{
ConnectionImpl::init();
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index ad8f21e7cd..98d04d8d66 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -283,8 +283,13 @@ void ConnectionImpl::open()
// If the connect fails then the connector is cleaned up either when we try to connect again
// - in that case in connector.reset() above;
// - or when we are deleted
- handler.waitForOpen();
- QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+ try {
+ handler.waitForOpen();
+ QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+ } catch (const Exception& e) {
+ connector->checkVersion(version);
+ throw;
+ }
// If the SASL layer has provided an "operational" userId for the connection,
// put it in the negotiated settings.
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index c71dd9ecb6..af6483c979 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -24,6 +24,7 @@
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/SecurityLayer.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include <map>
@@ -68,5 +69,26 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>)
{
}
+bool Connector::checkProtocolHeader(framing::Buffer& in, const framing::ProtocolVersion& version)
+{
+ if (!header) {
+ boost::shared_ptr<framing::ProtocolInitiation> protocolInit(new framing::ProtocolInitiation);
+ if (protocolInit->decode(in)) {
+ header = protocolInit;
+ QPID_LOG(debug, "RECV [" << getIdentifier() << "]: INIT(" << *protocolInit << ")");
+ checkVersion(version);
+ }
+ }
+ return header;
+}
+
+void Connector::checkVersion(const framing::ProtocolVersion& version)
+{
+ if (header && !header->matches(version)){
+ throw ProtocolVersionError(QPID_MSG("Incorrect version: " << *header
+ << "; expected " << version));
+ }
+}
+
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 1b5e59e06d..49fb48bdf6 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -41,6 +41,8 @@ struct SecuritySettings;
namespace framing {
class InputHandler;
class AMQFrame;
+class Buffer;
+class ProtocolInitiation;
}
namespace client {
@@ -74,6 +76,11 @@ class Connector : public framing::FrameHandler
virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
+ void checkVersion(const framing::ProtocolVersion& version);
+ protected:
+ boost::shared_ptr<framing::ProtocolInitiation> header;
+
+ bool checkProtocolHeader(framing::Buffer&, const framing::ProtocolVersion& version);
};
}}
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index 1689b7aee2..77762343e2 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -388,18 +388,17 @@ void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
size_t RdmaConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- //TODO: check the version is correct
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+ try {
+ if (checkProtocolHeader(in, version)) {
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ input->received(frame);
+ }
}
- initiated = true;
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- input->received(frame);
+ } catch (const ProtocolVersionError& e) {
+ QPID_LOG(info, "Closing connection due to " << e.what());
+ close();
}
return size - in.available();
}
diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp
index 7c67196242..d5d2433060 100644
--- a/qpid/cpp/src/qpid/client/SslConnector.cpp
+++ b/qpid/cpp/src/qpid/client/SslConnector.cpp
@@ -385,23 +385,17 @@ void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff)
size_t SslConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
- if(!(protocolInit==version)){
- throw Exception(QPID_MSG("Unsupported version: " << protocolInit
- << " supported version " << version));
+ try {
+ if (checkProtocolHeader(in, version)) {
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ input->received(frame);
}
- initiated = true;
- } else {
- return size - in.available();
}
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- input->received(frame);
+ } catch (const ProtocolVersionError& e) {
+ QPID_LOG(info, "Closing connection due to " << e.what());
+ close();
}
return size - in.available();
}
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp
index 02f820fe03..0a570fb1d9 100644
--- a/qpid/cpp/src/qpid/client/TCPConnector.cpp
+++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp
@@ -281,23 +281,17 @@ void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
size_t TCPConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
- if(!(protocolInit==version)){
- throw Exception(QPID_MSG("Unsupported version: " << protocolInit
- << " supported version " << version));
+ try {
+ if (checkProtocolHeader(in, version)) {
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ input->received(frame);
}
- initiated = true;
- } else {
- return size - in.available();
}
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- input->received(frame);
+ } catch (const ProtocolVersionError& e) {
+ QPID_LOG(info, "Closing connection due to " << e.what());
+ close();
}
return size - in.available();
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 24145f0117..11ef06e517 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -305,6 +305,8 @@ bool ConnectionImpl::tryConnect()
QPID_LOG(info, "Connected to " << *i);
mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
+ } catch (const qpid::ProtocolVersionError& e) {
+ throw qpid::messaging::ProtocolVersionError("AMQP 0-10 not supported");
} catch (const qpid::TransportFailure& e) {
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.h b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
index c519bc2442..fe6410af55 100644
--- a/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
+++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
@@ -47,6 +47,7 @@ public:
inline uint8_t getMinor() const { return version.getMinor(); }
inline ProtocolVersion getVersion() const { return version; }
bool operator==(ProtocolVersion v) const { return v == getVersion(); }
+ bool matches(ProtocolVersion v) const { return v == getVersion(); }
};
QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi);
diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp
index c8a60fc56b..c40d32cbc1 100644
--- a/qpid/cpp/src/qpid/messaging/Connection.cpp
+++ b/qpid/cpp/src/qpid/messaging/Connection.cpp
@@ -48,34 +48,35 @@ Connection::Connection(const std::string& url, const std::string& o)
Variant::Map options;
AddressParser parser(o);
if (o.empty() || parser.parseMap(options)) {
- ConnectionImpl* impl = ProtocolRegistry::create(url, options);
- if (impl) {
- PI::ctor(*this, impl);
- } else {
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
- }
+ PI::ctor(*this, ProtocolRegistry::create(url, options));
} else {
throw InvalidOptionString("Invalid option string: " + o);
}
}
Connection::Connection(const std::string& url, const Variant::Map& options)
{
- ConnectionImpl* impl = ProtocolRegistry::create(url, options);
- if (impl) {
- PI::ctor(*this, impl);
- } else {
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
- }
+ PI::ctor(*this, ProtocolRegistry::create(url, options));
}
Connection::Connection()
{
Variant::Map options;
std::string url = "127.0.0.1:5672";
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ PI::ctor(*this, ProtocolRegistry::create(url, options));
}
-void Connection::open() { impl->open(); }
+void Connection::open()
+{
+ while (true) {
+ try {
+ impl->open();
+ return;
+ } catch (const ProtocolVersionError& e) {
+ PI::set(*this, ProtocolRegistry::next(PI::get(impl).get()));
+ QPID_LOG(info, e.what() << ", trying alternative protocol version...");
+ }
+ }
+}
bool Connection::isOpen() { return impl->isOpen(); }
bool Connection::isOpen() const { return impl->isOpen(); }
void Connection::close() { impl->close(); }
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
index 92c6d91b10..05d835b282 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
@@ -22,6 +22,7 @@
*
*/
#include <string>
+#include <boost/function.hpp>
#include "qpid/RefCounted.h"
namespace qpid {
@@ -32,6 +33,7 @@ class Variant;
namespace messaging {
+class ProtocolRegistry;
class Session;
class ConnectionImpl : public virtual qpid::RefCounted
@@ -49,7 +51,10 @@ class ConnectionImpl : public virtual qpid::RefCounted
virtual void reconnect() = 0;
virtual std::string getUrl() const = 0;
private:
+ friend class ProtocolRegistry;
+ boost::function<ConnectionImpl*()> next;
};
+
}} // namespace qpid::messaging
#endif /*!QPID_MESSAGING_CONNECTIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
index 9e1e5f23fe..dbb0d6dfc2 100644
--- a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
+++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
@@ -22,14 +22,116 @@
#include "qpid/messaging/exceptions.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/client/LoadPlugins.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Options.h"
+#include "qpid/StringUtils.h"
+#include "config.h"
#include <map>
+#include <sstream>
+#include <boost/bind.hpp>
using qpid::types::Variant;
namespace qpid {
namespace messaging {
namespace {
-typedef std::map<std::string, ProtocolRegistry::Factory*> Registry;
+struct ProtocolOptions : qpid::Options
+{
+ std::string protocolDefaults;
+
+ ProtocolOptions() : qpid::Options("Protocol Settings")
+ {
+ addOptions()
+ ("protocol-defaults", optValue(protocolDefaults, "PROTOCOLS"), "Protocols to use when none are specified");
+ }
+};
+const std::string SEPARATOR(", ");
+const std::string EMPTY;
+std::string join(const std::vector<std::string>& in, const std::string& base=EMPTY, const std::string& separator = SEPARATOR)
+{
+ std::stringstream out;
+ if (!base.empty()) out << base;
+ for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
+ if (i != in.begin()) out << separator;
+ out << *i;
+ }
+ return out.str();
+}
+
+typedef std::map<std::string, ProtocolRegistry::Factory*> Factories;
+
+ConnectionImpl* create_0_10(const std::string& url, const qpid::types::Variant::Map& options)
+{
+ return new qpid::client::amqp0_10::ConnectionImpl(url, options);
+}
+
+class Registry
+{
+ public:
+ Registry()
+ {
+ factories["amqp0-10"] = &create_0_10;
+ CommonOptions common("", "", QPIDC_CONF_FILE);
+ ProtocolOptions options;
+ try {
+ common.parse(0, 0, common.clientConfig, true);
+ options.parse (0, 0, common.clientConfig, true);
+ } catch (const std::exception& e) {
+ throw qpid::types::Exception(QPID_MSG("Failed to parse options while initialising Protocol Registry: " << e.what()));
+ }
+ QPID_LOG(debug, "Protocol defaults: " << options.protocolDefaults);
+ if (!options.protocolDefaults.empty()) {
+ split(versions, options.protocolDefaults, ", ");
+ }
+ }
+ ProtocolRegistry::Factory* find(const std::string& name) const
+ {
+ Factories::const_iterator i = factories.find(name);
+ if (i == factories.end()) {
+ std::stringstream error;
+ error << "Unsupported protocol: " << name;
+ error << " (valid values are " << getNames() << ")";
+ throw MessagingException(error.str());
+ } else {
+ return i->second;
+ }
+ }
+ void add(const std::string& name, ProtocolRegistry::Factory* factory)
+ {
+ factories[name] = factory;
+ }
+ std::string getNames() const
+ {
+ std::stringstream names;
+ for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) {
+ if (i != factories.begin()) names << ", ";
+ names << i->first;
+ }
+ return names.str();
+ }
+ void collectNames(std::vector<std::string>& names) const
+ {
+ for (std::vector< std::string >::const_iterator i = versions.begin(); i != versions.end(); ++i) {
+ Factories::const_iterator j = factories.find(*i);
+ if (j == factories.end()) {
+ QPID_LOG(notice, "Unsupported protocol specified in defaults " << *i);
+ } else {
+ names.push_back(*i);
+ }
+ }
+ if (names.empty()) {
+ if (!versions.empty()) {
+ QPID_LOG(warning, "Protocol defaults specified are not valid (" << join(versions) << ") falling back to " << getNames());
+ }
+ for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) {
+ names.push_back(i->first);
+ }
+ }
+ }
+ private:
+ Factories factories;
+ std::vector<std::string> versions;
+};
Registry& theRegistry()
{
@@ -57,17 +159,42 @@ ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::
qpid::client::theModuleLoader();//ensure modules are loaded
Variant name;
Variant::Map stripped;
+ std::vector<std::string> versions;
if (extract("protocol", name, options, stripped)) {
- Registry::const_iterator i = theRegistry().find(name.asString());
- if (i != theRegistry().end()) return (i->second)(url, stripped);
- else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
- else throw MessagingException("Unsupported protocol: " + name.asString());
+ split(versions, name.asString(), ", ");
+ } else {
+ theRegistry().collectNames(versions);
}
- return 0;
+ bool debugOn;
+ QPID_LOG_TEST(debug, debugOn);
+ if (debugOn) {
+ QPID_LOG(debug, "Trying versions " << join(versions));
+ }
+ return createInternal(versions, url, stripped, join(versions, "No suitable protocol version supported by peer, tried "));
+}
+
+ConnectionImpl* ProtocolRegistry::createInternal(const std::vector<std::string>& requested, const std::string& url, const Variant::Map& options, const std::string& error)
+{
+ std::vector<std::string>::const_iterator i = requested.begin();
+ if (i == requested.end())
+ throw MessagingException(error);
+ std::string name = *i;
+ ConnectionImpl* result = theRegistry().find(name)(url, options);
+ result->next = boost::bind(&ProtocolRegistry::createInternal, std::vector<std::string>(++i, requested.end()), url, options, error);
+ return result;
+ }
+
+ConnectionImpl* ProtocolRegistry::next(ConnectionImpl* last)
+{
+ if (last->next) {
+ return last->next();
+ }
+ throw MessagingException("No suitable protocol version supported by peer");
}
+
void ProtocolRegistry::add(const std::string& name, Factory* factory)
{
- theRegistry()[name] = factory;
+ theRegistry().add(name, factory);
}
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
index 25e8bd4ec8..6a6f5962c3 100644
--- a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
+++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
@@ -23,8 +23,8 @@
*/
#include "qpid/messaging/ImportExport.h"
-
#include "qpid/types/Variant.h"
+#include <vector>
namespace qpid {
namespace messaging {
@@ -37,8 +37,10 @@ class ProtocolRegistry
public:
typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options);
static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options);
+ static ConnectionImpl* next(ConnectionImpl*);
QPID_MESSAGING_EXTERN static void add(const std::string& name, Factory* factory);
private:
+ static ConnectionImpl* createInternal(const std::vector<std::string>& versions, const std::string& url, const qpid::types::Variant::Map& options, const std::string& error);
};
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp
index 419c508626..af8ab22251 100644
--- a/qpid/cpp/src/qpid/messaging/exceptions.cpp
+++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp
@@ -57,6 +57,7 @@ TransactionUnknown::TransactionUnknown(const std::string& msg) : TransactionErro
UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {}
ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {}
+ProtocolVersionError::ProtocolVersionError(const std::string& msg) : ConnectionError(msg) {}
AuthenticationFailure::AuthenticationFailure(const std::string& msg) : ConnectionError(msg) {}
TransportFailure::TransportFailure(const std::string& msg) : MessagingException(msg) {}