diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 34 |
1 files changed, 30 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index c010e4c7ed..ccb2b00947 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +126,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return getProtocolHandler().getProtocolVersion(); } + protected void acknowledgeImpl() + { + while (true) + { + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + + acknowledgeMessage(tag, false); + } + } + public void acknowledgeMessage(long deliveryTag, boolean multiple) { BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple); @@ -170,8 +185,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } } - public void sendCommit() throws AMQException, FailoverException + public void commitImpl() throws AMQException, FailoverException, TransportException { + // Acknowledge all delivered messages + while (true) + { + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + acknowledgeMessage(tag, false); + } + final AMQProtocolHandler handler = getProtocolHandler(); handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); @@ -401,12 +428,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException + final boolean immediate, long producerId) throws JMSException { try { return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId, - this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + this, getProtocolHandler(), producerId, immediate, mandatory); } catch (AMQException e) { @@ -615,5 +642,4 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return null; } } - } |