diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 95 |
1 files changed, 68 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index f71324f3fa..b0b5a85031 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -41,23 +41,11 @@ Message::Message(const ConnectionToken* const _publisher, persistenceId(0) {} Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ - buffer.getShortString(exchange); - buffer.getShortString(routingKey); - - AMQFrame headerFrame; - headerFrame.decode(buffer); - AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody, AMQBody>(headerFrame.getBody()); - setHeader(headerBody); - - AMQContentBody::shared_ptr contentBody; - while (buffer.available()) { - AMQFrame contentFrame; - contentFrame.decode(buffer); - contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody()); - addContent(contentBody); - } + decode(buffer); } +Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} + Message::~Message(){} void Message::setHeader(AMQHeaderBody::shared_ptr _header){ @@ -83,7 +71,6 @@ void Message::deliver(OutputHandler* out, int channel, // AMQP version change - kpvdr 2006-11-17 // TODO: Make this class version-aware and link these hard-wired numbers to that version out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); -// out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); sendContent(out, channel, framesize); } @@ -128,18 +115,54 @@ bool Message::isPersistent() return props && props->getDeliveryMode() == PERSISTENT; } -void Message::encode(Buffer& buffer) +void Message::decode(Buffer& buffer) { - buffer.putShortString(exchange); - buffer.putShortString(routingKey); + decodeHeader(buffer); + decodeContent(buffer); +} + +void Message::decodeHeader(Buffer& buffer) +{ + buffer.getShortString(exchange); + buffer.getShortString(routingKey); - AMQBody::shared_ptr body; + u_int32_t headerSize = buffer.getLong(); + AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); + headerBody->decode(buffer, headerSize); + setHeader(headerBody); +} - body = static_pointer_cast<AMQBody, AMQHeaderBody>(header); +void Message::decodeContent(Buffer& buffer) +{ + AMQContentBody::shared_ptr contentBody; + while (buffer.available()) { + AMQFrame contentFrame; + contentFrame.decode(buffer); + contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody()); + addContent(contentBody); + } +} - AMQFrame headerFrame(0, body); - headerFrame.encode(buffer); - +void Message::encode(Buffer& buffer) +{ + encodeHeader(buffer); + encodeContent(buffer); +} + +void Message::encodeHeader(Buffer& buffer) +{ + buffer.putShortString(exchange); + buffer.putShortString(routingKey); + buffer.putLong(header->size()); + header->encode(buffer); +} + +void Message::encodeContent(Buffer& buffer) +{ + //Use a frame around each content block. Not really required but + //gives some error checking at little expense. Could change in the + //future... + AMQBody::shared_ptr body; for (content_iterator i = content.begin(); i != content.end(); i++) { body = static_pointer_cast<AMQBody, AMQContentBody>(*i); AMQFrame contentFrame(0, body); @@ -149,13 +172,31 @@ void Message::encode(Buffer& buffer) u_int32_t Message::encodedSize() { + return encodedHeaderSize() + encodedContentSize(); +} + +u_int32_t Message::encodedContentSize() +{ int encodedContentSize(0); for (content_iterator i = content.begin(); i != content.end(); i++) { - encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame (TODO, could replace frame by simple size) + encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame } + return encodedContentSize; +} +u_int32_t Message::encodedHeaderSize() +{ return exchange.size() + 1 + routingKey.size() + 1 - + header->size() + 8 //8 extra bytes for frame (TODO, could actually remove the frame) - + encodedContentSize; + + header->size() + 4;//4 extra bytes for size +} + +u_int64_t Message::expectedContentSize() +{ + return header.get() ? header->getContentSize() : 0; +} + +void Message::releaseContent() +{ + content.clear(); } |