diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessageMessage.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerMessageMessage.cpp | 232 |
1 files changed, 113 insertions, 119 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 0da5f3d8f5..1184885aeb 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken }; MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_ + ConnectionToken* publisher, const MessageTransferBody* transfer_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_) + transfer_->getImmediate()), + transfer(*transfer_) { - assert(transfer->getBody().isInline()); + assert(transfer.getBody().isInline()); } MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_ + ConnectionToken* publisher, const MessageTransferBody* transfer_, + ReferencePtr reference_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_), + transfer_->getImmediate()), + transfer(*transfer_), reference(reference_) { - assert(!transfer->getBody().isInline()); + assert(!transfer.getBody().isInline()); assert(reference_); } /** * Currently used by message store impls to recover messages */ -MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} +MessageMessage::MessageMessage() {} // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( @@ -84,27 +83,28 @@ void MessageMessage::transferMessage( const std::string& consumerTag, uint32_t framesize) { - const framing::Content& body = transfer->getBody(); + const framing::Content& body = transfer.getBody(); // Send any reference data ReferencePtr ref= getReference(); if (ref){ // Open - channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId()))); + channel.send(MessageOpenBody(channel.getVersion(), ref->getId())); // Appends for(Reference::Appends::const_iterator a = ref->getAppends().begin(); a != ref->getAppends().end(); ++a) { - uint32_t sizeleft = (*a)->size(); - const string& content = (*a)->getBytes(); + uint32_t sizeleft = a->size(); + const string& content = a->getBytes(); // Calculate overhead bytes // Assume that the overhead is constant as the reference name doesn't change uint32_t overhead = sizeleft - content.size(); string::size_type contentStart = 0; while (sizeleft) { string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize)))); + + channel.send(MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize))); sizeleft -= contentSize; contentStart += contentSize; } @@ -112,76 +112,73 @@ void MessageMessage::transferMessage( } // The transfer - if ( transfer->size()<=framesize ) { - channel.send(make_shared_ptr( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body))); + if ( transfer.size()<=framesize ) { + channel.send(MessageTransferBody(ProtocolVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body)); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; string content = body.getValue(); string refname = "dummy"; - TransferPtr newTransfer( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname))); + MessageTransferBody newTransfer(channel.getVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + framing::Content(REFERENCE, refname)); ReferencePtr newRef(new Reference(refname)); - Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); - newRef->append(newAppend); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef); + newRef->append(MessageAppendBody(channel.getVersion(), refname, content)); + MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef); newMsg.transferMessage(channel, consumerTag, framesize); return; } // Close any reference data if (ref) - channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); + channel.send(MessageCloseBody(ProtocolVersion(), ref->getId())); } @@ -202,8 +199,8 @@ bool MessageMessage::isComplete() uint64_t MessageMessage::contentSize() const { - if (transfer->getBody().isInline()) - return transfer->getBody().getValue().size(); + if (transfer.getBody().isInline()) + return transfer.getBody().getValue().size(); else { assert(getReference()); return getReference()->getSize(); @@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() const FieldTable& MessageMessage::getApplicationHeaders() { - return transfer->getApplicationHeaders(); + return transfer.getApplicationHeaders(); } bool MessageMessage::isPersistent() { - return transfer->getDeliveryMode() == PERSISTENT; + return transfer.getDeliveryMode() == PERSISTENT; } uint32_t MessageMessage::encodedSize() const @@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const uint32_t MessageMessage::encodedHeaderSize() const { - return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize(); + return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size(); } uint32_t MessageMessage::encodedContentSize() const @@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const void MessageMessage::encodeHeader(Buffer& buffer) const { RecoveryManagerImpl::encodeMessageType(*this, buffer); - if (transfer->getBody().isInline()) { - transfer->encodeContent(buffer); + if (transfer.getBody().isInline()) { + transfer.encode(buffer); } else { assert(getReference()); string data; const Reference::Appends& appends = getReference()->getAppends(); for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { - data += (*a)->getBytes(); + data += a->getBytes(); } framing::Content body(INLINE, data); - std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body)); - copy->encodeContent(buffer); + copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer); } } @@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer) { //don't care about the type here, but want encode/decode to be symmetric RecoveryManagerImpl::decodeMessageType(buffer); - - transfer->decodeContent(buffer); + transfer.decode(buffer); } void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) @@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) } -MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, - const string& destination, - const framing::Content& body) const +MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) const { - return new MessageTransferBody(version, - transfer->getTicket(), - destination, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body); - + return MessageTransferBody(version, + transfer.getTicket(), + destination, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body); } MessageMessage::ReferencePtr MessageMessage::getReference() const { @@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const { //TODO: change when encoding changes. Should be the payload of any //header & body frames. - return transfer->size(); + return transfer.size(); } |