diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 83 |
1 files changed, 7 insertions, 76 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 369c8a6e9d..f41b1c94fa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -38,7 +38,6 @@ import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQFrame; @@ -76,12 +75,12 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { + /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -91,7 +90,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknowledgement mode for the session. + * @param acknowledgeMode The acknoledgement mode for the session. * @param messageFactoryRegistry The message factory factory for the session. * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. @@ -109,7 +108,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknowledgement mode for the session. + * @param acknowledgeMode The acknoledgement mode for the session. * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ @@ -125,20 +124,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return getProtocolHandler().getProtocolVersion(); } - protected void acknowledgeImpl() - { - while (true) - { - Long tag = _unacknowledgedMessageTags.poll(); - if (tag == null) - { - break; - } - - acknowledgeMessage(tag, false); - } - } - public void acknowledgeMessage(long deliveryTag, boolean multiple) { BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple); @@ -168,7 +153,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B // we also need to check the state manager for 08/09 as the // _connection variable may not be updated in time by the error receiving // thread. - // We can't close the session if we are already in the process of + // We can't close the session if we are alreadying in the process of // closing/closed the connection. if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) @@ -184,20 +169,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } } - public void commitImpl() throws AMQException, FailoverException, TransportException + public void sendCommit() throws AMQException, FailoverException { - // Acknowledge all delivered messages - while (true) - { - Long tag = _deliveredMessageTags.poll(); - if (tag == null) - { - break; - } - - acknowledgeMessage(tag, false); - } - final AMQProtocolHandler handler = getProtocolHandler(); handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); @@ -427,12 +400,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, long producerId) throws JMSException + final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException { try { return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId, - this, getProtocolHandler(), producerId, immediate, mandatory); + this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); } catch (AMQException e) { @@ -604,18 +577,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } - @Override - protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) - throws JMSException - { - // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted - // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects). - // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the - // client explicitly deletes it. - - /* intentional no-op */ - } - public boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String, Object> args) throws JMSException { @@ -623,34 +584,4 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B queueName == null ? null : new AMQShortString(queueName), bindingKey == null ? null : new AMQShortString(bindingKey)); } - - - public AMQException getLastException() - { - // if the Connection has closed then we should throw any exception that - // has occurred that we were not waiting for - AMQStateManager manager = _connection.getProtocolHandler() - .getStateManager(); - - Exception e = manager.getLastException(); - if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) - && e != null) - { - if (e instanceof AMQException) - { - return (AMQException) e; - } - else - { - AMQException amqe = new AMQException(AMQConstant - .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), - e.getMessage(), e.getCause()); - return amqe; - } - } - else - { - return null; - } - } } |