diff options
Diffstat (limited to 'cpp/src/qpid/framing/AMQFrame.cpp')
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 117 |
1 files changed, 63 insertions, 54 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index f79eae3524..780af71be4 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -1,4 +1,3 @@ - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,46 +18,70 @@ * under the License. * */ -#include <boost/format.hpp> - #include "AMQFrame.h" + #include "qpid/QpidError.h" -#include "AMQMethodBody.h" +#include "qpid/framing/variant.h" +#include "qpid/framing/AMQMethodBody.h" +#include <boost/format.hpp> + +#include <iostream> namespace qpid { namespace framing { +namespace { +struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> { + QPID_USING_NOBLANK(AMQBody*); + AMQBody* operator()(MethodHolder& t) const { return t.get(); } + template <class T> AMQBody* operator()(T& t) const { return &t; } +}; + +struct EncodeVisitor : public NoBlankVisitor<void> { + Buffer& buffer; + EncodeVisitor(Buffer& b) : buffer(b) {} + + QPID_USING_NOBLANK(void); + template <class T> void operator()(const T& t) const { return t.encode(buffer); } +}; + +struct SizeVisitor : public NoBlankVisitor<uint32_t> { + QPID_USING_NOBLANK(uint32_t); + template <class T> uint32_t operator()(const T& t) const { return t.size(); } +}; + +struct DecodeVisitor : public NoBlankVisitor<void> { + Buffer& buffer; + uint32_t size; + DecodeVisitor(Buffer& b, uint32_t s) : buffer(b), size(s) {} + QPID_USING_NOBLANK(void); + void operator()(MethodHolder& t) const { return t.decode(buffer); } + template <class T> void operator()(T& t) const { return t.decode(buffer, size); } +}; -AMQP_MethodVersionMap AMQFrame::versionMap; - -AMQFrame::AMQFrame(ProtocolVersion _version) - : channel(0), type(0), version(_version) - { - assert(version != ProtocolVersion(0,0)); - } - -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {} +} -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) : - channel(_channel), body(_body), version(_version) -{} +AMQBody* AMQFrame::getBody() { + return boost::apply_visitor(GetBodyVisitor(), body); +} -AMQFrame::~AMQFrame() {} +const AMQBody* AMQFrame::getBody() const { + return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); +} void AMQFrame::encode(Buffer& buffer) { - buffer.putOctet(body->type()); + buffer.putOctet(getBody()->type()); buffer.putShort(channel); - buffer.putLong(body->size()); - body->encode(buffer); + buffer.putLong(boost::apply_visitor(SizeVisitor(), body)); + boost::apply_visitor(EncodeVisitor(buffer), body); buffer.putOctet(0xCE); } uint32_t AMQFrame::size() const{ - assert(body.get()); - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() - + 1/*0xCE*/; + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + + boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/; } bool AMQFrame::decode(Buffer& buffer) @@ -66,56 +89,42 @@ bool AMQFrame::decode(Buffer& buffer) if(buffer.available() < 7) return false; buffer.record(); - uint32_t frameSize = decodeHead(buffer); - if(buffer.available() < frameSize + 1){ + + uint8_t type = buffer.getOctet(); + channel = buffer.getShort(); + uint32_t size = buffer.getLong(); + + if(buffer.available() < size+1){ buffer.restore(); return false; } - decodeBody(buffer, frameSize); + decodeBody(buffer, size, type); uint8_t end = buffer.getOctet(); if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); return true; } -uint32_t AMQFrame::decodeHead(Buffer& buffer){ - type = buffer.getOctet(); - channel = buffer.getShort(); - return buffer.getLong(); -} - -void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) +void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) { switch(type) { - case METHOD_BODY: - body = AMQMethodBody::create(versionMap, version, buffer); - break; - case HEADER_BODY: - body = AMQBody::shared_ptr(new AMQHeaderBody()); - break; - case CONTENT_BODY: - body = AMQBody::shared_ptr(new AMQContentBody()); - break; - case HEARTBEAT_BODY: - body = AMQBody::shared_ptr(new AMQHeartbeatBody()); - break; + case METHOD_BODY: body = MethodHolder(); break; + case HEADER_BODY: body = AMQHeaderBody(); break; + case CONTENT_BODY: body = AMQContentBody(); break; + case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break; + default: THROW_QPID_ERROR( FRAMING_ERROR, boost::format("Unknown frame type %d") % type); } - body->decode(buffer, size); + boost::apply_visitor(DecodeVisitor(buffer,size), body); } -std::ostream& operator<<(std::ostream& out, const AMQFrame& t) +std::ostream& operator<<(std::ostream& out, const AMQFrame& f) { - out << "Frame[channel=" << t.channel << "; "; - if (t.body.get() == 0) - out << "empty"; - else - out << *t.body; - out << "]"; - return out; + return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody() + << "]"; } |