diff options
author | Gordon Sim <gsim@apache.org> | 2007-03-21 16:01:45 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-03-21 16:01:45 +0000 |
commit | df4faa062b3512312c78167bfbdf19ff969210ac (patch) | |
tree | 246a634754e2023df1692977fb9c347cba9795db /cpp/lib/broker/BrokerMessageMessage.cpp | |
parent | 9f994dad07799d26e9ecc7241863e7c48c952c99 (diff) | |
download | qpid-python-df4faa062b3512312c78167bfbdf19ff969210ac.tar.gz |
Modifications to allow messages produced by the message class to be persisted as well as those from the basic class.
Fix to broker initialisation (ensure queues use the correct store).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520924 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerMessageMessage.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 77 |
1 files changed, 69 insertions, 8 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 3449078d70..a50375cdd3 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -26,6 +26,7 @@ #include "MessageCloseBody.h" #include "MessageAppendBody.h" #include "Reference.h" +#include "framing/AMQFrame.h" #include "framing/FieldTable.h" #include "framing/BasicHeaderProperties.h" @@ -61,6 +62,11 @@ MessageMessage::MessageMessage( reference(reference_) {} +/** + * Currently used by message store impls to recover messages + */ +MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} + // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( framing::ChannelAdapter& channel, @@ -213,27 +219,82 @@ bool MessageMessage::isPersistent() uint32_t MessageMessage::encodedSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return encodedHeaderSize() + encodedContentSize(); } uint32_t MessageMessage::encodedHeaderSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return transfer->size() - transfer->baseSize(); } uint32_t MessageMessage::encodedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; } uint64_t MessageMessage::expectedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; +} + +void MessageMessage::encode(Buffer& buffer) +{ + encodeHeader(buffer); +} + +void MessageMessage::encodeHeader(Buffer& buffer) +{ + if (transfer->getBody().isInline()) { + transfer->encodeContent(buffer); + } else { + string data; + for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) { + data += (*a)->getBytes(); + } + framing::Content body(INLINE, data); + std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body)); + copy->encodeContent(buffer); + } +} + +void MessageMessage::decodeHeader(Buffer& buffer) +{ + transfer->decodeContent(buffer); +} + +void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) +{ } +MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) +{ + return new MessageTransferBody(version, + transfer->getTicket(), + destination, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + body, + transfer->getMandatory()); + +} }} // namespace qpid::broker |