diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client')
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/RdmaConnector.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SslConnector.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/TCPConnector.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 2 |
8 files changed, 67 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index 26e69233af..f7d6341128 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -45,7 +45,7 @@ using namespace qpid::sys; namespace qpid { namespace client { -Connection::Connection() : version(framing::highestProtocolVersion) +Connection::Connection() : version(framing::ProtocolVersion(0, 10)) { ConnectionImpl::init(); } diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index ad8f21e7cd..98d04d8d66 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -283,8 +283,13 @@ void ConnectionImpl::open() // If the connect fails then the connector is cleaned up either when we try to connect again // - in that case in connector.reset() above; // - or when we are deleted - handler.waitForOpen(); - QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); + try { + handler.waitForOpen(); + QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); + } catch (const Exception& e) { + connector->checkVersion(version); + throw; + } // If the SASL layer has provided an "operational" userId for the connection, // put it in the negotiated settings. diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index c71dd9ecb6..af6483c979 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -24,6 +24,7 @@ #include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "qpid/sys/SecurityLayer.h" +#include "qpid/framing/ProtocolInitiation.h" #include <map> @@ -68,5 +69,26 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) { } +bool Connector::checkProtocolHeader(framing::Buffer& in, const framing::ProtocolVersion& version) +{ + if (!header) { + boost::shared_ptr<framing::ProtocolInitiation> protocolInit(new framing::ProtocolInitiation); + if (protocolInit->decode(in)) { + header = protocolInit; + QPID_LOG(debug, "RECV [" << getIdentifier() << "]: INIT(" << *protocolInit << ")"); + checkVersion(version); + } + } + return header; +} + +void Connector::checkVersion(const framing::ProtocolVersion& version) +{ + if (header && !header->matches(version)){ + throw ProtocolVersionError(QPID_MSG("Incorrect version: " << *header + << "; expected " << version)); + } +} + }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index 1b5e59e06d..49fb48bdf6 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -41,6 +41,8 @@ struct SecuritySettings; namespace framing { class InputHandler; class AMQFrame; +class Buffer; +class ProtocolInitiation; } namespace client { @@ -74,6 +76,11 @@ class Connector : public framing::FrameHandler virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0; + void checkVersion(const framing::ProtocolVersion& version); + protected: + boost::shared_ptr<framing::ProtocolInitiation> header; + + bool checkProtocolHeader(framing::Buffer&, const framing::ProtocolVersion& version); }; }} diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 1689b7aee2..77762343e2 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -388,18 +388,17 @@ void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { size_t RdmaConnector::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); - if (!initiated) { - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - //TODO: check the version is correct - QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); + try { + if (checkProtocolHeader(in, version)) { + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); + input->received(frame); + } } - initiated = true; - } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - input->received(frame); + } catch (const ProtocolVersionError& e) { + QPID_LOG(info, "Closing connection due to " << e.what()); + close(); } return size - in.available(); } diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp index 7c67196242..d5d2433060 100644 --- a/qpid/cpp/src/qpid/client/SslConnector.cpp +++ b/qpid/cpp/src/qpid/client/SslConnector.cpp @@ -385,23 +385,17 @@ void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff) size_t SslConnector::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); - if (!initiated) { - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); - if(!(protocolInit==version)){ - throw Exception(QPID_MSG("Unsupported version: " << protocolInit - << " supported version " << version)); + try { + if (checkProtocolHeader(in, version)) { + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); + input->received(frame); } - initiated = true; - } else { - return size - in.available(); } - } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - input->received(frame); + } catch (const ProtocolVersionError& e) { + QPID_LOG(info, "Closing connection due to " << e.what()); + close(); } return size - in.available(); } diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp index 02f820fe03..0a570fb1d9 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.cpp +++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp @@ -281,23 +281,17 @@ void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) size_t TCPConnector::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); - if (!initiated) { - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); - if(!(protocolInit==version)){ - throw Exception(QPID_MSG("Unsupported version: " << protocolInit - << " supported version " << version)); + try { + if (checkProtocolHeader(in, version)) { + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); + input->received(frame); } - initiated = true; - } else { - return size - in.available(); } - } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - input->received(frame); + } catch (const ProtocolVersionError& e) { + QPID_LOG(info, "Closing connection due to " << e.what()); + close(); } return size - in.available(); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 24145f0117..11ef06e517 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -305,6 +305,8 @@ bool ConnectionImpl::tryConnect() QPID_LOG(info, "Connected to " << *i); mergeUrls(connection.getInitialBrokers(), l); return resetSessions(l); + } catch (const qpid::ProtocolVersionError& e) { + throw qpid::messaging::ProtocolVersionError("AMQP 0-10 not supported"); } catch (const qpid::TransportFailure& e) { QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); } |