diff options
author | Kim van der Riet <kpvdr@apache.org> | 2007-02-15 16:35:43 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-15 16:35:43 +0000 |
commit | 366bacef0be607c82053b960255926e27a7fe0cf (patch) | |
tree | 7323f8971436fbdceb3f5e6a40565e26b373fd8c | |
parent | 9ffd924daedfc7d1d3c2e072befaf6645aef671e (diff) | |
download | qpid-python-366bacef0be607c82053b960255926e27a7fe0cf.tar.gz |
Fix for RecoverTest which was failing because the redelivered flag was disconnected
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507993 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 15 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 00243f865b..752452ab38 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -395,6 +395,7 @@ public class AMQChannel { MessageTransferBody mtb = msg.getTransferBody().copy(); mtb.destination = destination; + mtb.redelivered = msg.isRedelivered(); ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize()); for (ByteBuffer bb : msg.getContents()) { @@ -441,6 +442,7 @@ public class AMQChannel { MessageTransferBody mtb = msg.getTransferBody().copy(); mtb.destination = destination; + mtb.redelivered = msg.isRedelivered(); mtb.body = new Content(Content.TypeEnum.REF_T, refId); _session.writeRequest(_channelId, mtb, listener); for (ByteBuffer bb : msg.getContents()) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e9c755d09c..63d1746c8d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -550,7 +550,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliveryTag(), - false, + messageFrame.getRedeliveredFlag(), messageFrame.getMessageHeaders(), messageFrame.getContents()); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java index d16f0c1fc4..49c1184119 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -68,7 +68,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener messageHeaders.setDeliveryMode(transferBody.getDeliveryMode()); messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders()); - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders, transferBody.getRedelivered()); if(transferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T) { @@ -78,7 +78,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener else { String referenceId = new String(transferBody.getBody().getContentAsByteArray()); - protocolSession.deliverMessageToAMQSession(evt.getChannelId(),referenceId); + protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index dd3140e8d8..b3ea03efe3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -37,20 +37,23 @@ public class UnprocessedMessage private int channelId; private List<byte[]> contents = new LinkedList(); private long deliveryTag; + private boolean redeliveredFlag; private MessageHeaders messageHeaders; - public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders) + public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) { this.channelId = channelId; this.deliveryTag = deliveryTag; this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; } - public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content) + public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag) { this.channelId = channelId; this.deliveryTag = deliveryTag; this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; addContent(content); } @@ -80,6 +83,11 @@ public class UnprocessedMessage return deliveryTag; } + public boolean getRedeliveredFlag() + { + return redeliveredFlag; + } + public MessageHeaders getMessageHeaders() { return messageHeaders; |