summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java83
1 files changed, 7 insertions, 76 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 369c8a6e9d..f41b1c94fa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -38,7 +38,6 @@ import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
@@ -76,12 +75,12 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -91,7 +90,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknowledgement mode for the session.
+ * @param acknowledgeMode The acknoledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
@@ -109,7 +108,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknowledgement mode for the session.
+ * @param acknowledgeMode The acknoledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
@@ -125,20 +124,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return getProtocolHandler().getProtocolVersion();
}
- protected void acknowledgeImpl()
- {
- while (true)
- {
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
- }
-
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -168,7 +153,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
// we also need to check the state manager for 08/09 as the
// _connection variable may not be updated in time by the error receiving
// thread.
- // We can't close the session if we are already in the process of
+ // We can't close the session if we are alreadying in the process of
// closing/closed the connection.
if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -184,20 +169,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
}
- public void commitImpl() throws AMQException, FailoverException, TransportException
+ public void sendCommit() throws AMQException, FailoverException
{
- // Acknowledge all delivered messages
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
-
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
@@ -427,12 +400,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, long producerId) throws JMSException
+ final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
- this, getProtocolHandler(), producerId, immediate, mandatory);
+ this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
catch (AMQException e)
{
@@ -604,18 +577,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
- @Override
- protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
- throws JMSException
- {
- // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
- // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
- // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
- // client explicitly deletes it.
-
- /* intentional no-op */
- }
-
public boolean isQueueBound(String exchangeName, String queueName,
String bindingKey, Map<String, Object> args) throws JMSException
{
@@ -623,34 +584,4 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
-
-
- public AMQException getLastException()
- {
- // if the Connection has closed then we should throw any exception that
- // has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler()
- .getStateManager();
-
- Exception e = manager.getLastException();
- if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
- && e != null)
- {
- if (e instanceof AMQException)
- {
- return (AMQException) e;
- }
- else
- {
- AMQException amqe = new AMQException(AMQConstant
- .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
- e.getMessage(), e.getCause());
- return amqe;
- }
- }
- else
- {
- return null;
- }
- }
}