summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-15 16:35:43 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-15 16:35:43 +0000
commit366bacef0be607c82053b960255926e27a7fe0cf (patch)
tree7323f8971436fbdceb3f5e6a40565e26b373fd8c
parent9ffd924daedfc7d1d3c2e072befaf6645aef671e (diff)
downloadqpid-python-366bacef0be607c82053b960255926e27a7fe0cf.tar.gz
Fix for RecoverTest which was failing because the redelivered flag was disconnected
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507993 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java12
4 files changed, 15 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 00243f865b..752452ab38 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -395,6 +395,7 @@ public class AMQChannel
{
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
+ mtb.redelivered = msg.isRedelivered();
ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize());
for (ByteBuffer bb : msg.getContents())
{
@@ -441,6 +442,7 @@ public class AMQChannel
{
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
+ mtb.redelivered = msg.isRedelivered();
mtb.body = new Content(Content.TypeEnum.REF_T, refId);
_session.writeRequest(_channelId, mtb, listener);
for (ByteBuffer bb : msg.getContents())
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index e9c755d09c..63d1746c8d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -550,7 +550,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- false,
+ messageFrame.getRedeliveredFlag(),
messageFrame.getMessageHeaders(),
messageFrame.getContents());
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
index d16f0c1fc4..49c1184119 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
@@ -68,7 +68,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener
messageHeaders.setDeliveryMode(transferBody.getDeliveryMode());
messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders());
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders);
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders, transferBody.getRedelivered());
if(transferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
{
@@ -78,7 +78,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener
else
{
String referenceId = new String(transferBody.getBody().getContentAsByteArray());
- protocolSession.deliverMessageToAMQSession(evt.getChannelId(),referenceId);
+ protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index dd3140e8d8..b3ea03efe3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -37,20 +37,23 @@ public class UnprocessedMessage
private int channelId;
private List<byte[]> contents = new LinkedList();
private long deliveryTag;
+ private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
- public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders)
+ public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
{
this.channelId = channelId;
this.deliveryTag = deliveryTag;
this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
}
- public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content)
+ public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag)
{
this.channelId = channelId;
this.deliveryTag = deliveryTag;
this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
addContent(content);
}
@@ -80,6 +83,11 @@ public class UnprocessedMessage
return deliveryTag;
}
+ public boolean getRedeliveredFlag()
+ {
+ return redeliveredFlag;
+ }
+
public MessageHeaders getMessageHeaders()
{
return messageHeaders;