summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp181
1 files changed, 80 insertions, 101 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index e1c50c14fc..13de271e3b 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -24,6 +24,7 @@
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/ClientInvoker.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -31,14 +32,21 @@ using namespace boost;
namespace {
const std::string OK("OK");
+const std::string PLAIN("PLAIN");
+const std::string en_US("en_US");
+
+const std::string INVALID_STATE_START("start received in invalid state");
+const std::string INVALID_STATE_TUNE("tune received in invalid state");
+const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
+const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
}
ConnectionHandler::ConnectionHandler()
- : StateManager(NOT_STARTED)
+ : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler)
{
- mechanism = "PLAIN";
- locale = "en_US";
+ mechanism = PLAIN;
+ locale = en_US;
heartbeat = 0;
maxChannels = 32767;
maxFrameSize = 65535;
@@ -52,34 +60,29 @@ ConnectionHandler::ConnectionHandler()
void ConnectionHandler::incoming(AMQFrame& frame)
{
if (getState() == CLOSED) {
- throw Exception("Connection is closed.");
+ throw Exception("Received frame on closed connection");
}
+
AMQBody* body = frame.getBody();
- if (frame.getChannel() == 0) {
- if (body->getMethod()) {
- handle(body->getMethod());
- } else {
- error(503, "Cannot send content on channel zero.");
- }
- } else {
- switch(getState()) {
- case OPEN:
- try {
+ try {
+ if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) {
+ switch(getState()) {
+ case OPEN:
in(frame);
- }catch(ConnectionException& e){
- error(e.code, e.what(), body);
- }catch(std::exception& e){
- error(541/*internal error*/, e.what(), body);
+ break;
+ case CLOSING:
+ QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
+ break;
+ default:
+ throw Exception("Cannot receive frames on non-zero channel until connection is established.");
}
- break;
- case CLOSING:
- QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
- break;
- default:
- //must be in connection initialisation:
- fail("Cannot receive frames on non-zero channel until connection is established.");
}
+ }catch(std::exception& e){
+ QPID_LOG(warning, "Closing connection due to " << e.what());
+ setState(CLOSING);
+ proxy.close(501, e.what());
+ if (onError) onError(501, e.what());
}
}
@@ -109,101 +112,77 @@ void ConnectionHandler::close()
break;
case OPEN:
setState(CLOSING);
- send(ConnectionCloseBody(version, 200, OK, 0, 0));
+ proxy.close(200, OK);
waitFor(CLOSED);
break;
// Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED
}
}
-void ConnectionHandler::send(const framing::AMQBody& body)
+void ConnectionHandler::checkState(STATES s, const std::string& msg)
{
- AMQFrame f(body);
- out(f);
+ if (getState() != s) {
+ throw CommandInvalidException(msg);
+ }
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+void ConnectionHandler::fail(const std::string& message)
{
- setState(CLOSING);
- send(ConnectionCloseBody(version, code, message, classId, methodId));
+ QPID_LOG(warning, message);
+ setState(FAILED);
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
+void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*mechanisms*/, const Array& /*locales*/)
{
- if (onError)
- onError(code, message);
- AMQMethodBody* method = body->getMethod();
- if (method)
- error(code, message, method->amqpClassId(), method->amqpMethodId());
- else
- error(code, message);
+ checkState(NOT_STARTED, INVALID_STATE_START);
+ setState(NEGOTIATING);
+ //TODO: verify that desired mechanism and locale are supported
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ proxy.startOk(properties, mechanism, response, locale);
}
+void ConnectionHandler::secure(const std::string& /*challenge*/)
+{
+ throw NotImplementedException("Challenge-response cycle not yet implemented in client");
+}
-void ConnectionHandler::fail(const std::string& message)
+void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/)
{
- QPID_LOG(warning, message);
- setState(FAILED);
+ checkState(NEGOTIATING, INVALID_STATE_TUNE);
+ //TODO: verify that desired heartbeat and max frame size are valid
+ maxChannels = channelMax;
+ proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
+ setState(OPENING);
+ proxy.open(vhost, capabilities, insist);
}
-void ConnectionHandler::handle(AMQMethodBody* method)
+void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/)
{
- switch (getState()) {
- case NOT_STARTED:
- if (method->isA<ConnectionStartBody>()) {
- setState(NEGOTIATING);
- string response = ((char)0) + uid + ((char)0) + pwd;
- send(ConnectionStartOkBody(version, properties, mechanism, response, locale));
- } else {
- fail("Bad method sequence, expected connection-start.");
- }
- break;
- case NEGOTIATING:
- if (method->isA<ConnectionTuneBody>()) {
- ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method);
- heartbeat = proposal->getHeartbeat();
- maxChannels = proposal->getChannelMax();
- send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat));
- setState(OPENING);
- send(ConnectionOpenBody(version, vhost, capabilities, insist));
- //TODO: support for further security challenges
- //} else if (method->isA<ConnectionSecureBody>()) {
- } else {
- fail("Unexpected method sequence, expected connection-tune.");
- }
- break;
- case OPENING:
- if (method->isA<ConnectionOpenOkBody>()) {
- setState(OPEN);
- //TODO: support for redirection
- //} else if (method->isA<ConnectionRedirectBody>()) {
- } else {
- fail("Unexpected method sequence, expected connection-open-ok.");
- }
- break;
- case OPEN:
- if (method->isA<ConnectionCloseBody>()) {
- send(ConnectionCloseOkBody(version));
- setState(CLOSED);
- ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method);
- QPID_LOG(warning, "Broker closed connection: " << c->getReplyCode()
- << ", " << c->getReplyText());
- if (onError) {
- onError(c->getReplyCode(), c->getReplyText());
- }
- } else {
- error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
- }
- break;
- case CLOSING:
- if (method->isA<ConnectionCloseOkBody>()) {
- if (onClose) {
- onClose();
- }
- setState(CLOSED);
- } else {
- QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored.");
- }
- break;
+ checkState(OPENING, INVALID_STATE_OPEN_OK);
+ //TODO: store knownHosts for reconnection etc
+ setState(OPEN);
+}
+
+void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/)
+{
+ throw NotImplementedException("Redirection received from broker; not yet implemented in client");
+}
+
+void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText)
+{
+ proxy.closeOk();
+ setState(CLOSED);
+ QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText);
+ if (onError) {
+ onError(replyCode, replyText);
+ }
+}
+
+void ConnectionHandler::closeOk()
+{
+ checkState(CLOSING, INVALID_STATE_CLOSE_OK);
+ if (onClose) {
+ onClose();
}
+ setState(CLOSED);
}