diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 84 |
1 files changed, 75 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 2c88d6f557..2442b157f1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -25,10 +25,11 @@ import javax.jms.*; import javax.jms.IllegalStateException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; @@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory; import java.util.Map; -public final class AMQSession_0_8 extends AMQSession +public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { /** Used for debugging. */ @@ -218,6 +219,7 @@ public final class AMQSession_0_8 extends AMQSession return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName()); } + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException { @@ -245,10 +247,14 @@ public final class AMQSession_0_8 extends AMQSession { throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } - } - - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, - String messageSelector, AMQShortString tag) throws AMQException, FailoverException + } + + @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, + AMQShortString queueName, + AMQProtocolHandler protocolHandler, + boolean nowait, + String messageSelector, + int tag) throws AMQException, FailoverException { FieldTable arguments = FieldTableFactory.newFieldTable(); if ((messageSelector != null) && !messageSelector.equals("")) @@ -268,7 +274,7 @@ public final class AMQSession_0_8 extends AMQSession BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, - tag, + new AMQShortString(String.valueOf(tag)), consumer.isNoLocal(), consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), @@ -337,7 +343,7 @@ public final class AMQSession_0_8 extends AMQSession } - public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId) { @@ -345,6 +351,66 @@ public final class AMQSession_0_8 extends AMQSession this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); } + + @Override public void messageReceived(UnprocessedMessage message) + { + + if (message instanceof ReturnMessage) + { + // Return of the bounced message. + returnBouncedMessage((ReturnMessage) message); + } + else + { + super.messageReceived(message); + } + } + + private void returnBouncedMessage(final ReturnMessage msg) + { + _connection.performConnectionTask(new Runnable() + { + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); + } + + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } + } + }); + } + + + + public void sendRollback() throws AMQException, FailoverException { TxRollbackBody body = getMethodRegistry().createTxRollbackBody(); @@ -365,7 +431,7 @@ public final class AMQSession_0_8 extends AMQSession checkNotClosed(); AMQTopic origTopic = checkValidTopic(topic); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name); if (subscriber != null) { if (subscriber.getTopic().equals(topic)) |