summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing/AMQFrame.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/framing/AMQFrame.cpp')
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp117
1 files changed, 63 insertions, 54 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index f79eae3524..780af71be4 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -1,4 +1,3 @@
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,46 +18,70 @@
* under the License.
*
*/
-#include <boost/format.hpp>
-
#include "AMQFrame.h"
+
#include "qpid/QpidError.h"
-#include "AMQMethodBody.h"
+#include "qpid/framing/variant.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include <boost/format.hpp>
+
+#include <iostream>
namespace qpid {
namespace framing {
+namespace {
+struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> {
+ QPID_USING_NOBLANK(AMQBody*);
+ AMQBody* operator()(MethodHolder& t) const { return t.get(); }
+ template <class T> AMQBody* operator()(T& t) const { return &t; }
+};
+
+struct EncodeVisitor : public NoBlankVisitor<void> {
+ Buffer& buffer;
+ EncodeVisitor(Buffer& b) : buffer(b) {}
+
+ QPID_USING_NOBLANK(void);
+ template <class T> void operator()(const T& t) const { return t.encode(buffer); }
+};
+
+struct SizeVisitor : public NoBlankVisitor<uint32_t> {
+ QPID_USING_NOBLANK(uint32_t);
+ template <class T> uint32_t operator()(const T& t) const { return t.size(); }
+};
+
+struct DecodeVisitor : public NoBlankVisitor<void> {
+ Buffer& buffer;
+ uint32_t size;
+ DecodeVisitor(Buffer& b, uint32_t s) : buffer(b), size(s) {}
+ QPID_USING_NOBLANK(void);
+ void operator()(MethodHolder& t) const { return t.decode(buffer); }
+ template <class T> void operator()(T& t) const { return t.decode(buffer, size); }
+};
-AMQP_MethodVersionMap AMQFrame::versionMap;
-
-AMQFrame::AMQFrame(ProtocolVersion _version)
- : channel(0), type(0), version(_version)
- {
- assert(version != ProtocolVersion(0,0));
- }
-
-AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {}
+}
-AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) :
- channel(_channel), body(_body), version(_version)
-{}
+AMQBody* AMQFrame::getBody() {
+ return boost::apply_visitor(GetBodyVisitor(), body);
+}
-AMQFrame::~AMQFrame() {}
+const AMQBody* AMQFrame::getBody() const {
+ return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
+}
void AMQFrame::encode(Buffer& buffer)
{
- buffer.putOctet(body->type());
+ buffer.putOctet(getBody()->type());
buffer.putShort(channel);
- buffer.putLong(body->size());
- body->encode(buffer);
+ buffer.putLong(boost::apply_visitor(SizeVisitor(), body));
+ boost::apply_visitor(EncodeVisitor(buffer), body);
buffer.putOctet(0xCE);
}
uint32_t AMQFrame::size() const{
- assert(body.get());
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size()
- + 1/*0xCE*/;
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ +
+ boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/;
}
bool AMQFrame::decode(Buffer& buffer)
@@ -66,56 +89,42 @@ bool AMQFrame::decode(Buffer& buffer)
if(buffer.available() < 7)
return false;
buffer.record();
- uint32_t frameSize = decodeHead(buffer);
- if(buffer.available() < frameSize + 1){
+
+ uint8_t type = buffer.getOctet();
+ channel = buffer.getShort();
+ uint32_t size = buffer.getLong();
+
+ if(buffer.available() < size+1){
buffer.restore();
return false;
}
- decodeBody(buffer, frameSize);
+ decodeBody(buffer, size, type);
uint8_t end = buffer.getOctet();
if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
return true;
}
-uint32_t AMQFrame::decodeHead(Buffer& buffer){
- type = buffer.getOctet();
- channel = buffer.getShort();
- return buffer.getLong();
-}
-
-void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
+void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type)
{
switch(type)
{
- case METHOD_BODY:
- body = AMQMethodBody::create(versionMap, version, buffer);
- break;
- case HEADER_BODY:
- body = AMQBody::shared_ptr(new AMQHeaderBody());
- break;
- case CONTENT_BODY:
- body = AMQBody::shared_ptr(new AMQContentBody());
- break;
- case HEARTBEAT_BODY:
- body = AMQBody::shared_ptr(new AMQHeartbeatBody());
- break;
+ case METHOD_BODY: body = MethodHolder(); break;
+ case HEADER_BODY: body = AMQHeaderBody(); break;
+ case CONTENT_BODY: body = AMQContentBody(); break;
+ case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break;
+
default:
THROW_QPID_ERROR(
FRAMING_ERROR,
boost::format("Unknown frame type %d") % type);
}
- body->decode(buffer, size);
+ boost::apply_visitor(DecodeVisitor(buffer,size), body);
}
-std::ostream& operator<<(std::ostream& out, const AMQFrame& t)
+std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
- out << "Frame[channel=" << t.channel << "; ";
- if (t.body.get() == 0)
- out << "empty";
- else
- out << *t.body;
- out << "]";
- return out;
+ return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody()
+ << "]";
}