diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index b0b5a85031..64e66c4a30 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -20,6 +20,10 @@ */ #include <qpid/broker/Message.h> #include <iostream> + +#include <qpid/broker/InMemoryContent.h> +#include <qpid/broker/LazyLoadedContent.h> +#include <qpid/broker/MessageStore.h> // AMQP version change - kpvdr 2006-11-17 #include <qpid/framing/ProtocolVersion.h> #include <qpid/framing/BasicDeliverBody.h> @@ -40,8 +44,10 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} -Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ - decode(buffer); +Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : + publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + + decode(buffer, headersOnly, contentChunkSize); } Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} @@ -53,7 +59,10 @@ void Message::setHeader(AMQHeaderBody::shared_ptr _header){ } void Message::addContent(AMQContentBody::shared_ptr data){ - content.push_back(data); + if (!content.get()) { + content = std::auto_ptr<Content>(new InMemoryContent()); + } + content->add(data); size += data->size(); } @@ -68,8 +77,9 @@ void Message::redeliver(){ void Message::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version + + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); sendContent(out, channel, framesize); } @@ -80,8 +90,8 @@ void Message::sendGetOk(OutputHandler* out, u_int64_t deliveryTag, u_int32_t framesize){ - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount))); sendContent(out, channel, framesize); } @@ -89,15 +99,8 @@ void Message::sendGetOk(OutputHandler* out, void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(channel, headerBody)); - for(content_iterator i = content.begin(); i != content.end(); i++){ - if((*i)->size() > framesize){ - //TODO: need to split it - std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl; - }else{ - AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); - out->send(new AMQFrame(channel, contentBody)); - } - } + + if (content.get()) content->send(out, channel, framesize); } BasicHeaderProperties* Message::getHeaderProperties(){ @@ -115,10 +118,10 @@ bool Message::isPersistent() return props && props->getDeliveryMode() == PERSISTENT; } -void Message::decode(Buffer& buffer) +void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) { decodeHeader(buffer); - decodeContent(buffer); + if (!headersOnly) decodeContent(buffer, contentChunkSize); } void Message::decodeHeader(Buffer& buffer) @@ -132,15 +135,25 @@ void Message::decodeHeader(Buffer& buffer) setHeader(headerBody); } -void Message::decodeContent(Buffer& buffer) +void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) { - AMQContentBody::shared_ptr contentBody; - while (buffer.available()) { - AMQFrame contentFrame; - contentFrame.decode(buffer); - contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody()); + u_int64_t expected = expectedContentSize(); + if (expected != buffer.available()) { + std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; + } + + if (!chunkSize || chunkSize > expected) { + chunkSize = expected; + } + + u_int64_t total = 0; + while (total < expectedContentSize()) { + u_int64_t remaining = expected - total; + AMQContentBody::shared_ptr contentBody(new AMQContentBody()); + contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); addContent(contentBody); - } + total += chunkSize; + } } void Message::encode(Buffer& buffer) @@ -159,15 +172,7 @@ void Message::encodeHeader(Buffer& buffer) void Message::encodeContent(Buffer& buffer) { - //Use a frame around each content block. Not really required but - //gives some error checking at little expense. Could change in the - //future... - AMQBody::shared_ptr body; - for (content_iterator i = content.begin(); i != content.end(); i++) { - body = static_pointer_cast<AMQBody, AMQContentBody>(*i); - AMQFrame contentFrame(0, body); - contentFrame.encode(buffer); - } + if (content.get()) content->encode(buffer); } u_int32_t Message::encodedSize() @@ -177,11 +182,7 @@ u_int32_t Message::encodedSize() u_int32_t Message::encodedContentSize() { - int encodedContentSize(0); - for (content_iterator i = content.begin(); i != content.end(); i++) { - encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame - } - return encodedContentSize; + return content.get() ? content->size() : 0; } u_int32_t Message::encodedHeaderSize() @@ -196,7 +197,15 @@ u_int64_t Message::expectedContentSize() return header.get() ? header->getContentSize() : 0; } -void Message::releaseContent() +void Message::releaseContent(MessageStore* store) +{ + if (!content.get() || content->size() > 0) { + //set content to lazy loading mode (but only if there is stored content): + content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize())); + } +} + +void Message::setContent(std::auto_ptr<Content>& _content) { - content.clear(); + content = _content; } |