summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp95
1 files changed, 68 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index f71324f3fa..b0b5a85031 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -41,23 +41,11 @@ Message::Message(const ConnectionToken* const _publisher,
persistenceId(0) {}
Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
- buffer.getShortString(exchange);
- buffer.getShortString(routingKey);
-
- AMQFrame headerFrame;
- headerFrame.decode(buffer);
- AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody, AMQBody>(headerFrame.getBody());
- setHeader(headerBody);
-
- AMQContentBody::shared_ptr contentBody;
- while (buffer.available()) {
- AMQFrame contentFrame;
- contentFrame.decode(buffer);
- contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
- addContent(contentBody);
- }
+ decode(buffer);
}
+Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+
Message::~Message(){}
void Message::setHeader(AMQHeaderBody::shared_ptr _header){
@@ -83,7 +71,6 @@ void Message::deliver(OutputHandler* out, int channel,
// AMQP version change - kpvdr 2006-11-17
// TODO: Make this class version-aware and link these hard-wired numbers to that version
out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
-// out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -128,18 +115,54 @@ bool Message::isPersistent()
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::encode(Buffer& buffer)
+void Message::decode(Buffer& buffer)
{
- buffer.putShortString(exchange);
- buffer.putShortString(routingKey);
+ decodeHeader(buffer);
+ decodeContent(buffer);
+}
+
+void Message::decodeHeader(Buffer& buffer)
+{
+ buffer.getShortString(exchange);
+ buffer.getShortString(routingKey);
- AMQBody::shared_ptr body;
+ u_int32_t headerSize = buffer.getLong();
+ AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
+ headerBody->decode(buffer, headerSize);
+ setHeader(headerBody);
+}
- body = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+void Message::decodeContent(Buffer& buffer)
+{
+ AMQContentBody::shared_ptr contentBody;
+ while (buffer.available()) {
+ AMQFrame contentFrame;
+ contentFrame.decode(buffer);
+ contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
+ addContent(contentBody);
+ }
+}
- AMQFrame headerFrame(0, body);
- headerFrame.encode(buffer);
-
+void Message::encode(Buffer& buffer)
+{
+ encodeHeader(buffer);
+ encodeContent(buffer);
+}
+
+void Message::encodeHeader(Buffer& buffer)
+{
+ buffer.putShortString(exchange);
+ buffer.putShortString(routingKey);
+ buffer.putLong(header->size());
+ header->encode(buffer);
+}
+
+void Message::encodeContent(Buffer& buffer)
+{
+ //Use a frame around each content block. Not really required but
+ //gives some error checking at little expense. Could change in the
+ //future...
+ AMQBody::shared_ptr body;
for (content_iterator i = content.begin(); i != content.end(); i++) {
body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
AMQFrame contentFrame(0, body);
@@ -149,13 +172,31 @@ void Message::encode(Buffer& buffer)
u_int32_t Message::encodedSize()
{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+u_int32_t Message::encodedContentSize()
+{
int encodedContentSize(0);
for (content_iterator i = content.begin(); i != content.end(); i++) {
- encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame (TODO, could replace frame by simple size)
+ encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
}
+ return encodedContentSize;
+}
+u_int32_t Message::encodedHeaderSize()
+{
return exchange.size() + 1
+ routingKey.size() + 1
- + header->size() + 8 //8 extra bytes for frame (TODO, could actually remove the frame)
- + encodedContentSize;
+ + header->size() + 4;//4 extra bytes for size
+}
+
+u_int64_t Message::expectedContentSize()
+{
+ return header.get() ? header->getContentSize() : 0;
+}
+
+void Message::releaseContent()
+{
+ content.clear();
}