summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-10-03 13:04:04 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-10-03 13:04:04 +0000
commita90c9538cd57d22bd7c91835bc3835c4bc4aa64f (patch)
tree60c886cebc28f1d3c62edf8c91e921e978b10b45
parent0bc2b2790271dba7f6a0ce16de9c387ce1a59797 (diff)
downloadqpid-python-a90c9538cd57d22bd7c91835bc3835c4bc4aa64f.tar.gz
Changed to use message.recover instead of message.release
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@581589 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java2
3 files changed, 24 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 2b29ed3817..3456124185 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -248,13 +248,13 @@ public class AMQSession_0_10 extends AMQSession
public void sendRecover() throws AMQException, FailoverException
{
// release all unack messages
- RangeSet ranges = new RangeSet();
+ /*RangeSet ranges = new RangeSet();
for (long messageTag : _unacknowledgedMessageTags)
{
// release this message
ranges.add(messageTag);
- }
- getQpidSession().messageRelease(ranges);
+ }*/
+ getQpidSession().messageRecover(Option.REQUEUE);
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
index 52448d3c00..76735f8925 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
@@ -79,7 +79,9 @@ public interface Session
// Producer
//------------------------------------------------------
/**
- * Transfer the given message to a specified exchange.
+ * Transfer the given
+ *
+ * to a specified exchange.
* <p/>
* <p>This is a convinience method for providing a complete message
* using a single method which internaly is mapped to messageTransfer(), headers() followed
@@ -364,6 +366,23 @@ public interface Session
public void messageReject(RangeSet ranges, int code, String text);
/**
+ * This method asks the broker to redeliver all unacknowledged messages on a specified session.
+ * Zero or more messages may be redelivered. This method is only allowed on non-transacted
+ * sessions.
+ * <p> Following are valid options:
+ * <ul>
+ * <li>{@link Option#REQUEUE}: <p>IIf this field is not set, the message will be redelivered to the original recipient.
+ * If this option is ser, the server will attempt to requeue the message, potentially then delivering it
+ * to an alternative subscriber.
+ * <p/>
+ * </ul>
+ *
+ * @param _options see available options
+ */
+ public void messageRecover(Option... _options);
+
+
+ /**
* As it is possible that the broker does not manage to reject some messages, after completion of
* {@link Session#messageReject} this method will return the ranges of rejected messages.
* <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 9c8b525deb..ad2cbe8cde 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -105,7 +105,7 @@ public class Session extends Invoker
processed.add(range);
flush = syncPoint != null && processed.includes(syncPoint);
}
- if (flush)
+ if (! flush)
{
flushProcessed();
}