diff options
Diffstat (limited to 'cpp/src/qpid/client/ChannelHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ChannelHandler.cpp | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp index a6aea438f0..b3d720baf0 100644 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -21,6 +21,7 @@ #include "ChannelHandler.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; @@ -30,40 +31,39 @@ ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {} void ChannelHandler::incoming(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (getState() == OPEN) { - if (isA<ChannelCloseBody>(body)) { - ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body)); + ChannelCloseBody* closeBody= + dynamic_cast<ChannelCloseBody*>(body->getMethod()); + if (closeBody) { setState(CLOSED); if (onClose) { - onClose(method->getReplyCode(), method->getReplyText()); + onClose(closeBody->getReplyCode(), closeBody->getReplyText()); } } else { try { in(frame); }catch(ChannelException& e){ - if (body->type() == METHOD_BODY) { - AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body)); - close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - } else { + AMQMethodBody* method=body->getMethod(); + if (method) + close(e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + else close(e.code, e.toString(), 0, 0); - } } } } else { - if (body->type() == METHOD_BODY) { - handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); - } else { + if (body->getMethod()) + handleMethod(body->getMethod()); + else throw new ConnectionException(504, "Channel not open."); - } - } } void ChannelHandler::outgoing(AMQFrame& frame) { if (getState() == OPEN) { - frame.channel = id; + frame.setChannel(id); out(frame); } else { throw Exception("Channel not open"); @@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id) id = _id; setState(OPENING); - AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version))); + AMQFrame f(version, id, ChannelOpenBody(version)); out(f); std::set<int> states; @@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id) void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId))); + AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId)); out(f); } @@ -100,24 +100,24 @@ void ChannelHandler::close() waitFor(CLOSED); } -void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method) +void ChannelHandler::handleMethod(AMQMethodBody* method) { switch (getState()) { - case OPENING: + case OPENING: if (method->isA<ChannelOpenOkBody>()) { setState(OPEN); } else { throw ConnectionException(504, "Channel not opened."); } break; - case CLOSING: + case CLOSING: if (method->isA<ChannelCloseOkBody>()) { setState(CLOSED); } //else just ignore it break; - case CLOSED: + case CLOSED: throw ConnectionException(504, "Channel not opened."); - default: + default: throw Exception("Unexpected state encountered in ChannelHandler!"); } } |