diff options
Diffstat (limited to 'java/client/src')
11 files changed, 81 insertions, 58 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 6401e3b23f..5c13e7861f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -99,7 +99,10 @@ public abstract class AMQDestination implements Destination, Referenceable _queueName = queueName; } - public abstract String getEncodedName(); + public String getEncodedName() + { + return toURL(); + } public boolean isDurable() { @@ -244,7 +247,7 @@ public abstract class AMQDestination implements Destination, Referenceable return false; } if ((_queueName == null && that._queueName != null) || - (_queueName != null && !_queueName.equals(that._queueName))) + (_queueName != null && !_queueName.equals(that._queueName))) { return false; } @@ -282,4 +285,26 @@ public abstract class AMQDestination implements Destination, Referenceable AMQConnectionFactory.class.getName(), null); // factory location } + + public static Destination createDestination(BindingURL binding) + { + String type = binding.getExchangeClass(); + + if (type.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return new AMQQueue(binding); + } + else if (type.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + return new AMQTopic(binding); + } + else if (type.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) + { + return new AMQHeadersExchange(binding); + } + else + { + throw new IllegalArgumentException("Unknown Exchange Class:" + type + " in binding:" + binding); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index c5bae6e1fa..c6d21c0ea7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -38,11 +38,6 @@ public class AMQHeadersExchange extends AMQDestination super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null); } - public String getEncodedName() - { - return getDestinationName(); - } - public String getRoutingKey() { return getDestinationName(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 8304a29e4d..6c0da6112a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -75,11 +75,7 @@ public class AMQQueue extends AMQDestination implements Queue autoDelete, queueName); } - public String getEncodedName() - { - return 'Q' + getQueueName(); - } - + public String getRoutingKey() { return getQueueName(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index 0b12bfb728..6b41ea0112 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -26,22 +26,25 @@ import javax.jms.TemporaryQueue; /** * AMQ implementation of a TemporaryQueue. */ -final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue { +final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue +{ + /** * Create a new instance of an AMQTemporaryQueue */ - public AMQTemporaryQueue() { - super("TempQueue" + Long.toString(System.currentTimeMillis()), - null, true, true); + public AMQTemporaryQueue() + { + super("TempQueue" + Long.toString(System.currentTimeMillis()), true); } /** * @see javax.jms.TemporaryQueue#delete() */ - public void delete() throws JMSException { + public void delete() throws JMSException + { throw new UnsupportedOperationException("Delete not supported, " + - "will auto-delete when connection closed"); + "will auto-delete when connection closed"); } - + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 89727f65b7..4dd38eea18 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -61,11 +61,6 @@ public class AMQTopic extends AMQDestination implements Topic return super.getDestinationName(); } - public String getEncodedName() - { - return 'T' + getDestinationName(); - } - public String getRoutingKey() { return getDestinationName(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 279d861cc2..41eb21a415 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -23,6 +23,9 @@ package org.apache.qpid.client.message; import org.apache.commons.collections.map.ReferenceMap; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; @@ -136,19 +139,16 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms Destination dest = (Destination) _destinationCache.get(replyToEncoding); if (dest == null) { - char destType = replyToEncoding.charAt(0); - if (destType == 'Q') + try { - dest = new AMQQueue(replyToEncoding.substring(1)); + BindingURL binding = new AMQBindingURL(replyToEncoding); + dest = AMQDestination.createDestination(binding); } - else if (destType == 'T') - { - dest = new AMQTopic(replyToEncoding.substring(1)); - } - else + catch (URLSyntaxException e) { throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding); } + _destinationCache.put(replyToEncoding, dest); } return dest; @@ -163,7 +163,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } if (!(destination instanceof AMQDestination)) { - throw new IllegalArgumentException("ReplyTo destination my be an AMQ destination - passed argument was type " + + throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -389,9 +389,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_session != null) { - if (_session.getAMQConnection().isClosed()){ - throw new javax.jms.IllegalStateException("Connection is already closed"); - } + if (_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } // we set multiple to true here since acknowledgement implies acknowledge of all previous messages // received on the session diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 42ae5a7b17..18e1fdad82 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -46,7 +46,7 @@ public class StateWaiter implements StateListener public void waituntilStateHasChanged() throws AMQException { - synchronized (_monitor) + synchronized(_monitor) { // // The guard is required in case we are woken up by a spurious @@ -71,22 +71,22 @@ public class StateWaiter implements StateListener _logger.debug("Throwable reached state waiter: " + _throwable); if (_throwable instanceof AMQException) { - throw (AMQException) _throwable; + throw(AMQException) _throwable; } else { - throw new AMQException("Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught. + throw new AMQException("Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught. } } } public void stateChanged(AMQState oldState, AMQState newState) { - synchronized (_monitor) + synchronized(_monitor) { if (_logger.isDebugEnabled()) { - _logger.debug("stateChanged called"); + _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState); } if (_state == newState) { @@ -103,7 +103,7 @@ public class StateWaiter implements StateListener public void error(Throwable t) { - synchronized (_monitor) + synchronized(_monitor) { if (_logger.isDebugEnabled()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 78d937f453..2e47ed2666 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -60,6 +60,7 @@ public class SocketTransportConnection implements ITransportConnection // once more testing of the performance of the simple allocator has been done if (!Boolean.getBoolean("amqj.enablePooledAllocator")) { + _logger.warn("Using SimpleByteBufferAllocator"); ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 58507a75ca..e9e23aefdb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -268,7 +268,7 @@ public class TransportConnection Object[] params = {port}; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); //Give the broker a second to create - _logger.info("Created Instance"); + _logger.info("Created VMBroker Instance:" + port); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index b0641350ad..0759113667 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -25,6 +25,7 @@ import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQHeadersExchange; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; @@ -177,21 +178,15 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor return null; } - if (binding.getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) - { - return createTopic(binding); - } - else if (binding.getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + try { - return createQueue(binding); + return AMQDestination.createDestination(binding); } - else if (binding.getExchangeClass().equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) + catch (IllegalArgumentException iaw) { - return createHeaderExchange(binding); + _logger.warn("Binding: '" + binding + "' not supported"); + return null; } - - _logger.warn("Binding: '" + binding + "' not supported"); - return null; } /** diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index fdf50a7609..7f76baa157 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -119,8 +119,17 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setJMSPriority(100); // Queue - Queue q = //_session.createTemporaryQueue(); - q = new AMQQueue("TestReply"); + Queue q; + + if (i / 2 == 0) + { + q = _session.createTemporaryQueue(); + } + else + { + q = new AMQQueue("TestReply"); + } + m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); @@ -173,6 +182,8 @@ public class PropertyValueTest extends TestCase implements MessageListener (int) Integer.MAX_VALUE, m.getIntProperty("Int")); Assert.assertEquals("Check CorrelationID properties are correctly transported", "Correlation", m.getJMSCorrelationID()); + + _logger.warn("getJMSPriority not being verified."); // Assert.assertEquals("Check Priority properties are correctly transported", // 100, m.getJMSPriority()); @@ -180,8 +191,9 @@ public class PropertyValueTest extends TestCase implements MessageListener Assert.assertEquals("Check ReplyTo properties are correctly transported", m.getStringProperty("TempQueue"), m.getJMSReplyTo().toString()); -// Assert.assertEquals("Check Type properties are correctly transported", -// "Test", m.getJMSType()); + Assert.assertEquals("Check Type properties are correctly transported", + "Test", m.getJMSType()); + Assert.assertEquals("Check Short properties are correctly transported", (short) Short.MAX_VALUE, m.getShortProperty("Short")); Assert.assertEquals("Check UnsignedInt properties are correctly transported", |