diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/common/framing/AMQBody.cpp | 5 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQBody.h | 12 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.cpp | 39 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.h | 13 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.cpp | 27 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.h | 34 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.cpp | 58 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.h | 67 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.cpp | 59 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.h | 70 | ||||
-rw-r--r-- | cpp/lib/common/framing/BodyHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/common/framing/BodyHandler.h | 2 | ||||
-rw-r--r-- | cpp/lib/common/framing/Buffer.h | 3 |
13 files changed, 333 insertions, 58 deletions
diff --git a/cpp/lib/common/framing/AMQBody.cpp b/cpp/lib/common/framing/AMQBody.cpp index b095312a16..c7c253beda 100644 --- a/cpp/lib/common/framing/AMQBody.cpp +++ b/cpp/lib/common/framing/AMQBody.cpp @@ -27,10 +27,7 @@ std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing:: body.print(out); return out; } - qpid::framing::AMQBody::~AMQBody() {} -void qpid::framing::AMQBody::print(std::ostream& out) const { - out << "unknown body"; -} + diff --git a/cpp/lib/common/framing/AMQBody.h b/cpp/lib/common/framing/AMQBody.h index 5547d3c506..8b8e1a729d 100644 --- a/cpp/lib/common/framing/AMQBody.h +++ b/cpp/lib/common/framing/AMQBody.h @@ -38,12 +38,20 @@ namespace qpid { virtual u_int8_t type() const = 0; virtual void encode(Buffer& buffer) const = 0; virtual void decode(Buffer& buffer, u_int32_t size) = 0; - virtual void print(std::ostream& out) const; + + virtual void print(std::ostream& out) const = 0; }; std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; - enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8}; + enum BodyTypes { + METHOD_BODY = 1, + HEADER_BODY = 2, + CONTENT_BODY = 3, + HEARTBEAT_BODY = 8, + REQUEST_BODY = 9, + RESPONSE_BODY = 10 + }; } } diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 2f1efcbe09..9ebf4c27ad 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -21,6 +21,8 @@ */ #include <AMQFrame.h> #include <QpidError.h> +#include "AMQRequestBody.h" +#include "AMQResponseBody.h" using namespace qpid::framing; @@ -34,7 +36,7 @@ AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, version(_version), channel(_channel), body(_body) {} -AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody::shared_ptr& _body) : +AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : version(_version), channel(_channel), body(_body) {} @@ -44,7 +46,7 @@ u_int16_t AMQFrame::getChannel(){ return channel; } -AMQBody::shared_ptr& AMQFrame::getBody(){ +AMQBody::shared_ptr AMQFrame::getBody(){ return body; } @@ -57,16 +59,10 @@ void AMQFrame::encode(Buffer& buffer) buffer.putOctet(0xCE); } -AMQBody::shared_ptr AMQFrame::createMethodBody(Buffer& buffer){ - u_int16_t classId = buffer.getShort(); - u_int16_t methodId = buffer.getShort(); - AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, version.getMajor(), version.getMinor())); - return body; -} - u_int32_t AMQFrame::size() const{ - if(!body.get()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to get size of frame with no body set!"); - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() + 1/*0xCE*/; + assert(body.get()); + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() + + 1/*0xCE*/; } bool AMQFrame::decode(Buffer& buffer) @@ -95,19 +91,26 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize) { switch(type) { - case METHOD_BODY: - body = createMethodBody(buffer); - break; - case HEADER_BODY: + case METHOD_BODY: + body = AMQMethodBody::create(versionMap, version, buffer); + break; + case HEADER_BODY: body = AMQBody::shared_ptr(new AMQHeaderBody()); break; - case CONTENT_BODY: + case CONTENT_BODY: body = AMQBody::shared_ptr(new AMQContentBody()); break; - case HEARTBEAT_BODY: + case HEARTBEAT_BODY: body = AMQBody::shared_ptr(new AMQHeartbeatBody()); break; - default: + case REQUEST_BODY: + body = AMQBody::shared_ptr(new AMQRequestBody(versionMap, version)); + break; + case RESPONSE_BODY: + body = AMQBody::shared_ptr(new AMQResponseBody(versionMap, version)); + break; + default: + assert(0); string msg("Unknown body type: "); msg += type; THROW_QPID_ERROR(FRAMING_ERROR, msg); diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index ff27e1e68f..c08dfe805b 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -1,3 +1,6 @@ +#ifndef _AMQFrame_ +#define _AMQFrame_ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -30,9 +33,6 @@ #include <AMQP_HighestVersion.h> #include <Buffer.h> -#ifndef _AMQFrame_ -#define _AMQFrame_ - namespace qpid { namespace framing { @@ -43,20 +43,19 @@ class AMQFrame : virtual public AMQDataBlock qpid::framing::ProtocolVersion version; u_int16_t channel; - u_int8_t type;//used if the body is decoded separately from the 'head' + u_int8_t type; AMQBody::shared_ptr body; - AMQBody::shared_ptr createMethodBody(Buffer& buffer); public: AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion); AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); - AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody::shared_ptr& body); + AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body); virtual ~AMQFrame(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); virtual u_int32_t size() const; u_int16_t getChannel(); - AMQBody::shared_ptr& getBody(); + AMQBody::shared_ptr getBody(); u_int32_t decodeHead(Buffer& buffer); void decodeBody(Buffer& buffer, uint32_t size); diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp index 525310f3d4..cd6db9b1a9 100644 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ b/cpp/lib/common/framing/AMQMethodBody.cpp @@ -20,27 +20,40 @@ */ #include <AMQMethodBody.h> #include <QpidError.h> +#include "AMQP_MethodVersionMap.h" -void qpid::framing::AMQMethodBody::encode(Buffer& buffer) const{ +namespace qpid { +namespace framing { + +void AMQMethodBody::encode(Buffer& buffer) const{ buffer.putShort(amqpClassId()); buffer.putShort(amqpMethodId()); encodeContent(buffer); } -void qpid::framing::AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/){ +void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/){ decodeContent(buffer); } -bool qpid::framing::AMQMethodBody::match(AMQMethodBody* other) const{ +bool AMQMethodBody::match(AMQMethodBody* other) const{ return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId(); } -void qpid::framing::AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channel*/){ +void AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channel*/){ THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); } -std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQMethodBody& m){ - m.print(out); - return out; + +AMQMethodBody::shared_ptr AMQMethodBody::create( + AMQP_MethodVersionMap& versionMap, ProtocolVersion version, + Buffer& buffer) +{ + u_int16_t classId = buffer.getShort(); + u_int16_t methodId = buffer.getShort(); + return AMQMethodBody::shared_ptr( + versionMap.createMethodBody( + classId, methodId, version.getMajor(), version.getMinor())); } + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index da25c7c545..57dc84d4aa 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -1,3 +1,6 @@ +#ifndef _AMQMethodBody_ +#define _AMQMethodBody_ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -24,24 +27,26 @@ #include <Buffer.h> #include <AMQP_ServerOperations.h> -#ifndef _AMQMethodBody_ -#define _AMQMethodBody_ - namespace qpid { namespace framing { -class AMQMethodBody : virtual public AMQBody +class AMQP_MethodVersionMap; + +class AMQMethodBody : public AMQBody { -public: + public: typedef boost::shared_ptr<AMQMethodBody> shared_ptr; - ProtocolVersion version; - inline u_int8_t type() const { return METHOD_BODY; } - inline u_int32_t size() const { return 4 + bodySize(); } - inline AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {} - inline AMQMethodBody(ProtocolVersion version) : version(version) {} - inline virtual ~AMQMethodBody() {} - virtual void print(std::ostream& out) const = 0; + static shared_ptr create( + AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf); + + ProtocolVersion version; + u_int8_t type() const { return METHOD_BODY; } + u_int32_t size() const { return 4 + bodySize(); } + AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {} + AMQMethodBody(ProtocolVersion version) : version(version) {} + virtual ~AMQMethodBody() {} + virtual u_int16_t amqpMethodId() const = 0; virtual u_int16_t amqpClassId() const = 0; virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel); @@ -53,10 +58,7 @@ public: bool match(AMQMethodBody* other) const; }; -std::ostream& operator<<(std::ostream& out, const AMQMethodBody& body); - -} -} +}} // namespace qpid::framing #endif diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp new file mode 100644 index 0000000000..61688cd97c --- /dev/null +++ b/cpp/lib/common/framing/AMQRequestBody.cpp @@ -0,0 +1,58 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "AMQRequestBody.h" +#include "AMQP_MethodVersionMap.h" + +namespace qpid { +namespace framing { + +AMQRequestBody::AMQRequestBody(AMQP_MethodVersionMap& vm, ProtocolVersion v) + : versionMap(vm), version(v) {} + +AMQRequestBody::AMQRequestBody( + AMQP_MethodVersionMap& vm, ProtocolVersion v, + u_int64_t reqId, u_int64_t respMark, + AMQMethodBody::shared_ptr m +) : versionMap(vm), version(v), + requestId(reqId), responseMark(respMark), method(m) +{} + + +void +AMQRequestBody::encode(Buffer& buffer) const { + assert(method.get()); + buffer.putLongLong(requestId); + buffer.putLongLong(responseMark); + method->encode(buffer); +} + +void +AMQRequestBody::decode(Buffer& buffer, u_int32_t /*size*/) { + requestId = buffer.getLongLong(); + responseMark = buffer.getLongLong(); + method = AMQMethodBody::create(versionMap, version, buffer); +} + +void +AMQRequestBody::print(std::ostream& out) const +{ + out << "request(" << size() << " bytes) " << *method; +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h new file mode 100644 index 0000000000..0908545fcd --- /dev/null +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -0,0 +1,67 @@ +#ifndef _framing_AMQRequestBody_h +#define _framing_AMQRequestBody_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "AMQMethodBody.h" + +namespace qpid { +namespace framing { + +class AMQP_MethodVersionMap; + +/** + * Body of an AMQP Request frame. + */ +class AMQRequestBody : public AMQBody +{ + public: + typedef boost::shared_ptr<AMQRequestBody> shared_ptr; + + AMQRequestBody(AMQP_MethodVersionMap&, ProtocolVersion); + AMQRequestBody( + AMQP_MethodVersionMap&, ProtocolVersion, + u_int64_t requestId, u_int64_t responseMark, + AMQMethodBody::shared_ptr method); + + const AMQMethodBody& getMethodBody() const { return *method; } + AMQMethodBody& getMethodBody() { return *method; } + u_int64_t getRequestId() { return requestId; } + u_int64_t getResponseMark() { return responseMark; } + + u_int32_t size() const { return 16 + method->size(); } + u_int8_t type() const { return REQUEST_BODY; } + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer, u_int32_t size); + void print(std::ostream& out) const; + + private: + AMQP_MethodVersionMap& versionMap; + ProtocolVersion version; + u_int64_t requestId; + u_int64_t responseMark; + AMQMethodBody::shared_ptr method; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_AMQRequestBody_h*/ diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp new file mode 100644 index 0000000000..1e1fbdf4bd --- /dev/null +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -0,0 +1,59 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "AMQResponseBody.h" +#include "AMQP_MethodVersionMap.h" + +namespace qpid { +namespace framing { + +AMQResponseBody::AMQResponseBody(AMQP_MethodVersionMap& vm, ProtocolVersion v) + : versionMap(vm), version(v) {} + +AMQResponseBody::AMQResponseBody( + AMQP_MethodVersionMap& vm, ProtocolVersion v, + u_int64_t respId, u_int64_t reqId, u_int32_t batch, + AMQMethodBody::shared_ptr m +) : versionMap(vm), version(v), + responseId(respId), requestId(reqId), batchOffset(batch), method(m) +{} + +void +AMQResponseBody::encode(Buffer& buffer) const { + assert(method.get()); + buffer.putLongLong(responseId); + buffer.putLongLong(requestId); + buffer.putLong(batchOffset); + method->encode(buffer); +} + +void +AMQResponseBody::decode(Buffer& buffer, u_int32_t /*size*/) { + responseId = buffer.getLongLong(); + requestId = buffer.getLongLong(); + batchOffset = buffer.getLong(); + method = AMQMethodBody::create(versionMap, version, buffer); +} + +void +AMQResponseBody::print(std::ostream& out) const +{ + out << "response(" << size() << " bytes) " << *method; +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h new file mode 100644 index 0000000000..a232ef7d34 --- /dev/null +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -0,0 +1,70 @@ +#ifndef _framing_AMQResponseBody_h +#define _framing_AMQResponseBody_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "AMQMethodBody.h" + +namespace qpid { +namespace framing { + +class AMQP_MethodVersionMap; + +/** + * Body of an AMQP Response frame. + */ +class AMQResponseBody : public AMQBody +{ + + public: + typedef boost::shared_ptr<AMQResponseBody> shared_ptr; + + AMQResponseBody(AMQP_MethodVersionMap&, ProtocolVersion); + AMQResponseBody( + AMQP_MethodVersionMap&, ProtocolVersion, + u_int64_t responseId, u_int64_t requestId, u_int32_t batchOffset, + AMQMethodBody::shared_ptr method); + + const AMQMethodBody& getMethodBody() const { return *method; } + AMQMethodBody& getMethodBody() { return *method; } + u_int64_t getResponseId() { return responseId; } + u_int64_t getRequestId() { return requestId; } + u_int32_t getBatchOffset() { return batchOffset; } + + u_int32_t size() const { return 20 + method->size(); } + u_int8_t type() const { return RESPONSE_BODY; } + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer, u_int32_t size); + void print(std::ostream& out) const; + + private: + AMQP_MethodVersionMap& versionMap; + ProtocolVersion version; + u_int64_t responseId; + u_int64_t requestId; + u_int32_t batchOffset; + AMQMethodBody::shared_ptr method; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_AMQResponseBody_h*/ diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp index 8ccfb222df..ae4ff25bbe 100644 --- a/cpp/lib/common/framing/BodyHandler.cpp +++ b/cpp/lib/common/framing/BodyHandler.cpp @@ -26,7 +26,7 @@ using namespace boost; BodyHandler::~BodyHandler() {} -void BodyHandler::handleBody(AMQBody::shared_ptr& body){ +void BodyHandler::handleBody(const AMQBody::shared_ptr& body){ switch(body->type()) { diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h index 3923258d1c..0260a35e88 100644 --- a/cpp/lib/common/framing/BodyHandler.h +++ b/cpp/lib/common/framing/BodyHandler.h @@ -39,7 +39,7 @@ namespace framing { virtual void handleContent(AMQContentBody::shared_ptr body) = 0; virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body) = 0; - void handleBody(AMQBody::shared_ptr& body); + void handleBody(const AMQBody::shared_ptr& body); }; class UnknownBodyType{ diff --git a/cpp/lib/common/framing/Buffer.h b/cpp/lib/common/framing/Buffer.h index b07c2a2ced..f2dd5071a7 100644 --- a/cpp/lib/common/framing/Buffer.h +++ b/cpp/lib/common/framing/Buffer.h @@ -80,8 +80,7 @@ public: }; -} -} +}} // namespace qpid::framing #endif |