diff options
author | Marnie McCormack <marnie@apache.org> | 2006-11-03 10:02:56 +0000 |
---|---|---|
committer | Marnie McCormack <marnie@apache.org> | 2006-11-03 10:02:56 +0000 |
commit | 1e26f96640746417fbfbd224cb4d2f36a4df64ff (patch) | |
tree | 745032b102d12e6ae3491f2e409d371e5c03c26d | |
parent | 408ffc032213d0175da1638c9976cc97cfd977c4 (diff) | |
download | qpid-python-1e26f96640746417fbfbd224cb4d2f36a4df64ff.tar.gz |
Changes to fix QueueReceiver ClassCastException being thrown by methods in AMQSession which should return this interface. As per QPID-58. Added QueueReceiverAdaptor class now used in AMQSession.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/java@470742 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 153 insertions, 8 deletions
diff --git a/client/src/org/apache/qpid/client/AMQSession.java b/client/src/org/apache/qpid/client/AMQSession.java index 8b2c2ec04d..dd868f9dba 100644 --- a/client/src/org/apache/qpid/client/AMQSession.java +++ b/client/src/org/apache/qpid/client/AMQSession.java @@ -169,7 +169,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String reason = message.bounceBody.replyText; _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - //Todo should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) { _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); @@ -380,7 +380,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw new JMSException("Unable to create message: " + e); + throw new JMSException("Unable to create text message: " + e); } } } @@ -398,7 +398,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw new JMSException("Unable to create message: " + e); + throw new JMSException("Unable to create text message: " + e); } } } @@ -718,6 +718,34 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi }.execute(_connection); } + /** + * Creates a QueueReceiver + * @param destination + * @return QueueReceiver - a wrapper around our MessageConsumer + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination) throws JMSException + { + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + return new QueueReceiverAdaptor(dest, consumer); + } + + /** + * Creates a QueueReceiver using a message selector + * @param destination + * @param messageSelector + * @return QueueReceiver - a wrapper around our MessageConsumer + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException + { + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) + createConsumer(destination, messageSelector); + return new QueueReceiverAdaptor(dest, consumer); + } + public MessageConsumer createConsumer(Destination destination) throws JMSException { return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); @@ -924,14 +952,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + /** + * Creates a QueueReceiver wrapping a MessageConsumer + * @param queue + * @return QueueReceiver + * @throws JMSException + */ public QueueReceiver createReceiver(Queue queue) throws JMSException { - return (QueueReceiver) createConsumer(queue); + AMQQueue dest = (AMQQueue) queue; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + return new QueueReceiverAdaptor(dest, consumer); } + /** + * Creates a QueueReceiver wrapping a MessageConsumer using a message selector + * @param queue + * @param messageSelector + * @return QueueReceiver + * @throws JMSException + */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { - return (QueueReceiver) createConsumer(queue, messageSelector); + AMQQueue dest = (AMQQueue) queue; + BasicMessageConsumer consumer = (BasicMessageConsumer) + createConsumer(dest, messageSelector); + return new QueueReceiverAdaptor(dest, consumer); } public QueueSender createSender(Queue queue) throws JMSException @@ -961,14 +1007,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + /** + * Creates a non-durable subscriber + * @param topic + * @return TopicSubscriber - a wrapper round our MessageConsumer + * @throws JMSException + */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return (TopicSubscriber) createConsumer(topic); + AMQTopic dest = new AMQTopic(topic.getTopicName()); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } + /** + * Creates a non-durable subscriber with a message selector + * @param topic + * @param messageSelector + * @param noLocal + * @return TopicSubscriber - a wrapper round our MessageConsumer + * @throws JMSException + */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { - return (TopicSubscriber) createConsumer(topic, messageSelector, noLocal); + AMQTopic dest = new AMQTopic(topic.getTopicName()); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } /** diff --git a/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/client/src/org/apache/qpid/client/BasicMessageConsumer.java index a6f89fd221..51d7337457 100644 --- a/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -129,7 +129,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private boolean _dups_ok_acknowledge_send; - BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) diff --git a/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java b/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java new file mode 100644 index 0000000000..fc04181e19 --- /dev/null +++ b/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java @@ -0,0 +1,83 @@ +/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.*;
+
+/**
+ * Class that wraps a MessageConsumer for backwards JMS compatibility
+ * Returned by methods in AMQSession etc
+ */
+public class QueueReceiverAdaptor implements QueueReceiver {
+
+ protected MessageConsumer _consumer;
+ protected Queue _queue;
+
+ protected QueueReceiverAdaptor(Queue queue, MessageConsumer consumer)
+ {
+ _consumer = consumer;
+ _queue = queue;
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+ return _consumer.getMessageSelector();
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ return _consumer.getMessageListener();
+ }
+
+ public void setMessageListener(MessageListener messageListener) throws JMSException
+ {
+ _consumer.setMessageListener(messageListener);
+ }
+
+ public Message receive() throws JMSException
+ {
+ return _consumer.receive();
+ }
+
+ public Message receive(long l) throws JMSException
+ {
+ return _consumer.receive(l);
+ }
+
+ public Message receiveNoWait() throws JMSException
+ {
+ return _consumer.receiveNoWait();
+ }
+
+ public void close() throws JMSException
+ {
+ _consumer.close();
+ }
+
+ /**
+ * Return the queue associated with this receiver
+ * @return
+ * @throws JMSException
+ */
+ public Queue getQueue() throws JMSException
+ {
+ return _queue;
+ }
+
+
+}
|