summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp65
1 files changed, 32 insertions, 33 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index f47506d977..66db9384e2 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -22,6 +22,8 @@
#include "ConnectionHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -53,16 +55,16 @@ void ConnectionHandler::incoming(AMQFrame& frame)
throw Exception("Connection is closed.");
}
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (frame.getChannel() == 0) {
- if (body->type() == METHOD_BODY) {
- handle(shared_polymorphic_cast<AMQMethodBody>(body));
+ if (body->getMethod()) {
+ handle(body->getMethod());
} else {
error(503, "Cannot send content on channel zero.");
}
} else {
switch(getState()) {
- case OPEN:
+ case OPEN:
try {
in(frame);
}catch(ConnectionException& e){
@@ -71,10 +73,10 @@ void ConnectionHandler::incoming(AMQFrame& frame)
error(541/*internal error*/, e.what(), body);
}
break;
- case CLOSING:
+ case CLOSING:
QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
break;
- default:
+ default:
//must be in connection initialisation:
fail("Cannot receive frames on non-zero channel until connection is established.");
}
@@ -101,32 +103,29 @@ void ConnectionHandler::waitForOpen()
void ConnectionHandler::close()
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
-
+ send(ConnectionCloseBody(version, 200, OK, 0, 0));
waitFor(CLOSED);
}
-void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+void ConnectionHandler::send(const framing::AMQBody& body)
{
- AMQFrame f;
- f.setBody(body);
+ AMQFrame f(ProtocolVersion(), 0, body);
out(f);
}
void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));
+ send(ConnectionCloseBody(version, code, message, classId, methodId));
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
{
- if (body->type() == METHOD_BODY) {
- AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method = body->getMethod();
+ if (method)
error(code, message, method->amqpClassId(), method->amqpMethodId());
- } else {
+ else
error(code, message);
- }
}
@@ -136,54 +135,54 @@ void ConnectionHandler::fail(const std::string& message)
setState(FAILED);
}
-void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+void ConnectionHandler::handle(AMQMethodBody* method)
{
switch (getState()) {
- case NOT_STARTED:
+ case NOT_STARTED:
if (method->isA<ConnectionStartBody>()) {
setState(NEGOTIATING);
string response = ((char)0) + uid + ((char)0) + pwd;
- send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+ send(ConnectionStartOkBody(version, properties, mechanism, response, locale));
} else {
fail("Bad method sequence, expected connection-start.");
}
break;
- case NEGOTIATING:
+ case NEGOTIATING:
if (method->isA<ConnectionTuneBody>()) {
- ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+ ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method);
heartbeat = proposal->getHeartbeat();
maxChannels = proposal->getChannelMax();
- send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+ send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat));
setState(OPENING);
- send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
- //TODO: support for further security challenges
- //} else if (method->isA<ConnectionSecureBody>()) {
+ send(ConnectionOpenBody(version, vhost, capabilities, insist));
+ //TODO: support for further security challenges
+ //} else if (method->isA<ConnectionSecureBody>()) {
} else {
fail("Unexpected method sequence, expected connection-tune.");
}
break;
- case OPENING:
+ case OPENING:
if (method->isA<ConnectionOpenOkBody>()) {
setState(OPEN);
- //TODO: support for redirection
- //} else if (method->isA<ConnectionRedirectBody>()) {
+ //TODO: support for redirection
+ //} else if (method->isA<ConnectionRedirectBody>()) {
} else {
fail("Unexpected method sequence, expected connection-open-ok.");
}
break;
- case OPEN:
+ case OPEN:
if (method->isA<ConnectionCloseBody>()) {
- send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+ send(ConnectionCloseOkBody(version));
setState(CLOSED);
if (onError) {
- ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method));
+ ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method);
onError(c->getReplyCode(), c->getReplyText());
}
} else {
error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
}
break;
- case CLOSING:
+ case CLOSING:
if (method->isA<ConnectionCloseOkBody>()) {
if (onClose) {
onClose();