summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-04 21:08:40 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-04 21:08:40 +0000
commit93270855a90ee31578418499d9ea5ed30a730f04 (patch)
tree973220e21aeb4530721fa39a14cd4bc366984f88
parent656a3125a1ea5b2a4645d11b396d12ed11959dc9 (diff)
downloadqpid-python-93270855a90ee31578418499d9ea5ed30a730f04.tar.gz
NO-JIRA: JMS Fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1165110 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java24
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java11
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java14
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java16
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java20
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java11
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java32
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java24
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java44
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java80
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java10
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java5
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java5
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java1
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java35
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java16
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java21
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java12
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java5
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java1
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java5
21 files changed, 324 insertions, 68 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
index 97cbc837c4..123d136866 100644
--- a/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
+++ b/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
@@ -110,6 +110,18 @@ public class Hello
System.out.println(o.getClass().getName() + ": " + o);
}
+ else if(message instanceof ObjectMessage)
+ {
+ System.out.println("Received Object Message:");
+ System.out.println("========================");
+ ObjectMessage objectMessage = (ObjectMessage)message;
+ Object o = objectMessage.getObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+ }
+ else
+ {
+ System.out.println("Received Message " + message.getClass().getName());
+ }
}
catch (JMSException e)
{
@@ -124,8 +136,9 @@ public class Hello
MessageProducer messageProducer = producersession.createProducer(queue);
TextMessage message = producersession.createTextMessage("Hello world!");
+ message.setJMSType("Hello");
messageProducer.send(message);
-
+ /*
MapMessage mapmessage = producersession.createMapMessage();
mapmessage.setBoolean("mybool", true);
mapmessage.setString("mystring", "hello");
@@ -139,13 +152,18 @@ public class Hello
messageProducer.send(bytesMessage);
- StreamMessage streamMessage = producersession.createStreamMessage();
+ ObjectMessage objectMessage = producersession.createObjectMessage();
+ objectMessage.setObject(new Double("3.14159265358979323846264338327950288"));
+
+ messageProducer.send(objectMessage);
+
+/* StreamMessage streamMessage = producersession.createStreamMessage();
streamMessage.writeBoolean(true);
streamMessage.writeLong(18031974L);
streamMessage.writeString("this is a stream Message");
streamMessage.writeChar('£');
messageProducer.send(streamMessage);
-
+*/
Thread.sleep(50000L);
connection.close();
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
index 2e13b66282..0ca629db7e 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
@@ -23,6 +23,7 @@ import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.amqp_1_0.type.messaging.Footer;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import java.util.*;
@@ -31,16 +32,16 @@ public class AmqpMessageImpl extends MessageImpl implements AmqpMessage
{
private List<Section> _sections;
- protected AmqpMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List<Section> sections,
+ protected AmqpMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List<Section> sections,
Footer footer, SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_sections = sections;
}
protected AmqpMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
_sections = new ArrayList<Section>();
}
@@ -64,6 +65,10 @@ public class AmqpMessageImpl extends MessageImpl implements AmqpMessage
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.addAll(_sections);
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
index 14a97b8c4f..e3c7d46038 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
@@ -39,10 +39,10 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
private Data _dataIn;
// message created for reading
- protected BytesMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Data data,
+ protected BytesMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Data data,
Footer footer, SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_dataIn = data;
final Binary dataBuffer = data.getValue();
_dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
@@ -52,7 +52,11 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
// message created to be sent
protected BytesMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(),
+ new MessageAnnotations(new HashMap()),
+ new Properties(),
+ new ApplicationProperties(new HashMap()),
+ new Footer(Collections.EMPTY_MAP),
session);
_bytesOut = new ByteArrayOutputStream();
@@ -496,6 +500,10 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.add(getDataSection());
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
index 318077fd75..460ff578d6 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -104,21 +104,29 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
public QueueConnection createQueueConnection() throws JMSException
{
- return createConnection();
+ final ConnectionImpl connection = createConnection();
+ connection.setQueueConnection(true);
+ return connection;
}
public QueueConnection createQueueConnection(final String username, final String password) throws JMSException
{
- return createConnection(username, password);
+ final ConnectionImpl connection = createConnection(username, password);
+ connection.setQueueConnection(true);
+ return connection;
}
public TopicConnection createTopicConnection() throws JMSException
{
- return createConnection();
+ final ConnectionImpl connection = createConnection();
+ connection.setTopicConnection(true);
+ return connection;
}
public TopicConnection createTopicConnection(final String username, final String password) throws JMSException
{
- return createConnection(username, password);
+ final ConnectionImpl connection = createConnection(username, password);
+ connection.setTopicConnection(true);
+ return connection;
}
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index f9a983f908..a5ab6898e5 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.Connection;
import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.transport.Container;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -38,6 +39,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
private final Object _lock = new Object();
private org.apache.qpid.amqp_1_0.client.Connection _conn;
+ private boolean _isQueueConnection;
+ private boolean _isTopicConnection;
private static enum State
@@ -51,10 +54,11 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
{
+ Container container = clientId == null ? new Container() : new Container(clientId);
// TODO - authentication, containerId, clientId, ssl?, etc
try
{
- _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password);
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container);
// TODO - retrieve negotiated AMQP version
_connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
}
@@ -97,7 +101,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
}
SessionImpl session = new SessionImpl(this, acknowledgeMode);
-
+ session.setQueueSession(_isQueueConnection);
+ session.setTopicSession(_isTopicConnection);
_sessions.add(session);
return session;
@@ -108,7 +113,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public String getClientID() throws JMSException
{
checkClosed();
- return null; //TODO
+ return _conn.getEndpoint().getContainer().getId();
}
public void setClientID(final String s) throws JMSException
@@ -269,4 +274,13 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
}
}
+ void setQueueConnection(final boolean queueConnection)
+ {
+ _isQueueConnection = queueConnection;
+ }
+
+ void setTopicConnection(final boolean topicConnection)
+ {
+ _isTopicConnection = topicConnection;
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
index 1edec4fac8..166a33c126 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
@@ -32,17 +32,18 @@ public class MapMessageImpl extends MessageImpl implements MapMessage
{
private Map _map;
- public MapMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Map map,
+ public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
Footer footer,
SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_map = map;
}
MapMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
_map = new HashMap();
}
@@ -417,6 +418,10 @@ public class MapMessageImpl extends MessageImpl implements MapMessage
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.add(new AmqpValue(_map));
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index 678a5b5d69..18ed3e5866 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -41,6 +41,9 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
private Binary _lastUnackedMessage;
MessageListener _messageListener;
+ private boolean _isQueueConsumer;
+ private boolean _isTopicSubscriber;
+
private boolean _closed = false;
MessageConsumerImpl(final Destination destination,
@@ -53,10 +56,18 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
if(destination instanceof DestinationImpl)
{
_destination = (DestinationImpl) destination;
+ if(destination instanceof javax.jms.Queue)
+ {
+ _isQueueConsumer = true;
+ }
+ else if(destination instanceof javax.jms.Topic)
+ {
+ _isTopicSubscriber = true;
+ }
}
- else if(destination != null)
+ else
{
- // TODO - throw appropriate exception
+ throw new InvalidDestinationException("Invalid destination class");
}
_session = session;
@@ -64,7 +75,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
}
- protected Receiver createClientReceiver()
+ protected Receiver createClientReceiver() throws IllegalStateException
{
return _session.getClientSession().createReceiver(_destination.getAddress());
}
@@ -142,7 +153,10 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
if(msg != null)
{
MessageFactory factory = _session.getMessageFactory();
- return factory.createMessage(_destination, msg);
+ final MessageImpl message = factory.createMessage(_destination, msg);
+ message.setFromQueue(_isQueueConsumer);
+ message.setFromTopic(_isTopicSubscriber);
+ return message;
}
else
{
@@ -230,4 +244,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
{
return (Topic) getDestination();
}
+
+ void setQueueConsumer(final boolean queueConsumer)
+ {
+ _isQueueConsumer = queueConsumer;
+ }
+
+ void setTopicSubscriber(final boolean topicSubscriber)
+ {
+ _isTopicSubscriber = topicSubscriber;
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
index 9b61f75a1f..b010c405eb 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
@@ -46,6 +46,8 @@ class MessageFactory
MessageImpl message;
List<Section> payload = msg.getPayload();
Header header = null;
+ MessageAnnotations messageAnnotations = null;
+
Properties properties = null;
ApplicationProperties appProperties = null;
Footer footer;
@@ -61,6 +63,12 @@ class MessageFactory
section = iter.hasNext() ? iter.next() : null;
}
+ if(section instanceof MessageAnnotations)
+ {
+ messageAnnotations = (MessageAnnotations) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
if(section instanceof Properties)
{
properties = (Properties) section;
@@ -86,16 +94,16 @@ class MessageFactory
Section bodySection = body.get(0);
if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
{
- message = new MapMessageImpl(header, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
+ message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
}
else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
{
- message = new StreamMessageImpl(header, properties, appProperties,
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
(List) ((AmqpValue)bodySection).getValue(), footer, _session);
}
else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
{
- message = new TextMessageImpl(header, properties, appProperties,
+ message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
(String) ((AmqpValue)bodySection).getValue(), footer, _session);
}
else if(bodySection instanceof Data)
@@ -120,16 +128,16 @@ class MessageFactory
e.printStackTrace(); //TODO
}
- message = new ObjectMessageImpl(header, properties, appProperties, serializable, footer, _session);
+ message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties, serializable, footer, _session);
}
else
{
- message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
+ message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
}
}
else if(bodySection instanceof AmqpSequence)
{
- message = new StreamMessageImpl(header, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
}
/*else if(bodySection instanceof AmqpDataSection)
@@ -174,12 +182,12 @@ class MessageFactory
}*/
else
{
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
}
}
else
{
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
}
message.setReadOnly();
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
index cd4381b898..6eb150924c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
@@ -33,6 +33,7 @@ import org.apache.qpid.amqp_1_0.type.UnsignedShort;
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.amqp_1_0.type.messaging.Footer;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import javax.jms.DeliveryMode;
@@ -49,6 +50,7 @@ public abstract class MessageImpl implements Message
static final Set<Class> _supportedClasses =
new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
Float.class, Double.class, Character.class, String.class, byte[].class));
+ private static final Symbol JMS_TYPE = Symbol.valueOf("apache.qpid.amqp_1_0-jms-type");
private Header _header;
private Properties _properties;
@@ -57,8 +59,13 @@ public abstract class MessageImpl implements Message
public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
private SessionImpl _sessionImpl;
private boolean _readOnly;
+ private MessageAnnotations _messageAnnotations;
+
+ private boolean _isFromQueue;
+ private boolean _isFromTopic;
protected MessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties appProperties,
Footer footer,
@@ -66,6 +73,7 @@ public abstract class MessageImpl implements Message
{
_header = header == null ? new Header() : header;
_properties = properties == null ? new Properties() : properties;
+ _messageAnnotations = messageAnnotations == null ? new MessageAnnotations(new HashMap()) : messageAnnotations;
_footer = footer == null ? new Footer(Collections.EMPTY_MAP) : footer;
_applicationProperties = appProperties == null ? new ApplicationProperties(new HashMap()) : appProperties;
_sessionImpl = session;
@@ -170,7 +178,9 @@ public abstract class MessageImpl implements Message
public DestinationImpl getJMSDestination() throws JMSException
{
- return DestinationImpl.valueOf(getTo());
+ return _isFromQueue ? QueueImpl.valueOf(getTo())
+ : _isFromTopic ? TopicImpl.valueOf(getTo())
+ : DestinationImpl.valueOf(getTo());
}
public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
@@ -240,20 +250,22 @@ public abstract class MessageImpl implements Message
public String getJMSType() throws JMSException
{
- final MessageAttributes messageAttrs = getHeaderMessageAttrs();
- final Object attrValue = messageAttrs == null ? null : messageAttrs.get(Symbol.valueOf("apache.qpid.amqp_1_0-jms-type"));
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
return attrValue instanceof String ? attrValue.toString() : null;
}
public void setJMSType(String s) throws JMSException
{
- MessageAttributes messageAttrs = getHeaderMessageAttrs();
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
if(messageAttrs == null)
{
- // TODO - Solve MessageAttrs problem
- messageAttrs = null;
+ messageAttrs = new HashMap();
+ _messageAnnotations = new MessageAnnotations(messageAttrs);
}
+
+ messageAttrs.put(JMS_TYPE, s);
}
public long getJMSExpiration() throws JMSException
@@ -1056,10 +1068,30 @@ public abstract class MessageImpl implements Message
return _footer;
}
+ MessageAnnotations getMessageAnnotations()
+ {
+ return _messageAnnotations;
+ }
+
public ApplicationProperties getApplicationProperties()
{
return _applicationProperties;
}
+ public void reset() throws JMSException
+ {
+ _readOnly = true;
+ }
+
+ void setFromQueue(final boolean fromQueue)
+ {
+ _isFromQueue = fromQueue;
+ }
+
+ void setFromTopic(final boolean fromTopic)
+ {
+ _isFromTopic = fromTopic;
+ }
+
abstract Collection<Section> getSections();
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index bb91552b46..c6bdbfbb69 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -52,21 +52,24 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
}
else if(destination != null)
{
- // TODO - throw appropriate exception
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
}
_session = session;
- try
- {
- _sender = _session.getClientSession().createSender(_destination.getAddress());
- }
- catch (Sender.SenderCreationException e)
+ if(_destination != null)
{
- // TODO - refine exception
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.initCause(e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
+ try
+ {
+ _sender = _session.getClientSession().createSender(_destination.getAddress());
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
}
}
@@ -151,7 +154,10 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
if(!_closed)
{
_closed = true;
- _sender.close();
+ if(_sender != null)
+ {
+ _sender.close();
+ }
}
}
@@ -255,7 +261,55 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
throws JMSException
{
- //TODO
+
+ checkClosed();
+ if(destination == null)
+ {
+ send(message, deliveryMode, priority, ttl);
+ }
+ else
+ {
+ if(_destination != null)
+ {
+ throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
+ }
+ else if(!(destination instanceof DestinationImpl))
+ {
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+ }
+ try
+ {
+ _destination = (DestinationImpl) destination;
+ _sender = _session.getClientSession().createSender(_destination.getAddress());
+
+ send(message, deliveryMode, priority, ttl);
+
+ _sender.close();
+
+
+
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ finally
+ {
+ _sender = null;
+ _destination = null;
+ }
+ }
}
public Queue getQueue() throws JMSException
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
index 35e48bab82..e4916983fc 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
@@ -40,20 +40,22 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
private Serializable _object;
protected ObjectMessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties appProperties,
Serializable object,
Footer footer,
SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
getProperties().setContentType(CONTENT_TYPE);
_object = object;
}
protected ObjectMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
getProperties().setContentType(CONTENT_TYPE);
}
@@ -74,6 +76,10 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
index 7b81156da8..c88bd8268c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
@@ -48,4 +48,9 @@ public class QueueImpl extends DestinationImpl implements Queue
return queue;
}
+ public static QueueImpl valueOf(String address)
+ {
+ return address == null ? null : createQueue(address);
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
index 0106cf13fd..45eaf93945 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
@@ -22,7 +22,7 @@ import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
-import javax.jms.JMSException;
+import javax.jms.*;
public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
{
@@ -33,9 +33,10 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei
throws JMSException
{
super(destination, session, selector, noLocal);
+ setQueueConsumer(true);
}
- protected Receiver createClientReceiver()
+ protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
{
return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
index 53b452b422..e5ed8b3b3d 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
@@ -28,6 +28,7 @@ public class QueueSessionImpl extends SessionImpl implements QueueSession
protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
super(connection, acknowledgeMode);
+ setQueueSession(true);
}
public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index f6378ceb47..eb805671b4 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -20,6 +20,7 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Connection;
import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.QueueSession;
@@ -27,6 +28,7 @@ import org.apache.qpid.amqp_1_0.jms.Session;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -50,6 +52,9 @@ public class SessionImpl implements Session, QueueSession, TopicSession
private boolean _closed;
+ private boolean _isQueueSession;
+ private boolean _isTopicSession;
+
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
_connection = connection;
@@ -102,8 +107,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
public TextMessageImpl createTextMessage() throws JMSException
{
- checkClosed();
- return new TextMessageImpl(this);
+ return createTextMessage("");
}
public TextMessageImpl createTextMessage(final String s) throws JMSException
@@ -293,6 +297,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession
throws JMSException
{
checkClosed();
+ if(!(topic instanceof TopicImpl))
+ {
+ throw new InvalidDestinationException("invalid destination " + topic);
+ }
return null; //TODO
}
@@ -317,7 +325,17 @@ public class SessionImpl implements Session, QueueSession, TopicSession
public TemporaryQueueImpl createTemporaryQueue() throws JMSException
{
checkClosed();
- return null; //TODO
+ try
+ {
+ Sender send = _session.createTemporaryQueueSender();
+
+ TemporaryQueueImpl tempQ = new TemporaryQueueImpl(((Target)send.getTarget()).getAddress(), send);
+ return tempQ;
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ throw new JMSException("Unable to create temporary queue");
+ }
}
public TemporaryTopicImpl createTemporaryTopic() throws JMSException
@@ -329,6 +347,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
public void unsubscribe(final String s) throws JMSException
{
checkClosed();
+
//TODO
}
@@ -605,4 +624,14 @@ public class SessionImpl implements Session, QueueSession, TopicSession
}
}
}
+
+ void setQueueSession(final boolean queueSession)
+ {
+ _isQueueSession = queueSession;
+ }
+
+ void setTopicSession(final boolean topicSession)
+ {
+ _isTopicSession = topicSession;
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
index b4f1e8c8e0..eb4930625f 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
@@ -37,26 +37,28 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
- protected StreamMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List list,
+ protected StreamMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List list,
Footer footer, SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_list = list;
}
StreamMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()), new Properties(),
+ new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
_list = new ArrayList();
}
public StreamMessageImpl(final Header header,
+ final MessageAnnotations messageAnnotations,
final Properties properties,
final ApplicationProperties appProperties,
final List amqpListSection, final Footer footer)
{
- super(header, properties, appProperties, footer, null);
+ super(header, messageAnnotations, properties, appProperties, footer, null);
_list = amqpListSection;
}
@@ -257,6 +259,7 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
public Object readObject() throws JMSException
{
+ checkReadable();
if(_offset == -1)
{
return _list.get(++_position);
@@ -332,6 +335,7 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
public void reset() throws JMSException
{
+ super.reset();
_position = -1;
_offset = -1;
}
@@ -340,6 +344,10 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.add(new AmqpValue(_list));
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
index 309aa8548c..64a4bd1f8c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
@@ -18,19 +18,36 @@
*/
package org.apache.qpid.amqp_1_0.jms.impl;
+import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.TemporaryQueue;
import javax.jms.JMSException;
public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue
{
- protected TemporaryQueueImpl(String address)
+ private Sender _sender;
+
+ protected TemporaryQueueImpl(String address, Sender sender)
{
super(address);
+ _sender = sender;
}
public void delete() throws JMSException
{
- //TODO
+ try
+ {
+ if(_sender != null)
+ {
+ _sender.close();
+ _sender = null;
+ }
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
index 5c9ea954a9..e13629a2d0 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
@@ -33,19 +33,21 @@ public class TextMessageImpl extends MessageImpl implements TextMessage
private String _text;
protected TextMessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties appProperties,
String text,
Footer footer,
SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_text = text;
}
protected TextMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
}
@@ -68,6 +70,10 @@ public class TextMessageImpl extends MessageImpl implements TextMessage
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
AmqpValue section = new AmqpValue(_text);
@@ -75,4 +81,6 @@ public class TextMessageImpl extends MessageImpl implements TextMessage
sections.add(getFooter());
return sections;
}
+
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
index a5e5df5bd2..e54a660963 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
@@ -49,5 +49,8 @@ public class TopicImpl extends DestinationImpl implements Topic
return topic;
}
-
+ public static TopicImpl valueOf(String address)
+ {
+ return address == null ? null : createTopic(address);
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
index 8189bec082..052a3f2a6b 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
@@ -28,6 +28,7 @@ public class TopicSessionImpl extends SessionImpl implements TopicSession
protected TopicSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
super(connection, acknowledgeMode);
+ setTopicSession(true);
}
public TopicSubscriberImpl createSubscriber(final Topic topic) throws JMSException
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
index dd76103598..2e713e4c8c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
@@ -22,7 +22,7 @@ import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.Topic;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import javax.jms.JMSException;
+import javax.jms.*;
public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber
{
@@ -33,6 +33,7 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
throws JMSException
{
super(destination, session, selector, noLocal);
+ setTopicSubscriber(true);
}
public TopicImpl getTopic() throws JMSException
@@ -41,7 +42,7 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
}
- protected Receiver createClientReceiver()
+ protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
{
return getSession().getClientSession().createCopyingReceiver(getDestination().getAddress());
}