summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2006-12-11 16:16:55 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2006-12-11 16:16:55 +0000
commit5f3d9cf0aa6da8f619ca9fcc7fa8294efcaade8e (patch)
treedbd41b2d16a5f2a2c85ef0ba18c39500bbe80bca /java
parent57a26dfe616fb18225e83cf7ca963786b3415a5c (diff)
downloadqpid-python-5f3d9cf0aa6da8f619ca9fcc7fa8294efcaade8e.tar.gz
This contains a fix for QPID-165 and QPID-166
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@485735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java42
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java6
10 files changed, 137 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0bb8736227..9dcbfca6bc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -54,6 +54,7 @@ import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -550,8 +551,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
- // TODO Auto-generated method stub
- return null;
+ return QpidConnectionMetaData.instance();
+
}
public ExceptionListener getExceptionListener() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8f90913e5c..03c18903e4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -38,6 +38,7 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -279,7 +280,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
}
- AMQConnection getAMQConnection()
+ public AMQConnection getAMQConnection()
{
return _connection;
}
@@ -744,6 +745,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
@@ -759,6 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(destination, messageSelector);
@@ -767,17 +770,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
}
@@ -787,6 +793,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -798,6 +805,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
@@ -808,6 +816,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
selector, rawSelector);
}
@@ -820,6 +829,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
selector, rawSelector);
}
@@ -1045,6 +1055,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1061,6 +1072,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
@@ -1075,6 +1087,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1086,6 +1099,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1094,6 +1108,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
//return (TopicPublisher) createProducer(topic);
return new TopicPublisherAdapter(createProducer(topic), topic);
}
@@ -1101,12 +1116,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
@@ -1124,6 +1141,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void unsubscribe(String name) throws JMSException
{
+ checkNotClosed();
+
//send a queue.delete for the subscription
String queue = _connection.getClientID() + ":" + name;
AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
@@ -1325,4 +1344,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
+
+ /*
+ * I could have combined the last 3 methods, but this way it improves readability
+ */
+ private void checkValidTopic(Topic topic) throws InvalidDestinationException{
+ if (topic == null){
+ throw new javax.jms.InvalidDestinationException("Invalid Topic");
+ }
+ }
+
+ private void checkValidQueue(Queue queue) throws InvalidDestinationException{
+ if (queue == null){
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
+ }
+
+ private void checkValidDestination(Destination destination) throws InvalidDestinationException{
+ if (destination == null){
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ded2152bf8..4fb62b49fc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -544,7 +544,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
this.checkNotClosed();
if(_session == null || _session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 8d6287eca3..fd6070a045 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.*;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.UnsupportedEncodingException;
@@ -231,6 +232,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
@@ -241,6 +243,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -251,6 +254,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -262,6 +266,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
long timeToLive) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
@@ -272,6 +277,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Destination destination, Message message) throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -285,6 +291,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -298,6 +305,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -311,6 +319,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -325,6 +334,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -487,17 +497,30 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_encoding = encoding;
}
- private void checkPreConditions() throws IllegalStateException, JMSException {
+ private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
checkNotClosed();
-
+
+ if(_session == null || _session.isClosed()){
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
+
+ private void checkInitialDestination(){
if(_destination == null){
throw new UnsupportedOperationException("Destination is null");
}
+ }
+
+ private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
+ if (_destination != null && suppliedDestination != null){
+ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ }
- if(_session == null || _session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ if (suppliedDestination == null){
+ throw new InvalidDestinationException("Supplied Destination was invalid");
}
}
+
public AMQSession getSession() {
return _session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
new file mode 100644
index 0000000000..10a65c2ad8
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.client;
+
+import java.util.Enumeration;
+
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class QpidConnectionMetaData implements ConnectionMetaData {
+
+ private static QpidConnectionMetaData _instance = new QpidConnectionMetaData();
+
+ private QpidConnectionMetaData(){
+ }
+
+ public static QpidConnectionMetaData instance(){
+ return _instance;
+ }
+
+ public int getJMSMajorVersion() throws JMSException {
+ return 1;
+ }
+
+ public int getJMSMinorVersion() throws JMSException {
+ return 1;
+ }
+
+ public String getJMSProviderName() throws JMSException {
+ return "Apache Qpid";
+ }
+
+ public String getJMSVersion() throws JMSException {
+ return "1.1";
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException {
+ return null;
+ }
+
+ public int getProviderMajorVersion() throws JMSException {
+ return 0;
+ }
+
+ public int getProviderMinorVersion() throws JMSException {
+ return 9;
+ }
+
+ public String getProviderVersion() throws JMSException {
+ return "Incubating-M1";
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
index 21ec50c046..aeb2afa118 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
@@ -103,7 +103,7 @@ public class QueueReceiverAdaptor implements QueueReceiver {
AMQSession session = msgConsumer.getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index 15bf4a125f..f90cc97a80 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -52,26 +52,32 @@ public class QueueSenderAdapter implements QueueSender {
}
public int getDeliveryMode() throws JMSException {
+ checkPreConditions();
return delegate.getDeliveryMode();
}
public Destination getDestination() throws JMSException {
+ checkPreConditions();
return delegate.getDestination();
}
public boolean getDisableMessageID() throws JMSException {
+ checkPreConditions();
return delegate.getDisableMessageID();
}
public boolean getDisableMessageTimestamp() throws JMSException {
+ checkPreConditions();
return delegate.getDisableMessageTimestamp();
}
public int getPriority() throws JMSException {
+ checkPreConditions();
return delegate.getPriority();
}
public long getTimeToLive() throws JMSException {
+ checkPreConditions();
return delegate.getTimeToLive();
}
@@ -128,7 +134,7 @@ public class QueueSenderAdapter implements QueueSender {
AMQSession session = ((BasicMessageProducer)delegate).getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
index 0702202c2a..02da284b83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
@@ -132,7 +132,7 @@ public class TopicPublisherAdapter implements TopicPublisher {
AMQSession session = ((BasicMessageProducer)delegate).getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index 06e353e271..014c7c3311 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -116,7 +116,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber
AMQSession session = msgConsumer.getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 329153534b..514287aea7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -384,11 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
public void acknowledge() throws JMSException
- {
+ {
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_session != null)
{
+ if (_session.getAMQConnection().isClosed()){
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
// we set multiple to true here since acknowledgement implies acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);