diff options
author | Alan Conway <aconway@apache.org> | 2007-04-03 18:50:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-04-03 18:50:43 +0000 |
commit | 4336a5575fbf984a58dcbd0564463843e615a7c7 (patch) | |
tree | 1c9a26c2a018662a6f382bd303b3fa116f101c50 /cpp/src/broker/BrokerMessageMessage.cpp | |
parent | e50363a432201734aa8ee03ee62ad553e6b68bfe (diff) | |
download | qpid-python-4336a5575fbf984a58dcbd0564463843e615a7c7.tar.gz |
* cpp/src/broker/Broker.cpp: Join cleaner thread.
* cpp/src/broker/BrokerMessageMessage.h, .cpp, ReferenceTest:
- Broke reference cycle between broker::MessageMessage and Reference
by using a weak_ptr in MessageMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/broker/BrokerMessageMessage.cpp')
-rw-r--r-- | cpp/src/broker/BrokerMessageMessage.cpp | 221 |
1 files changed, 118 insertions, 103 deletions
diff --git a/cpp/src/broker/BrokerMessageMessage.cpp b/cpp/src/broker/BrokerMessageMessage.cpp index f320a0915e..d8eb0fada0 100644 --- a/cpp/src/broker/BrokerMessageMessage.cpp +++ b/cpp/src/broker/BrokerMessageMessage.cpp @@ -48,7 +48,9 @@ MessageMessage::MessageMessage( transfer_), requestId(requestId_), transfer(transfer_) -{} +{ + assert(transfer->getBody().isInline()); +} MessageMessage::MessageMessage( ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_, @@ -61,7 +63,10 @@ MessageMessage::MessageMessage( requestId(requestId_), transfer(transfer_), reference(reference_) -{} +{ + assert(!transfer->getBody().isInline()); + assert(reference_); +} /** * Currently used by message store impls to recover messages @@ -74,101 +79,100 @@ void MessageMessage::transferMessage( const std::string& consumerTag, uint32_t framesize) { - const framing::Content& body = transfer->getBody(); + const framing::Content& body = transfer->getBody(); + // Send any reference data + ReferencePtr ref= getReference(); + if (ref){ - // Send any reference data - if (!body.isInline()){ - // Open - channel.send(new MessageOpenBody(channel.getVersion(), reference->getId())); - // Appends - for(Reference::Appends::const_iterator a = reference->getAppends().begin(); - a != reference->getAppends().end(); - ++a) { - uint32_t sizeleft = (*a)->size(); - const string& content = (*a)->getBytes(); - // Calculate overhead bytes - // Assume that the overhead is constant as the reference name doesn't change - uint32_t overhead = sizeleft - content.size(); - string::size_type contentStart = 0; - while (sizeleft) { - string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(), - string(content, contentStart, contentSize))); - sizeleft -= contentSize; - contentStart += contentSize; - } - } - } + // Open + channel.send(new MessageOpenBody(channel.getVersion(), ref->getId())); + // Appends + for(Reference::Appends::const_iterator a = ref->getAppends().begin(); + a != ref->getAppends().end(); + ++a) { + uint32_t sizeleft = (*a)->size(); + const string& content = (*a)->getBytes(); + // Calculate overhead bytes + // Assume that the overhead is constant as the reference name doesn't change + uint32_t overhead = sizeleft - content.size(); + string::size_type contentStart = 0; + while (sizeleft) { + string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; + channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize))); + sizeleft -= contentSize; + contentStart += contentSize; + } + } + } - // The transfer - if ( transfer->size()<=framesize ) { + // The transfer + if ( transfer->size()<=framesize ) { channel.send( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body, - transfer->getMandatory())); - } else { - // Thing to do here is to construct a simple reference message then deliver that instead - // fragmentation will be taken care of in the delivery if necessary; - string content = body.getValue(); - string refname = "dummy"; - TransferPtr newTransfer( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname), - transfer->getMandatory())); - ReferencePtr newRef(new Reference(refname)); - Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); - newRef->append(newAppend); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); - newMsg.transferMessage(channel, consumerTag, framesize); - return; - } - // Close any reference data - if (!body.isInline()){ - // Close - channel.send(new MessageCloseBody(channel.getVersion(), reference->getId())); - } + new MessageTransferBody(channel.getVersion(), + transfer->getTicket(), + consumerTag, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + body, + transfer->getMandatory())); + } else { + // Thing to do here is to construct a simple reference message then deliver that instead + // fragmentation will be taken care of in the delivery if necessary; + string content = body.getValue(); + string refname = "dummy"; + TransferPtr newTransfer( + new MessageTransferBody(channel.getVersion(), + transfer->getTicket(), + consumerTag, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + framing::Content(REFERENCE, refname), + transfer->getMandatory())); + ReferencePtr newRef(new Reference(refname)); + Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); + newRef->append(newAppend); + MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); + newMsg.transferMessage(channel, consumerTag, framesize); + return; + } + // Close any reference data + if (ref) + channel.send(new MessageCloseBody(channel.getVersion(), ref->getId())); } void MessageMessage::deliver( @@ -177,18 +181,18 @@ void MessageMessage::deliver( uint64_t /*deliveryTag*/, uint32_t framesize) { - transferMessage(channel, consumerTag, framesize); + transferMessage(channel, consumerTag, framesize); } void MessageMessage::sendGetOk( const framing::MethodContext& context, - const std::string& destination, + const std::string& destination, uint32_t /*messageCount*/, uint64_t /*deliveryTag*/, uint32_t framesize) { - framing::ChannelAdapter* channel = context.channel; - transferMessage(*channel, destination, framesize); + framing::ChannelAdapter* channel = context.channel; + transferMessage(*channel, destination, framesize); } bool MessageMessage::isComplete() @@ -198,10 +202,12 @@ bool MessageMessage::isComplete() uint64_t MessageMessage::contentSize() const { - if (transfer->getBody().isInline()) - return transfer->getBody().getValue().size(); - else - return reference->getSize(); + if (transfer->getBody().isInline()) + return transfer->getBody().getValue().size(); + else { + assert(getReference()); + return getReference()->getSize(); + } } qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() @@ -249,8 +255,10 @@ void MessageMessage::encodeHeader(Buffer& buffer) const if (transfer->getBody().isInline()) { transfer->encodeContent(buffer); } else { + assert(getReference()); string data; - for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) { + const Reference::Appends& appends = getReference()->getAppends(); + for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { data += (*a)->getBytes(); } framing::Content body(INLINE, data); @@ -302,4 +310,11 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version transfer->getMandatory()); } + +MessageMessage::ReferencePtr MessageMessage::getReference() const { + return reference.lock(); +} + + }} // namespace qpid::broker + |