summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java181
1 files changed, 83 insertions, 98 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 7daebbff04..8ab23a240e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,13 +21,8 @@
package org.apache.qpid.client;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -43,44 +38,20 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -131,7 +102,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
while (true)
{
- Long tag = _unacknowledgedMessageTags.poll();
+ Long tag = getUnacknowledgedMessageTags().poll();
if (tag == null)
{
break;
@@ -145,15 +116,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
- final AMQFrame ackFrame = body.generateFrame(_channelId);
+ final AMQFrame ackFrame = body.generateFrame(getChannelId());
if (_logger.isDebugEnabled())
{
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + getChannelId());
}
getProtocolHandler().writeFrame(ackFrame, !isTransacted());
- _unacknowledgedMessageTags.remove(deliveryTag);
+ getUnacknowledgedMessageTags().remove(deliveryTag);
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
@@ -162,7 +133,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
- generateFrame(_channelId), QueueBindOkBody.class);
+ generateFrame(getChannelId()), QueueBindOkBody.class);
}
public void sendClose(long timeout) throws AMQException, FailoverException
@@ -179,7 +150,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
getProtocolHandler().closeSession(this);
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
+ new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(getChannelId()),
ChannelCloseOkBody.class, timeout);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully.
@@ -191,7 +162,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// Acknowledge all delivered messages
while (true)
{
- Long tag = _deliveredMessageTags.poll();
+ Long tag = getDeliveredMessageTags().poll();
if (tag == null)
{
break;
@@ -202,7 +173,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
final AMQProtocolHandler handler = getProtocolHandler();
- handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
+ handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
}
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
@@ -218,22 +189,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
- AMQFrame queueDeclare = body.generateFrame(_channelId);
+ AMQFrame queueDeclare = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendRecover() throws AMQException, FailoverException
{
enforceRejectBehaviourDuringRecover();
- _prefetchedMessageTags.clear();
- _unacknowledgedMessageTags.clear();
+ getPrefetchedMessageTags().clear();
+ getUnacknowledgedMessageTags().clear();
if (isStrictAMQP())
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
+ getAMQConnection().getProtocolHandler().writeFrame(body.generateFrame(getChannelId()));
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
@@ -243,17 +214,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else
{
@@ -266,9 +237,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags);
+ _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags());
}
- ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values());
+ ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values());
boolean messageListenerFound = false;
boolean serverRejectBehaviourFound = false;
for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
@@ -287,7 +258,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if (serverRejectBehaviourFound)
{
//reject(false) any messages we don't want returned again
- switch(_acknowledgeMode)
+ switch(getAcknowledgeMode())
{
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
@@ -296,7 +267,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
break;
}
case Session.CLIENT_ACKNOWLEDGE:
- for(Long tag : _unacknowledgedMessageTags)
+ for(Long tag : getUnacknowledgedMessageTags())
{
rejectMessage(tag, false);
}
@@ -314,7 +285,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
boolean normalRejectBehaviour = true;
- for (BasicMessageConsumer_0_8 consumer : _consumers.values())
+ for (BasicMessageConsumer_0_8 consumer : getConsumers().values())
{
if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
{
@@ -326,7 +297,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
while (true)
{
- Long tag = _deliveredMessageTags.poll();
+ Long tag = getDeliveredMessageTags().poll();
if (tag == null)
{
break;
@@ -338,8 +309,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void rejectMessage(long deliveryTag, boolean requeue)
{
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)||
- ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
+ if ((getAcknowledgeMode() == CLIENT_ACKNOWLEDGE) || (getAcknowledgeMode() == SESSION_TRANSACTED)||
+ ((getAcknowledgeMode() == AUTO_ACKNOWLEDGE || getAcknowledgeMode() == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
{
if (_logger.isDebugEnabled())
{
@@ -347,9 +318,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
- AMQFrame frame = body.generateFrame(_channelId);
+ AMQFrame frame = body.generateFrame(getChannelId());
- _connection.getProtocolHandler().writeFrame(frame);
+ getAMQConnection().getProtocolHandler().writeFrame(frame);
}
}
@@ -370,12 +341,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public AMQMethodEvent execute() throws AMQException, FailoverException
{
AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
- (exchangeName, routingKey, queueName).generateFrame(_channelId);
+ (exchangeName, routingKey, queueName).generateFrame(getChannelId());
return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
// Extract and return the response code from the query.
ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
@@ -392,7 +363,6 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
AMQShortString queueName,
AMQProtocolHandler protocolHandler,
boolean nowait,
- MessageFilter messageSelector,
int tag) throws AMQException, FailoverException
{
@@ -406,7 +376,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
consumer.getArguments());
- AMQFrame jmsConsume = body.generateFrame(_channelId);
+ AMQFrame jmsConsume = body.generateFrame(getChannelId());
if (nowait)
{
@@ -424,17 +394,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
false,false,false,false,null);
- AMQFrame exchangeDeclare = body.generateFrame(_channelId);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException
+ final boolean nowait, boolean passive) throws AMQException, FailoverException
{
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+ QueueDeclareBody body =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ passive,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null);
- AMQFrame queueDeclare = body.generateFrame(_channelId);
+ AMQFrame queueDeclare = body.generateFrame(getChannelId());
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
@@ -446,7 +424,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
false,
false,
true);
- AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
+ AMQFrame queueDeleteFrame = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
@@ -454,8 +432,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
{
ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
- AMQFrame channelFlowFrame = body.generateFrame(_channelId);
- _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+ getAMQConnection().getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
@@ -464,18 +442,18 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
- exclusive, _acknowledgeMode, noConsume, autoClose);
+ return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
+ getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+ exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
- public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, long producerId) throws JMSException
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, long producerId) throws JMSException
{
try
{
- return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
+ return new BasicMessageProducer_0_8(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(),
this, getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
@@ -505,7 +483,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
private void returnBouncedMessage(final ReturnMessage msg)
{
- _connection.performConnectionTask(new Runnable()
+ getAMQConnection().performConnectionTask(new Runnable()
{
public void run()
{
@@ -513,8 +491,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
+ getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -522,20 +500,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS)
{
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
+ getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ } else if (errorCode == AMQConstant.NO_ROUTE)
{
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- }
- else
+ getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ } else
{
- _connection.exceptionReceived(
+ getAMQConnection().exceptionReceived(
new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
}
- }
- catch (Exception e)
+ } catch (Exception e)
{
_logger.error(
"Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
@@ -571,7 +546,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return null;
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
}
public DestinationCache<AMQQueue> getQueueDestinationCache()
@@ -607,9 +582,18 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return matches;
}
+ public long getMessageCount()
+ {
+ return _messageCount;
+ }
+
+ public long getConsumerCount()
+ {
+ return _consumerCount;
+ }
}
- protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
{
AMQFrame queueDeclare =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -619,10 +603,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
amqd.isExclusive(),
amqd.isAutoDelete(),
false,
- null).generateFrame(_channelId);
+ null).generateFrame(getChannelId());
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
- return okHandler._messageCount;
+ return okHandler.getMessageCount();
}
protected boolean tagLE(long tag1, long tag2)
@@ -647,6 +631,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws AMQException
{
throw new UnsupportedOperationException("The new addressing based sytanx is "
@@ -683,7 +668,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler()
+ AMQStateManager manager = getAMQConnection().getProtocolHandler()
.getStateManager();
Exception e = manager.getLastException();