summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-03-05 18:01:22 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-03-05 18:01:22 +0000
commit5ae2fdce3361ceb8c6fee7b4455d3ed890844c8f (patch)
tree1c8df984434598795f1c22e11dc59e1c2b5c9898
parentdf4921be47d527609342f1a95d595eb12b28160b (diff)
downloadqpid-python-5ae2fdce3361ceb8c6fee7b4455d3ed890844c8f.tar.gz
r1239@fuschia: andrew | 2007-02-26 10:58:52 +0000
Refactored message transfer to extract commmonality from deliver/get r1242@fuschia: andrew | 2007-03-05 17:54:44 +0000 Turn oversize inline transfers into reference transfers git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@514751 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp89
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h4
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp6
3 files changed, 63 insertions, 36 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index e1be57fad7..35e5039e12 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -61,14 +61,13 @@ MessageMessage::MessageMessage(
reference(reference_)
{}
-void MessageMessage::deliver(
+void MessageMessage::transferMessage(
framing::ChannelAdapter& channel,
const std::string& consumerTag,
- u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/)
-{
+ u_int32_t framesize)
+{
const framing::Content& body = transfer->getBody();
-
+
// Send any reference data
if (!body.isInline()){
// Open
@@ -81,8 +80,9 @@ void MessageMessage::deliver(
}
}
- // The the transfer
- channel.send(
+ // The transfer
+ if ( transfer->size()<=framesize ) {
+ channel.send(
new MessageTransferBody(channel.getVersion(),
transfer->getTicket(),
consumerTag,
@@ -107,6 +107,44 @@ void MessageMessage::deliver(
transfer->getApplicationHeaders(),
body,
transfer->getMandatory()));
+ } else {
+ // Thing to do here is to construct a simple reference message then deliver that instead
+ // fragmentmentation will be taken care of in the delivery
+ // if necessary; problem is to invent a reference name to use
+ string content = body.getValue();
+ string refname = "dummy";
+ TransferPtr newTransfer(
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ framing::Content(REFERENCE, refname),
+ transfer->getMandatory()));
+ ReferencePtr newRef(new Reference(refname));
+ Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+ newRef->append(newAppend);
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+ newMsg.transferMessage(channel, consumerTag, framesize);
+ return;
+ }
// Close any reference data
if (!body.isInline()){
// Close
@@ -114,39 +152,24 @@ void MessageMessage::deliver(
}
}
+void MessageMessage::deliver(
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t framesize)
+{
+ transferMessage(channel, consumerTag, framesize);
+}
+
void MessageMessage::sendGetOk(
const framing::MethodContext& context,
const std::string& destination,
u_int32_t /*messageCount*/,
u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/)
+ u_int32_t framesize)
{
framing::ChannelAdapter* channel = context.channel;
- channel->send(
- new MessageTransferBody(channel->getVersion(),
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- transfer->getBody(),
- transfer->getMandatory()));
+ transferMessage(*channel, destination, framesize);
}
bool MessageMessage::isComplete()
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index 3673fde9ed..1f2adb79f3 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -76,6 +76,10 @@ class MessageMessage: public Message{
u_int64_t expectedContentSize();
private:
+ void transferMessage(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ u_int32_t framesize);
+
framing::RequestId requestId;
const TransferPtr transfer;
const ReferencePtr reference;
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 784f180d5c..5d6d682e8b 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -187,12 +187,12 @@ MessageHandlerImpl::recover(const MethodContext& context,
}
void
-MessageHandlerImpl::reject(const MethodContext&,
+MessageHandlerImpl::reject(const MethodContext& /*context*/,
u_int16_t /*code*/,
const string& /*text*/ )
{
- // FIXME astitcher 2007-01-11: 0-9 feature
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+ channel.ack();
+ // channel.requeue();
}
void