summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java84
1 files changed, 75 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 2c88d6f557..2442b157f1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -25,10 +25,11 @@ import javax.jms.*;
import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-public final class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
@@ -218,6 +219,7 @@ public final class AMQSession_0_8 extends AMQSession
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
}
+
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException
{
@@ -245,10 +247,14 @@ public final class AMQSession_0_8 extends AMQSession
{
throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
- }
-
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
- String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ }
+
+ @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ AMQShortString queueName,
+ AMQProtocolHandler protocolHandler,
+ boolean nowait,
+ String messageSelector,
+ int tag) throws AMQException, FailoverException
{
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -268,7 +274,7 @@ public final class AMQSession_0_8 extends AMQSession
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
- tag,
+ new AMQShortString(String.valueOf(tag)),
consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
@@ -337,7 +343,7 @@ public final class AMQSession_0_8 extends AMQSession
}
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent, long producerId)
{
@@ -345,6 +351,66 @@ public final class AMQSession_0_8 extends AMQSession
this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
+
+ @Override public void messageReceived(UnprocessedMessage message)
+ {
+
+ if (message instanceof ReturnMessage)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage((ReturnMessage) message);
+ }
+ else
+ {
+ super.messageReceived(message);
+ }
+ }
+
+ private void returnBouncedMessage(final ReturnMessage msg)
+ {
+ _connection.performConnectionTask(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ }
+ else
+ {
+ _connection.exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
+ }
+ });
+ }
+
+
+
+
public void sendRollback() throws AMQException, FailoverException
{
TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
@@ -365,7 +431,7 @@ public final class AMQSession_0_8 extends AMQSession
checkNotClosed();
AMQTopic origTopic = checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))