diff options
7 files changed, 233 insertions, 46 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 638aa555a2..820b8c3f83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -139,6 +139,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private AMQException _lastAMQException = null; + + /* + * The connection meta data + */ + private QpidConnectionMetaData _connectionMetaData; + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { @@ -281,6 +287,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw e; } + _connectionMetaData = new QpidConnectionMetaData(this); } protected boolean checkException(Throwable thrown) @@ -550,7 +557,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); - return QpidConnectionMetaData.instance(); + return _connectionMetaData; } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index d3d9db3806..f0d3cf5abc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -22,6 +22,8 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; @@ -39,6 +41,7 @@ import javax.jms.MessageListener; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import javax.jms.Destination; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -241,6 +244,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); + String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString()); + try + { + Destination dest = AMQDestination.createDestination(new AMQBindingURL(url)); + jmsMsg.setJMSDestination(dest); + } + catch (URLSyntaxException e) + { + _logger.warn("Unable to parse the supplied destination header: " + url); + } + } _session.setInRecovery(false); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index e11d70cf41..7a5fcbccf9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -507,8 +507,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { checkTemporaryDestination(destination); + origMessage.setJMSDestination(destination); + AbstractJMSMessage message = convertToNativeMessage(origMessage); + message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java new file mode 100644 index 0000000000..3a7b7a7b3d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java @@ -0,0 +1,47 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.*;
+
+public enum CustomJMXProperty
+{
+ JMSX_QPID_JMSDESTINATIONURL,
+ JMSXGroupID,
+ JMSXGroupSeq;
+
+ private static Enumeration _names;
+
+ public static synchronized Enumeration asEnumeration()
+ {
+ if(_names == null)
+ {
+ CustomJMXProperty[] properties = values();
+ ArrayList<String> nameList = new ArrayList<String>(properties.length);
+ for(CustomJMXProperty property : properties)
+ {
+ nameList.add(property.toString());
+ }
+ _names = Collections.enumeration(nameList);
+ }
+ return _names;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java index 10a65c2ad8..6ab7808110 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java +++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.client; import java.util.Enumeration; @@ -5,46 +25,73 @@ import java.util.Enumeration; import javax.jms.ConnectionMetaData; import javax.jms.JMSException; -public class QpidConnectionMetaData implements ConnectionMetaData { - - private static QpidConnectionMetaData _instance = new QpidConnectionMetaData(); - - private QpidConnectionMetaData(){ - } - - public static QpidConnectionMetaData instance(){ - return _instance; - } - - public int getJMSMajorVersion() throws JMSException { - return 1; - } - - public int getJMSMinorVersion() throws JMSException { - return 1; - } - - public String getJMSProviderName() throws JMSException { - return "Apache Qpid"; - } - - public String getJMSVersion() throws JMSException { - return "1.1"; - } - - public Enumeration getJMSXPropertyNames() throws JMSException { - return null; - } - - public int getProviderMajorVersion() throws JMSException { - return 0; - } - - public int getProviderMinorVersion() throws JMSException { - return 9; - } - - public String getProviderVersion() throws JMSException { - return "Incubating-M1"; - } +public class QpidConnectionMetaData implements ConnectionMetaData +{ + + + + QpidConnectionMetaData(AMQConnection conn) + { + } + + public int getJMSMajorVersion() throws JMSException + { + return 1; + } + + public int getJMSMinorVersion() throws JMSException + { + return 1; + } + + public String getJMSProviderName() throws JMSException + { + return "Apache Qpid"; + } + + public String getJMSVersion() throws JMSException + { + return "1.1"; + } + + public Enumeration getJMSXPropertyNames() throws JMSException + { + return CustomJMXProperty.asEnumeration(); + } + + public int getProviderMajorVersion() throws JMSException + { + return 0; + } + + public int getProviderMinorVersion() throws JMSException + { + return 8; + } + + public String getProviderVersion() throws JMSException + { + return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ " + + getProtocolVersion() + "] )"; + } + + private String getProtocolVersion() + { + // TODO - Implement based on connection negotiated protocol + return "0.8"; + } + + public String getBrokerVersion() + { + // TODO - get broker version + return "<unkown>"; + } + + public String getClientVersion() + { + // TODO - get client build version from properties file or similar + return "<unknown>"; + } + + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 23d6c0151e..fea7a29594 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -172,13 +172,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public Destination getJMSDestination() throws JMSException { - // TODO: implement this once we have sorted out how to figure out the exchange class - return _destination; + return _destination; } public void setJMSDestination(Destination destination) throws JMSException { - _destination = destination; + _destination = destination; } public int getJMSDeliveryMode() throws JMSException diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java new file mode 100644 index 0000000000..27736ac473 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -0,0 +1,70 @@ +package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSDestinationTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(JMSDestinationTest.class);
+
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ public void testJMSDestination() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ TextMessage sentMsg = producerSession.createTextMessage("hello");
+ assertNull(sentMsg.getJMSDestination());
+
+ producer.send(sentMsg);
+
+ assertEquals(sentMsg.getJMSDestination(), queue);
+
+ con2.close();
+
+ con.start();
+
+ TextMessage rm = (TextMessage) consumer.receive();
+ assertNotNull(rm);
+
+ assertEquals(rm.getJMSDestination(),queue);
+ con.close();
+ }
+
+}
|