diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 94 |
1 files changed, 44 insertions, 50 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index b5868cd235..86e1fc08de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -72,6 +72,7 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -269,7 +270,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic long prefetch = getAMQConnection().getMaxPrefetch(); - if (unackedCount >= prefetch/2 || maxAckDelay <= 0) + if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE) { flushAcknowledgments(); } @@ -411,25 +412,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - - /** - * Commit the receipt and the delivery of all messages exchanged by this session resources. - */ - public void sendCommit() throws AMQException, FailoverException - { - getQpidSession().setAutoSync(true); - try - { - getQpidSession().txCommit(); - } - finally - { - getQpidSession().setAutoSync(false); - } - // We need to sync so that we get notify of an error. - sync(); - } - /** * Create a queue with a given name. * @@ -462,6 +444,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendRecover() throws AMQException, FailoverException { // release all unacked messages + RangeSet ranges = gatherUnackedRangeSet(); + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + // We need to sync so that we get notify of an error. + sync(); + } + + private RangeSet gatherUnackedRangeSet() + { RangeSet ranges = new RangeSet(); while (true) { @@ -470,11 +460,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { break; } - ranges.add((int) (long) tag); + + ranges.add(tag.intValue()); } - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); - // We need to sync so that we get notify of an error. - sync(); + + return ranges; } @@ -548,7 +538,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) - throws JMSException { boolean res; ExchangeBoundResult bindingQueryResult = @@ -676,13 +665,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Create an 0_10 message producer */ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, - long producerId) throws JMSException + final boolean immediate, final long producerId) throws JMSException { try { return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this, - getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + getProtocolHandler(), producerId, immediate, mandatory); } catch (AMQException e) { @@ -692,6 +680,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throw ex; } + catch(TransportException e) + { + throw toJMSException("Exception while creating message producer:" + e.getMessage(), e); + } } @@ -994,34 +986,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - @Override public void commit() throws JMSException + public void commitImpl() throws AMQException, FailoverException, TransportException { - checkTransacted(); - try - { - if( _txSize > 0 ) - { - messageAcknowledge(_txRangeSet, true); - _txRangeSet.clear(); - _txSize = 0; - } - sendCommit(); - } - catch(SessionException e) + if( _txSize > 0 ) { - JMSException ex = new JMSException("Session exception occured while trying to commit"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; + messageAcknowledge(_txRangeSet, true); + _txRangeSet.clear(); + _txSize = 0; } - catch (AMQException e) + + getQpidSession().setAutoSync(true); + try { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + getQpidSession().txCommit(); } - catch (FailoverException e) + finally { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + getQpidSession().setAutoSync(false); } + // We need to sync so that we get notify of an error. + sync(); } protected final boolean tagLE(long tag1, long tag2) @@ -1383,5 +1367,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sb.append(">"); return sb.toString(); } - + + protected void acknowledgeImpl() + { + RangeSet range = gatherUnackedRangeSet(); + + if(range.size() > 0 ) + { + messageAcknowledge(range, true); + getQpidSession().sync(); + } + } } |