diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 181 |
1 files changed, 83 insertions, 98 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 7daebbff04..8ab23a240e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,13 +21,8 @@ package org.apache.qpid.client; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.ArrayList; -import java.util.Map; - -import javax.jms.Destination; -import javax.jms.JMSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -43,44 +38,20 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRecoverSyncBody; -import org.apache.qpid.framing.BasicRecoverSyncOkBody; -import org.apache.qpid.framing.BasicRejectBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.TxCommitOkBody; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.Destination; +import javax.jms.JMSException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { @@ -131,7 +102,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { while (true) { - Long tag = _unacknowledgedMessageTags.poll(); + Long tag = getUnacknowledgedMessageTags().poll(); if (tag == null) { break; @@ -145,15 +116,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple); - final AMQFrame ackFrame = body.generateFrame(_channelId); + final AMQFrame ackFrame = body.generateFrame(getChannelId()); if (_logger.isDebugEnabled()) { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + getChannelId()); } getProtocolHandler().writeFrame(ackFrame, !isTransacted()); - _unacknowledgedMessageTags.remove(deliveryTag); + getUnacknowledgedMessageTags().remove(deliveryTag); } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, @@ -162,7 +133,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). - generateFrame(_channelId), QueueBindOkBody.class); + generateFrame(getChannelId()), QueueBindOkBody.class); } public void sendClose(long timeout) throws AMQException, FailoverException @@ -179,7 +150,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().closeSession(this); getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), + new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(getChannelId()), ChannelCloseOkBody.class, timeout); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully. @@ -191,7 +162,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // Acknowledge all delivered messages while (true) { - Long tag = _deliveredMessageTags.poll(); + Long tag = getDeliveredMessageTags().poll(); if (tag == null) { break; @@ -202,7 +173,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe final AMQProtocolHandler handler = getProtocolHandler(); - handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); + handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class); } public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, @@ -218,22 +189,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); - AMQFrame queueDeclare = body.generateFrame(_channelId); + AMQFrame queueDeclare = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); } public void sendRecover() throws AMQException, FailoverException { enforceRejectBehaviourDuringRecover(); - _prefetchedMessageTags.clear(); - _unacknowledgedMessageTags.clear(); + getPrefetchedMessageTags().clear(); + getUnacknowledgedMessageTags().clear(); if (isStrictAMQP()) { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId)); + getAMQConnection().getProtocolHandler().writeFrame(body.generateFrame(getChannelId())); _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else @@ -243,17 +214,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class); } else if(getProtocolVersion().equals(ProtocolVersion.v0_9)) { BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); - _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) { BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); - _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else { @@ -266,9 +237,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { if (_logger.isDebugEnabled()) { - _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags); + _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags()); } - ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values()); + ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; for(BasicMessageConsumer_0_8 consumer : consumersToCheck) @@ -287,7 +258,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (serverRejectBehaviourFound) { //reject(false) any messages we don't want returned again - switch(_acknowledgeMode) + switch(getAcknowledgeMode()) { case Session.DUPS_OK_ACKNOWLEDGE: case Session.AUTO_ACKNOWLEDGE: @@ -296,7 +267,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe break; } case Session.CLIENT_ACKNOWLEDGE: - for(Long tag : _unacknowledgedMessageTags) + for(Long tag : getUnacknowledgedMessageTags()) { rejectMessage(tag, false); } @@ -314,7 +285,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // consumer on the queue. Whilst this is within the JMS spec it is not // user friendly and avoidable. boolean normalRejectBehaviour = true; - for (BasicMessageConsumer_0_8 consumer : _consumers.values()) + for (BasicMessageConsumer_0_8 consumer : getConsumers().values()) { if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) { @@ -326,7 +297,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe while (true) { - Long tag = _deliveredMessageTags.poll(); + Long tag = getDeliveredMessageTags().poll(); if (tag == null) { break; @@ -338,8 +309,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public void rejectMessage(long deliveryTag, boolean requeue) { - if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)|| - ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners())) + if ((getAcknowledgeMode() == CLIENT_ACKNOWLEDGE) || (getAcknowledgeMode() == SESSION_TRANSACTED)|| + ((getAcknowledgeMode() == AUTO_ACKNOWLEDGE || getAcknowledgeMode() == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners())) { if (_logger.isDebugEnabled()) { @@ -347,9 +318,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue); - AMQFrame frame = body.generateFrame(_channelId); + AMQFrame frame = body.generateFrame(getChannelId()); - _connection.getProtocolHandler().writeFrame(frame); + getAMQConnection().getProtocolHandler().writeFrame(frame); } } @@ -370,12 +341,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public AMQMethodEvent execute() throws AMQException, FailoverException { AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody - (exchangeName, routingKey, queueName).generateFrame(_channelId); + (exchangeName, routingKey, queueName).generateFrame(getChannelId()); return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); } - }, _connection).execute(); + }, getAMQConnection()).execute(); // Extract and return the response code from the query. ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); @@ -392,7 +363,6 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, - MessageFilter messageSelector, int tag) throws AMQException, FailoverException { @@ -406,7 +376,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe consumer.getArguments()); - AMQFrame jmsConsume = body.generateFrame(_channelId); + AMQFrame jmsConsume = body.generateFrame(getChannelId()); if (nowait) { @@ -424,17 +394,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), false,false,false,false,null); - AMQFrame exchangeDeclare = body.generateFrame(_channelId); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException + final boolean nowait, boolean passive) throws AMQException, FailoverException { - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); + QueueDeclareBody body = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + passive, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null); - AMQFrame queueDeclare = body.generateFrame(_channelId); + AMQFrame queueDeclare = body.generateFrame(getChannelId()); protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); } @@ -446,7 +424,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe false, false, true); - AMQFrame queueDeleteFrame = body.generateFrame(_channelId); + AMQFrame queueDeleteFrame = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } @@ -454,8 +432,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException { ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend); - AMQFrame channelFlowFrame = body.generateFrame(_channelId); - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + AMQFrame channelFlowFrame = body.generateFrame(getChannelId()); + getAMQConnection().getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, @@ -464,18 +442,18 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow, - exclusive, _acknowledgeMode, noConsume, autoClose); + return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, + getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow, + exclusive, getAcknowledgeMode(), noConsume, autoClose); } - public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, long producerId) throws JMSException + public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, long producerId) throws JMSException { try { - return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId, + return new BasicMessageProducer_0_8(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, getProtocolHandler(), producerId, immediate, mandatory); } catch (AMQException e) @@ -505,7 +483,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe private void returnBouncedMessage(final ReturnMessage msg) { - _connection.performConnectionTask(new Runnable() + getAMQConnection().performConnectionTask(new Runnable() { public void run() { @@ -513,8 +491,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = - _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache); + getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(), + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache); AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); AMQShortString reason = msg.getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); @@ -522,20 +500,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS) { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } - else if (errorCode == AMQConstant.NO_ROUTE) + getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); + } else if (errorCode == AMQConstant.NO_ROUTE) { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } - else + getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); + } else { - _connection.exceptionReceived( + getAMQConnection().exceptionReceived( new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); } - } - catch (Exception e) + } catch (Exception e) { _logger.error( "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", @@ -571,7 +546,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return null; } - }, _connection).execute(); + }, getAMQConnection()).execute(); } public DestinationCache<AMQQueue> getQueueDestinationCache() @@ -607,9 +582,18 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return matches; } + public long getMessageCount() + { + return _messageCount; + } + + public long getConsumerCount() + { + return _consumerCount; + } } - protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException + protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException { AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(), @@ -619,10 +603,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe amqd.isExclusive(), amqd.isAutoDelete(), false, - null).generateFrame(_channelId); + null).generateFrame(getChannelId()); QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); - return okHandler._messageCount; + return okHandler.getMessageCount(); } protected boolean tagLE(long tag1, long tag2) @@ -647,6 +631,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, + boolean noLocal, boolean noWait) throws AMQException { throw new UnsupportedOperationException("The new addressing based sytanx is " @@ -683,7 +668,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for - AMQStateManager manager = _connection.getProtocolHandler() + AMQStateManager manager = getAMQConnection().getProtocolHandler() .getStateManager(); Exception e = manager.getLastException(); |