summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java619
1 files changed, 619 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
new file mode 100644
index 0000000000..c010e4c7ed
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -0,0 +1,619 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.ReturnMessage;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicAckBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+{
+
+ /** Used for debugging. */
+ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+
+ /**
+ * Creates a new session on a connection.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param messageFactoryRegistry The message factory factory for the session.
+ * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
+ */
+ AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ {
+
+ super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+ }
+
+ /**
+ * Creates a new session on a connection with the default message factory factory.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ */
+ AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+ int defaultPrefetchLow)
+ {
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+ defaultPrefetchLow);
+ }
+
+ private ProtocolVersion getProtocolVersion()
+ {
+ return getProtocolHandler().getProtocolVersion();
+ }
+
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ {
+ BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
+
+ final AMQFrame ackFrame = body.generateFrame(_channelId);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ }
+
+ getProtocolHandler().writeFrame(ackFrame);
+ _unacknowledgedMessageTags.remove(deliveryTag);
+ }
+
+ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName, final AMQDestination dest,
+ final boolean nowait) throws AMQException, FailoverException
+ {
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
+ (getTicket(),queueName,exchangeName,routingKey,false,arguments).
+ generateFrame(_channelId), QueueBindOkBody.class);
+ }
+
+ public void sendClose(long timeout) throws AMQException, FailoverException
+ {
+ // we also need to check the state manager for 08/09 as the
+ // _connection variable may not be updated in time by the error receiving
+ // thread.
+ // We can't close the session if we are alreadying in the process of
+ // closing/closed the connection.
+
+ if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+ || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING)))
+ {
+
+ getProtocolHandler().closeSession(this);
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
+ ChannelCloseOkBody.class, timeout);
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
+ }
+ }
+
+ public void sendCommit() throws AMQException, FailoverException
+ {
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
+ }
+
+ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
+ FailoverException
+ {
+ FieldTable table = null;
+ if(arguments != null && !arguments.isEmpty())
+ {
+ table = new FieldTable();
+ for(Map.Entry<String, Object> entry : arguments.entrySet())
+ {
+ table.setObject(entry.getKey(), entry.getValue());
+ }
+ }
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
+ AMQFrame queueDeclare = body.generateFrame(_channelId);
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ }
+
+ public void sendRecover() throws AMQException, FailoverException
+ {
+ _unacknowledgedMessageTags.clear();
+
+ if (isStrictAMQP())
+ {
+ // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+
+ BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
+ _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ }
+ else
+ {
+ // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
+ // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
+ if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+ }
+ else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
+ {
+ BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
+ _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ }
+ else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
+ {
+ BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
+ _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ }
+ else
+ {
+ throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
+ }
+ }
+ }
+
+ public void releaseForRollback()
+ {
+ // Reject all the messages that have been received in this session and
+ // have not yet been acknowledged. Should look to remove
+ // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+ // Otherwise messages will be able to arrive out of order to a second
+ // consumer on the queue. Whilst this is within the JMS spec it is not
+ // user friendly and avoidable.
+ while (true)
+ {
+ Long tag = _deliveredMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ rejectMessage(tag, true);
+ }
+ }
+
+ public void rejectMessage(long deliveryTag, boolean requeue)
+ {
+ if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
+ }
+
+ BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
+ AMQFrame frame = body.generateFrame(_channelId);
+
+ _connection.getProtocolHandler().writeFrame(frame);
+ }
+ }
+
+ public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ {
+ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
+ }
+
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException
+ {
+ try
+ {
+ AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
+ (exchangeName, routingKey, queueName).generateFrame(_channelId);
+
+ return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+
+ }
+ }, _connection).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ return (responseBody.getReplyCode() == 0);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
+ }
+ }
+
+ @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ AMQShortString queueName,
+ AMQProtocolHandler protocolHandler,
+ boolean nowait,
+ String messageSelector,
+ int tag) throws AMQException, FailoverException
+ {
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if ((messageSelector != null) && !messageSelector.equals(""))
+ {
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+ }
+
+ if (consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+
+ if (consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
+
+ BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
+ queueName,
+ new AMQShortString(String.valueOf(tag)),
+ consumer.isNoLocal(),
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+ consumer.isExclusive(),
+ nowait,
+ arguments);
+
+
+ AMQFrame jmsConsume = body.generateFrame(_channelId);
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(jmsConsume);
+ }
+ else
+ {
+ protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ }
+ }
+
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException
+ {
+ ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
+ name.toString().startsWith("amq."),
+ false,false,false,false,null);
+ AMQFrame exchangeDeclare = body.generateFrame(_channelId);
+
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException
+ {
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+
+ AMQFrame queueDeclare = body.generateFrame(_channelId);
+
+ protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ }
+
+ public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
+ {
+ QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
+ queueName,
+ false,
+ false,
+ true);
+ AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
+
+ getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ }
+
+ public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
+ {
+ ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
+ AMQFrame channelFlowFrame = body.generateFrame(_channelId);
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ }
+
+ public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
+ final boolean noConsume, final boolean autoClose) throws JMSException
+ {
+
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
+ _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+ exclusive, _acknowledgeMode, noConsume, autoClose);
+ }
+
+
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException
+ {
+ try
+ {
+ return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
+ this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Error creating producer");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+
+ throw ex;
+ }
+ }
+
+
+ @Override public void messageReceived(UnprocessedMessage message)
+ {
+
+ if (message instanceof ReturnMessage)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage((ReturnMessage) message);
+ }
+ else
+ {
+ super.messageReceived(message);
+ }
+ }
+
+ private void returnBouncedMessage(final ReturnMessage msg)
+ {
+ _connection.performConnectionTask(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ }
+ else
+ {
+ _connection.exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
+ }
+ });
+ }
+
+
+
+
+ public void sendRollback() throws AMQException, FailoverException
+ {
+ TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
+ AMQFrame frame = body.generateFrame(getChannelId());
+ getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
+ }
+
+ public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+ {
+ new FailoverRetrySupport<Object, AMQException>(
+ new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
+ }
+
+ class QueueDeclareOkHandler extends SpecificMethodFrameListener
+ {
+
+ private long _messageCount;
+ private long _consumerCount;
+
+ public QueueDeclareOkHandler()
+ {
+ super(getChannelId(), QueueDeclareOkBody.class);
+ }
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+ {
+ boolean matches = super.processMethod(channelId, frame);
+ if (matches)
+ {
+ QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+ _messageCount = declareOk.getMessageCount();
+ _consumerCount = declareOk.getConsumerCount();
+ }
+ return matches;
+ }
+
+ }
+
+ protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ {
+ AMQFrame queueDeclare =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ true,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null).generateFrame(_channelId);
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ return okHandler._messageCount;
+ }
+
+ protected final boolean tagLE(long tag1, long tag2)
+ {
+ return tag1 <= tag2;
+ }
+
+ protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ {
+ return false;
+ }
+
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_8;
+ }
+
+ public void sync() throws AMQException
+ {
+ declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
+ }
+
+ public void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException
+ {
+ throw new UnsupportedOperationException("The new addressing based sytanx is "
+ + "not supported for AMQP 0-8/0-9 versions");
+ }
+
+ protected void flushAcknowledgments()
+ {
+
+ }
+
+ public boolean isQueueBound(String exchangeName, String queueName,
+ String bindingKey, Map<String, Object> args) throws JMSException
+ {
+ return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName),
+ queueName == null ? null : new AMQShortString(queueName),
+ bindingKey == null ? null : new AMQShortString(bindingKey));
+ }
+
+
+ public AMQException getLastException()
+ {
+ // if the Connection has closed then we should throw any exception that
+ // has occurred that we were not waiting for
+ AMQStateManager manager = _connection.getProtocolHandler()
+ .getStateManager();
+
+ Exception e = manager.getLastException();
+ if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+ && e != null)
+ {
+ if (e instanceof AMQException)
+ {
+ return (AMQException) e;
+ }
+ else
+ {
+ AMQException amqe = new AMQException(AMQConstant
+ .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
+ e.getMessage(), e.getCause());
+ return amqe;
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+}