diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.cpp | 24 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.h | 16 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.cpp | 25 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.h | 18 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.cpp | 46 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.h | 52 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.cpp | 43 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.h | 50 | ||||
-rw-r--r-- | cpp/lib/common/framing/BodyHandler.cpp | 13 | ||||
-rw-r--r-- | cpp/lib/common/framing/amqp_types.h | 5 | ||||
-rw-r--r-- | cpp/tests/FramingTest.cpp | 31 |
13 files changed, 176 insertions, 157 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index d0971c4fae..3110c7ca3d 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -95,7 +95,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ switch(body->type()) { - case METHOD_BODY: + case REQUEST_BODY: + case RESPONSE_BODY: + case METHOD_BODY: method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); try{ method->invoke(*this, channel); diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 3d0b547b07..37aedd5607 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -171,7 +171,7 @@ void Channel::cancelAll(){ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ Consumer* c = i->second; if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(version, id, new BasicAckBody(c->lastDeliveryTag, true))); + out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); } consumers.erase(i); delete c; @@ -377,8 +377,8 @@ void Channel::deliver(Consumer* consumer, Message& msg){ if(++(consumer->count) < prefetch) break; //else drop-through case AUTO_ACK: - out->send(new AMQFrame(version, id, new BasicAckBody(msg.getDeliveryTag(), multiple))); - consumer->lastDeliveryTag = 0; + out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple))); + consumer->lastDeliveryTag = 0; } } diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 9ebf4c27ad..8ac5199c45 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -69,13 +69,13 @@ bool AMQFrame::decode(Buffer& buffer) { if(buffer.available() < 7) return false; buffer.record(); - u_int32_t bufSize = decodeHead(buffer); - - if(buffer.available() < bufSize + 1){ + u_int32_t frameSize = decodeHead(buffer); + + if(buffer.available() < frameSize + 1){ buffer.restore(); return false; } - decodeBody(buffer, bufSize); + decodeBody(buffer, frameSize); u_int8_t end = buffer.getOctet(); if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); return true; @@ -87,13 +87,19 @@ u_int32_t AMQFrame::decodeHead(Buffer& buffer){ return buffer.getLong(); } -void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize) +void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) { switch(type) { case METHOD_BODY: body = AMQMethodBody::create(versionMap, version, buffer); break; + case REQUEST_BODY: + body = AMQRequestBody::create(versionMap, version, buffer); + break; + case RESPONSE_BODY: + body = AMQResponseBody::create(versionMap, version, buffer); + break; case HEADER_BODY: body = AMQBody::shared_ptr(new AMQHeaderBody()); break; @@ -103,19 +109,13 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize) case HEARTBEAT_BODY: body = AMQBody::shared_ptr(new AMQHeartbeatBody()); break; - case REQUEST_BODY: - body = AMQBody::shared_ptr(new AMQRequestBody(versionMap, version)); - break; - case RESPONSE_BODY: - body = AMQBody::shared_ptr(new AMQResponseBody(versionMap, version)); - break; default: assert(0); string msg("Unknown body type: "); msg += type; THROW_QPID_ERROR(FRAMING_ERROR, msg); } - body->decode(buffer, bufSize); + body->decode(buffer, size); } std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index c08dfe805b..a927481e77 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -39,13 +39,6 @@ namespace framing { class AMQFrame : virtual public AMQDataBlock { - static AMQP_MethodVersionMap versionMap; - qpid::framing::ProtocolVersion version; - - u_int16_t channel; - u_int8_t type; - AMQBody::shared_ptr body; - public: AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion); AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); @@ -60,6 +53,15 @@ class AMQFrame : virtual public AMQDataBlock u_int32_t decodeHead(Buffer& buffer); void decodeBody(Buffer& buffer, uint32_t size); + private: + static AMQP_MethodVersionMap versionMap; + qpid::framing::ProtocolVersion version; + + u_int16_t channel; + u_int8_t type; + AMQBody::shared_ptr body; + + friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body); }; diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp index cd6db9b1a9..de081243ee 100644 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ b/cpp/lib/common/framing/AMQMethodBody.cpp @@ -25,14 +25,9 @@ namespace qpid { namespace framing { -void AMQMethodBody::encode(Buffer& buffer) const{ +void AMQMethodBody::encodeId(Buffer& buffer) const{ buffer.putShort(amqpClassId()); buffer.putShort(amqpMethodId()); - encodeContent(buffer); -} - -void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/){ - decodeContent(buffer); } bool AMQMethodBody::match(AMQMethodBody* other) const{ @@ -43,17 +38,25 @@ void AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channe THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); } - - AMQMethodBody::shared_ptr AMQMethodBody::create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer) { - u_int16_t classId = buffer.getShort(); - u_int16_t methodId = buffer.getShort(); + MethodId id; + id.decode(buffer); return AMQMethodBody::shared_ptr( versionMap.createMethodBody( - classId, methodId, version.getMajor(), version.getMinor())); + id.classId, id.methodId, version.getMajor(), version.getMinor())); } +void AMQMethodBody::MethodId::decode(Buffer& buffer) { + classId = buffer.getShort(); + methodId = buffer.getShort(); +} + +void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/) { + decodeContent(buffer); +} + + }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index 57dc84d4aa..a7725079fb 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -42,20 +42,28 @@ class AMQMethodBody : public AMQBody ProtocolVersion version; u_int8_t type() const { return METHOD_BODY; } - u_int32_t size() const { return 4 + bodySize(); } AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {} AMQMethodBody(ProtocolVersion version) : version(version) {} virtual ~AMQMethodBody() {} + void decode(Buffer&, u_int32_t); virtual u_int16_t amqpMethodId() const = 0; virtual u_int16_t amqpClassId() const = 0; virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel); + bool match(AMQMethodBody* other) const; + + protected: + static u_int32_t baseSize() { return 4; } + + struct MethodId { + u_int16_t classId; + u_int16_t methodId; + void decode(Buffer& b); + }; + + void encodeId(Buffer& buffer) const; virtual void encodeContent(Buffer& buffer) const = 0; virtual void decodeContent(Buffer& buffer) = 0; - virtual u_int32_t bodySize() const = 0; - void encode(Buffer& buffer) const; - void decode(Buffer& buffer, u_int32_t size); - bool match(AMQMethodBody* other) const; }; }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp index 61688cd97c..a5e6ce6974 100644 --- a/cpp/lib/common/framing/AMQRequestBody.cpp +++ b/cpp/lib/common/framing/AMQRequestBody.cpp @@ -22,37 +22,37 @@ namespace qpid { namespace framing { -AMQRequestBody::AMQRequestBody(AMQP_MethodVersionMap& vm, ProtocolVersion v) - : versionMap(vm), version(v) {} - -AMQRequestBody::AMQRequestBody( - AMQP_MethodVersionMap& vm, ProtocolVersion v, - u_int64_t reqId, u_int64_t respMark, - AMQMethodBody::shared_ptr m -) : versionMap(vm), version(v), - requestId(reqId), responseMark(respMark), method(m) -{} - - -void -AMQRequestBody::encode(Buffer& buffer) const { - assert(method.get()); +void AMQRequestBody::Data::encode(Buffer& buffer) const { buffer.putLongLong(requestId); buffer.putLongLong(responseMark); - method->encode(buffer); } - -void -AMQRequestBody::decode(Buffer& buffer, u_int32_t /*size*/) { + +void AMQRequestBody::Data::decode(Buffer& buffer) { requestId = buffer.getLongLong(); responseMark = buffer.getLongLong(); - method = AMQMethodBody::create(versionMap, version, buffer); } -void -AMQRequestBody::print(std::ostream& out) const +void AMQRequestBody::encode(Buffer& buffer) const { + data.encode(buffer); + encodeId(buffer); + encodeContent(buffer); +} + +AMQRequestBody::shared_ptr +AMQRequestBody::create( + AMQP_MethodVersionMap& versionMap, ProtocolVersion version, + Buffer& buffer) { - out << "request(" << size() << " bytes) " << *method; + MethodId id; + Data data; + data.decode(buffer); + id.decode(buffer); + AMQRequestBody* body = dynamic_cast<AMQRequestBody*>( + versionMap.createMethodBody( + id.classId, id.methodId, version.getMajor(), version.getMinor())); + assert(body); + body->data = data; + return AMQRequestBody::shared_ptr(body); } }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index 0908545fcd..1836a7d49d 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -24,40 +24,44 @@ namespace qpid { namespace framing { -class AMQP_MethodVersionMap; - /** - * Body of an AMQP Request frame. + * Body of a request method frame. */ -class AMQRequestBody : public AMQBody +class AMQRequestBody : public AMQMethodBody { - public: + public: typedef boost::shared_ptr<AMQRequestBody> shared_ptr; + + static shared_ptr create( + AMQP_MethodVersionMap& versionMap, ProtocolVersion version, + Buffer& buffer); - AMQRequestBody(AMQP_MethodVersionMap&, ProtocolVersion); - AMQRequestBody( - AMQP_MethodVersionMap&, ProtocolVersion, - u_int64_t requestId, u_int64_t responseMark, - AMQMethodBody::shared_ptr method); + AMQRequestBody(ProtocolVersion v, RequestId id=0, ResponseId mark=0) + : AMQMethodBody(v), data(id, mark) {} - const AMQMethodBody& getMethodBody() const { return *method; } - AMQMethodBody& getMethodBody() { return *method; } - u_int64_t getRequestId() { return requestId; } - u_int64_t getResponseMark() { return responseMark; } - - u_int32_t size() const { return 16 + method->size(); } u_int8_t type() const { return REQUEST_BODY; } - void encode(Buffer& buffer) const; - void decode(Buffer& buffer, u_int32_t size); - void print(std::ostream& out) const; + RequestId getRequestId() const { return data.requestId; } + void setRequestId(RequestId id) { data.requestId=id; } + ResponseId getResponseMark() const { return data.responseMark; } + void setResponseMark(ResponseId mark) { data.responseMark=mark; } + + protected: + static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; } + private: - AMQP_MethodVersionMap& versionMap; - ProtocolVersion version; - u_int64_t requestId; - u_int64_t responseMark; - AMQMethodBody::shared_ptr method; + struct Data { + Data(RequestId id=0, ResponseId mark=0) + : requestId(id), responseMark(mark) {} + void encode(Buffer&) const; + void decode(Buffer&); + + RequestId requestId; + ResponseId responseMark; + }; + + Data data; }; }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp index 1e1fbdf4bd..49fdea9242 100644 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -22,38 +22,39 @@ namespace qpid { namespace framing { -AMQResponseBody::AMQResponseBody(AMQP_MethodVersionMap& vm, ProtocolVersion v) - : versionMap(vm), version(v) {} - -AMQResponseBody::AMQResponseBody( - AMQP_MethodVersionMap& vm, ProtocolVersion v, - u_int64_t respId, u_int64_t reqId, u_int32_t batch, - AMQMethodBody::shared_ptr m -) : versionMap(vm), version(v), - responseId(respId), requestId(reqId), batchOffset(batch), method(m) -{} - -void -AMQResponseBody::encode(Buffer& buffer) const { - assert(method.get()); +void AMQResponseBody::Data::encode(Buffer& buffer) const { buffer.putLongLong(responseId); buffer.putLongLong(requestId); buffer.putLong(batchOffset); - method->encode(buffer); } -void -AMQResponseBody::decode(Buffer& buffer, u_int32_t /*size*/) { +void AMQResponseBody::Data::decode(Buffer& buffer) { responseId = buffer.getLongLong(); requestId = buffer.getLongLong(); batchOffset = buffer.getLong(); - method = AMQMethodBody::create(versionMap, version, buffer); } -void -AMQResponseBody::print(std::ostream& out) const +void AMQResponseBody::encode(Buffer& buffer) const { + data.encode(buffer); + encodeId(buffer); + encodeContent(buffer); +} + +AMQResponseBody::shared_ptr AMQResponseBody::create( + AMQP_MethodVersionMap& versionMap, ProtocolVersion version, + Buffer& buffer) { - out << "response(" << size() << " bytes) " << *method; + MethodId id; + Data data; + data.decode(buffer); + id.decode(buffer); + AMQResponseBody* body = dynamic_cast<AMQResponseBody*>( + versionMap.createMethodBody( + id.classId, id.methodId, version.getMajor(), version.getMinor())); + assert(body); + body->data = data; + return AMQResponseBody::shared_ptr(body); } + }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index a232ef7d34..bfd6cb2b9a 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -27,40 +27,44 @@ namespace framing { class AMQP_MethodVersionMap; /** - * Body of an AMQP Response frame. + * Body of a response method frame. */ -class AMQResponseBody : public AMQBody +class AMQResponseBody : public AMQMethodBody { public: typedef boost::shared_ptr<AMQResponseBody> shared_ptr; + + static shared_ptr create( + AMQP_MethodVersionMap& versionMap, ProtocolVersion version, + Buffer& buffer); - AMQResponseBody(AMQP_MethodVersionMap&, ProtocolVersion); AMQResponseBody( - AMQP_MethodVersionMap&, ProtocolVersion, - u_int64_t responseId, u_int64_t requestId, u_int32_t batchOffset, - AMQMethodBody::shared_ptr method); - - const AMQMethodBody& getMethodBody() const { return *method; } - AMQMethodBody& getMethodBody() { return *method; } - u_int64_t getResponseId() { return responseId; } - u_int64_t getRequestId() { return requestId; } - u_int32_t getBatchOffset() { return batchOffset; } - - u_int32_t size() const { return 20 + method->size(); } + ProtocolVersion v, ResponseId id=0, RequestId req=0, BatchOffset off=0) + : AMQMethodBody(v), data(id, req, off) {} + u_int8_t type() const { return RESPONSE_BODY; } - void encode(Buffer& buffer) const; - void decode(Buffer& buffer, u_int32_t size); - void print(std::ostream& out) const; + ResponseId getResponseId() { return data.responseId; } + RequestId getRequestId() { return data.requestId; } + BatchOffset getBatchOffset() { return data.batchOffset; } + + protected: + static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } private: - AMQP_MethodVersionMap& versionMap; - ProtocolVersion version; - u_int64_t responseId; - u_int64_t requestId; - u_int32_t batchOffset; - AMQMethodBody::shared_ptr method; + struct Data { + Data(ResponseId id=0, RequestId req=0, BatchOffset off=0) + : responseId(id), requestId(req), batchOffset(off) {} + void encode(Buffer&) const; + void decode(Buffer&); + + u_int64_t responseId; + u_int64_t requestId; + u_int32_t batchOffset; + }; + + Data data; }; }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp index ae4ff25bbe..1f761bf3f1 100644 --- a/cpp/lib/common/framing/BodyHandler.cpp +++ b/cpp/lib/common/framing/BodyHandler.cpp @@ -30,24 +30,25 @@ void BodyHandler::handleBody(const AMQBody::shared_ptr& body){ switch(body->type()) { - - case METHOD_BODY: + case METHOD_BODY: + case REQUEST_BODY: + case RESPONSE_BODY: handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body)); break; - case HEADER_BODY: + case HEADER_BODY: handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); break; - case CONTENT_BODY: + case CONTENT_BODY: handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); break; - case HEARTBEAT_BODY: + case HEARTBEAT_BODY: handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); break; - default: + default: throw UnknownBodyType(body->type()); } diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h index 56d2225e31..e1e3821584 100644 --- a/cpp/lib/common/framing/amqp_types.h +++ b/cpp/lib/common/framing/amqp_types.h @@ -32,11 +32,14 @@ typedef unsigned __int64 u_int64_t; #include "stdint.h" #endif - namespace qpid { namespace framing { using std::string; +typedef u_int64_t RequestId; +typedef u_int64_t ResponseId; +typedef u_int32_t BatchOffset; + }} // namespace qpid::framing #endif diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp index 9dc0b7d898..5b90d9c288 100644 --- a/cpp/tests/FramingTest.cpp +++ b/cpp/tests/FramingTest.cpp @@ -39,7 +39,7 @@ std::string tostring(const T& x) out << x; return out.str(); } - + class FramingTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(FramingTest); @@ -147,38 +147,29 @@ class FramingTest : public CppUnit::TestCase } void testRequestBodyFrame() { - AMQMethodBody::shared_ptr method(new ChannelOkBody(version)); - AMQRequestBody::shared_ptr request( - new AMQRequestBody(versionMap, version, 111, 222, method)); + std::string testing("testing"); + AMQBody::shared_ptr request(new ChannelOpenBody(version, testing)); AMQFrame in(version, 999, request); in.encode(buffer); buffer.flip(); AMQFrame out; out.decode(buffer); - request = boost::dynamic_pointer_cast<AMQRequestBody>(out.getBody()); - CPPUNIT_ASSERT(request); - CPPUNIT_ASSERT_EQUAL(111ULL, request->getRequestId()); - CPPUNIT_ASSERT_EQUAL(222ULL, request->getResponseMark()); - AMQMethodBody& body = request->getMethodBody(); - CPPUNIT_ASSERT(dynamic_cast<ChannelOkBody*>(&body)); + ChannelOpenBody* decoded = + dynamic_cast<ChannelOpenBody*>(out.getBody().get()); + CPPUNIT_ASSERT(decoded); + CPPUNIT_ASSERT_EQUAL(testing, decoded->getOutOfBand()); } void testResponseBodyFrame() { - AMQMethodBody::shared_ptr method(new ChannelOkBody(version)); - AMQResponseBody::shared_ptr response( - new AMQResponseBody(versionMap, version, 111, 222, 333, method)); + AMQBody::shared_ptr response(new ChannelOkBody(version)); AMQFrame in(version, 999, response); in.encode(buffer); buffer.flip(); AMQFrame out; out.decode(buffer); - response = boost::dynamic_pointer_cast<AMQResponseBody>(out.getBody()); - CPPUNIT_ASSERT(response); - CPPUNIT_ASSERT_EQUAL(111ULL, response->getResponseId()); - CPPUNIT_ASSERT_EQUAL(222ULL, response->getRequestId()); - CPPUNIT_ASSERT_EQUAL(333U, response->getBatchOffset()); - AMQMethodBody& body = response->getMethodBody(); - CPPUNIT_ASSERT(dynamic_cast<ChannelOkBody*>(&body)); + ChannelOkBody* decoded = + dynamic_cast<ChannelOkBody*>(out.getBody().get()); + CPPUNIT_ASSERT(decoded); } }; |