summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/common/framing/AMQBody.cpp5
-rw-r--r--cpp/lib/common/framing/AMQBody.h12
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp39
-rw-r--r--cpp/lib/common/framing/AMQFrame.h13
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp27
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h34
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.cpp58
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.h67
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.cpp59
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.h70
-rw-r--r--cpp/lib/common/framing/BodyHandler.cpp2
-rw-r--r--cpp/lib/common/framing/BodyHandler.h2
-rw-r--r--cpp/lib/common/framing/Buffer.h3
13 files changed, 333 insertions, 58 deletions
diff --git a/cpp/lib/common/framing/AMQBody.cpp b/cpp/lib/common/framing/AMQBody.cpp
index b095312a16..c7c253beda 100644
--- a/cpp/lib/common/framing/AMQBody.cpp
+++ b/cpp/lib/common/framing/AMQBody.cpp
@@ -27,10 +27,7 @@ std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::
body.print(out);
return out;
}
-
qpid::framing::AMQBody::~AMQBody() {}
-void qpid::framing::AMQBody::print(std::ostream& out) const {
- out << "unknown body";
-}
+
diff --git a/cpp/lib/common/framing/AMQBody.h b/cpp/lib/common/framing/AMQBody.h
index 5547d3c506..8b8e1a729d 100644
--- a/cpp/lib/common/framing/AMQBody.h
+++ b/cpp/lib/common/framing/AMQBody.h
@@ -38,12 +38,20 @@ namespace qpid {
virtual u_int8_t type() const = 0;
virtual void encode(Buffer& buffer) const = 0;
virtual void decode(Buffer& buffer, u_int32_t size) = 0;
- virtual void print(std::ostream& out) const;
+
+ virtual void print(std::ostream& out) const = 0;
};
std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
- enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8};
+ enum BodyTypes {
+ METHOD_BODY = 1,
+ HEADER_BODY = 2,
+ CONTENT_BODY = 3,
+ HEARTBEAT_BODY = 8,
+ REQUEST_BODY = 9,
+ RESPONSE_BODY = 10
+ };
}
}
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
index 2f1efcbe09..9ebf4c27ad 100644
--- a/cpp/lib/common/framing/AMQFrame.cpp
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -21,6 +21,8 @@
*/
#include <AMQFrame.h>
#include <QpidError.h>
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
using namespace qpid::framing;
@@ -34,7 +36,7 @@ AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel,
version(_version), channel(_channel), body(_body)
{}
-AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody::shared_ptr& _body) :
+AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) :
version(_version), channel(_channel), body(_body)
{}
@@ -44,7 +46,7 @@ u_int16_t AMQFrame::getChannel(){
return channel;
}
-AMQBody::shared_ptr& AMQFrame::getBody(){
+AMQBody::shared_ptr AMQFrame::getBody(){
return body;
}
@@ -57,16 +59,10 @@ void AMQFrame::encode(Buffer& buffer)
buffer.putOctet(0xCE);
}
-AMQBody::shared_ptr AMQFrame::createMethodBody(Buffer& buffer){
- u_int16_t classId = buffer.getShort();
- u_int16_t methodId = buffer.getShort();
- AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, version.getMajor(), version.getMinor()));
- return body;
-}
-
u_int32_t AMQFrame::size() const{
- if(!body.get()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to get size of frame with no body set!");
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() + 1/*0xCE*/;
+ assert(body.get());
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size()
+ + 1/*0xCE*/;
}
bool AMQFrame::decode(Buffer& buffer)
@@ -95,19 +91,26 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize)
{
switch(type)
{
- case METHOD_BODY:
- body = createMethodBody(buffer);
- break;
- case HEADER_BODY:
+ case METHOD_BODY:
+ body = AMQMethodBody::create(versionMap, version, buffer);
+ break;
+ case HEADER_BODY:
body = AMQBody::shared_ptr(new AMQHeaderBody());
break;
- case CONTENT_BODY:
+ case CONTENT_BODY:
body = AMQBody::shared_ptr(new AMQContentBody());
break;
- case HEARTBEAT_BODY:
+ case HEARTBEAT_BODY:
body = AMQBody::shared_ptr(new AMQHeartbeatBody());
break;
- default:
+ case REQUEST_BODY:
+ body = AMQBody::shared_ptr(new AMQRequestBody(versionMap, version));
+ break;
+ case RESPONSE_BODY:
+ body = AMQBody::shared_ptr(new AMQResponseBody(versionMap, version));
+ break;
+ default:
+ assert(0);
string msg("Unknown body type: ");
msg += type;
THROW_QPID_ERROR(FRAMING_ERROR, msg);
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index ff27e1e68f..c08dfe805b 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -1,3 +1,6 @@
+#ifndef _AMQFrame_
+#define _AMQFrame_
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -30,9 +33,6 @@
#include <AMQP_HighestVersion.h>
#include <Buffer.h>
-#ifndef _AMQFrame_
-#define _AMQFrame_
-
namespace qpid {
namespace framing {
@@ -43,20 +43,19 @@ class AMQFrame : virtual public AMQDataBlock
qpid::framing::ProtocolVersion version;
u_int16_t channel;
- u_int8_t type;//used if the body is decoded separately from the 'head'
+ u_int8_t type;
AMQBody::shared_ptr body;
- AMQBody::shared_ptr createMethodBody(Buffer& buffer);
public:
AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
- AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody::shared_ptr& body);
+ AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body);
virtual ~AMQFrame();
virtual void encode(Buffer& buffer);
virtual bool decode(Buffer& buffer);
virtual u_int32_t size() const;
u_int16_t getChannel();
- AMQBody::shared_ptr& getBody();
+ AMQBody::shared_ptr getBody();
u_int32_t decodeHead(Buffer& buffer);
void decodeBody(Buffer& buffer, uint32_t size);
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
index 525310f3d4..cd6db9b1a9 100644
--- a/cpp/lib/common/framing/AMQMethodBody.cpp
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -20,27 +20,40 @@
*/
#include <AMQMethodBody.h>
#include <QpidError.h>
+#include "AMQP_MethodVersionMap.h"
-void qpid::framing::AMQMethodBody::encode(Buffer& buffer) const{
+namespace qpid {
+namespace framing {
+
+void AMQMethodBody::encode(Buffer& buffer) const{
buffer.putShort(amqpClassId());
buffer.putShort(amqpMethodId());
encodeContent(buffer);
}
-void qpid::framing::AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/){
+void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/){
decodeContent(buffer);
}
-bool qpid::framing::AMQMethodBody::match(AMQMethodBody* other) const{
+bool AMQMethodBody::match(AMQMethodBody* other) const{
return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId();
}
-void qpid::framing::AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channel*/){
+void AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channel*/){
THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
}
-std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQMethodBody& m){
- m.print(out);
- return out;
+
+AMQMethodBody::shared_ptr AMQMethodBody::create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer)
+{
+ u_int16_t classId = buffer.getShort();
+ u_int16_t methodId = buffer.getShort();
+ return AMQMethodBody::shared_ptr(
+ versionMap.createMethodBody(
+ classId, methodId, version.getMajor(), version.getMinor()));
}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
index da25c7c545..57dc84d4aa 100644
--- a/cpp/lib/common/framing/AMQMethodBody.h
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -1,3 +1,6 @@
+#ifndef _AMQMethodBody_
+#define _AMQMethodBody_
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -24,24 +27,26 @@
#include <Buffer.h>
#include <AMQP_ServerOperations.h>
-#ifndef _AMQMethodBody_
-#define _AMQMethodBody_
-
namespace qpid {
namespace framing {
-class AMQMethodBody : virtual public AMQBody
+class AMQP_MethodVersionMap;
+
+class AMQMethodBody : public AMQBody
{
-public:
+ public:
typedef boost::shared_ptr<AMQMethodBody> shared_ptr;
- ProtocolVersion version;
- inline u_int8_t type() const { return METHOD_BODY; }
- inline u_int32_t size() const { return 4 + bodySize(); }
- inline AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {}
- inline AMQMethodBody(ProtocolVersion version) : version(version) {}
- inline virtual ~AMQMethodBody() {}
- virtual void print(std::ostream& out) const = 0;
+ static shared_ptr create(
+ AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf);
+
+ ProtocolVersion version;
+ u_int8_t type() const { return METHOD_BODY; }
+ u_int32_t size() const { return 4 + bodySize(); }
+ AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {}
+ AMQMethodBody(ProtocolVersion version) : version(version) {}
+ virtual ~AMQMethodBody() {}
+
virtual u_int16_t amqpMethodId() const = 0;
virtual u_int16_t amqpClassId() const = 0;
virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel);
@@ -53,10 +58,7 @@ public:
bool match(AMQMethodBody* other) const;
};
-std::ostream& operator<<(std::ostream& out, const AMQMethodBody& body);
-
-}
-}
+}} // namespace qpid::framing
#endif
diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp
new file mode 100644
index 0000000000..61688cd97c
--- /dev/null
+++ b/cpp/lib/common/framing/AMQRequestBody.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQRequestBody.h"
+#include "AMQP_MethodVersionMap.h"
+
+namespace qpid {
+namespace framing {
+
+AMQRequestBody::AMQRequestBody(AMQP_MethodVersionMap& vm, ProtocolVersion v)
+ : versionMap(vm), version(v) {}
+
+AMQRequestBody::AMQRequestBody(
+ AMQP_MethodVersionMap& vm, ProtocolVersion v,
+ u_int64_t reqId, u_int64_t respMark,
+ AMQMethodBody::shared_ptr m
+) : versionMap(vm), version(v),
+ requestId(reqId), responseMark(respMark), method(m)
+{}
+
+
+void
+AMQRequestBody::encode(Buffer& buffer) const {
+ assert(method.get());
+ buffer.putLongLong(requestId);
+ buffer.putLongLong(responseMark);
+ method->encode(buffer);
+}
+
+void
+AMQRequestBody::decode(Buffer& buffer, u_int32_t /*size*/) {
+ requestId = buffer.getLongLong();
+ responseMark = buffer.getLongLong();
+ method = AMQMethodBody::create(versionMap, version, buffer);
+}
+
+void
+AMQRequestBody::print(std::ostream& out) const
+{
+ out << "request(" << size() << " bytes) " << *method;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h
new file mode 100644
index 0000000000..0908545fcd
--- /dev/null
+++ b/cpp/lib/common/framing/AMQRequestBody.h
@@ -0,0 +1,67 @@
+#ifndef _framing_AMQRequestBody_h
+#define _framing_AMQRequestBody_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQMethodBody.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQP_MethodVersionMap;
+
+/**
+ * Body of an AMQP Request frame.
+ */
+class AMQRequestBody : public AMQBody
+{
+ public:
+ typedef boost::shared_ptr<AMQRequestBody> shared_ptr;
+
+ AMQRequestBody(AMQP_MethodVersionMap&, ProtocolVersion);
+ AMQRequestBody(
+ AMQP_MethodVersionMap&, ProtocolVersion,
+ u_int64_t requestId, u_int64_t responseMark,
+ AMQMethodBody::shared_ptr method);
+
+ const AMQMethodBody& getMethodBody() const { return *method; }
+ AMQMethodBody& getMethodBody() { return *method; }
+ u_int64_t getRequestId() { return requestId; }
+ u_int64_t getResponseMark() { return responseMark; }
+
+ u_int32_t size() const { return 16 + method->size(); }
+ u_int8_t type() const { return REQUEST_BODY; }
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, u_int32_t size);
+ void print(std::ostream& out) const;
+
+ private:
+ AMQP_MethodVersionMap& versionMap;
+ ProtocolVersion version;
+ u_int64_t requestId;
+ u_int64_t responseMark;
+ AMQMethodBody::shared_ptr method;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_AMQRequestBody_h*/
diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp
new file mode 100644
index 0000000000..1e1fbdf4bd
--- /dev/null
+++ b/cpp/lib/common/framing/AMQResponseBody.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQResponseBody.h"
+#include "AMQP_MethodVersionMap.h"
+
+namespace qpid {
+namespace framing {
+
+AMQResponseBody::AMQResponseBody(AMQP_MethodVersionMap& vm, ProtocolVersion v)
+ : versionMap(vm), version(v) {}
+
+AMQResponseBody::AMQResponseBody(
+ AMQP_MethodVersionMap& vm, ProtocolVersion v,
+ u_int64_t respId, u_int64_t reqId, u_int32_t batch,
+ AMQMethodBody::shared_ptr m
+) : versionMap(vm), version(v),
+ responseId(respId), requestId(reqId), batchOffset(batch), method(m)
+{}
+
+void
+AMQResponseBody::encode(Buffer& buffer) const {
+ assert(method.get());
+ buffer.putLongLong(responseId);
+ buffer.putLongLong(requestId);
+ buffer.putLong(batchOffset);
+ method->encode(buffer);
+}
+
+void
+AMQResponseBody::decode(Buffer& buffer, u_int32_t /*size*/) {
+ responseId = buffer.getLongLong();
+ requestId = buffer.getLongLong();
+ batchOffset = buffer.getLong();
+ method = AMQMethodBody::create(versionMap, version, buffer);
+}
+
+void
+AMQResponseBody::print(std::ostream& out) const
+{
+ out << "response(" << size() << " bytes) " << *method;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h
new file mode 100644
index 0000000000..a232ef7d34
--- /dev/null
+++ b/cpp/lib/common/framing/AMQResponseBody.h
@@ -0,0 +1,70 @@
+#ifndef _framing_AMQResponseBody_h
+#define _framing_AMQResponseBody_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQMethodBody.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQP_MethodVersionMap;
+
+/**
+ * Body of an AMQP Response frame.
+ */
+class AMQResponseBody : public AMQBody
+{
+
+ public:
+ typedef boost::shared_ptr<AMQResponseBody> shared_ptr;
+
+ AMQResponseBody(AMQP_MethodVersionMap&, ProtocolVersion);
+ AMQResponseBody(
+ AMQP_MethodVersionMap&, ProtocolVersion,
+ u_int64_t responseId, u_int64_t requestId, u_int32_t batchOffset,
+ AMQMethodBody::shared_ptr method);
+
+ const AMQMethodBody& getMethodBody() const { return *method; }
+ AMQMethodBody& getMethodBody() { return *method; }
+ u_int64_t getResponseId() { return responseId; }
+ u_int64_t getRequestId() { return requestId; }
+ u_int32_t getBatchOffset() { return batchOffset; }
+
+ u_int32_t size() const { return 20 + method->size(); }
+ u_int8_t type() const { return RESPONSE_BODY; }
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, u_int32_t size);
+ void print(std::ostream& out) const;
+
+ private:
+ AMQP_MethodVersionMap& versionMap;
+ ProtocolVersion version;
+ u_int64_t responseId;
+ u_int64_t requestId;
+ u_int32_t batchOffset;
+ AMQMethodBody::shared_ptr method;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_AMQResponseBody_h*/
diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp
index 8ccfb222df..ae4ff25bbe 100644
--- a/cpp/lib/common/framing/BodyHandler.cpp
+++ b/cpp/lib/common/framing/BodyHandler.cpp
@@ -26,7 +26,7 @@ using namespace boost;
BodyHandler::~BodyHandler() {}
-void BodyHandler::handleBody(AMQBody::shared_ptr& body){
+void BodyHandler::handleBody(const AMQBody::shared_ptr& body){
switch(body->type())
{
diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h
index 3923258d1c..0260a35e88 100644
--- a/cpp/lib/common/framing/BodyHandler.h
+++ b/cpp/lib/common/framing/BodyHandler.h
@@ -39,7 +39,7 @@ namespace framing {
virtual void handleContent(AMQContentBody::shared_ptr body) = 0;
virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body) = 0;
- void handleBody(AMQBody::shared_ptr& body);
+ void handleBody(const AMQBody::shared_ptr& body);
};
class UnknownBodyType{
diff --git a/cpp/lib/common/framing/Buffer.h b/cpp/lib/common/framing/Buffer.h
index b07c2a2ced..f2dd5071a7 100644
--- a/cpp/lib/common/framing/Buffer.h
+++ b/cpp/lib/common/framing/Buffer.h
@@ -80,8 +80,7 @@ public:
};
-}
-}
+}} // namespace qpid::framing
#endif