diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2006-12-11 16:16:55 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2006-12-11 16:16:55 +0000 |
commit | 5f3d9cf0aa6da8f619ca9fcc7fa8294efcaade8e (patch) | |
tree | dbd41b2d16a5f2a2c85ef0ba18c39500bbe80bca /java | |
parent | 57a26dfe616fb18225e83cf7ca963786b3415a5c (diff) | |
download | qpid-python-5f3d9cf0aa6da8f619ca9fcc7fa8294efcaade8e.tar.gz |
This contains a fix for QPID-165 and QPID-166
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@485735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
10 files changed, 137 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0bb8736227..9dcbfca6bc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -54,6 +54,7 @@ import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Enumeration; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -550,8 +551,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); - // TODO Auto-generated method stub - return null; + return QpidConnectionMetaData.instance(); + } public ExceptionListener getExceptionListener() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f90913e5c..03c18903e4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -38,6 +38,7 @@ import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; import javax.jms.IllegalStateException; + import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -279,7 +280,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); } - AMQConnection getAMQConnection() + public AMQConnection getAMQConnection() { return _connection; } @@ -744,6 +745,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); @@ -759,6 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); @@ -767,17 +770,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); } @@ -787,6 +793,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); } @@ -798,6 +805,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } @@ -808,6 +816,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { + checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector); } @@ -820,6 +829,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { + checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector); } @@ -1045,6 +1055,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1061,6 +1072,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } @@ -1075,6 +1087,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1086,6 +1099,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); return new TopicSubscriberAdaptor(dest, consumer); @@ -1094,6 +1108,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicPublisher createPublisher(Topic topic) throws JMSException { checkNotClosed(); + checkValidTopic(topic); //return (TopicPublisher) createProducer(topic); return new TopicPublisherAdapter(createProducer(topic), topic); } @@ -1101,12 +1116,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueBrowser createBrowser(Queue queue) throws JMSException { checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } @@ -1124,6 +1141,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void unsubscribe(String name) throws JMSException { + checkNotClosed(); + //send a queue.delete for the subscription String queue = _connection.getClientID() + ":" + name; AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); @@ -1325,4 +1344,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true); _connection.getProtocolHandler().writeFrame(channelFlowFrame); } + + /* + * I could have combined the last 3 methods, but this way it improves readability + */ + private void checkValidTopic(Topic topic) throws InvalidDestinationException{ + if (topic == null){ + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } + } + + private void checkValidQueue(Queue queue) throws InvalidDestinationException{ + if (queue == null){ + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } + } + + private void checkValidDestination(Destination destination) throws InvalidDestinationException{ + if (destination == null){ + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ded2152bf8..4fb62b49fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -544,7 +544,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer this.checkNotClosed(); if(_session == null || _session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 8d6287eca3..fd6070a045 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.*; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; import java.io.UnsupportedEncodingException; @@ -231,6 +232,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, @@ -241,6 +243,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -251,6 +254,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, boolean immediate) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -262,6 +266,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j long timeToLive) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory, @@ -272,6 +277,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Destination destination, Message message) throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -285,6 +291,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -298,6 +305,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -311,6 +319,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -325,6 +334,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -487,17 +497,30 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _encoding = encoding; } - private void checkPreConditions() throws IllegalStateException, JMSException { + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { checkNotClosed(); - + + if(_session == null || _session.isClosed()){ + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } + + private void checkInitialDestination(){ if(_destination == null){ throw new UnsupportedOperationException("Destination is null"); } + } + + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{ + if (_destination != null && suppliedDestination != null){ + throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + } - if(_session == null || _session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + if (suppliedDestination == null){ + throw new InvalidDestinationException("Supplied Destination was invalid"); } } + public AMQSession getSession() { return _session; diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java new file mode 100644 index 0000000000..10a65c2ad8 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -0,0 +1,50 @@ +package org.apache.qpid.client; + +import java.util.Enumeration; + +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; + +public class QpidConnectionMetaData implements ConnectionMetaData { + + private static QpidConnectionMetaData _instance = new QpidConnectionMetaData(); + + private QpidConnectionMetaData(){ + } + + public static QpidConnectionMetaData instance(){ + return _instance; + } + + public int getJMSMajorVersion() throws JMSException { + return 1; + } + + public int getJMSMinorVersion() throws JMSException { + return 1; + } + + public String getJMSProviderName() throws JMSException { + return "Apache Qpid"; + } + + public String getJMSVersion() throws JMSException { + return "1.1"; + } + + public Enumeration getJMSXPropertyNames() throws JMSException { + return null; + } + + public int getProviderMajorVersion() throws JMSException { + return 0; + } + + public int getProviderMinorVersion() throws JMSException { + return 9; + } + + public String getProviderVersion() throws JMSException { + return "Incubating-M1"; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java index 21ec50c046..aeb2afa118 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java @@ -103,7 +103,7 @@ public class QueueReceiverAdaptor implements QueueReceiver { AMQSession session = msgConsumer.getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index 15bf4a125f..f90cc97a80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -52,26 +52,32 @@ public class QueueSenderAdapter implements QueueSender { } public int getDeliveryMode() throws JMSException { + checkPreConditions(); return delegate.getDeliveryMode(); } public Destination getDestination() throws JMSException { + checkPreConditions(); return delegate.getDestination(); } public boolean getDisableMessageID() throws JMSException { + checkPreConditions(); return delegate.getDisableMessageID(); } public boolean getDisableMessageTimestamp() throws JMSException { + checkPreConditions(); return delegate.getDisableMessageTimestamp(); } public int getPriority() throws JMSException { + checkPreConditions(); return delegate.getPriority(); } public long getTimeToLive() throws JMSException { + checkPreConditions(); return delegate.getTimeToLive(); } @@ -128,7 +134,7 @@ public class QueueSenderAdapter implements QueueSender { AMQSession session = ((BasicMessageProducer)delegate).getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java index 0702202c2a..02da284b83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java @@ -132,7 +132,7 @@ public class TopicPublisherAdapter implements TopicPublisher { AMQSession session = ((BasicMessageProducer)delegate).getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index 06e353e271..014c7c3311 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -116,7 +116,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber AMQSession session = msgConsumer.getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 329153534b..514287aea7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -384,11 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } public void acknowledge() throws JMSException - { + { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_session != null) { + if (_session.getAMQConnection().isClosed()){ + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages // received on the session _session.acknowledgeMessage(_deliveryTag, true); |