diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-08 17:02:26 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-08 17:02:26 +0000 |
commit | d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0 (patch) | |
tree | f0c608bcb9e4e5af6cd7ca5245401d2d1716b4f3 /java/client/src | |
parent | 61350c8523e2edca63d8a9ab2c970ad8607d4c0a (diff) | |
download | qpid-python-d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0.tar.gz |
QPID-255 : Patch Supplied by Rob Godfrey - Change to use bespoke AMQShortString rather than converting to String
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
39 files changed, 648 insertions, 423 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 6da0da9f6f..e59b6fbe19 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -308,7 +308,7 @@ public class AMQBrokerDetails implements BrokerDetails } } - //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options + //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options optionsURL.deleteCharAt(optionsURL.length() - 1); return optionsURL.toString(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 58ac49dd4e..a4d0065699 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -965,7 +965,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void resubscribeSessions() throws JMSException, AMQException { ArrayList sessions = new ArrayList(_sessions.values()); - _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove? + _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index c6f3f9c492..b634f48c1e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -25,6 +25,7 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.url.URLHelper; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.naming.Reference; import javax.naming.NamingException; @@ -35,19 +36,27 @@ import javax.jms.Destination; public abstract class AMQDestination implements Destination, Referenceable { - protected final String _exchangeName; + protected final AMQShortString _exchangeName; - protected final String _exchangeClass; + protected final AMQShortString _exchangeClass; - protected final String _destinationName; + protected final AMQShortString _destinationName; - protected boolean _isDurable; + protected final boolean _isDurable; protected final boolean _isExclusive; protected final boolean _isAutoDelete; - protected String _queueName; + private AMQShortString _queueName; + + private String _url; + private AMQShortString _urlAsShortString; + + private byte[] _byteEncoding; + private static final int IS_DURABLE_MASK = 0x1; + private static final int IS_EXCLUSIVE_MASK = 0x2; + private static final int IS_AUTODELETE_MASK = 0x4; protected AMQDestination(String url) throws URLSyntaxException { @@ -63,27 +72,27 @@ public abstract class AMQDestination implements Destination, Referenceable _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); - _queueName = binding.getQueueName(); + _queueName = new AMQShortString(binding.getQueueName()); } - protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName) + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName) { this(exchangeName, exchangeClass, destinationName, false, false, queueName); } - protected AMQDestination(String exchangeName, String exchangeClass, String destinationName) + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName) { this(exchangeName, exchangeClass, destinationName, false, false, null); } - protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, - boolean isAutoDelete, String queueName) + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName) { this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false); } - protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, - boolean isAutoDelete, String queueName, boolean isDurable) + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { if (destinationName == null) { @@ -106,9 +115,13 @@ public abstract class AMQDestination implements Destination, Referenceable _isDurable = isDurable; } - public String getEncodedName() + public AMQShortString getEncodedName() { - return toURL(); + if(_urlAsShortString == null) + { + toURL(); + } + return _urlAsShortString; } public boolean isDurable() @@ -116,12 +129,12 @@ public abstract class AMQDestination implements Destination, Referenceable return _isDurable; } - public String getExchangeName() + public AMQShortString getExchangeName() { return _exchangeName; } - public String getExchangeClass() + public AMQShortString getExchangeClass() { return _exchangeClass; } @@ -136,22 +149,34 @@ public abstract class AMQDestination implements Destination, Referenceable return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName); } - public String getDestinationName() + public AMQShortString getDestinationName() { return _destinationName; } public String getQueueName() { + return _queueName == null ? null : _queueName.toString(); + } + + public AMQShortString getAMQQueueName() + { return _queueName; } - public void setQueueName(String queueName) + + + public void setQueueName(AMQShortString queueName) { + _queueName = queueName; + // calculated URL now out of date + _url = null; + _urlAsShortString = null; + _byteEncoding = null; } - public abstract String getRoutingKey(); + public abstract AMQShortString getRoutingKey(); public boolean isExclusive() { @@ -179,53 +204,114 @@ public abstract class AMQDestination implements Destination, Referenceable public String toURL() { - StringBuffer sb = new StringBuffer(); + String url = _url; + if(url == null) + { - sb.append(_exchangeClass); - sb.append("://"); - sb.append(_exchangeName); - sb.append("/"); + StringBuffer sb = new StringBuffer(); - if (_destinationName != null) - { - sb.append(_destinationName); - } + sb.append(_exchangeClass); + sb.append("://"); + sb.append(_exchangeName); - sb.append("/"); + sb.append('/'); - if (_queueName != null) - { - sb.append(_queueName); - } + if (_destinationName != null) + { + sb.append(_destinationName); + } - sb.append("?"); + sb.append('/'); - if (_isDurable) - { - sb.append(BindingURL.OPTION_DURABLE); - sb.append("='true'"); - sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); - } + if (_queueName != null) + { + sb.append(_queueName); + } - if (_isExclusive) - { - sb.append(BindingURL.OPTION_EXCLUSIVE); - sb.append("='true'"); - sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); - } + sb.append('?'); - if (_isAutoDelete) - { - sb.append(BindingURL.OPTION_AUTODELETE); - sb.append("='true'"); - sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + if (_isDurable) + { + sb.append(BindingURL.OPTION_DURABLE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_isExclusive) + { + sb.append(BindingURL.OPTION_EXCLUSIVE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_isAutoDelete) + { + sb.append(BindingURL.OPTION_AUTODELETE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + //removeKey the last char '?' if there is no options , ',' if there are. + sb.deleteCharAt(sb.length() - 1); + url = sb.toString(); + _url = url; + _urlAsShortString = new AMQShortString(url); } + return url; + } - //remove the last char '?' if there is no options , ',' if there are. - sb.deleteCharAt(sb.length() - 1); + public byte[] toByteEncoding() + { + byte[] encoding = _byteEncoding; + if(encoding == null) + { + int size = _exchangeClass.length() + 1 + + _exchangeName.length() + 1 + + (_destinationName == null ? 0 : _destinationName.length()) + 1 + + (_queueName == null ? 0 : _queueName.length()) + 1 + + 1; + encoding = new byte[size]; + int pos = 0; + + pos = _exchangeClass.writeToByteArray(encoding, pos); + pos = _exchangeName.writeToByteArray(encoding, pos); + if(_destinationName == null) + { + encoding[pos++] = (byte)0; + } + else + { + pos = _destinationName.writeToByteArray(encoding,pos); + } + if(_queueName == null) + { + encoding[pos++] = (byte)0; + } + else + { + pos = _queueName.writeToByteArray(encoding,pos); + } + byte options = 0; + if(_isDurable) + { + options |= IS_DURABLE_MASK; + } + if(_isExclusive) + { + options |= IS_EXCLUSIVE_MASK; + } + if(_isAutoDelete) + { + options |= IS_AUTODELETE_MASK; + } + encoding[pos] = options; + + + _byteEncoding = encoding; - return sb.toString(); + } + return encoding; } public boolean equals(Object o) @@ -293,9 +379,55 @@ public abstract class AMQDestination implements Destination, Referenceable null); // factory location } + + public static Destination createDestination(byte[] byteEncodedDestination) + { + AMQShortString exchangeClass; + AMQShortString exchangeName; + AMQShortString destinationName; + AMQShortString queueName; + boolean isDurable; + boolean isExclusive; + boolean isAutoDelete; + + int pos = 0; + exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= exchangeClass.length() + 1; + exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= exchangeName.length() + 1; + destinationName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= (destinationName == null ? 0 : destinationName.length()) + 1; + queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= (queueName == null ? 0 : queueName.length()) + 1; + int options = byteEncodedDestination[pos]; + isDurable = (options & IS_DURABLE_MASK) != 0; + isExclusive = (options & IS_EXCLUSIVE_MASK) != 0; + isAutoDelete = (options & IS_AUTODELETE_MASK) != 0; + + if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable); + } + else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable); + } + else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) + { + return new AMQHeadersExchange(destinationName); + } + else + { + throw new IllegalArgumentException("Unknown Exchange Class:" + exchangeClass); + } + + + + } + public static Destination createDestination(BindingURL binding) { - String type = binding.getExchangeClass(); + AMQShortString type = binding.getExchangeClass(); if (type.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index c6d21c0ea7..b3dea770fa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.url.BindingURL; +import org.apache.qpid.framing.AMQShortString; /** * A destination backed by a headers exchange @@ -33,12 +34,17 @@ public class AMQHeadersExchange extends AMQDestination this(binding.getExchangeName()); } - public AMQHeadersExchange(String queueName) + public AMQHeadersExchange(String name) + { + this(new AMQShortString(name)); + } + + public AMQHeadersExchange(AMQShortString queueName) { super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null); } - public String getRoutingKey() + public AMQShortString getRoutingKey() { return getDestinationName(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 6c0da6112a..39a5ffc0b8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import org.apache.qpid.url.BindingURL; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.jms.Queue; @@ -42,12 +43,22 @@ public class AMQQueue extends AMQDestination implements Queue * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. * @param name the name of the queue */ - public AMQQueue(String name) + public AMQQueue(AMQShortString name) { this(name, false); } /** + * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. + * @param name the name of the queue + */ + public AMQQueue(String name) + { + this(new AMQShortString(name), false); + } + + + /** * Create a queue with a specified name. * * @param name the destination name (used in the routing key) @@ -56,10 +67,23 @@ public class AMQQueue extends AMQDestination implements Queue */ public AMQQueue(String name, boolean temporary) { + this(new AMQShortString(name),temporary); + } + + + /** + * Create a queue with a specified name. + * + * @param name the destination name (used in the routing key) + * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted + * and exclusive + */ + public AMQQueue(AMQShortString name, boolean temporary) + { // queue name is set to null indicating that the broker assigns a name in the case of temporary queues // temporary queues are typically used as response queues - this(name, temporary?null:name, temporary, temporary); - _isDurable = !temporary; + this(name, temporary?null:name, temporary, temporary, !temporary); + } /** @@ -69,16 +93,22 @@ public class AMQQueue extends AMQDestination implements Queue * @param exclusive true if the queue should only permit a single consumer * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches */ - public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete) + public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete) + { + this(destinationName, queueName, exclusive, autoDelete, false); + } + + + public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, - autoDelete, queueName); + autoDelete, queueName, durable); } - public String getRoutingKey() + public AMQShortString getRoutingKey() { - return getQueueName(); + return getAMQQueueName(); } public boolean isNameRequired() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0dfd469d8d..be240cc39e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -26,10 +26,7 @@ import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.client.failover.FailoverSupport; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.JMSStreamMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.util.FlowControllingBlockingQueue; @@ -104,7 +101,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Maps from consumer tag (String) to JMSMessageConsumer instance */ - private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>(); + private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** * Maps from destination to count of JMSMessageConsumers @@ -205,7 +202,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi message.bodies); int errorCode = message.bounceBody.replyCode; - String reason = message.bounceBody.replyText; + AMQShortString reason = message.bounceBody.replyText; _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. @@ -322,14 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized(_connection.getFailoverMutex()) { checkNotClosed(); - try - { - return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } + return new JMSBytesMessage(); } } @@ -338,31 +328,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized(_connection.getFailoverMutex()) { checkNotClosed(); - try - { - return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } + return new JMSMapMessage(); } } public javax.jms.Message createMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } - } + return createBytesMessage(); } public ObjectMessage createObjectMessage() throws JMSException @@ -370,33 +342,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized(_connection.getFailoverMutex()) { checkNotClosed(); - try - { - return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } + return (ObjectMessage) new JMSObjectMessage(); } } public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream"); - msg.setObject(object); - return msg; - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } - } + ObjectMessage msg = createObjectMessage(); + msg.setObject(object); + return msg; } public StreamMessage createStreamMessage() throws JMSException @@ -405,14 +359,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); - try - { - return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE); - } - catch (AMQException e) - { - throw new JMSException("Unable to create text message: " + e); - } + return new JMSStreamMessage(); } } @@ -422,33 +369,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); - try - { - return (TextMessage) _messageFactoryRegistry.createMessage("text/plain"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create text message: " + e); - } + return new JMSTextMessage(); } } public TextMessage createTextMessage(String text) throws JMSException { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain"); - msg.setText(text); - return msg; - } - catch (AMQException e) - { - throw new JMSException("Unable to create text message: " + e); - } - } + + TextMessage msg = createTextMessage(); + msg.setText(text); + return msg; } public boolean getTransacted() throws JMSException @@ -530,7 +460,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi 0, // classId 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - "JMS client closing channel"); // replyText + new AMQShortString("JMS client closing channel")); // replyText _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -1050,12 +980,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void declareExchange(String name, String type) + public void declareExchange(AMQShortString name, AMQShortString type) { declareExchange(name, type, _connection.getProtocolHandler()); } - public void declareExchangeSynch(String name, String type) throws AMQException + public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -1079,7 +1009,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); } - private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) + private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -1106,7 +1036,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. * @throws AMQException */ - private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException + private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException { // For queues (but not topics) we generate the name in the client rather than the // server. This allows the name to be reused on failover if required. In general, @@ -1127,14 +1057,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.isExclusive(), // exclusive true, // nowait false, // passive - amqd.getQueueName(), // queue + amqd.getAMQQueueName(), // queue 0); // ticket protocolHandler.writeFrame(queueDeclare); - return amqd.getQueueName(); + return amqd.getAMQQueueName(); } - private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException + private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -1157,12 +1087,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName * @return the consumer tag generated by the broker */ - private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler, + private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag - String tag = Integer.toString(_nextTag++); + AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); FieldTable arguments = FieldTableFactory.newFieldTable(); if (messageSelector != null && !messageSelector.equals("")) @@ -1282,7 +1212,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (topicName.indexOf('/') == -1) { - return new AMQTopic(topicName); + return new AMQTopic(new AMQShortString(topicName)); } else { @@ -1352,12 +1282,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { + AMQShortString topicName; + if(topic instanceof AMQTopic) + { + topicName = ((AMQTopic)topic).getDestinationName(); + } + else + { + topicName = new AMQShortString(topic.getTopicName()); + } // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getQueueName()) && - !isQueueBound(dest.getQueueName(), topic.getTopicName())) + if (isQueueBound(dest.getAMQQueueName()) && + !isQueueBound(dest.getAMQQueueName(), topicName)) { - deleteQueue(dest.getQueueName()); + deleteQueue(dest.getAMQQueueName()); } } @@ -1369,7 +1308,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return subscriber; } - void deleteQueue(String queueName) throws JMSException + void deleteQueue(AMQShortString queueName) throws JMSException { try { @@ -1461,12 +1400,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - boolean isQueueBound(String queueName) throws JMSException + boolean isQueueBound(AMQShortString queueName) throws JMSException { return isQueueBound(queueName, null); } - boolean isQueueBound(String queueName, String routingKey) throws JMSException + boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -1606,7 +1545,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi declareExchange(amqd, protocolHandler); - String queueName = declareQueue(amqd, protocolHandler); + AMQShortString queueName = declareQueue(amqd, protocolHandler); bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); @@ -1674,7 +1613,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void resubscribeProducers() throws AMQException { ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove + _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey for (Iterator it = producers.iterator(); it.hasNext();) { BasicMessageProducer producer = (BasicMessageProducer) it.next(); @@ -1718,7 +1657,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(channelFlowFrame); } - public void confirmConsumerCancelled(String consumerTag) + public void confirmConsumerCancelled(AMQShortString consumerTag) { BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if((consumer != null) && (consumer.isAutoClose())) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index 81fee69f90..18c655a829 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; + import javax.jms.JMSException; import javax.jms.TemporaryQueue; @@ -38,7 +40,7 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor */ public AMQTemporaryQueue(AMQSession session) { - super("TempQueue" + Long.toString(System.currentTimeMillis()), true); + super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true); _session = session; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 39304f3f4c..9b8a6686d3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import org.apache.qpid.url.BindingURL; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; import javax.jms.Topic; @@ -40,10 +41,15 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(String name) { + this(new AMQShortString(name)); + } + + public AMQTopic(AMQShortString name) + { this(name, true, null, false); } - public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable) + public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); @@ -56,17 +62,17 @@ public class AMQTopic extends AMQDestination implements Topic true); } - public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException + public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { - return connection.getClientID() + ":" + subscriptionName; + return new AMQShortString(connection.getClientID() + ":" + subscriptionName); } public String getTopicName() throws JMSException { - return super.getDestinationName(); + return super.getDestinationName().toString(); } - public String getRoutingKey() + public AMQShortString getRoutingKey() { return getDestinationName(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1033e827de..c5e97a27f6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -22,16 +22,11 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; @@ -74,7 +69,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * The consumer tag allows us to close the consumer by sending a jmsCancel method to the * broker */ - private String _consumerTag; + private AMQShortString _consumerTag; /** * We need to know the channel id when constructing frames @@ -255,17 +250,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); - String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString()); - try - { - Destination dest = AMQDestination.createDestination(new AMQBindingURL(url)); - jmsMsg.setJMSDestination(dest); - } - catch (URLSyntaxException e) - { - _logger.warn("Unable to parse the supplied destination header: " + url); - } - + byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName()); + Destination dest = AMQDestination.createDestination(url); + jmsMsg.setJMSDestination(dest); + } _session.setInRecovery(false); } @@ -498,7 +486,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ void notifyMessage(UnprocessedMessage messageFrame, int channelId) { - if (_logger.isDebugEnabled()) + final boolean debug = _logger.isDebugEnabled(); + + if (debug) { _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag); } @@ -509,7 +499,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer messageFrame.contentHeader, messageFrame.bodies); - _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); + if(debug) + { + _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); + } jmsMessage.setConsumer(this); preDeliver(jmsMessage); @@ -642,12 +635,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session.deregisterConsumer(this); } - public String getConsumerTag() + public AMQShortString getConsumerTag() { return _consumerTag; } - public void setConsumerTag(String consumerTag) + public void setConsumerTag(AMQShortString consumerTag) { _consumerTag = consumerTag; } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index d38e461400..56b8f44e56 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -522,7 +522,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j AbstractJMSMessage message = convertToNativeMessage(origMessage); - message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); + message.getJmsContentHeaderProperties().setBytes(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName(), destination.toByteEncoding()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -534,26 +534,22 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j destination.getRoutingKey(), // routingKey 0); // ticket - long currentTime = 0; - if (!_disableTimestamps) - { - currentTime = System.currentTimeMillis(); - message.setJMSTimestamp(currentTime); - } + + message.prepareForSending(); ByteBuffer payload = message.getData(); BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties(); - if (timeToLive > 0) + if (!_disableTimestamps) { - if (!_disableTimestamps) + final long currentTime = System.currentTimeMillis(); + contentHeaderProperties.setTimestamp(currentTime); + + if (timeToLive > 0) { contentHeaderProperties.setExpiration(currentTime + timeToLive); } - } - else - { - if (!_disableTimestamps) + else { contentHeaderProperties.setExpiration(0); } @@ -561,14 +557,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - int size = (payload != null) ? payload.limit() : 0; - ContentBody[] contentBodies = createContentBodies(payload); - AMQFrame[] frames = new AMQFrame[2 + contentBodies.length]; - for (int i = 0; i < contentBodies.length; i++) + final int size = (payload != null) ? payload.limit() : 0; + final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); + final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; + + if(payload != null) { - frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]); + createContentBodies(payload, frames, 2, _channelId); } - if (contentBodies.length > 0 && _logger.isDebugEnabled()) + + if (contentBodyFrameCount != 0 && _logger.isDebugEnabled()) { _logger.debug("Sending content body frames to " + destination); } @@ -592,10 +590,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (message != origMessage) { - _logger.warn("Updating original message"); + _logger.debug("Updating original message"); origMessage.setJMSPriority(message.getJMSPriority()); origMessage.setJMSTimestamp(message.getJMSTimestamp()); - _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration()); + _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); origMessage.setJMSExpiration(message.getJMSExpiration()); origMessage.setJMSMessageID(message.getJMSMessageID()); } @@ -625,42 +623,52 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * maximum frame size. * * @param payload - * @return the array of content bodies + * @param frames + * @param offset + * @param channelId @return the array of content bodies */ - private ContentBody[] createContentBodies(ByteBuffer payload) + private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) { - if (payload == null || payload.remaining() == 0) - { - return NO_CONTENT_BODIES; - } - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; - int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - final ContentBody[] bodies = new ContentBody[frameCount]; - - if (frameCount == 1) + if (frames.length == offset + 1) { - bodies[0] = new ContentBody(); - bodies[0].payload = payload; + frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload)); } else { - long remaining = dataLength; - for (int i = 0; i < bodies.length; i++) + + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + long remaining = payload.remaining(); + for (int i = offset; i < frames.length; i++) { - bodies[i] = new ContentBody(); - payload.position((int) framePayloadMax * i); + payload.position((int) framePayloadMax * (i-offset)); int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - bodies[i].payload = payload.slice(); + frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice())); + remaining -= length; } } - return bodies; + + } + + private int calculateContentBodyFrameCount(ByteBuffer payload) + { + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + int frameCount; + if(payload == null || payload.remaining() == 0) + { + frameCount = 0; + } + else + { + int dataLength = payload.remaining(); + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; + frameCount = (int) (dataLength / framePayloadMax) + lastFrame; + } + return frameCount; } public void setMimeType(String mimeType) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index 3a7b7a7b3d..26e26781c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java +++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -20,23 +20,38 @@ */
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.*;
-public enum CustomJMXProperty
+public enum CustomJMSXProperty
{
JMSX_QPID_JMSDESTINATIONURL,
JMSXGroupID,
JMSXGroupSeq;
+
+ private final AMQShortString _nameAsShortString;
+
+ CustomJMSXProperty()
+ {
+ _nameAsShortString = new AMQShortString(toString());
+ }
+
+ public AMQShortString getShortStringName()
+ {
+ return _nameAsShortString;
+ }
+
private static Enumeration _names;
public static synchronized Enumeration asEnumeration()
{
if(_names == null)
{
- CustomJMXProperty[] properties = values();
+ CustomJMSXProperty[] properties = values();
ArrayList<String> nameList = new ArrayList<String>(properties.length);
- for(CustomJMXProperty property : properties)
+ for(CustomJMSXProperty property : properties)
{
nameList.add(property.toString());
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java index 9ee802ff10..7749bded2c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java +++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -56,7 +56,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData public Enumeration getJMSXPropertyNames() throws JMSException { - return CustomJMXProperty.asEnumeration(); + return CustomJMSXProperty.asEnumeration(); } public int getProviderMajorVersion() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 278f0906ea..dbc1512b2f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.AMQShortString; public class ChannelCloseMethodHandler implements StateAwareMethodListener { @@ -51,7 +52,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); int errorCode = method.replyCode; - String reason = method.replyText; + AMQShortString reason = method.replyText; if (_logger.isDebugEnabled()) { _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); @@ -77,7 +78,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener { _logger.info("Broker responded with Invalid Selector."); - throw new AMQInvalidSelectorException(reason); + throw new AMQInvalidSelectorException(String.valueOf(reason)); } else { @@ -85,6 +86,6 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener } } - evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason); + evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index bbfb100b25..bd1be5d629 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.AMQShortString; public class ConnectionCloseMethodHandler implements StateAwareMethodListener { @@ -56,7 +57,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener //stateManager.changeState(AMQState.CONNECTION_CLOSING); int errorCode = method.replyCode; - String reason = method.replyText; + AMQShortString reason = method.replyText; // TODO: check whether channel id of zero is appropriate // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -75,7 +76,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); - throw new AMQAuthenticationException(errorCode, reason); + throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java index a658e3e787..5580d02895 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java @@ -49,19 +49,20 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener _logger.info("ConnectionRedirect frame received"); ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod(); + String host = method.host.toString(); // the host is in the form hostname:port with the port being optional - int portIndex = method.host.indexOf(':'); - String host; + int portIndex = host.indexOf(':'); + int port; if (portIndex == -1) { - host = method.host; port = DEFAULT_REDIRECT_PORT; } else { - host = method.host.substring(0, portIndex); - port = Integer.parseInt(method.host.substring(portIndex + 1)); + port = Integer.parseInt(host.substring(portIndex + 1)); + host = host.substring(0, portIndex); + } evt.getProtocolSession().failover(host, port); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 8640bbb999..6f206735fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -31,10 +31,7 @@ import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.*; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -122,18 +119,18 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put(ClientProperties.instance.toString(), ps.getClientID()); - clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName()); - clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion()); - clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo()); + clientProperties.setString(ClientProperties.instance.toString(), ps.getClientID()); + clientProperties.setString(ClientProperties.product.toString(), QpidProperties.getProductName()); + clientProperties.setString(ClientProperties.version.toString(), QpidProperties.getReleaseVersion()); + clientProperties.setString(ClientProperties.platform.toString(), getFullSystemInfo()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0, // AMQP version (major, minor) clientProperties, // clientProperties - selectedLocale, // locale - mechanism, // mechanism + new AMQShortString(selectedLocale), // locale + new AMQShortString(mechanism), // mechanism saslResponse)); // response } catch (UnsupportedEncodingException e) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 3592ee4c53..604202a742 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -28,10 +28,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.*; public class ConnectionTuneMethodHandler implements StateAwareMethodListener { @@ -67,10 +64,10 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); session.writeFrame(createTuneOkFrame(evt.getChannelId(), params)); - session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true)); + session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), new AMQShortString(session.getAMQConnection().getVirtualHost()), null, true)); } - protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) + protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 011f7c09ab..5fb8de3690 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -55,7 +55,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage AbstractBytesMessage(ByteBuffer data) { super(data); // this instanties a content header - getJmsContentHeaderProperties().setContentType(getMimeType()); + getJmsContentHeaderProperties().setContentType(getMimeTypeAsShortString()); if (_data == null) { @@ -74,7 +74,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - getJmsContentHeaderProperties().setContentType(getMimeType()); + getJmsContentHeaderProperties().setContentType(getMimeTypeAsShortString()); } public void clearBodyImpl() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 0c29344c37..4a0d3283b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -30,6 +30,7 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.BasicMessageConsumer; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; import javax.jms.Destination; import javax.jms.JMSException; @@ -168,7 +169,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } final AMQDestination amqd = (AMQDestination) destination; - final String encodedDestination = amqd.getEncodedName(); + final AMQShortString encodedDestination = amqd.getEncodedName(); _destinationCache.put(encodedDestination, destination); getJmsContentHeaderProperties().setReplyTo(encodedDestination); } @@ -235,7 +236,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void clearProperties() throws JMSException { - getJmsContentHeaderProperties().getJMSHeaders().clear(); + getJmsContentHeaderProperties().clear(); _readableProperties = false; } @@ -247,139 +248,168 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } + public boolean propertyExists(AMQShortString propertyName) throws JMSException + { + checkPropertyName(propertyName); + return getJmsContentHeaderProperties().propertyExists(propertyName); + } + + public boolean propertyExists(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().propertyExists(propertyName); + return getJmsContentHeaderProperties().propertyExists(propertyName); } + public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException + { + checkPropertyName(propertyName); + + return getJmsContentHeaderProperties().getBoolean(propertyName); + } + + public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getBoolean(propertyName); + return getJmsContentHeaderProperties().getBoolean(propertyName); } public byte getByteProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getByte(propertyName); + return getJmsContentHeaderProperties().getByte(propertyName); } public short getShortProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getShort(propertyName); + return getJmsContentHeaderProperties().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getInteger(propertyName); + return getJmsContentHeaderProperties().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getLong(propertyName); + return getJmsContentHeaderProperties().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getFloat(propertyName); + return getJmsContentHeaderProperties().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getDouble(propertyName); + return getJmsContentHeaderProperties().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getString(propertyName); + return getJmsContentHeaderProperties().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getObject(propertyName); + return getJmsContentHeaderProperties().getObject(propertyName); } public Enumeration getPropertyNames() throws JMSException { - return getJmsContentHeaderProperties().getJMSHeaders().getPropertyNames(); + return getJmsContentHeaderProperties().getPropertyNames(); + } + + public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException + { + checkWritableProperties(); + checkPropertyName(propertyName); + getJmsContentHeaderProperties().setBoolean(propertyName, b); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setBoolean(propertyName, b); + getJmsContentHeaderProperties().setBoolean(propertyName, b); } public void setByteProperty(String propertyName, byte b) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setByte(propertyName, new Byte(b)); + getJmsContentHeaderProperties().setByte(propertyName, new Byte(b)); } public void setShortProperty(String propertyName, short i) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setShort(propertyName, new Short(i)); + getJmsContentHeaderProperties().setShort(propertyName, new Short(i)); } public void setIntProperty(String propertyName, int i) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setInteger(propertyName, new Integer(i)); + getJmsContentHeaderProperties().setInteger(propertyName, new Integer(i)); } public void setLongProperty(String propertyName, long l) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setLong(propertyName, new Long(l)); + getJmsContentHeaderProperties().setLong(propertyName, new Long(l)); } public void setFloatProperty(String propertyName, float f) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setFloat(propertyName, new Float(f)); + getJmsContentHeaderProperties().setFloat(propertyName, new Float(f)); } public void setDoubleProperty(String propertyName, double v) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setDouble(propertyName, new Double(v)); + getJmsContentHeaderProperties().setDouble(propertyName, new Double(v)); } public void setStringProperty(String propertyName, String value) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setString(propertyName, value); + getJmsContentHeaderProperties().setString(propertyName, value); } public void setObjectProperty(String propertyName, Object object) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setObject(propertyName, object); + getJmsContentHeaderProperties().setObject(propertyName, object); } + protected void removeProperty(AMQShortString propertyName) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().remove(propertyName); + } + + protected void removeProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName); + getJmsContentHeaderProperties().remove(propertyName); } public void acknowledgeThis() throws JMSException @@ -421,7 +451,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach */ public abstract String toBodyString() throws JMSException; - public abstract String getMimeType(); + public String getMimeType() + { + return getMimeTypeAsShortString().toString(); + } + + public abstract AMQShortString getMimeTypeAsShortString(); public String toString() { @@ -436,13 +471,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); buf.append("\nAMQ message number: ").append(_deliveryTag); buf.append("\nProperties:"); - if (getJmsContentHeaderProperties().getJMSHeaders().isEmpty()) + if (getJmsContentHeaderProperties().isEmpty()) { buf.append("<NONE>"); } else { - buf.append('\n').append(getJmsContentHeaderProperties().getJMSHeaders()); + buf.append('\n').append(getJmsContentHeaderProperties().getHeaders()); } return buf.toString(); } @@ -458,13 +493,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach getJmsContentHeaderProperties().setHeaders(messageProperties); } - private void checkPropertyName(String propertyName) + private void checkPropertyName(CharSequence propertyName) { if (propertyName == null) { throw new IllegalArgumentException("Property name must not be null"); } - else if ("".equals(propertyName)) + else if (propertyName.length()==0) { throw new IllegalArgumentException("Property name must not be the empty string"); } @@ -537,4 +572,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { _consumer = basicMessageConsumer; } + + public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException + { + checkPropertyName(propertyName); + return getJmsContentHeaderProperties().getBytes(propertyName); + + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index dcff8c348b..4b28a43c64 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -43,16 +43,23 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory List bodies) throws AMQException { ByteBuffer data; + final boolean debug = _logger.isDebugEnabled(); // we optimise the non-fragmented case to avoid copying if (bodies != null && bodies.size() == 1) { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")"); + if(debug) + { + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")"); + } data = ((ContentBody)bodies.get(0)).payload; } else { - _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")"); + if(debug) + { + _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")"); + } data = ByteBuffer.allocate((int)contentHeader.bodySize); // XXX: Is cast a problem? final Iterator it = bodies.iterator(); while (it.hasNext()) @@ -63,7 +70,10 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } data.flip(); } - _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining()); + if(debug) + { + _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining()); + } return createMessage(messageNbr, data, contentHeader); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index d769300c69..ec7ef453eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.BytesMessage; import javax.jms.JMSException; @@ -35,9 +36,11 @@ import java.nio.CharBuffer; public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { - private static final String MIME_TYPE = "application/octet-stream"; + public static final String MIME_TYPE = "application/octet-stream"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - JMSBytesMessage() + + public JMSBytesMessage() { this(null); } @@ -65,9 +68,9 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag _readableMessage = true; } - public String getMimeType() + public AMQShortString getMimeTypeAsShortString() { - return MIME_TYPE; + return MIME_TYPE_SHORT_STRING; } public long getBodyLength() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 88e78a1dad..fcbb6500d4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; @@ -37,10 +38,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm public static final String MIME_TYPE = "jms/map-message"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); private Map<String,Object> _map = new HashMap<String, Object>(); - JMSMapMessage() throws JMSException + public JMSMapMessage() throws JMSException { this(null); } @@ -74,9 +76,9 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm return _map.toString(); } - public String getMimeType() + public AMQShortString getMimeTypeAsShortString() { - return MIME_TYPE; + return MIME_TYPE_SHORT_STRING; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 35c5377f14..ae29cef901 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; import javax.jms.MessageFormatException; @@ -34,14 +35,15 @@ import java.nio.charset.Charset; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { - static final String MIME_TYPE = "application/java-object-stream"; + public static final String MIME_TYPE = "application/java-object-stream"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); private static final int DEFAULT_BUFFER_SIZE = 1024; /** * Creates empty, writable message for use by producers */ - JMSObjectMessage() + public JMSObjectMessage() { this(null); } @@ -54,7 +56,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } - getJmsContentHeaderProperties().setContentType(MIME_TYPE); + getJmsContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); } /** @@ -80,9 +82,9 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag return toString(_data); } - public String getMimeType() + public AMQShortString getMimeTypeAsShortString() { - return MIME_TYPE; + return MIME_TYPE_SHORT_STRING; } public void setObject(Serializable serializable) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 972a5fc8bf..747b97b11c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.*; import java.nio.charset.CharacterCodingException; @@ -34,6 +35,7 @@ import java.nio.charset.Charset; public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage { public static final String MIME_TYPE="jms/stream-message"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); /** @@ -42,7 +44,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea */ private int _byteArrayRemaining = -1; - JMSStreamMessage() + public JMSStreamMessage() { this(null); } @@ -71,9 +73,9 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea _readableMessage = true; } - public String getMimeType() + public AMQShortString getMimeTypeAsShortString() { - return MIME_TYPE; + return MIME_TYPE_SHORT_STRING; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index d8394b0489..f386346dd1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.message; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import org.apache.mina.common.ByteBuffer; @@ -32,15 +33,18 @@ import java.nio.charset.CharacterCodingException; public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { private static final String MIME_TYPE = "text/plain"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + private String _decodedValue; /** * This constant represents the name of a property that is set when the message payload is null. */ - private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL"; + private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL"); + private static final Charset DEFAULT_CHARSET = Charset.defaultCharset(); - JMSTextMessage() throws JMSException + public JMSTextMessage() throws JMSException { this(null, null); } @@ -48,7 +52,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text JMSTextMessage(ByteBuffer data, String encoding) throws JMSException { super(data); // this instantiates a content header - getJmsContentHeaderProperties().setContentType(MIME_TYPE); + getJmsContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); getJmsContentHeaderProperties().setEncoding(encoding); } @@ -56,7 +60,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text throws AMQException { super(deliveryTag, contentHeader, data); - contentHeader.setContentType(MIME_TYPE); + contentHeader.setContentType(MIME_TYPE_SHORT_STRING); _data = data; } @@ -91,9 +95,9 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text _data = data; } - public String getMimeType() + public AMQShortString getMimeTypeAsShortString() { - return MIME_TYPE; + return MIME_TYPE_SHORT_STRING; } public void setText(String text) throws JMSException @@ -109,13 +113,14 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text _data.limit(text.length()) ; //_data.sweep(); _data.setAutoExpand(true); - if (getJmsContentHeaderProperties().getEncoding() == null) + final String encoding = getJmsContentHeaderProperties().getEncoding(); + if (encoding == null) { _data.put(text.getBytes()); } else { - _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding())); + _data.put(text.getBytes(encoding)); } _changedData=true; } @@ -164,7 +169,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { try { - _decodedValue = _data.getString(Charset.defaultCharset().newDecoder()); + _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder()); } catch (CharacterCodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 348988f06d..df7537f1e8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.message; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; import java.util.HashMap; @@ -31,7 +32,8 @@ import java.util.List; public class MessageFactoryRegistry { - private final Map _mimeToFactoryMap = new HashMap(); + private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>(); + private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>(); public void registerFactory(String mimeType, MessageFactory mf) { @@ -39,12 +41,14 @@ public class MessageFactoryRegistry { throw new IllegalArgumentException("Message factory must not be null"); } - _mimeToFactoryMap.put(mimeType, mf); + _mimeStringToFactoryMap.put(mimeType, mf); + _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf); } public MessageFactory deregisterFactory(String mimeType) { - return (MessageFactory) _mimeToFactoryMap.remove(mimeType); + _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType)); + return _mimeStringToFactoryMap.remove(mimeType); } /** @@ -63,7 +67,7 @@ public class MessageFactoryRegistry List bodies) throws AMQException, JMSException { BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; - MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(properties.getContentType()); + MessageFactory mf = _mimeShortStringToFactoryMap.get(properties.getContentTypeShortString()); if (mf == null) { throw new AMQException("Unsupport MIME type of " + properties.getContentType()); @@ -80,7 +84,7 @@ public class MessageFactoryRegistry { throw new IllegalArgumentException("Mime type must not be null"); } - MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(mimeType); + MessageFactory mf = _mimeStringToFactoryMap.get(mimeType); if (mf == null) { throw new AMQException("Unsupport MIME type of " + mimeType); @@ -101,7 +105,7 @@ public class MessageFactoryRegistry mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); mf.registerFactory("text/plain", new JMSTextMessageFactory()); mf.registerFactory("text/xml", new JMSTextMessageFactory()); - mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory()); + mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); mf.registerFactory(null, new JMSBytesMessageFactory()); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index f37af835e1..bd60b2c250 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -37,14 +37,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.ssl.BogusSSLContextFactory; @@ -99,7 +92,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter // We add a proxy for the state manager so that we can substitute the state manager easily in this class. // We substitute the state manager when performing failover - _frameListeners.add(new AMQMethodListener() +/* _frameListeners.add(new AMQMethodListener() { public boolean methodReceived(AMQMethodEvent evt) throws AMQException { @@ -110,7 +103,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _stateManager.error(e); } - }); + });*/ } public boolean isUseSSL() @@ -284,11 +277,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void propagateExceptionToWaiters(Exception e) { _stateManager.error(e); - final Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + if(!_frameListeners.isEmpty()) { - final AMQMethodListener ml = (AMQMethodListener) it.next(); - ml.error(e); + final Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener ml = (AMQMethodListener) it.next(); + ml.error(e); + } } } @@ -296,12 +292,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageReceived(IoSession session, Object message) throws Exception { + final long msgNumber = ++_messageReceivedCount; - if (_messageReceivedCount++ % 1000 == 0) + if (_logger.isDebugEnabled() && (msgNumber % 1000 == 0)) { _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } - Iterator it = _frameListeners.iterator(); + AMQFrame frame = (AMQFrame) message; HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody); @@ -314,13 +311,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter } final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody) frame.bodyFrame, _protocolSession); + try { - boolean wasAnyoneInterested = false; - while (it.hasNext()) + + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); + if(!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } } if (!wasAnyoneInterested) { @@ -329,11 +332,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter } catch (AMQException e) { - it = _frameListeners.iterator(); - while (it.hasNext()) + _stateManager.error(e); + if(!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } } exceptionCaught(session, e); } @@ -359,17 +366,21 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageSent(IoSession session, Object message) throws Exception { - if (_messagesOut++ % 1000 == 0) + final long sentMessages = _messagesOut++; + + final boolean debug = _logger.isDebugEnabled(); + + if (debug && (sentMessages % 1000 == 0)) { _logger.debug("Sent " + _messagesOut + " protocol messages"); } _connection.bytesSent(session.getWrittenBytes()); - if (_logger.isDebugEnabled()) + if (debug) { _logger.debug("Sent frame " + message); } } - +/* public void addFrameListener(AMQMethodListener listener) { _frameListeners.add(listener); @@ -379,7 +390,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _frameListeners.remove(listener); } - + */ public void attainState(AMQState s) throws AMQException { _stateManager.attainState(s); @@ -423,9 +434,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } + catch (AMQException e) + { + throw e; + } finally { - // If we don't remove the listener then no-one will + // If we don't removeKey the listener then no-one will _frameListeners.remove(listener); } @@ -480,7 +495,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter 0, // classId 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - "JMS client is closing the connection."); // replyText + new AMQShortString("JMS client is closing the connection.")); // replyText syncWrite(frame, ConnectionCloseOkBody.class); _protocolSession.closeProtocolSession(); @@ -518,7 +533,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } - public String generateQueueName() + public AMQShortString generateQueueName() { return _protocolSession.generateQueueName(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6a40fd3133..ca622a98ba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -31,11 +31,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.*; import org.apache.commons.lang.StringUtils; import javax.jms.JMSException; @@ -381,7 +377,7 @@ public class AMQProtocolSession implements ProtocolVersionList _protocolHandler.failover(host, port); } - protected String generateQueueName() + protected AMQShortString generateQueueName() { int id; synchronized(_queueIdLock) @@ -390,7 +386,7 @@ public class AMQProtocolSession implements ProtocolVersionList } //get rid of / and : and ; from address for spec conformance String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:",""); - return "tmp_" + localAddress + "_" + id; + return new AMQShortString("tmp_" + localAddress + "_" + id); } /** @@ -407,7 +403,7 @@ public class AMQProtocolSession implements ProtocolVersionList } } - public void confirmConsumerCancelled(int channelId, String consumerTag) + public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { final Integer chId = channelId; final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); diff --git a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java index 4291cb3259..ddbea9a557 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java @@ -73,8 +73,8 @@ public class AmqPlainSaslClient implements SaslClient throw new SaslException("Error handling SASL callbacks: " + e, e); } FieldTable table = FieldTableFactory.newFieldTable(); - table.put("LOGIN", nameCallback.getName()); - table.put("PASSWORD", pwdCallback.getPassword()); + table.setString("LOGIN", nameCallback.getName()); + table.setString("PASSWORD", new String(pwdCallback.getPassword())); return table.getDataAsBytes(); } diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 5497cafed4..b724bbfc05 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -29,6 +29,7 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.framing.AMQShortString; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -232,10 +233,14 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor */ protected Queue createQueue(Object value) { - if (value instanceof String) + if(value instanceof AMQShortString) + { + return new AMQQueue((AMQShortString) value); + } + else if (value instanceof String) { - return new AMQQueue((String) value); + return new AMQQueue(new AMQShortString((String) value)); } else if (value instanceof BindingURL) @@ -251,9 +256,13 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor */ protected Topic createTopic(Object value) { - if (value instanceof String) + if(value instanceof AMQShortString) + { + return new AMQTopic((AMQShortString)value); + } + else if (value instanceof String) { - return new AMQTopic((String) value); + return new AMQTopic(new AMQShortString((String) value)); } else if (value instanceof BindingURL) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index d12ab01bdc..a1763ddc73 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -25,6 +25,8 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import javax.jms.*; @@ -50,10 +52,10 @@ public class RecoverTest extends TestCase Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); + Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME); Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -107,10 +109,10 @@ public class RecoverTest extends TestCase Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); + Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME); Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -171,8 +173,8 @@ public class RecoverTest extends TestCase Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("Q1", "Q1", false, true); - Queue queue2 = new AMQQueue("Q2", "Q2", false, true); + Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + Queue queue2 = new AMQQueue(new AMQShortString("Q2"), new AMQShortString("Q2"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); @@ -210,7 +212,7 @@ public class RecoverTest extends TestCase Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = new AMQQueue("Q1", "Q1", false, true); + Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); MessageProducer producer = consumerSession.createProducer(queue); producer.send(consumerSession.createTextMessage("hello")); MessageConsumer consumer = consumerSession.createConsumer(queue); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java index ad180e3a89..fb347053c7 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java @@ -35,16 +35,20 @@ import junit.framework.TestCase; public class FieldTableKeyEnumeratorTest extends TestCase { + public void testTrue() + { + + } public void testKeyEnumeration() { FieldTable result = FieldTableFactory.newFieldTable(); - result.put("one", 1L); - result.put("two", 2L); - result.put("three", 3L); - result.put("four", 4L); - result.put("five", 5L); + result.setObject("one", 1L); + result.setObject("two", 2L); + result.setObject("three", 3L); + result.setObject("four", 4L); + result.setObject("five", 5L); - Iterator iterator = result.keySet().iterator(); + Iterator iterator = result.keys().iterator(); try { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index f4efd64dbb..5af55d6625 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -85,11 +85,11 @@ public class FieldTableMessageTest extends TestCase implements MessageListener private FieldTable load() throws IOException { FieldTable result = FieldTableFactory.newFieldTable(); - result.put("one", 1L); - result.put("two", 2L); - result.put("three", 3L); - result.put("four", 4L); - result.put("five", 5L); + result.setLong("one", 1L); + result.setLong("two", 2L); + result.setLong("three", 3L); + result.setLong("four", 4L); + result.setLong("five", 5L); return result; } @@ -133,10 +133,9 @@ public class FieldTableMessageTest extends TestCase implements MessageListener { ByteBuffer buffer = ((JMSBytesMessage) m).getData(); FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining()); - for (Object o : _expected.keySet()) - { - String key = (String) o; - assertEquals("Values for " + key + " did not match", _expected.get(key), actual.get(key)); + for (String key : _expected.keys()) + { + assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key)); } } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 17679788bd..7423a3d8f0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -29,12 +29,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.testutil.VMBrokerSetup; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; +import javax.jms.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -81,7 +76,7 @@ public class PropertyValueTest extends TestCase implements MessageListener { _connection = connection; _destination = destination; - _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //set up a slow consumer _session.createConsumer(destination).setMessageListener(this); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index 903f6a9da9..81481bc94d 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -75,7 +75,7 @@ public class TextMessageTest extends TestCase implements MessageListener { _connection = connection; _destination = destination; - _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //set up a slow consumer _session.createConsumer(destination).setMessageListener(this); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java index 22015dbc93..691acbb213 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.unit.client.forwardall; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; /** * Queue that allows several private queues to be registered and bound @@ -29,15 +30,19 @@ import org.apache.qpid.client.AMQQueue; */ class SpecialQueue extends AMQQueue { - private final String name; + private final AMQShortString name; SpecialQueue(String name) { + this(new AMQShortString(name)); + } + SpecialQueue(AMQShortString name) + { super(name, true); this.name = name; } - public String getRoutingKey() + public AMQShortString getRoutingKey() { return name; } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index eee9b2de9f..64898a1b9a 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.client.protocol; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.framing.AMQShortString; import org.apache.mina.common.IoSession; import junit.framework.TestCase; @@ -45,7 +46,7 @@ public class AMQProtocolSessionTest extends TestCase return (TestIoSession) _minaProtocolSession; } - public String genQueueName() + public AMQShortString genQueueName() { return generateQueueName(); } @@ -80,26 +81,26 @@ public class AMQProtocolSessionTest extends TestCase public void testGenerateQueueName() { - String testAddress; + AMQShortString testAddress; - //test address with / and ; chars which generateQueueName should remove + //test address with / and ; chars which generateQueueName should removeKey _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress); _testSession.getMinaProtocolSession().setLocalPort(_port); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress); + assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString()); //test empty address _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress); + assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString()); //test address with no special chars _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress); + assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java index c14b5317c7..23e3b9cc88 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -11,6 +11,7 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -41,7 +42,7 @@ public class JMSDestinationTest extends TestCase {
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 794316d2f5..8a6e279142 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -170,7 +170,7 @@ public class TopicSessionTest extends TestCase con.start(); TextMessage tm = session1.createTextMessage("Hello"); publisher.publish(tm); - tm = (TextMessage) consumer1.receive(2000); + tm = (TextMessage) consumer1.receive(200000L); assertNotNull(tm); String msgText = tm.getText(); assertEquals("Hello", msgText); @@ -178,7 +178,7 @@ public class TopicSessionTest extends TestCase msgText = tm.getText(); assertNull(msgText); publisher.publish(tm); - tm = (TextMessage) consumer1.receive(2000); + tm = (TextMessage) consumer1.receive(20000000L); assertNotNull(tm); msgText = tm.getText(); assertNull(msgText); |