summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java94
1 files changed, 44 insertions, 50 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index b5868cd235..86e1fc08de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -72,6 +72,7 @@ import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,7 +270,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
+ if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
{
flushAcknowledgments();
}
@@ -411,25 +412,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
-
- /**
- * Commit the receipt and the delivery of all messages exchanged by this session resources.
- */
- public void sendCommit() throws AMQException, FailoverException
- {
- getQpidSession().setAutoSync(true);
- try
- {
- getQpidSession().txCommit();
- }
- finally
- {
- getQpidSession().setAutoSync(false);
- }
- // We need to sync so that we get notify of an error.
- sync();
- }
-
/**
* Create a queue with a given name.
*
@@ -462,6 +444,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
+ RangeSet ranges = gatherUnackedRangeSet();
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ // We need to sync so that we get notify of an error.
+ sync();
+ }
+
+ private RangeSet gatherUnackedRangeSet()
+ {
RangeSet ranges = new RangeSet();
while (true)
{
@@ -470,11 +460,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
break;
}
- ranges.add((int) (long) tag);
+
+ ranges.add(tag.intValue());
}
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- // We need to sync so that we get notify of an error.
- sync();
+
+ return ranges;
}
@@ -548,7 +538,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
- throws JMSException
{
boolean res;
ExchangeBoundResult bindingQueryResult =
@@ -676,13 +665,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Create an 0_10 message producer
*/
public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent,
- long producerId) throws JMSException
+ final boolean immediate, final long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
- getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -692,6 +680,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
throw ex;
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
+ }
}
@@ -994,34 +986,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- @Override public void commit() throws JMSException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
- checkTransacted();
- try
- {
- if( _txSize > 0 )
- {
- messageAcknowledge(_txRangeSet, true);
- _txRangeSet.clear();
- _txSize = 0;
- }
- sendCommit();
- }
- catch(SessionException e)
+ if( _txSize > 0 )
{
- JMSException ex = new JMSException("Session exception occured while trying to commit");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
+ messageAcknowledge(_txRangeSet, true);
+ _txRangeSet.clear();
+ _txSize = 0;
}
- catch (AMQException e)
+
+ getQpidSession().setAutoSync(true);
+ try
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ getQpidSession().txCommit();
}
- catch (FailoverException e)
+ finally
{
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ getQpidSession().setAutoSync(false);
}
+ // We need to sync so that we get notify of an error.
+ sync();
}
protected final boolean tagLE(long tag1, long tag2)
@@ -1383,5 +1367,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sb.append(">");
return sb.toString();
}
-
+
+ protected void acknowledgeImpl()
+ {
+ RangeSet range = gatherUnackedRangeSet();
+
+ if(range.size() > 0 )
+ {
+ messageAcknowledge(range, true);
+ getQpidSession().sync();
+ }
+ }
}