summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerMessageMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerMessageMessage.cpp')
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp77
1 files changed, 69 insertions, 8 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index 3449078d70..a50375cdd3 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -26,6 +26,7 @@
#include "MessageCloseBody.h"
#include "MessageAppendBody.h"
#include "Reference.h"
+#include "framing/AMQFrame.h"
#include "framing/FieldTable.h"
#include "framing/BasicHeaderProperties.h"
@@ -61,6 +62,11 @@ MessageMessage::MessageMessage(
reference(reference_)
{}
+/**
+ * Currently used by message store impls to recover messages
+ */
+MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
framing::ChannelAdapter& channel,
@@ -213,27 +219,82 @@ bool MessageMessage::isPersistent()
uint32_t MessageMessage::encodedSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return encodedHeaderSize() + encodedContentSize();
}
uint32_t MessageMessage::encodedHeaderSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return transfer->size() - transfer->baseSize();
}
uint32_t MessageMessage::encodedContentSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return 0;
}
uint64_t MessageMessage::expectedContentSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return 0;
+}
+
+void MessageMessage::encode(Buffer& buffer)
+{
+ encodeHeader(buffer);
+}
+
+void MessageMessage::encodeHeader(Buffer& buffer)
+{
+ if (transfer->getBody().isInline()) {
+ transfer->encodeContent(buffer);
+ } else {
+ string data;
+ for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) {
+ data += (*a)->getBytes();
+ }
+ framing::Content body(INLINE, data);
+ std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
+ copy->encodeContent(buffer);
+ }
+}
+
+void MessageMessage::decodeHeader(Buffer& buffer)
+{
+ transfer->decodeContent(buffer);
+}
+
+void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
+{
}
+MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body)
+{
+ return new MessageTransferBody(version,
+ transfer->getTicket(),
+ destination,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ body,
+ transfer->getMandatory());
+
+}
}} // namespace qpid::broker