diff options
Diffstat (limited to 'cpp/lib/broker/BrokerMessageMessage.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 77 |
1 files changed, 69 insertions, 8 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 3449078d70..a50375cdd3 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -26,6 +26,7 @@ #include "MessageCloseBody.h" #include "MessageAppendBody.h" #include "Reference.h" +#include "framing/AMQFrame.h" #include "framing/FieldTable.h" #include "framing/BasicHeaderProperties.h" @@ -61,6 +62,11 @@ MessageMessage::MessageMessage( reference(reference_) {} +/** + * Currently used by message store impls to recover messages + */ +MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} + // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( framing::ChannelAdapter& channel, @@ -213,27 +219,82 @@ bool MessageMessage::isPersistent() uint32_t MessageMessage::encodedSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return encodedHeaderSize() + encodedContentSize(); } uint32_t MessageMessage::encodedHeaderSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return transfer->size() - transfer->baseSize(); } uint32_t MessageMessage::encodedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; } uint64_t MessageMessage::expectedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; +} + +void MessageMessage::encode(Buffer& buffer) +{ + encodeHeader(buffer); +} + +void MessageMessage::encodeHeader(Buffer& buffer) +{ + if (transfer->getBody().isInline()) { + transfer->encodeContent(buffer); + } else { + string data; + for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) { + data += (*a)->getBytes(); + } + framing::Content body(INLINE, data); + std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body)); + copy->encodeContent(buffer); + } +} + +void MessageMessage::decodeHeader(Buffer& buffer) +{ + transfer->decodeContent(buffer); +} + +void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) +{ } +MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) +{ + return new MessageTransferBody(version, + transfer->getTicket(), + destination, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + body, + transfer->getMandatory()); + +} }} // namespace qpid::broker |