diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-03 13:04:04 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-03 13:04:04 +0000 |
commit | a90c9538cd57d22bd7c91835bc3835c4bc4aa64f (patch) | |
tree | 60c886cebc28f1d3c62edf8c91e921e978b10b45 | |
parent | 0bc2b2790271dba7f6a0ce16de9c387ce1a59797 (diff) | |
download | qpid-python-a90c9538cd57d22bd7c91835bc3835c4bc4aa64f.tar.gz |
Changed to use message.recover instead of message.release
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@581589 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 24 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 2b29ed3817..3456124185 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -248,13 +248,13 @@ public class AMQSession_0_10 extends AMQSession public void sendRecover() throws AMQException, FailoverException { // release all unack messages - RangeSet ranges = new RangeSet(); + /*RangeSet ranges = new RangeSet(); for (long messageTag : _unacknowledgedMessageTags) { // release this message ranges.add(messageTag); - } - getQpidSession().messageRelease(ranges); + }*/ + getQpidSession().messageRecover(Option.REQUEUE); // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index 52448d3c00..76735f8925 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -79,7 +79,9 @@ public interface Session // Producer //------------------------------------------------------ /** - * Transfer the given message to a specified exchange. + * Transfer the given + * + * to a specified exchange. * <p/> * <p>This is a convinience method for providing a complete message * using a single method which internaly is mapped to messageTransfer(), headers() followed @@ -364,6 +366,23 @@ public interface Session public void messageReject(RangeSet ranges, int code, String text); /** + * This method asks the broker to redeliver all unacknowledged messages on a specified session. + * Zero or more messages may be redelivered. This method is only allowed on non-transacted + * sessions. + * <p> Following are valid options: + * <ul> + * <li>{@link Option#REQUEUE}: <p>IIf this field is not set, the message will be redelivered to the original recipient. + * If this option is ser, the server will attempt to requeue the message, potentially then delivering it + * to an alternative subscriber. + * <p/> + * </ul> + * + * @param _options see available options + */ + public void messageRecover(Option... _options); + + + /** * As it is possible that the broker does not manage to reject some messages, after completion of * {@link Session#messageReject} this method will return the ranges of rejected messages. * <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 9c8b525deb..ad2cbe8cde 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -105,7 +105,7 @@ public class Session extends Invoker processed.add(range); flush = syncPoint != null && processed.includes(syncPoint); } - if (flush) + if (! flush) { flushProcessed(); } |