summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerMessageMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessageMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp232
1 files changed, 113 insertions, 119 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 0da5f3d8f5..1184885aeb 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken
};
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_)
+ transfer_->getImmediate()),
+ transfer(*transfer_)
{
- assert(transfer->getBody().isInline());
+ assert(transfer.getBody().isInline());
}
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_,
+ ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_),
+ transfer_->getImmediate()),
+ transfer(*transfer_),
reference(reference_)
{
- assert(!transfer->getBody().isInline());
+ assert(!transfer.getBody().isInline());
assert(reference_);
}
/**
* Currently used by message store impls to recover messages
*/
-MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+MessageMessage::MessageMessage() {}
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
@@ -84,27 +83,28 @@ void MessageMessage::transferMessage(
const std::string& consumerTag,
uint32_t framesize)
{
- const framing::Content& body = transfer->getBody();
+ const framing::Content& body = transfer.getBody();
// Send any reference data
ReferencePtr ref= getReference();
if (ref){
// Open
- channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageOpenBody(channel.getVersion(), ref->getId()));
// Appends
for(Reference::Appends::const_iterator a = ref->getAppends().begin();
a != ref->getAppends().end();
++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
+ uint32_t sizeleft = a->size();
+ const string& content = a->getBytes();
// Calculate overhead bytes
// Assume that the overhead is constant as the reference name doesn't change
uint32_t overhead = sizeleft - content.size();
string::size_type contentStart = 0;
while (sizeleft) {
string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize))));
+
+ channel.send(MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize)));
sizeleft -= contentSize;
contentStart += contentSize;
}
@@ -112,76 +112,73 @@ void MessageMessage::transferMessage(
}
// The transfer
- if ( transfer->size()<=framesize ) {
- channel.send(make_shared_ptr(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body)));
+ if ( transfer.size()<=framesize ) {
+ channel.send(MessageTransferBody(ProtocolVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
string content = body.getValue();
string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname)));
+ MessageTransferBody newTransfer(channel.getVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ framing::Content(REFERENCE, refname));
ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef);
+ newRef->append(MessageAppendBody(channel.getVersion(), refname, content));
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef);
newMsg.transferMessage(channel, consumerTag, framesize);
return;
}
// Close any reference data
if (ref)
- channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageCloseBody(ProtocolVersion(), ref->getId()));
}
@@ -202,8 +199,8 @@ bool MessageMessage::isComplete()
uint64_t MessageMessage::contentSize() const
{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
+ if (transfer.getBody().isInline())
+ return transfer.getBody().getValue().size();
else {
assert(getReference());
return getReference()->getSize();
@@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
const FieldTable& MessageMessage::getApplicationHeaders()
{
- return transfer->getApplicationHeaders();
+ return transfer.getApplicationHeaders();
}
bool MessageMessage::isPersistent()
{
- return transfer->getDeliveryMode() == PERSISTENT;
+ return transfer.getDeliveryMode() == PERSISTENT;
}
uint32_t MessageMessage::encodedSize() const
@@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const
uint32_t MessageMessage::encodedHeaderSize() const
{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize();
+ return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size();
}
uint32_t MessageMessage::encodedContentSize() const
@@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const
void MessageMessage::encodeHeader(Buffer& buffer) const
{
RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer->getBody().isInline()) {
- transfer->encodeContent(buffer);
+ if (transfer.getBody().isInline()) {
+ transfer.encode(buffer);
} else {
assert(getReference());
string data;
const Reference::Appends& appends = getReference()->getAppends();
for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
- data += (*a)->getBytes();
+ data += a->getBytes();
}
framing::Content body(INLINE, data);
- std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
- copy->encodeContent(buffer);
+ copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer);
}
}
@@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer)
{
//don't care about the type here, but want encode/decode to be symmetric
RecoveryManagerImpl::decodeMessageType(buffer);
-
- transfer->decodeContent(buffer);
+ transfer.decode(buffer);
}
void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
@@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
}
-MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
+MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body) const
{
- return new MessageTransferBody(version,
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body);
-
+ return MessageTransferBody(version,
+ transfer.getTicket(),
+ destination,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body);
}
MessageMessage::ReferencePtr MessageMessage::getReference() const {
@@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const
{
//TODO: change when encoding changes. Should be the payload of any
//header & body frames.
- return transfer->size();
+ return transfer.size();
}