diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 65 |
1 files changed, 32 insertions, 33 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index f47506d977..66db9384e2 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -22,6 +22,8 @@ #include "ConnectionHandler.h" #include "qpid/log/Statement.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; @@ -53,16 +55,16 @@ void ConnectionHandler::incoming(AMQFrame& frame) throw Exception("Connection is closed."); } - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (frame.getChannel() == 0) { - if (body->type() == METHOD_BODY) { - handle(shared_polymorphic_cast<AMQMethodBody>(body)); + if (body->getMethod()) { + handle(body->getMethod()); } else { error(503, "Cannot send content on channel zero."); } } else { switch(getState()) { - case OPEN: + case OPEN: try { in(frame); }catch(ConnectionException& e){ @@ -71,10 +73,10 @@ void ConnectionHandler::incoming(AMQFrame& frame) error(541/*internal error*/, e.what(), body); } break; - case CLOSING: + case CLOSING: QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored."); break; - default: + default: //must be in connection initialisation: fail("Cannot receive frames on non-zero channel until connection is established."); } @@ -101,32 +103,29 @@ void ConnectionHandler::waitForOpen() void ConnectionHandler::close() { setState(CLOSING); - send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0))); - + send(ConnectionCloseBody(version, 200, OK, 0, 0)); waitFor(CLOSED); } -void ConnectionHandler::send(framing::AMQBody::shared_ptr body) +void ConnectionHandler::send(const framing::AMQBody& body) { - AMQFrame f; - f.setBody(body); + AMQFrame f(ProtocolVersion(), 0, body); out(f); } void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId))); + send(ConnectionCloseBody(version, code, message, classId, methodId)); } -void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body) +void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) { - if (body->type() == METHOD_BODY) { - AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body)); + AMQMethodBody* method = body->getMethod(); + if (method) error(code, message, method->amqpClassId(), method->amqpMethodId()); - } else { + else error(code, message); - } } @@ -136,54 +135,54 @@ void ConnectionHandler::fail(const std::string& message) setState(FAILED); } -void ConnectionHandler::handle(AMQMethodBody::shared_ptr method) +void ConnectionHandler::handle(AMQMethodBody* method) { switch (getState()) { - case NOT_STARTED: + case NOT_STARTED: if (method->isA<ConnectionStartBody>()) { setState(NEGOTIATING); string response = ((char)0) + uid + ((char)0) + pwd; - send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale))); + send(ConnectionStartOkBody(version, properties, mechanism, response, locale)); } else { fail("Bad method sequence, expected connection-start."); } break; - case NEGOTIATING: + case NEGOTIATING: if (method->isA<ConnectionTuneBody>()) { - ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method)); + ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method); heartbeat = proposal->getHeartbeat(); maxChannels = proposal->getChannelMax(); - send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat))); + send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)); setState(OPENING); - send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist))); - //TODO: support for further security challenges - //} else if (method->isA<ConnectionSecureBody>()) { + send(ConnectionOpenBody(version, vhost, capabilities, insist)); + //TODO: support for further security challenges + //} else if (method->isA<ConnectionSecureBody>()) { } else { fail("Unexpected method sequence, expected connection-tune."); } break; - case OPENING: + case OPENING: if (method->isA<ConnectionOpenOkBody>()) { setState(OPEN); - //TODO: support for redirection - //} else if (method->isA<ConnectionRedirectBody>()) { + //TODO: support for redirection + //} else if (method->isA<ConnectionRedirectBody>()) { } else { fail("Unexpected method sequence, expected connection-open-ok."); } break; - case OPEN: + case OPEN: if (method->isA<ConnectionCloseBody>()) { - send(make_shared_ptr(new ConnectionCloseOkBody(version))); + send(ConnectionCloseOkBody(version)); setState(CLOSED); if (onError) { - ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method)); + ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method); onError(c->getReplyCode(), c->getReplyText()); } } else { error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); } break; - case CLOSING: + case CLOSING: if (method->isA<ConnectionCloseOkBody>()) { if (onClose) { onClose(); |