/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.client; import java.util.Map; import javax.jms.Destination; import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicAckBody; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.BasicRecoverBody; import org.apache.qpid.framing.BasicRecoverOkBody; import org.apache.qpid.framing.BasicRecoverSyncBody; import org.apache.qpid.framing.BasicRecoverSyncOkBody; import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelFlowBody; import org.apache.qpid.framing.ChannelFlowOkBody; import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class AMQSession_0_8 extends AMQSession { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); /** * Creates a new session on a connection. * * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. * @param acknowledgeMode The acknowledgement mode for the session. * @param messageFactoryRegistry The message factory factory for the session. * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark); } /** * Creates a new session on a connection with the default message factory factory. * * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. * @param acknowledgeMode The acknowledgement mode for the session. * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); } private ProtocolVersion getProtocolVersion() { return getProtocolHandler().getProtocolVersion(); } protected void acknowledgeImpl() { while (true) { Long tag = _unacknowledgedMessageTags.poll(); if (tag == null) { break; } acknowledgeMessage(tag, false); } } public void acknowledgeMessage(long deliveryTag, boolean multiple) { BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple); final AMQFrame ackFrame = body.generateFrame(_channelId); if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } getProtocolHandler().writeFrame(ackFrame); _unacknowledgedMessageTags.remove(deliveryTag); } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination dest, final boolean nowait) throws AMQException, FailoverException { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). generateFrame(_channelId), QueueBindOkBody.class); } public void sendClose(long timeout) throws AMQException, FailoverException { // we also need to check the state manager for 08/09 as the // _connection variable may not be updated in time by the error receiving // thread. // We can't close the session if we are already in the process of // closing/closed the connection. if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING))) { getProtocolHandler().closeSession(this); getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), ChannelCloseOkBody.class, timeout); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully. } } public void commitImpl() throws AMQException, FailoverException, TransportException { // Acknowledge all delivered messages while (true) { Long tag = _deliveredMessageTags.poll(); if (tag == null) { break; } acknowledgeMessage(tag, false); } final AMQProtocolHandler handler = getProtocolHandler(); handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); } public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map arguments) throws AMQException, FailoverException { FieldTable table = null; if(arguments != null && !arguments.isEmpty()) { table = new FieldTable(); for(Map.Entry entry : arguments.entrySet()) { table.setObject(entry.getKey(), entry.getValue()); } } QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); AMQFrame queueDeclare = body.generateFrame(_channelId); getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); } public void sendRecover() throws AMQException, FailoverException { _unacknowledgedMessageTags.clear(); if (isStrictAMQP()) { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId)); _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else { // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad // in 0-9 we used the cleaner addition of a new sync recover method with its own ok if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class); } else if(getProtocolVersion().equals(ProtocolVersion.v0_9)) { BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); } else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) { BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); } else { throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion()); } } } public void releaseForRollback() { // Reject all the messages that have been received in this session and // have not yet been acknowledged. Should look to remove // _deliveredMessageTags and use _txRangeSet as used by 0-10. // Otherwise messages will be able to arrive out of order to a second // consumer on the queue. Whilst this is within the JMS spec it is not // user friendly and avoidable. while (true) { Long tag = _deliveredMessageTags.poll(); if (tag == null) { break; } rejectMessage(tag, true); } } public void rejectMessage(long deliveryTag, boolean requeue) { if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) { if (_logger.isDebugEnabled()) { _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode()); } BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue); AMQFrame frame = body.generateFrame(_channelId); _connection.getProtocolHandler().writeFrame(frame); } } public boolean isQueueBound(final AMQDestination destination) throws JMSException { return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName()); } public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException { try { AMQMethodEvent response = new FailoverRetrySupport( new FailoverProtectedOperation() { public AMQMethodEvent execute() throws AMQException, FailoverException { AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody (exchangeName, routingKey, queueName).generateFrame(_channelId); return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); } }, _connection).execute(); // Extract and return the response code from the query. ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); return (responseBody.getReplyCode() == 0); } catch (AMQException e) { throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } } @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException { FieldTable arguments = FieldTableFactory.newFieldTable(); if ((messageSelector != null) && !messageSelector.equals("")) { arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); } if (consumer.isAutoClose()) { arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); } if (consumer.isNoConsume()) { arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, new AMQShortString(String.valueOf(tag)), consumer.isNoLocal(), consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), nowait, arguments); AMQFrame jmsConsume = body.generateFrame(_channelId); if (nowait) { protocolHandler.writeFrame(jmsConsume); } else { protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); } } public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException, FailoverException { ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), false,false,false,false,null); AMQFrame exchangeDeclare = body.generateFrame(_channelId); protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); AMQFrame queueDeclare = body.generateFrame(_channelId); protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); } public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException { QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(), queueName, false, false, true); AMQFrame queueDeleteFrame = body.generateFrame(_channelId); getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException { ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend); AMQFrame channelFlowFrame = body.generateFrame(_channelId); _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException { final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal, _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); } public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, long producerId) throws JMSException { try { return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId, this, getProtocolHandler(), producerId, immediate, mandatory); } catch (AMQException e) { JMSException ex = new JMSException("Error creating producer"); ex.initCause(e); ex.setLinkedException(e); throw ex; } } @Override public void messageReceived(UnprocessedMessage message) { if (message instanceof ReturnMessage) { // Return of the bounced message. returnBouncedMessage((ReturnMessage) message); } else { super.messageReceived(message); } } private void returnBouncedMessage(final ReturnMessage msg) { _connection.performConnectionTask(new Runnable() { public void run() { try { // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); AMQShortString reason = msg.getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS) { _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); } else if (errorCode == AMQConstant.NO_ROUTE) { _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); } else { _connection.exceptionReceived( new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); } } catch (Exception e) { _logger.error( "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); } } }); } public void sendRollback() throws AMQException, FailoverException { TxRollbackBody body = getMethodRegistry().createTxRollbackBody(); AMQFrame frame = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class); } public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException { new FailoverRetrySupport( new FailoverProtectedOperation() { public Object execute() throws AMQException, FailoverException { BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); return null; } }, _connection).execute(); } class QueueDeclareOkHandler extends SpecificMethodFrameListener { private long _messageCount; private long _consumerCount; public QueueDeclareOkHandler() { super(getChannelId(), QueueDeclareOkBody.class); } public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException { boolean matches = super.processMethod(channelId, frame); if (matches) { QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame; _messageCount = declareOk.getMessageCount(); _consumerCount = declareOk.getConsumerCount(); } return matches; } } protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException { AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(), amqd.getAMQQueueName(), true, amqd.isDurable(), amqd.isExclusive(), amqd.isAutoDelete(), false, null).generateFrame(_channelId); QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); return okHandler._messageCount; } protected final boolean tagLE(long tag1, long tag2) { return tag1 <= tag2; } protected final boolean updateRollbackMark(long currentMark, long deliveryTag) { return false; } public AMQMessageDelegateFactory getMessageDelegateFactory() { return AMQMessageDelegateFactory.FACTORY_0_8; } public void sync() throws AMQException { declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } protected void flushAcknowledgments() { } @Override protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException { // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects). // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the // client explicitly deletes it. /* intentional no-op */ } public boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map args) throws JMSException { return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName), queueName == null ? null : new AMQShortString(queueName), bindingKey == null ? null : new AMQShortString(bindingKey)); } public AMQException getLastException() { // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for AMQStateManager manager = _connection.getProtocolHandler() .getStateManager(); Exception e = manager.getLastException(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && e != null) { if (e instanceof AMQException) { return (AMQException) e; } else { AMQException amqe = new AMQException(AMQConstant .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), e.getMessage(), e.getCause()); return amqe; } } else { return null; } } }