summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp4
-rw-r--r--cpp/lib/client/ClientChannel.cpp6
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp24
-rw-r--r--cpp/lib/common/framing/AMQFrame.h16
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp25
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h18
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.cpp46
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.h52
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.cpp43
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.h50
-rw-r--r--cpp/lib/common/framing/BodyHandler.cpp13
-rw-r--r--cpp/lib/common/framing/amqp_types.h5
-rw-r--r--cpp/tests/FramingTest.cpp31
13 files changed, 176 insertions, 157 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index d0971c4fae..3110c7ca3d 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -95,7 +95,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
switch(body->type())
{
- case METHOD_BODY:
+ case REQUEST_BODY:
+ case RESPONSE_BODY:
+ case METHOD_BODY:
method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
try{
method->invoke(*this, channel);
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index 3d0b547b07..37aedd5607 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -171,7 +171,7 @@ void Channel::cancelAll(){
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
Consumer* c = i->second;
if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(c->lastDeliveryTag, true)));
+ out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
}
consumers.erase(i);
delete c;
@@ -377,8 +377,8 @@ void Channel::deliver(Consumer* consumer, Message& msg){
if(++(consumer->count) < prefetch) break;
//else drop-through
case AUTO_ACK:
- out->send(new AMQFrame(version, id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
- consumer->lastDeliveryTag = 0;
+ out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple)));
+ consumer->lastDeliveryTag = 0;
}
}
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
index 9ebf4c27ad..8ac5199c45 100644
--- a/cpp/lib/common/framing/AMQFrame.cpp
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -69,13 +69,13 @@ bool AMQFrame::decode(Buffer& buffer)
{
if(buffer.available() < 7) return false;
buffer.record();
- u_int32_t bufSize = decodeHead(buffer);
-
- if(buffer.available() < bufSize + 1){
+ u_int32_t frameSize = decodeHead(buffer);
+
+ if(buffer.available() < frameSize + 1){
buffer.restore();
return false;
}
- decodeBody(buffer, bufSize);
+ decodeBody(buffer, frameSize);
u_int8_t end = buffer.getOctet();
if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
return true;
@@ -87,13 +87,19 @@ u_int32_t AMQFrame::decodeHead(Buffer& buffer){
return buffer.getLong();
}
-void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize)
+void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
{
switch(type)
{
case METHOD_BODY:
body = AMQMethodBody::create(versionMap, version, buffer);
break;
+ case REQUEST_BODY:
+ body = AMQRequestBody::create(versionMap, version, buffer);
+ break;
+ case RESPONSE_BODY:
+ body = AMQResponseBody::create(versionMap, version, buffer);
+ break;
case HEADER_BODY:
body = AMQBody::shared_ptr(new AMQHeaderBody());
break;
@@ -103,19 +109,13 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize)
case HEARTBEAT_BODY:
body = AMQBody::shared_ptr(new AMQHeartbeatBody());
break;
- case REQUEST_BODY:
- body = AMQBody::shared_ptr(new AMQRequestBody(versionMap, version));
- break;
- case RESPONSE_BODY:
- body = AMQBody::shared_ptr(new AMQResponseBody(versionMap, version));
- break;
default:
assert(0);
string msg("Unknown body type: ");
msg += type;
THROW_QPID_ERROR(FRAMING_ERROR, msg);
}
- body->decode(buffer, bufSize);
+ body->decode(buffer, size);
}
std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t)
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index c08dfe805b..a927481e77 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -39,13 +39,6 @@ namespace framing {
class AMQFrame : virtual public AMQDataBlock
{
- static AMQP_MethodVersionMap versionMap;
- qpid::framing::ProtocolVersion version;
-
- u_int16_t channel;
- u_int8_t type;
- AMQBody::shared_ptr body;
-
public:
AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
@@ -60,6 +53,15 @@ class AMQFrame : virtual public AMQDataBlock
u_int32_t decodeHead(Buffer& buffer);
void decodeBody(Buffer& buffer, uint32_t size);
+ private:
+ static AMQP_MethodVersionMap versionMap;
+ qpid::framing::ProtocolVersion version;
+
+ u_int16_t channel;
+ u_int8_t type;
+ AMQBody::shared_ptr body;
+
+
friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body);
};
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
index cd6db9b1a9..de081243ee 100644
--- a/cpp/lib/common/framing/AMQMethodBody.cpp
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -25,14 +25,9 @@
namespace qpid {
namespace framing {
-void AMQMethodBody::encode(Buffer& buffer) const{
+void AMQMethodBody::encodeId(Buffer& buffer) const{
buffer.putShort(amqpClassId());
buffer.putShort(amqpMethodId());
- encodeContent(buffer);
-}
-
-void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/){
- decodeContent(buffer);
}
bool AMQMethodBody::match(AMQMethodBody* other) const{
@@ -43,17 +38,25 @@ void AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channe
THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
}
-
-
AMQMethodBody::shared_ptr AMQMethodBody::create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer)
{
- u_int16_t classId = buffer.getShort();
- u_int16_t methodId = buffer.getShort();
+ MethodId id;
+ id.decode(buffer);
return AMQMethodBody::shared_ptr(
versionMap.createMethodBody(
- classId, methodId, version.getMajor(), version.getMinor()));
+ id.classId, id.methodId, version.getMajor(), version.getMinor()));
}
+void AMQMethodBody::MethodId::decode(Buffer& buffer) {
+ classId = buffer.getShort();
+ methodId = buffer.getShort();
+}
+
+void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/) {
+ decodeContent(buffer);
+}
+
+
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
index 57dc84d4aa..a7725079fb 100644
--- a/cpp/lib/common/framing/AMQMethodBody.h
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -42,20 +42,28 @@ class AMQMethodBody : public AMQBody
ProtocolVersion version;
u_int8_t type() const { return METHOD_BODY; }
- u_int32_t size() const { return 4 + bodySize(); }
AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {}
AMQMethodBody(ProtocolVersion version) : version(version) {}
virtual ~AMQMethodBody() {}
+ void decode(Buffer&, u_int32_t);
virtual u_int16_t amqpMethodId() const = 0;
virtual u_int16_t amqpClassId() const = 0;
virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel);
+ bool match(AMQMethodBody* other) const;
+
+ protected:
+ static u_int32_t baseSize() { return 4; }
+
+ struct MethodId {
+ u_int16_t classId;
+ u_int16_t methodId;
+ void decode(Buffer& b);
+ };
+
+ void encodeId(Buffer& buffer) const;
virtual void encodeContent(Buffer& buffer) const = 0;
virtual void decodeContent(Buffer& buffer) = 0;
- virtual u_int32_t bodySize() const = 0;
- void encode(Buffer& buffer) const;
- void decode(Buffer& buffer, u_int32_t size);
- bool match(AMQMethodBody* other) const;
};
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp
index 61688cd97c..a5e6ce6974 100644
--- a/cpp/lib/common/framing/AMQRequestBody.cpp
+++ b/cpp/lib/common/framing/AMQRequestBody.cpp
@@ -22,37 +22,37 @@
namespace qpid {
namespace framing {
-AMQRequestBody::AMQRequestBody(AMQP_MethodVersionMap& vm, ProtocolVersion v)
- : versionMap(vm), version(v) {}
-
-AMQRequestBody::AMQRequestBody(
- AMQP_MethodVersionMap& vm, ProtocolVersion v,
- u_int64_t reqId, u_int64_t respMark,
- AMQMethodBody::shared_ptr m
-) : versionMap(vm), version(v),
- requestId(reqId), responseMark(respMark), method(m)
-{}
-
-
-void
-AMQRequestBody::encode(Buffer& buffer) const {
- assert(method.get());
+void AMQRequestBody::Data::encode(Buffer& buffer) const {
buffer.putLongLong(requestId);
buffer.putLongLong(responseMark);
- method->encode(buffer);
}
-
-void
-AMQRequestBody::decode(Buffer& buffer, u_int32_t /*size*/) {
+
+void AMQRequestBody::Data::decode(Buffer& buffer) {
requestId = buffer.getLongLong();
responseMark = buffer.getLongLong();
- method = AMQMethodBody::create(versionMap, version, buffer);
}
-void
-AMQRequestBody::print(std::ostream& out) const
+void AMQRequestBody::encode(Buffer& buffer) const {
+ data.encode(buffer);
+ encodeId(buffer);
+ encodeContent(buffer);
+}
+
+AMQRequestBody::shared_ptr
+AMQRequestBody::create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer)
{
- out << "request(" << size() << " bytes) " << *method;
+ MethodId id;
+ Data data;
+ data.decode(buffer);
+ id.decode(buffer);
+ AMQRequestBody* body = dynamic_cast<AMQRequestBody*>(
+ versionMap.createMethodBody(
+ id.classId, id.methodId, version.getMajor(), version.getMinor()));
+ assert(body);
+ body->data = data;
+ return AMQRequestBody::shared_ptr(body);
}
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h
index 0908545fcd..1836a7d49d 100644
--- a/cpp/lib/common/framing/AMQRequestBody.h
+++ b/cpp/lib/common/framing/AMQRequestBody.h
@@ -24,40 +24,44 @@
namespace qpid {
namespace framing {
-class AMQP_MethodVersionMap;
-
/**
- * Body of an AMQP Request frame.
+ * Body of a request method frame.
*/
-class AMQRequestBody : public AMQBody
+class AMQRequestBody : public AMQMethodBody
{
- public:
+ public:
typedef boost::shared_ptr<AMQRequestBody> shared_ptr;
+
+ static shared_ptr create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer);
- AMQRequestBody(AMQP_MethodVersionMap&, ProtocolVersion);
- AMQRequestBody(
- AMQP_MethodVersionMap&, ProtocolVersion,
- u_int64_t requestId, u_int64_t responseMark,
- AMQMethodBody::shared_ptr method);
+ AMQRequestBody(ProtocolVersion v, RequestId id=0, ResponseId mark=0)
+ : AMQMethodBody(v), data(id, mark) {}
- const AMQMethodBody& getMethodBody() const { return *method; }
- AMQMethodBody& getMethodBody() { return *method; }
- u_int64_t getRequestId() { return requestId; }
- u_int64_t getResponseMark() { return responseMark; }
-
- u_int32_t size() const { return 16 + method->size(); }
u_int8_t type() const { return REQUEST_BODY; }
-
void encode(Buffer& buffer) const;
- void decode(Buffer& buffer, u_int32_t size);
- void print(std::ostream& out) const;
+ RequestId getRequestId() const { return data.requestId; }
+ void setRequestId(RequestId id) { data.requestId=id; }
+ ResponseId getResponseMark() const { return data.responseMark; }
+ void setResponseMark(ResponseId mark) { data.responseMark=mark; }
+
+ protected:
+ static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; }
+
private:
- AMQP_MethodVersionMap& versionMap;
- ProtocolVersion version;
- u_int64_t requestId;
- u_int64_t responseMark;
- AMQMethodBody::shared_ptr method;
+ struct Data {
+ Data(RequestId id=0, ResponseId mark=0)
+ : requestId(id), responseMark(mark) {}
+ void encode(Buffer&) const;
+ void decode(Buffer&);
+
+ RequestId requestId;
+ ResponseId responseMark;
+ };
+
+ Data data;
};
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp
index 1e1fbdf4bd..49fdea9242 100644
--- a/cpp/lib/common/framing/AMQResponseBody.cpp
+++ b/cpp/lib/common/framing/AMQResponseBody.cpp
@@ -22,38 +22,39 @@
namespace qpid {
namespace framing {
-AMQResponseBody::AMQResponseBody(AMQP_MethodVersionMap& vm, ProtocolVersion v)
- : versionMap(vm), version(v) {}
-
-AMQResponseBody::AMQResponseBody(
- AMQP_MethodVersionMap& vm, ProtocolVersion v,
- u_int64_t respId, u_int64_t reqId, u_int32_t batch,
- AMQMethodBody::shared_ptr m
-) : versionMap(vm), version(v),
- responseId(respId), requestId(reqId), batchOffset(batch), method(m)
-{}
-
-void
-AMQResponseBody::encode(Buffer& buffer) const {
- assert(method.get());
+void AMQResponseBody::Data::encode(Buffer& buffer) const {
buffer.putLongLong(responseId);
buffer.putLongLong(requestId);
buffer.putLong(batchOffset);
- method->encode(buffer);
}
-void
-AMQResponseBody::decode(Buffer& buffer, u_int32_t /*size*/) {
+void AMQResponseBody::Data::decode(Buffer& buffer) {
responseId = buffer.getLongLong();
requestId = buffer.getLongLong();
batchOffset = buffer.getLong();
- method = AMQMethodBody::create(versionMap, version, buffer);
}
-void
-AMQResponseBody::print(std::ostream& out) const
+void AMQResponseBody::encode(Buffer& buffer) const {
+ data.encode(buffer);
+ encodeId(buffer);
+ encodeContent(buffer);
+}
+
+AMQResponseBody::shared_ptr AMQResponseBody::create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer)
{
- out << "response(" << size() << " bytes) " << *method;
+ MethodId id;
+ Data data;
+ data.decode(buffer);
+ id.decode(buffer);
+ AMQResponseBody* body = dynamic_cast<AMQResponseBody*>(
+ versionMap.createMethodBody(
+ id.classId, id.methodId, version.getMajor(), version.getMinor()));
+ assert(body);
+ body->data = data;
+ return AMQResponseBody::shared_ptr(body);
}
+
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h
index a232ef7d34..bfd6cb2b9a 100644
--- a/cpp/lib/common/framing/AMQResponseBody.h
+++ b/cpp/lib/common/framing/AMQResponseBody.h
@@ -27,40 +27,44 @@ namespace framing {
class AMQP_MethodVersionMap;
/**
- * Body of an AMQP Response frame.
+ * Body of a response method frame.
*/
-class AMQResponseBody : public AMQBody
+class AMQResponseBody : public AMQMethodBody
{
public:
typedef boost::shared_ptr<AMQResponseBody> shared_ptr;
+
+ static shared_ptr create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer);
- AMQResponseBody(AMQP_MethodVersionMap&, ProtocolVersion);
AMQResponseBody(
- AMQP_MethodVersionMap&, ProtocolVersion,
- u_int64_t responseId, u_int64_t requestId, u_int32_t batchOffset,
- AMQMethodBody::shared_ptr method);
-
- const AMQMethodBody& getMethodBody() const { return *method; }
- AMQMethodBody& getMethodBody() { return *method; }
- u_int64_t getResponseId() { return responseId; }
- u_int64_t getRequestId() { return requestId; }
- u_int32_t getBatchOffset() { return batchOffset; }
-
- u_int32_t size() const { return 20 + method->size(); }
+ ProtocolVersion v, ResponseId id=0, RequestId req=0, BatchOffset off=0)
+ : AMQMethodBody(v), data(id, req, off) {}
+
u_int8_t type() const { return RESPONSE_BODY; }
-
void encode(Buffer& buffer) const;
- void decode(Buffer& buffer, u_int32_t size);
- void print(std::ostream& out) const;
+ ResponseId getResponseId() { return data.responseId; }
+ RequestId getRequestId() { return data.requestId; }
+ BatchOffset getBatchOffset() { return data.batchOffset; }
+
+ protected:
+ static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
private:
- AMQP_MethodVersionMap& versionMap;
- ProtocolVersion version;
- u_int64_t responseId;
- u_int64_t requestId;
- u_int32_t batchOffset;
- AMQMethodBody::shared_ptr method;
+ struct Data {
+ Data(ResponseId id=0, RequestId req=0, BatchOffset off=0)
+ : responseId(id), requestId(req), batchOffset(off) {}
+ void encode(Buffer&) const;
+ void decode(Buffer&);
+
+ u_int64_t responseId;
+ u_int64_t requestId;
+ u_int32_t batchOffset;
+ };
+
+ Data data;
};
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp
index ae4ff25bbe..1f761bf3f1 100644
--- a/cpp/lib/common/framing/BodyHandler.cpp
+++ b/cpp/lib/common/framing/BodyHandler.cpp
@@ -30,24 +30,25 @@ void BodyHandler::handleBody(const AMQBody::shared_ptr& body){
switch(body->type())
{
-
- case METHOD_BODY:
+ case METHOD_BODY:
+ case REQUEST_BODY:
+ case RESPONSE_BODY:
handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body));
break;
- case HEADER_BODY:
+ case HEADER_BODY:
handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
break;
- case CONTENT_BODY:
+ case CONTENT_BODY:
handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
break;
- case HEARTBEAT_BODY:
+ case HEARTBEAT_BODY:
handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
break;
- default:
+ default:
throw UnknownBodyType(body->type());
}
diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h
index 56d2225e31..e1e3821584 100644
--- a/cpp/lib/common/framing/amqp_types.h
+++ b/cpp/lib/common/framing/amqp_types.h
@@ -32,11 +32,14 @@ typedef unsigned __int64 u_int64_t;
#include "stdint.h"
#endif
-
namespace qpid {
namespace framing {
using std::string;
+typedef u_int64_t RequestId;
+typedef u_int64_t ResponseId;
+typedef u_int32_t BatchOffset;
+
}} // namespace qpid::framing
#endif
diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp
index 9dc0b7d898..5b90d9c288 100644
--- a/cpp/tests/FramingTest.cpp
+++ b/cpp/tests/FramingTest.cpp
@@ -39,7 +39,7 @@ std::string tostring(const T& x)
out << x;
return out.str();
}
-
+
class FramingTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(FramingTest);
@@ -147,38 +147,29 @@ class FramingTest : public CppUnit::TestCase
}
void testRequestBodyFrame() {
- AMQMethodBody::shared_ptr method(new ChannelOkBody(version));
- AMQRequestBody::shared_ptr request(
- new AMQRequestBody(versionMap, version, 111, 222, method));
+ std::string testing("testing");
+ AMQBody::shared_ptr request(new ChannelOpenBody(version, testing));
AMQFrame in(version, 999, request);
in.encode(buffer);
buffer.flip();
AMQFrame out;
out.decode(buffer);
- request = boost::dynamic_pointer_cast<AMQRequestBody>(out.getBody());
- CPPUNIT_ASSERT(request);
- CPPUNIT_ASSERT_EQUAL(111ULL, request->getRequestId());
- CPPUNIT_ASSERT_EQUAL(222ULL, request->getResponseMark());
- AMQMethodBody& body = request->getMethodBody();
- CPPUNIT_ASSERT(dynamic_cast<ChannelOkBody*>(&body));
+ ChannelOpenBody* decoded =
+ dynamic_cast<ChannelOpenBody*>(out.getBody().get());
+ CPPUNIT_ASSERT(decoded);
+ CPPUNIT_ASSERT_EQUAL(testing, decoded->getOutOfBand());
}
void testResponseBodyFrame() {
- AMQMethodBody::shared_ptr method(new ChannelOkBody(version));
- AMQResponseBody::shared_ptr response(
- new AMQResponseBody(versionMap, version, 111, 222, 333, method));
+ AMQBody::shared_ptr response(new ChannelOkBody(version));
AMQFrame in(version, 999, response);
in.encode(buffer);
buffer.flip();
AMQFrame out;
out.decode(buffer);
- response = boost::dynamic_pointer_cast<AMQResponseBody>(out.getBody());
- CPPUNIT_ASSERT(response);
- CPPUNIT_ASSERT_EQUAL(111ULL, response->getResponseId());
- CPPUNIT_ASSERT_EQUAL(222ULL, response->getRequestId());
- CPPUNIT_ASSERT_EQUAL(333U, response->getBatchOffset());
- AMQMethodBody& body = response->getMethodBody();
- CPPUNIT_ASSERT(dynamic_cast<ChannelOkBody*>(&body));
+ ChannelOkBody* decoded =
+ dynamic_cast<ChannelOkBody*>(out.getBody().get());
+ CPPUNIT_ASSERT(decoded);
}
};