summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-03-21 16:01:45 +0000
committerGordon Sim <gsim@apache.org>2007-03-21 16:01:45 +0000
commit6625d0c47f5252af8d64abce773583ec27f28116 (patch)
tree27e573f2adce0347469ddb3691e07dfbecc5082a /qpid/cpp/lib/broker/BrokerMessageMessage.cpp
parentb8915dcb1272dd09b177529774461a29bf4b01a2 (diff)
downloadqpid-python-6625d0c47f5252af8d64abce773583ec27f28116.tar.gz
Modifications to allow messages produced by the message class to be persisted as well as those from the basic class.
Fix to broker initialisation (ensure queues use the correct store). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@520924 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/lib/broker/BrokerMessageMessage.cpp')
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageMessage.cpp77
1 files changed, 69 insertions, 8 deletions
diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
index 3449078d70..a50375cdd3 100644
--- a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -26,6 +26,7 @@
#include "MessageCloseBody.h"
#include "MessageAppendBody.h"
#include "Reference.h"
+#include "framing/AMQFrame.h"
#include "framing/FieldTable.h"
#include "framing/BasicHeaderProperties.h"
@@ -61,6 +62,11 @@ MessageMessage::MessageMessage(
reference(reference_)
{}
+/**
+ * Currently used by message store impls to recover messages
+ */
+MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
framing::ChannelAdapter& channel,
@@ -213,27 +219,82 @@ bool MessageMessage::isPersistent()
uint32_t MessageMessage::encodedSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return encodedHeaderSize() + encodedContentSize();
}
uint32_t MessageMessage::encodedHeaderSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return transfer->size() - transfer->baseSize();
}
uint32_t MessageMessage::encodedContentSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return 0;
}
uint64_t MessageMessage::expectedContentSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return 0;
+}
+
+void MessageMessage::encode(Buffer& buffer)
+{
+ encodeHeader(buffer);
+}
+
+void MessageMessage::encodeHeader(Buffer& buffer)
+{
+ if (transfer->getBody().isInline()) {
+ transfer->encodeContent(buffer);
+ } else {
+ string data;
+ for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) {
+ data += (*a)->getBytes();
+ }
+ framing::Content body(INLINE, data);
+ std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
+ copy->encodeContent(buffer);
+ }
+}
+
+void MessageMessage::decodeHeader(Buffer& buffer)
+{
+ transfer->decodeContent(buffer);
+}
+
+void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
+{
}
+MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body)
+{
+ return new MessageTransferBody(version,
+ transfer->getTicket(),
+ destination,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ body,
+ transfer->getMandatory());
+
+}
}} // namespace qpid::broker