summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java131
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java70
7 files changed, 233 insertions, 46 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 638aa555a2..820b8c3f83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -139,6 +139,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private AMQException _lastAMQException = null;
+
+ /*
+ * The connection meta data
+ */
+ private QpidConnectionMetaData _connectionMetaData;
+
public AMQConnection(String broker, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
@@ -281,6 +287,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw e;
}
+ _connectionMetaData = new QpidConnectionMetaData(this);
}
protected boolean checkException(Throwable thrown)
@@ -550,7 +557,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
- return QpidConnectionMetaData.instance();
+ return _connectionMetaData;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index d3d9db3806..f0d3cf5abc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -22,6 +22,8 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
@@ -39,6 +41,7 @@ import javax.jms.MessageListener;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.jms.Destination;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -241,6 +244,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+ String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString());
+ try
+ {
+ Destination dest = AMQDestination.createDestination(new AMQBindingURL(url));
+ jmsMsg.setJMSDestination(dest);
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.warn("Unable to parse the supplied destination header: " + url);
+ }
+
}
_session.setInRecovery(false);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index e11d70cf41..7a5fcbccf9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -507,8 +507,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
checkTemporaryDestination(destination);
+ origMessage.setJMSDestination(destination);
+
AbstractJMSMessage message = convertToNativeMessage(origMessage);
+ message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
new file mode 100644
index 0000000000..3a7b7a7b3d
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.*;
+
+public enum CustomJMXProperty
+{
+ JMSX_QPID_JMSDESTINATIONURL,
+ JMSXGroupID,
+ JMSXGroupSeq;
+
+ private static Enumeration _names;
+
+ public static synchronized Enumeration asEnumeration()
+ {
+ if(_names == null)
+ {
+ CustomJMXProperty[] properties = values();
+ ArrayList<String> nameList = new ArrayList<String>(properties.length);
+ for(CustomJMXProperty property : properties)
+ {
+ nameList.add(property.toString());
+ }
+ _names = Collections.enumeration(nameList);
+ }
+ return _names;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
index 10a65c2ad8..6ab7808110 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.client;
import java.util.Enumeration;
@@ -5,46 +25,73 @@ import java.util.Enumeration;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
-public class QpidConnectionMetaData implements ConnectionMetaData {
-
- private static QpidConnectionMetaData _instance = new QpidConnectionMetaData();
-
- private QpidConnectionMetaData(){
- }
-
- public static QpidConnectionMetaData instance(){
- return _instance;
- }
-
- public int getJMSMajorVersion() throws JMSException {
- return 1;
- }
-
- public int getJMSMinorVersion() throws JMSException {
- return 1;
- }
-
- public String getJMSProviderName() throws JMSException {
- return "Apache Qpid";
- }
-
- public String getJMSVersion() throws JMSException {
- return "1.1";
- }
-
- public Enumeration getJMSXPropertyNames() throws JMSException {
- return null;
- }
-
- public int getProviderMajorVersion() throws JMSException {
- return 0;
- }
-
- public int getProviderMinorVersion() throws JMSException {
- return 9;
- }
-
- public String getProviderVersion() throws JMSException {
- return "Incubating-M1";
- }
+public class QpidConnectionMetaData implements ConnectionMetaData
+{
+
+
+
+ QpidConnectionMetaData(AMQConnection conn)
+ {
+ }
+
+ public int getJMSMajorVersion() throws JMSException
+ {
+ return 1;
+ }
+
+ public int getJMSMinorVersion() throws JMSException
+ {
+ return 1;
+ }
+
+ public String getJMSProviderName() throws JMSException
+ {
+ return "Apache Qpid";
+ }
+
+ public String getJMSVersion() throws JMSException
+ {
+ return "1.1";
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException
+ {
+ return CustomJMXProperty.asEnumeration();
+ }
+
+ public int getProviderMajorVersion() throws JMSException
+ {
+ return 0;
+ }
+
+ public int getProviderMinorVersion() throws JMSException
+ {
+ return 8;
+ }
+
+ public String getProviderVersion() throws JMSException
+ {
+ return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+ + getProtocolVersion() + "] )";
+ }
+
+ private String getProtocolVersion()
+ {
+ // TODO - Implement based on connection negotiated protocol
+ return "0.8";
+ }
+
+ public String getBrokerVersion()
+ {
+ // TODO - get broker version
+ return "<unkown>";
+ }
+
+ public String getClientVersion()
+ {
+ // TODO - get client build version from properties file or similar
+ return "<unknown>";
+ }
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 23d6c0151e..fea7a29594 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -172,13 +172,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public Destination getJMSDestination() throws JMSException
{
- // TODO: implement this once we have sorted out how to figure out the exchange class
- return _destination;
+ return _destination;
}
public void setJMSDestination(Destination destination) throws JMSException
{
- _destination = destination;
+ _destination = destination;
}
public int getJMSDeliveryMode() throws JMSException
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
new file mode 100644
index 0000000000..27736ac473
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
@@ -0,0 +1,70 @@
+package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSDestinationTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(JMSDestinationTest.class);
+
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ public void testJMSDestination() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ TextMessage sentMsg = producerSession.createTextMessage("hello");
+ assertNull(sentMsg.getJMSDestination());
+
+ producer.send(sentMsg);
+
+ assertEquals(sentMsg.getJMSDestination(), queue);
+
+ con2.close();
+
+ con.start();
+
+ TextMessage rm = (TextMessage) consumer.receive();
+ assertNotNull(rm);
+
+ assertEquals(rm.getJMSDestination(),queue);
+ con.close();
+ }
+
+}