summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp91
1 files changed, 50 insertions, 41 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index b0b5a85031..64e66c4a30 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -20,6 +20,10 @@
*/
#include <qpid/broker/Message.h>
#include <iostream>
+
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid/broker/LazyLoadedContent.h>
+#include <qpid/broker/MessageStore.h>
// AMQP version change - kpvdr 2006-11-17
#include <qpid/framing/ProtocolVersion.h>
#include <qpid/framing/BasicDeliverBody.h>
@@ -40,8 +44,10 @@ Message::Message(const ConnectionToken* const _publisher,
size(0),
persistenceId(0) {}
-Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
- decode(buffer);
+Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+ publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
+
+ decode(buffer, headersOnly, contentChunkSize);
}
Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
@@ -53,7 +59,10 @@ void Message::setHeader(AMQHeaderBody::shared_ptr _header){
}
void Message::addContent(AMQContentBody::shared_ptr data){
- content.push_back(data);
+ if (!content.get()) {
+ content = std::auto_ptr<Content>(new InMemoryContent());
+ }
+ content->add(data);
size += data->size();
}
@@ -68,8 +77,9 @@ void Message::redeliver(){
void Message::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
+
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -80,8 +90,8 @@ void Message::sendGetOk(OutputHandler* out,
u_int64_t deliveryTag,
u_int32_t framesize){
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount)));
sendContent(out, channel, framesize);
}
@@ -89,15 +99,8 @@ void Message::sendGetOk(OutputHandler* out,
void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
- for(content_iterator i = content.begin(); i != content.end(); i++){
- if((*i)->size() > framesize){
- //TODO: need to split it
- std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
- }else{
- AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- out->send(new AMQFrame(channel, contentBody));
- }
- }
+
+ if (content.get()) content->send(out, channel, framesize);
}
BasicHeaderProperties* Message::getHeaderProperties(){
@@ -115,10 +118,10 @@ bool Message::isPersistent()
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::decode(Buffer& buffer)
+void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
{
decodeHeader(buffer);
- decodeContent(buffer);
+ if (!headersOnly) decodeContent(buffer, contentChunkSize);
}
void Message::decodeHeader(Buffer& buffer)
@@ -132,15 +135,25 @@ void Message::decodeHeader(Buffer& buffer)
setHeader(headerBody);
}
-void Message::decodeContent(Buffer& buffer)
+void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
{
- AMQContentBody::shared_ptr contentBody;
- while (buffer.available()) {
- AMQFrame contentFrame;
- contentFrame.decode(buffer);
- contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
+ u_int64_t expected = expectedContentSize();
+ if (expected != buffer.available()) {
+ std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
+ }
+
+ if (!chunkSize || chunkSize > expected) {
+ chunkSize = expected;
+ }
+
+ u_int64_t total = 0;
+ while (total < expectedContentSize()) {
+ u_int64_t remaining = expected - total;
+ AMQContentBody::shared_ptr contentBody(new AMQContentBody());
+ contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
addContent(contentBody);
- }
+ total += chunkSize;
+ }
}
void Message::encode(Buffer& buffer)
@@ -159,15 +172,7 @@ void Message::encodeHeader(Buffer& buffer)
void Message::encodeContent(Buffer& buffer)
{
- //Use a frame around each content block. Not really required but
- //gives some error checking at little expense. Could change in the
- //future...
- AMQBody::shared_ptr body;
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- AMQFrame contentFrame(0, body);
- contentFrame.encode(buffer);
- }
+ if (content.get()) content->encode(buffer);
}
u_int32_t Message::encodedSize()
@@ -177,11 +182,7 @@ u_int32_t Message::encodedSize()
u_int32_t Message::encodedContentSize()
{
- int encodedContentSize(0);
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
- }
- return encodedContentSize;
+ return content.get() ? content->size() : 0;
}
u_int32_t Message::encodedHeaderSize()
@@ -196,7 +197,15 @@ u_int64_t Message::expectedContentSize()
return header.get() ? header->getContentSize() : 0;
}
-void Message::releaseContent()
+void Message::releaseContent(MessageStore* store)
+{
+ if (!content.get() || content->size() > 0) {
+ //set content to lazy loading mode (but only if there is stored content):
+ content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize()));
+ }
+}
+
+void Message::setContent(std::auto_ptr<Content>& _content)
{
- content.clear();
+ content = _content;
}