summaryrefslogtreecommitdiff
path: root/cpp/src/broker/BrokerMessageMessage.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-04-03 18:50:43 +0000
committerAlan Conway <aconway@apache.org>2007-04-03 18:50:43 +0000
commit4336a5575fbf984a58dcbd0564463843e615a7c7 (patch)
tree1c9a26c2a018662a6f382bd303b3fa116f101c50 /cpp/src/broker/BrokerMessageMessage.cpp
parente50363a432201734aa8ee03ee62ad553e6b68bfe (diff)
downloadqpid-python-4336a5575fbf984a58dcbd0564463843e615a7c7.tar.gz
* cpp/src/broker/Broker.cpp: Join cleaner thread.
* cpp/src/broker/BrokerMessageMessage.h, .cpp, ReferenceTest: - Broke reference cycle between broker::MessageMessage and Reference by using a weak_ptr in MessageMessage git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/broker/BrokerMessageMessage.cpp')
-rw-r--r--cpp/src/broker/BrokerMessageMessage.cpp221
1 files changed, 118 insertions, 103 deletions
diff --git a/cpp/src/broker/BrokerMessageMessage.cpp b/cpp/src/broker/BrokerMessageMessage.cpp
index f320a0915e..d8eb0fada0 100644
--- a/cpp/src/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/broker/BrokerMessageMessage.cpp
@@ -48,7 +48,9 @@ MessageMessage::MessageMessage(
transfer_),
requestId(requestId_),
transfer(transfer_)
-{}
+{
+ assert(transfer->getBody().isInline());
+}
MessageMessage::MessageMessage(
ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
@@ -61,7 +63,10 @@ MessageMessage::MessageMessage(
requestId(requestId_),
transfer(transfer_),
reference(reference_)
-{}
+{
+ assert(!transfer->getBody().isInline());
+ assert(reference_);
+}
/**
* Currently used by message store impls to recover messages
@@ -74,101 +79,100 @@ void MessageMessage::transferMessage(
const std::string& consumerTag,
uint32_t framesize)
{
- const framing::Content& body = transfer->getBody();
+ const framing::Content& body = transfer->getBody();
+ // Send any reference data
+ ReferencePtr ref= getReference();
+ if (ref){
- // Send any reference data
- if (!body.isInline()){
- // Open
- channel.send(new MessageOpenBody(channel.getVersion(), reference->getId()));
- // Appends
- for(Reference::Appends::const_iterator a = reference->getAppends().begin();
- a != reference->getAppends().end();
- ++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
- // Calculate overhead bytes
- // Assume that the overhead is constant as the reference name doesn't change
- uint32_t overhead = sizeleft - content.size();
- string::size_type contentStart = 0;
- while (sizeleft) {
- string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(),
- string(content, contentStart, contentSize)));
- sizeleft -= contentSize;
- contentStart += contentSize;
- }
- }
- }
+ // Open
+ channel.send(new MessageOpenBody(channel.getVersion(), ref->getId()));
+ // Appends
+ for(Reference::Appends::const_iterator a = ref->getAppends().begin();
+ a != ref->getAppends().end();
+ ++a) {
+ uint32_t sizeleft = (*a)->size();
+ const string& content = (*a)->getBytes();
+ // Calculate overhead bytes
+ // Assume that the overhead is constant as the reference name doesn't change
+ uint32_t overhead = sizeleft - content.size();
+ string::size_type contentStart = 0;
+ while (sizeleft) {
+ string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
+ channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize)));
+ sizeleft -= contentSize;
+ contentStart += contentSize;
+ }
+ }
+ }
- // The transfer
- if ( transfer->size()<=framesize ) {
+ // The transfer
+ if ( transfer->size()<=framesize ) {
channel.send(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory()));
- } else {
- // Thing to do here is to construct a simple reference message then deliver that instead
- // fragmentation will be taken care of in the delivery if necessary;
- string content = body.getValue();
- string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname),
- transfer->getMandatory()));
- ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
- newMsg.transferMessage(channel, consumerTag, framesize);
- return;
- }
- // Close any reference data
- if (!body.isInline()){
- // Close
- channel.send(new MessageCloseBody(channel.getVersion(), reference->getId()));
- }
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ body,
+ transfer->getMandatory()));
+ } else {
+ // Thing to do here is to construct a simple reference message then deliver that instead
+ // fragmentation will be taken care of in the delivery if necessary;
+ string content = body.getValue();
+ string refname = "dummy";
+ TransferPtr newTransfer(
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ framing::Content(REFERENCE, refname),
+ transfer->getMandatory()));
+ ReferencePtr newRef(new Reference(refname));
+ Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+ newRef->append(newAppend);
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+ newMsg.transferMessage(channel, consumerTag, framesize);
+ return;
+ }
+ // Close any reference data
+ if (ref)
+ channel.send(new MessageCloseBody(channel.getVersion(), ref->getId()));
}
void MessageMessage::deliver(
@@ -177,18 +181,18 @@ void MessageMessage::deliver(
uint64_t /*deliveryTag*/,
uint32_t framesize)
{
- transferMessage(channel, consumerTag, framesize);
+ transferMessage(channel, consumerTag, framesize);
}
void MessageMessage::sendGetOk(
const framing::MethodContext& context,
- const std::string& destination,
+ const std::string& destination,
uint32_t /*messageCount*/,
uint64_t /*deliveryTag*/,
uint32_t framesize)
{
- framing::ChannelAdapter* channel = context.channel;
- transferMessage(*channel, destination, framesize);
+ framing::ChannelAdapter* channel = context.channel;
+ transferMessage(*channel, destination, framesize);
}
bool MessageMessage::isComplete()
@@ -198,10 +202,12 @@ bool MessageMessage::isComplete()
uint64_t MessageMessage::contentSize() const
{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
- else
- return reference->getSize();
+ if (transfer->getBody().isInline())
+ return transfer->getBody().getValue().size();
+ else {
+ assert(getReference());
+ return getReference()->getSize();
+ }
}
qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
@@ -249,8 +255,10 @@ void MessageMessage::encodeHeader(Buffer& buffer) const
if (transfer->getBody().isInline()) {
transfer->encodeContent(buffer);
} else {
+ assert(getReference());
string data;
- for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) {
+ const Reference::Appends& appends = getReference()->getAppends();
+ for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
data += (*a)->getBytes();
}
framing::Content body(INLINE, data);
@@ -302,4 +310,11 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version
transfer->getMandatory());
}
+
+MessageMessage::ReferencePtr MessageMessage::getReference() const {
+ return reference.lock();
+}
+
+
}} // namespace qpid::broker
+