diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2006-12-06 15:56:03 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2006-12-06 15:56:03 +0000 |
commit | cf3c35c56bf467619fe84d06d1bf322b95e65f80 (patch) | |
tree | ace03e7c3200b66876b81919d8ecaf5e17fd7c80 /java | |
parent | 2bacb1f07e02e382a1e48f2e5cef475be69b8475 (diff) | |
download | qpid-python-cf3c35c56bf467619fe84d06d1bf322b95e65f80.tar.gz |
This is a fix for QPID-159. These implementations now throw the correct exceptions as required by the spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@483129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 167 insertions, 18 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f97ea6bf1e..ded2152bf8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -159,11 +159,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { + checkPreConditions(); return _messageSelector; } public MessageListener getMessageListener() throws JMSException { + checkPreConditions(); return (MessageListener) _messageListener.get(); } @@ -179,7 +181,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void setMessageListener(MessageListener messageListener) throws JMSException { - checkNotClosed(); + checkPreConditions(); //if the current listener is non-null and the session is not stopped, then //it is an error to call this method. @@ -277,7 +279,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - checkNotClosed(); + checkPreConditions(); acquireReceiving(); @@ -311,7 +313,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - checkNotClosed(); + checkPreConditions(); acquireReceiving(); @@ -520,7 +522,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private void deregisterConsumer() { - _session.deregisterConsumer(_consumerTag); + _session.deregisterConsumer(_consumerTag); } public String getConsumerTag() @@ -529,7 +531,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } public void setConsumerTag(String consumerTag) - { + { _consumerTag = consumerTag; } + + public AMQSession getSession() { + return _session; + } + + private void checkPreConditions() throws JMSException{ + + this.checkNotClosed(); + + if(_session == null || _session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 14cafc3558..8d6287eca3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -143,6 +143,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageID(boolean b) throws JMSException { + checkPreConditions(); checkNotClosed(); // IGNORED } @@ -156,7 +157,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageTimestamp(boolean b) throws JMSException { - checkNotClosed(); + checkPreConditions(); _disableTimestamps = b; } @@ -168,7 +169,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDeliveryMode(int i) throws JMSException { - checkNotClosed(); + checkPreConditions(); if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) { throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + @@ -185,7 +186,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setPriority(int i) throws JMSException { - checkNotClosed(); + checkPreConditions(); if (i < 0 || i > 9) { throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); @@ -201,7 +202,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setTimeToLive(long l) throws JMSException { - checkNotClosed(); + checkPreConditions(); if (l < 0) { throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); @@ -229,6 +230,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, @@ -238,6 +240,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -247,6 +250,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, boolean immediate) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -257,6 +261,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory, @@ -266,7 +271,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Destination destination, Message message) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -279,7 +284,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -292,7 +297,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive, boolean mandatory) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -305,7 +310,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -319,7 +324,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j boolean immediate, boolean waitUntilSent) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -334,7 +339,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { throw new JMSException("Unsupported destination class: " + (destination != null ? destination.getClass() : null)); - } + } declareDestination((AMQDestination)destination); } @@ -481,4 +486,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkNotClosed(); _encoding = encoding; } + + private void checkPreConditions() throws IllegalStateException, JMSException { + checkNotClosed(); + + if(_destination == null){ + throw new UnsupportedOperationException("Destination is null"); + } + + if(_session == null || _session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } + + public AMQSession getSession() { + return _session; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java index 57e458d833..21ec50c046 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java @@ -39,31 +39,37 @@ public class QueueReceiverAdaptor implements QueueReceiver { public String getMessageSelector() throws JMSException { + checkPreConditions(); return _consumer.getMessageSelector(); } public MessageListener getMessageListener() throws JMSException { + checkPreConditions(); return _consumer.getMessageListener(); } public void setMessageListener(MessageListener messageListener) throws JMSException { + checkPreConditions(); _consumer.setMessageListener(messageListener); } public Message receive() throws JMSException { + checkPreConditions(); return _consumer.receive(); } public Message receive(long l) throws JMSException { + checkPreConditions(); return _consumer.receive(l); } public Message receiveNoWait() throws JMSException { + checkPreConditions(); return _consumer.receiveNoWait(); } @@ -79,8 +85,26 @@ public class QueueReceiverAdaptor implements QueueReceiver { */ public Queue getQueue() throws JMSException { + checkPreConditions(); return _queue; } + private void checkPreConditions() throws javax.jms.IllegalStateException { + BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer; + + if (msgConsumer.isClosed() ){ + throw new javax.jms.IllegalStateException("Consumer is closed"); + } + + if(_queue == null){ + throw new UnsupportedOperationException("Queue is null"); + } + + AMQSession session = msgConsumer.getSession(); + + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index cfdea2ad15..15bf4a125f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -1,6 +1,7 @@ package org.apache.qpid.client; import javax.jms.Destination; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; @@ -11,6 +12,7 @@ public class QueueSenderAdapter implements QueueSender { private MessageProducer delegate; private Queue queue; + private boolean closed = false; public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){ delegate = msgProducer; @@ -18,31 +20,35 @@ public class QueueSenderAdapter implements QueueSender { } public Queue getQueue() throws JMSException { + checkPreConditions(); return queue; } public void send(Message msg) throws JMSException { + checkPreConditions(); delegate.send(msg); } public void send(Queue queue, Message msg) throws JMSException { + checkPreConditions(); delegate.send(queue, msg); } public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - + checkPreConditions(); delegate.send(msg, deliveryMode,priority,timeToLive); } public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - + checkPreConditions(); delegate.send(queue,msg, deliveryMode,priority,timeToLive); } public void close() throws JMSException { delegate.close(); + closed = true; } public int getDeliveryMode() throws JMSException { @@ -70,35 +76,59 @@ public class QueueSenderAdapter implements QueueSender { } public void send(Destination dest, Message msg) throws JMSException { + checkPreConditions(); delegate.send(dest,msg); } public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); delegate.send(msg, deliveryMode,priority,timeToLive); } public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); delegate.send(dest,msg, deliveryMode,priority,timeToLive); } public void setDeliveryMode(int deliveryMode) throws JMSException { + checkPreConditions(); delegate.setDeliveryMode(deliveryMode); } public void setDisableMessageID(boolean disableMessageID) throws JMSException { + checkPreConditions(); delegate.setDisableMessageID(disableMessageID); } public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { + checkPreConditions(); delegate.setDisableMessageTimestamp(disableMessageTimestamp); } public void setPriority(int priority) throws JMSException { + checkPreConditions(); delegate.setPriority(priority); } public void setTimeToLive(long timeToLive) throws JMSException { + checkPreConditions(); delegate.setTimeToLive(timeToLive); } + + private void checkPreConditions() throws IllegalStateException, IllegalStateException { + if (closed){ + throw new javax.jms.IllegalStateException("Publisher is closed"); + } + + if(queue == null){ + throw new UnsupportedOperationException("Queue is null"); + } + + AMQSession session = ((BasicMessageProducer)delegate).getSession(); + + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java index 53a42e5185..0702202c2a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java @@ -1,6 +1,7 @@ package org.apache.qpid.client; import javax.jms.Destination; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; @@ -11,6 +12,7 @@ public class TopicPublisherAdapter implements TopicPublisher { private MessageProducer delegate; private Topic topic; + private boolean closed = false; public TopicPublisherAdapter(MessageProducer msgProducer, Topic topic){ delegate = msgProducer; @@ -18,29 +20,35 @@ public class TopicPublisherAdapter implements TopicPublisher { } public Topic getTopic() throws JMSException { + checkPreConditions(); return topic; } public void publish(Message msg) throws JMSException { + checkPreConditions(); delegate.send(msg); } public void publish(Topic topic, Message msg) throws JMSException { + checkPreConditions(); delegate.send(topic,msg); } public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); delegate.send(msg, deliveryMode,priority,timeToLive); } public void publish(Topic topic, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); delegate.send(topic,msg, deliveryMode,priority,timeToLive); } public void close() throws JMSException { delegate.close(); + closed = true; } public int getDeliveryMode() throws JMSException { @@ -68,40 +76,63 @@ public class TopicPublisherAdapter implements TopicPublisher { } public void send(Message msg) throws JMSException { + checkPreConditions(); delegate.send(msg); } public void send(Destination dest, Message msg) throws JMSException { + checkPreConditions(); delegate.send(dest,msg); } public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); delegate.send(msg, deliveryMode,priority,timeToLive); } public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); delegate.send(dest,msg, deliveryMode,priority,timeToLive); } - + public void setDeliveryMode(int deliveryMode) throws JMSException { + checkPreConditions(); delegate.setDeliveryMode(deliveryMode); } public void setDisableMessageID(boolean disableMessageID) throws JMSException { + checkPreConditions(); delegate.setDisableMessageID(disableMessageID); } public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { + checkPreConditions(); delegate.setDisableMessageTimestamp(disableMessageTimestamp); } public void setPriority(int priority) throws JMSException { + checkPreConditions(); delegate.setPriority(priority); } public void setTimeToLive(long timeToLive) throws JMSException { + checkPreConditions(); delegate.setTimeToLive(timeToLive); } + private void checkPreConditions() throws IllegalStateException, IllegalStateException { + if (closed){ + throw new javax.jms.IllegalStateException("Publisher is closed"); + } + + if(topic == null){ + throw new UnsupportedOperationException("Topic is null"); + } + + AMQSession session = ((BasicMessageProducer)delegate).getSession(); + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index c776a9943e..06e353e271 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -43,37 +44,45 @@ class TopicSubscriberAdaptor implements TopicSubscriber _consumer = consumer; _noLocal = noLocal; } + TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer) { this(topic, consumer, consumer.isNoLocal()); } + public Topic getTopic() throws JMSException { + checkPreConditions(); return _topic; } public boolean getNoLocal() throws JMSException { + checkPreConditions(); return _noLocal; } public String getMessageSelector() throws JMSException { + checkPreConditions(); return _consumer.getMessageSelector(); } public MessageListener getMessageListener() throws JMSException { + checkPreConditions(); return _consumer.getMessageListener(); } public void setMessageListener(MessageListener messageListener) throws JMSException { + checkPreConditions(); _consumer.setMessageListener(messageListener); } public Message receive() throws JMSException { + checkPreConditions(); return _consumer.receive(); } @@ -84,6 +93,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber public Message receiveNoWait() throws JMSException { + checkPreConditions(); return _consumer.receiveNoWait(); } @@ -91,4 +101,22 @@ class TopicSubscriberAdaptor implements TopicSubscriber { _consumer.close(); } + + private void checkPreConditions() throws javax.jms.IllegalStateException{ + BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer; + + if (msgConsumer.isClosed() ){ + throw new javax.jms.IllegalStateException("Consumer is closed"); + } + + if(_topic == null){ + throw new UnsupportedOperationException("Topic is null"); + } + + AMQSession session = msgConsumer.getSession(); + + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } |