diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-03-05 18:01:22 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-03-05 18:01:22 +0000 |
commit | 5ae2fdce3361ceb8c6fee7b4455d3ed890844c8f (patch) | |
tree | 1c8df984434598795f1c22e11dc59e1c2b5c9898 | |
parent | df4921be47d527609342f1a95d595eb12b28160b (diff) | |
download | qpid-python-5ae2fdce3361ceb8c6fee7b4455d3ed890844c8f.tar.gz |
r1239@fuschia: andrew | 2007-02-26 10:58:52 +0000
Refactored message transfer to extract commmonality from deliver/get
r1242@fuschia: andrew | 2007-03-05 17:54:44 +0000
Turn oversize inline transfers into reference transfers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@514751 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 89 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 4 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 6 |
3 files changed, 63 insertions, 36 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index e1be57fad7..35e5039e12 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -61,14 +61,13 @@ MessageMessage::MessageMessage( reference(reference_) {} -void MessageMessage::deliver( +void MessageMessage::transferMessage( framing::ChannelAdapter& channel, const std::string& consumerTag, - u_int64_t /*deliveryTag*/, - u_int32_t /*framesize*/) -{ + u_int32_t framesize) +{ const framing::Content& body = transfer->getBody(); - + // Send any reference data if (!body.isInline()){ // Open @@ -81,8 +80,9 @@ void MessageMessage::deliver( } } - // The the transfer - channel.send( + // The transfer + if ( transfer->size()<=framesize ) { + channel.send( new MessageTransferBody(channel.getVersion(), transfer->getTicket(), consumerTag, @@ -107,6 +107,44 @@ void MessageMessage::deliver( transfer->getApplicationHeaders(), body, transfer->getMandatory())); + } else { + // Thing to do here is to construct a simple reference message then deliver that instead + // fragmentmentation will be taken care of in the delivery + // if necessary; problem is to invent a reference name to use + string content = body.getValue(); + string refname = "dummy"; + TransferPtr newTransfer( + new MessageTransferBody(channel.getVersion(), + transfer->getTicket(), + consumerTag, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + framing::Content(REFERENCE, refname), + transfer->getMandatory())); + ReferencePtr newRef(new Reference(refname)); + Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); + newRef->append(newAppend); + MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); + newMsg.transferMessage(channel, consumerTag, framesize); + return; + } // Close any reference data if (!body.isInline()){ // Close @@ -114,39 +152,24 @@ void MessageMessage::deliver( } } +void MessageMessage::deliver( + framing::ChannelAdapter& channel, + const std::string& consumerTag, + u_int64_t /*deliveryTag*/, + u_int32_t framesize) +{ + transferMessage(channel, consumerTag, framesize); +} + void MessageMessage::sendGetOk( const framing::MethodContext& context, const std::string& destination, u_int32_t /*messageCount*/, u_int64_t /*deliveryTag*/, - u_int32_t /*framesize*/) + u_int32_t framesize) { framing::ChannelAdapter* channel = context.channel; - channel->send( - new MessageTransferBody(channel->getVersion(), - transfer->getTicket(), - destination, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - transfer->getBody(), - transfer->getMandatory())); + transferMessage(*channel, destination, framesize); } bool MessageMessage::isComplete() diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index 3673fde9ed..1f2adb79f3 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -76,6 +76,10 @@ class MessageMessage: public Message{ u_int64_t expectedContentSize(); private: + void transferMessage(framing::ChannelAdapter& channel, + const std::string& consumerTag, + u_int32_t framesize); + framing::RequestId requestId; const TransferPtr transfer; const ReferencePtr reference; diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 784f180d5c..5d6d682e8b 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -187,12 +187,12 @@ MessageHandlerImpl::recover(const MethodContext& context, } void -MessageHandlerImpl::reject(const MethodContext&, +MessageHandlerImpl::reject(const MethodContext& /*context*/, u_int16_t /*code*/, const string& /*text*/ ) { - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); + channel.ack(); + // channel.requeue(); } void |