summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client')
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp9
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp22
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h7
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp21
-rw-r--r--qpid/cpp/src/qpid/client/SslConnector.cpp24
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.cpp24
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp2
8 files changed, 67 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index 26e69233af..f7d6341128 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -45,7 +45,7 @@ using namespace qpid::sys;
namespace qpid {
namespace client {
-Connection::Connection() : version(framing::highestProtocolVersion)
+Connection::Connection() : version(framing::ProtocolVersion(0, 10))
{
ConnectionImpl::init();
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index ad8f21e7cd..98d04d8d66 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -283,8 +283,13 @@ void ConnectionImpl::open()
// If the connect fails then the connector is cleaned up either when we try to connect again
// - in that case in connector.reset() above;
// - or when we are deleted
- handler.waitForOpen();
- QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+ try {
+ handler.waitForOpen();
+ QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+ } catch (const Exception& e) {
+ connector->checkVersion(version);
+ throw;
+ }
// If the SASL layer has provided an "operational" userId for the connection,
// put it in the negotiated settings.
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index c71dd9ecb6..af6483c979 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -24,6 +24,7 @@
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/SecurityLayer.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include <map>
@@ -68,5 +69,26 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>)
{
}
+bool Connector::checkProtocolHeader(framing::Buffer& in, const framing::ProtocolVersion& version)
+{
+ if (!header) {
+ boost::shared_ptr<framing::ProtocolInitiation> protocolInit(new framing::ProtocolInitiation);
+ if (protocolInit->decode(in)) {
+ header = protocolInit;
+ QPID_LOG(debug, "RECV [" << getIdentifier() << "]: INIT(" << *protocolInit << ")");
+ checkVersion(version);
+ }
+ }
+ return header;
+}
+
+void Connector::checkVersion(const framing::ProtocolVersion& version)
+{
+ if (header && !header->matches(version)){
+ throw ProtocolVersionError(QPID_MSG("Incorrect version: " << *header
+ << "; expected " << version));
+ }
+}
+
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 1b5e59e06d..49fb48bdf6 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -41,6 +41,8 @@ struct SecuritySettings;
namespace framing {
class InputHandler;
class AMQFrame;
+class Buffer;
+class ProtocolInitiation;
}
namespace client {
@@ -74,6 +76,11 @@ class Connector : public framing::FrameHandler
virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
+ void checkVersion(const framing::ProtocolVersion& version);
+ protected:
+ boost::shared_ptr<framing::ProtocolInitiation> header;
+
+ bool checkProtocolHeader(framing::Buffer&, const framing::ProtocolVersion& version);
};
}}
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index 1689b7aee2..77762343e2 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -388,18 +388,17 @@ void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
size_t RdmaConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- //TODO: check the version is correct
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+ try {
+ if (checkProtocolHeader(in, version)) {
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ input->received(frame);
+ }
}
- initiated = true;
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- input->received(frame);
+ } catch (const ProtocolVersionError& e) {
+ QPID_LOG(info, "Closing connection due to " << e.what());
+ close();
}
return size - in.available();
}
diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp
index 7c67196242..d5d2433060 100644
--- a/qpid/cpp/src/qpid/client/SslConnector.cpp
+++ b/qpid/cpp/src/qpid/client/SslConnector.cpp
@@ -385,23 +385,17 @@ void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff)
size_t SslConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
- if(!(protocolInit==version)){
- throw Exception(QPID_MSG("Unsupported version: " << protocolInit
- << " supported version " << version));
+ try {
+ if (checkProtocolHeader(in, version)) {
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ input->received(frame);
}
- initiated = true;
- } else {
- return size - in.available();
}
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- input->received(frame);
+ } catch (const ProtocolVersionError& e) {
+ QPID_LOG(info, "Closing connection due to " << e.what());
+ close();
}
return size - in.available();
}
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp
index 02f820fe03..0a570fb1d9 100644
--- a/qpid/cpp/src/qpid/client/TCPConnector.cpp
+++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp
@@ -281,23 +281,17 @@ void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
size_t TCPConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
- if(!(protocolInit==version)){
- throw Exception(QPID_MSG("Unsupported version: " << protocolInit
- << " supported version " << version));
+ try {
+ if (checkProtocolHeader(in, version)) {
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ input->received(frame);
}
- initiated = true;
- } else {
- return size - in.available();
}
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- input->received(frame);
+ } catch (const ProtocolVersionError& e) {
+ QPID_LOG(info, "Closing connection due to " << e.what());
+ close();
}
return size - in.available();
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 24145f0117..11ef06e517 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -305,6 +305,8 @@ bool ConnectionImpl::tryConnect()
QPID_LOG(info, "Connected to " << *i);
mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
+ } catch (const qpid::ProtocolVersionError& e) {
+ throw qpid::messaging::ProtocolVersionError("AMQP 0-10 not supported");
} catch (const qpid::TransportFailure& e) {
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}