diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 181 |
1 files changed, 80 insertions, 101 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index e1c50c14fc..13de271e3b 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/amqp_framing.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/ClientInvoker.h" using namespace qpid::client; using namespace qpid::framing; @@ -31,14 +32,21 @@ using namespace boost; namespace { const std::string OK("OK"); +const std::string PLAIN("PLAIN"); +const std::string en_US("en_US"); + +const std::string INVALID_STATE_START("start received in invalid state"); +const std::string INVALID_STATE_TUNE("tune received in invalid state"); +const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); +const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); } ConnectionHandler::ConnectionHandler() - : StateManager(NOT_STARTED) + : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler) { - mechanism = "PLAIN"; - locale = "en_US"; + mechanism = PLAIN; + locale = en_US; heartbeat = 0; maxChannels = 32767; maxFrameSize = 65535; @@ -52,34 +60,29 @@ ConnectionHandler::ConnectionHandler() void ConnectionHandler::incoming(AMQFrame& frame) { if (getState() == CLOSED) { - throw Exception("Connection is closed."); + throw Exception("Received frame on closed connection"); } + AMQBody* body = frame.getBody(); - if (frame.getChannel() == 0) { - if (body->getMethod()) { - handle(body->getMethod()); - } else { - error(503, "Cannot send content on channel zero."); - } - } else { - switch(getState()) { - case OPEN: - try { + try { + if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) { + switch(getState()) { + case OPEN: in(frame); - }catch(ConnectionException& e){ - error(e.code, e.what(), body); - }catch(std::exception& e){ - error(541/*internal error*/, e.what(), body); + break; + case CLOSING: + QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); + break; + default: + throw Exception("Cannot receive frames on non-zero channel until connection is established."); } - break; - case CLOSING: - QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored."); - break; - default: - //must be in connection initialisation: - fail("Cannot receive frames on non-zero channel until connection is established."); } + }catch(std::exception& e){ + QPID_LOG(warning, "Closing connection due to " << e.what()); + setState(CLOSING); + proxy.close(501, e.what()); + if (onError) onError(501, e.what()); } } @@ -109,101 +112,77 @@ void ConnectionHandler::close() break; case OPEN: setState(CLOSING); - send(ConnectionCloseBody(version, 200, OK, 0, 0)); + proxy.close(200, OK); waitFor(CLOSED); break; // Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED } } -void ConnectionHandler::send(const framing::AMQBody& body) +void ConnectionHandler::checkState(STATES s, const std::string& msg) { - AMQFrame f(body); - out(f); + if (getState() != s) { + throw CommandInvalidException(msg); + } } -void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) +void ConnectionHandler::fail(const std::string& message) { - setState(CLOSING); - send(ConnectionCloseBody(version, code, message, classId, methodId)); + QPID_LOG(warning, message); + setState(FAILED); } -void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) +void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*mechanisms*/, const Array& /*locales*/) { - if (onError) - onError(code, message); - AMQMethodBody* method = body->getMethod(); - if (method) - error(code, message, method->amqpClassId(), method->amqpMethodId()); - else - error(code, message); + checkState(NOT_STARTED, INVALID_STATE_START); + setState(NEGOTIATING); + //TODO: verify that desired mechanism and locale are supported + string response = ((char)0) + uid + ((char)0) + pwd; + proxy.startOk(properties, mechanism, response, locale); } +void ConnectionHandler::secure(const std::string& /*challenge*/) +{ + throw NotImplementedException("Challenge-response cycle not yet implemented in client"); +} -void ConnectionHandler::fail(const std::string& message) +void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) { - QPID_LOG(warning, message); - setState(FAILED); + checkState(NEGOTIATING, INVALID_STATE_TUNE); + //TODO: verify that desired heartbeat and max frame size are valid + maxChannels = channelMax; + proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); + setState(OPENING); + proxy.open(vhost, capabilities, insist); } -void ConnectionHandler::handle(AMQMethodBody* method) +void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/) { - switch (getState()) { - case NOT_STARTED: - if (method->isA<ConnectionStartBody>()) { - setState(NEGOTIATING); - string response = ((char)0) + uid + ((char)0) + pwd; - send(ConnectionStartOkBody(version, properties, mechanism, response, locale)); - } else { - fail("Bad method sequence, expected connection-start."); - } - break; - case NEGOTIATING: - if (method->isA<ConnectionTuneBody>()) { - ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method); - heartbeat = proposal->getHeartbeat(); - maxChannels = proposal->getChannelMax(); - send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)); - setState(OPENING); - send(ConnectionOpenBody(version, vhost, capabilities, insist)); - //TODO: support for further security challenges - //} else if (method->isA<ConnectionSecureBody>()) { - } else { - fail("Unexpected method sequence, expected connection-tune."); - } - break; - case OPENING: - if (method->isA<ConnectionOpenOkBody>()) { - setState(OPEN); - //TODO: support for redirection - //} else if (method->isA<ConnectionRedirectBody>()) { - } else { - fail("Unexpected method sequence, expected connection-open-ok."); - } - break; - case OPEN: - if (method->isA<ConnectionCloseBody>()) { - send(ConnectionCloseOkBody(version)); - setState(CLOSED); - ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method); - QPID_LOG(warning, "Broker closed connection: " << c->getReplyCode() - << ", " << c->getReplyText()); - if (onError) { - onError(c->getReplyCode(), c->getReplyText()); - } - } else { - error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); - } - break; - case CLOSING: - if (method->isA<ConnectionCloseOkBody>()) { - if (onClose) { - onClose(); - } - setState(CLOSED); - } else { - QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored."); - } - break; + checkState(OPENING, INVALID_STATE_OPEN_OK); + //TODO: store knownHosts for reconnection etc + setState(OPEN); +} + +void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/) +{ + throw NotImplementedException("Redirection received from broker; not yet implemented in client"); +} + +void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText) +{ + proxy.closeOk(); + setState(CLOSED); + QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText); + if (onError) { + onError(replyCode, replyText); + } +} + +void ConnectionHandler::closeOk() +{ + checkState(CLOSING, INVALID_STATE_CLOSE_OK); + if (onClose) { + onClose(); } + setState(CLOSED); } |