summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarnie McCormack <marnie@apache.org>2006-11-03 10:02:56 +0000
committerMarnie McCormack <marnie@apache.org>2006-11-03 10:02:56 +0000
commit1e26f96640746417fbfbd224cb4d2f36a4df64ff (patch)
tree745032b102d12e6ae3491f2e409d371e5c03c26d
parent408ffc032213d0175da1638c9976cc97cfd977c4 (diff)
downloadqpid-python-1e26f96640746417fbfbd224cb4d2f36a4df64ff.tar.gz
Changes to fix QueueReceiver ClassCastException being thrown by methods in AMQSession which should return this interface. As per QPID-58. Added QueueReceiverAdaptor class now used in AMQSession.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/java@470742 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--client/src/org/apache/qpid/client/AMQSession.java76
-rw-r--r--client/src/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--client/src/org/apache/qpid/client/QueueReceiverAdaptor.java83
3 files changed, 153 insertions, 8 deletions
diff --git a/client/src/org/apache/qpid/client/AMQSession.java b/client/src/org/apache/qpid/client/AMQSession.java
index 8b2c2ec04d..dd868f9dba 100644
--- a/client/src/org/apache/qpid/client/AMQSession.java
+++ b/client/src/org/apache/qpid/client/AMQSession.java
@@ -169,7 +169,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String reason = message.bounceBody.replyText;
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
- //Todo should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
@@ -380,7 +380,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQException e)
{
- throw new JMSException("Unable to create message: " + e);
+ throw new JMSException("Unable to create text message: " + e);
}
}
}
@@ -398,7 +398,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQException e)
{
- throw new JMSException("Unable to create message: " + e);
+ throw new JMSException("Unable to create text message: " + e);
}
}
}
@@ -718,6 +718,34 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}.execute(_connection);
}
+ /**
+ * Creates a QueueReceiver
+ * @param destination
+ * @return QueueReceiver - a wrapper around our MessageConsumer
+ * @throws JMSException
+ */
+ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
+ {
+ AMQQueue dest = (AMQQueue) destination;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+ return new QueueReceiverAdaptor(dest, consumer);
+ }
+
+ /**
+ * Creates a QueueReceiver using a message selector
+ * @param destination
+ * @param messageSelector
+ * @return QueueReceiver - a wrapper around our MessageConsumer
+ * @throws JMSException
+ */
+ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
+ {
+ AMQQueue dest = (AMQQueue) destination;
+ BasicMessageConsumer consumer = (BasicMessageConsumer)
+ createConsumer(destination, messageSelector);
+ return new QueueReceiverAdaptor(dest, consumer);
+ }
+
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
@@ -924,14 +952,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ /**
+ * Creates a QueueReceiver wrapping a MessageConsumer
+ * @param queue
+ * @return QueueReceiver
+ * @throws JMSException
+ */
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
- return (QueueReceiver) createConsumer(queue);
+ AMQQueue dest = (AMQQueue) queue;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+ return new QueueReceiverAdaptor(dest, consumer);
}
+ /**
+ * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
+ * @param queue
+ * @param messageSelector
+ * @return QueueReceiver
+ * @throws JMSException
+ */
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
- return (QueueReceiver) createConsumer(queue, messageSelector);
+ AMQQueue dest = (AMQQueue) queue;
+ BasicMessageConsumer consumer = (BasicMessageConsumer)
+ createConsumer(dest, messageSelector);
+ return new QueueReceiverAdaptor(dest, consumer);
}
public QueueSender createSender(Queue queue) throws JMSException
@@ -961,14 +1007,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ /**
+ * Creates a non-durable subscriber
+ * @param topic
+ * @return TopicSubscriber - a wrapper round our MessageConsumer
+ * @throws JMSException
+ */
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
- return (TopicSubscriber) createConsumer(topic);
+ AMQTopic dest = new AMQTopic(topic.getTopicName());
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
+ /**
+ * Creates a non-durable subscriber with a message selector
+ * @param topic
+ * @param messageSelector
+ * @param noLocal
+ * @return TopicSubscriber - a wrapper round our MessageConsumer
+ * @throws JMSException
+ */
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
- return (TopicSubscriber) createConsumer(topic, messageSelector, noLocal);
+ AMQTopic dest = new AMQTopic(topic.getTopicName());
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
/**
diff --git a/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/client/src/org/apache/qpid/client/BasicMessageConsumer.java
index a6f89fd221..51d7337457 100644
--- a/client/src/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/client/src/org/apache/qpid/client/BasicMessageConsumer.java
@@ -129,7 +129,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private boolean _dups_ok_acknowledge_send;
- BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
+ protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
diff --git a/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java b/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java
new file mode 100644
index 0000000000..fc04181e19
--- /dev/null
+++ b/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.*;
+
+/**
+ * Class that wraps a MessageConsumer for backwards JMS compatibility
+ * Returned by methods in AMQSession etc
+ */
+public class QueueReceiverAdaptor implements QueueReceiver {
+
+ protected MessageConsumer _consumer;
+ protected Queue _queue;
+
+ protected QueueReceiverAdaptor(Queue queue, MessageConsumer consumer)
+ {
+ _consumer = consumer;
+ _queue = queue;
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+ return _consumer.getMessageSelector();
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ return _consumer.getMessageListener();
+ }
+
+ public void setMessageListener(MessageListener messageListener) throws JMSException
+ {
+ _consumer.setMessageListener(messageListener);
+ }
+
+ public Message receive() throws JMSException
+ {
+ return _consumer.receive();
+ }
+
+ public Message receive(long l) throws JMSException
+ {
+ return _consumer.receive(l);
+ }
+
+ public Message receiveNoWait() throws JMSException
+ {
+ return _consumer.receiveNoWait();
+ }
+
+ public void close() throws JMSException
+ {
+ _consumer.close();
+ }
+
+ /**
+ * Return the queue associated with this receiver
+ * @return
+ * @throws JMSException
+ */
+ public Queue getQueue() throws JMSException
+ {
+ return _queue;
+ }
+
+
+}