summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-08 17:02:26 +0000
committerRobert Greig <rgreig@apache.org>2007-01-08 17:02:26 +0000
commitd6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0 (patch)
treef0c608bcb9e4e5af6cd7ca5245401d2d1716b4f3 /java/client/src
parent61350c8523e2edca63d8a9ab2c970ad8607d4c0a (diff)
downloadqpid-python-d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0.tar.gz
QPID-255 : Patch Supplied by Rob Godfrey - Change to use bespoke AMQShortString rather than converting to String
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java242
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueue.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java151
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java96
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (renamed from java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java)21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java98
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java77
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java17
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java17
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java4
39 files changed, 648 insertions, 423 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 6da0da9f6f..e59b6fbe19 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -308,7 +308,7 @@ public class AMQBrokerDetails implements BrokerDetails
}
}
- //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+ //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
optionsURL.deleteCharAt(optionsURL.length() - 1);
return optionsURL.toString();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 58ac49dd4e..a4d0065699 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -965,7 +965,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void resubscribeSessions() throws JMSException, AMQException
{
ArrayList sessions = new ArrayList(_sessions.values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove?
+ _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index c6f3f9c492..b634f48c1e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -25,6 +25,7 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.naming.Reference;
import javax.naming.NamingException;
@@ -35,19 +36,27 @@ import javax.jms.Destination;
public abstract class AMQDestination implements Destination, Referenceable
{
- protected final String _exchangeName;
+ protected final AMQShortString _exchangeName;
- protected final String _exchangeClass;
+ protected final AMQShortString _exchangeClass;
- protected final String _destinationName;
+ protected final AMQShortString _destinationName;
- protected boolean _isDurable;
+ protected final boolean _isDurable;
protected final boolean _isExclusive;
protected final boolean _isAutoDelete;
- protected String _queueName;
+ private AMQShortString _queueName;
+
+ private String _url;
+ private AMQShortString _urlAsShortString;
+
+ private byte[] _byteEncoding;
+ private static final int IS_DURABLE_MASK = 0x1;
+ private static final int IS_EXCLUSIVE_MASK = 0x2;
+ private static final int IS_AUTODELETE_MASK = 0x4;
protected AMQDestination(String url) throws URLSyntaxException
{
@@ -63,27 +72,27 @@ public abstract class AMQDestination implements Destination, Referenceable
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
- _queueName = binding.getQueueName();
+ _queueName = new AMQShortString(binding.getQueueName());
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)
{
this(exchangeName, exchangeClass, destinationName, false, false, queueName);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
{
this(exchangeName, exchangeClass, destinationName, false, false, null);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
- boolean isAutoDelete, String queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName)
{
this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
- boolean isAutoDelete, String queueName, boolean isDurable)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
if (destinationName == null)
{
@@ -106,9 +115,13 @@ public abstract class AMQDestination implements Destination, Referenceable
_isDurable = isDurable;
}
- public String getEncodedName()
+ public AMQShortString getEncodedName()
{
- return toURL();
+ if(_urlAsShortString == null)
+ {
+ toURL();
+ }
+ return _urlAsShortString;
}
public boolean isDurable()
@@ -116,12 +129,12 @@ public abstract class AMQDestination implements Destination, Referenceable
return _isDurable;
}
- public String getExchangeName()
+ public AMQShortString getExchangeName()
{
return _exchangeName;
}
- public String getExchangeClass()
+ public AMQShortString getExchangeClass()
{
return _exchangeClass;
}
@@ -136,22 +149,34 @@ public abstract class AMQDestination implements Destination, Referenceable
return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
}
- public String getDestinationName()
+ public AMQShortString getDestinationName()
{
return _destinationName;
}
public String getQueueName()
{
+ return _queueName == null ? null : _queueName.toString();
+ }
+
+ public AMQShortString getAMQQueueName()
+ {
return _queueName;
}
- public void setQueueName(String queueName)
+
+
+ public void setQueueName(AMQShortString queueName)
{
+
_queueName = queueName;
+ // calculated URL now out of date
+ _url = null;
+ _urlAsShortString = null;
+ _byteEncoding = null;
}
- public abstract String getRoutingKey();
+ public abstract AMQShortString getRoutingKey();
public boolean isExclusive()
{
@@ -179,53 +204,114 @@ public abstract class AMQDestination implements Destination, Referenceable
public String toURL()
{
- StringBuffer sb = new StringBuffer();
+ String url = _url;
+ if(url == null)
+ {
- sb.append(_exchangeClass);
- sb.append("://");
- sb.append(_exchangeName);
- sb.append("/");
+ StringBuffer sb = new StringBuffer();
- if (_destinationName != null)
- {
- sb.append(_destinationName);
- }
+ sb.append(_exchangeClass);
+ sb.append("://");
+ sb.append(_exchangeName);
- sb.append("/");
+ sb.append('/');
- if (_queueName != null)
- {
- sb.append(_queueName);
- }
+ if (_destinationName != null)
+ {
+ sb.append(_destinationName);
+ }
- sb.append("?");
+ sb.append('/');
- if (_isDurable)
- {
- sb.append(BindingURL.OPTION_DURABLE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- }
+ if (_queueName != null)
+ {
+ sb.append(_queueName);
+ }
- if (_isExclusive)
- {
- sb.append(BindingURL.OPTION_EXCLUSIVE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- }
+ sb.append('?');
- if (_isAutoDelete)
- {
- sb.append(BindingURL.OPTION_AUTODELETE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ if (_isDurable)
+ {
+ sb.append(BindingURL.OPTION_DURABLE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isExclusive)
+ {
+ sb.append(BindingURL.OPTION_EXCLUSIVE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isAutoDelete)
+ {
+ sb.append(BindingURL.OPTION_AUTODELETE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ //removeKey the last char '?' if there is no options , ',' if there are.
+ sb.deleteCharAt(sb.length() - 1);
+ url = sb.toString();
+ _url = url;
+ _urlAsShortString = new AMQShortString(url);
}
+ return url;
+ }
- //remove the last char '?' if there is no options , ',' if there are.
- sb.deleteCharAt(sb.length() - 1);
+ public byte[] toByteEncoding()
+ {
+ byte[] encoding = _byteEncoding;
+ if(encoding == null)
+ {
+ int size = _exchangeClass.length() + 1 +
+ _exchangeName.length() + 1 +
+ (_destinationName == null ? 0 : _destinationName.length()) + 1 +
+ (_queueName == null ? 0 : _queueName.length()) + 1 +
+ 1;
+ encoding = new byte[size];
+ int pos = 0;
+
+ pos = _exchangeClass.writeToByteArray(encoding, pos);
+ pos = _exchangeName.writeToByteArray(encoding, pos);
+ if(_destinationName == null)
+ {
+ encoding[pos++] = (byte)0;
+ }
+ else
+ {
+ pos = _destinationName.writeToByteArray(encoding,pos);
+ }
+ if(_queueName == null)
+ {
+ encoding[pos++] = (byte)0;
+ }
+ else
+ {
+ pos = _queueName.writeToByteArray(encoding,pos);
+ }
+ byte options = 0;
+ if(_isDurable)
+ {
+ options |= IS_DURABLE_MASK;
+ }
+ if(_isExclusive)
+ {
+ options |= IS_EXCLUSIVE_MASK;
+ }
+ if(_isAutoDelete)
+ {
+ options |= IS_AUTODELETE_MASK;
+ }
+ encoding[pos] = options;
+
+
+ _byteEncoding = encoding;
- return sb.toString();
+ }
+ return encoding;
}
public boolean equals(Object o)
@@ -293,9 +379,55 @@ public abstract class AMQDestination implements Destination, Referenceable
null); // factory location
}
+
+ public static Destination createDestination(byte[] byteEncodedDestination)
+ {
+ AMQShortString exchangeClass;
+ AMQShortString exchangeName;
+ AMQShortString destinationName;
+ AMQShortString queueName;
+ boolean isDurable;
+ boolean isExclusive;
+ boolean isAutoDelete;
+
+ int pos = 0;
+ exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= exchangeClass.length() + 1;
+ exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= exchangeName.length() + 1;
+ destinationName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= (destinationName == null ? 0 : destinationName.length()) + 1;
+ queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= (queueName == null ? 0 : queueName.length()) + 1;
+ int options = byteEncodedDestination[pos];
+ isDurable = (options & IS_DURABLE_MASK) != 0;
+ isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
+ isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
+
+ if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+ }
+ else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable);
+ }
+ else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
+ {
+ return new AMQHeadersExchange(destinationName);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown Exchange Class:" + exchangeClass);
+ }
+
+
+
+ }
+
public static Destination createDestination(BindingURL binding)
{
- String type = binding.getExchangeClass();
+ AMQShortString type = binding.getExchangeClass();
if (type.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
index c6d21c0ea7..b3dea770fa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.framing.AMQShortString;
/**
* A destination backed by a headers exchange
@@ -33,12 +34,17 @@ public class AMQHeadersExchange extends AMQDestination
this(binding.getExchangeName());
}
- public AMQHeadersExchange(String queueName)
+ public AMQHeadersExchange(String name)
+ {
+ this(new AMQShortString(name));
+ }
+
+ public AMQHeadersExchange(AMQShortString queueName)
{
super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return getDestinationName();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 6c0da6112a..39a5ffc0b8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.Queue;
@@ -42,12 +43,22 @@ public class AMQQueue extends AMQDestination implements Queue
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
*/
- public AMQQueue(String name)
+ public AMQQueue(AMQShortString name)
{
this(name, false);
}
/**
+ * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+ * @param name the name of the queue
+ */
+ public AMQQueue(String name)
+ {
+ this(new AMQShortString(name), false);
+ }
+
+
+ /**
* Create a queue with a specified name.
*
* @param name the destination name (used in the routing key)
@@ -56,10 +67,23 @@ public class AMQQueue extends AMQDestination implements Queue
*/
public AMQQueue(String name, boolean temporary)
{
+ this(new AMQShortString(name),temporary);
+ }
+
+
+ /**
+ * Create a queue with a specified name.
+ *
+ * @param name the destination name (used in the routing key)
+ * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
+ * and exclusive
+ */
+ public AMQQueue(AMQShortString name, boolean temporary)
+ {
// queue name is set to null indicating that the broker assigns a name in the case of temporary queues
// temporary queues are typically used as response queues
- this(name, temporary?null:name, temporary, temporary);
- _isDurable = !temporary;
+ this(name, temporary?null:name, temporary, temporary, !temporary);
+
}
/**
@@ -69,16 +93,22 @@ public class AMQQueue extends AMQDestination implements Queue
* @param exclusive true if the queue should only permit a single consumer
* @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
*/
- public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete)
+ public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
+ {
+ this(destinationName, queueName, exclusive, autoDelete, false);
+ }
+
+
+ public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
- autoDelete, queueName);
+ autoDelete, queueName, durable);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
- return getQueueName();
+ return getAMQQueueName();
}
public boolean isNameRequired()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0dfd469d8d..be240cc39e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -26,10 +26,7 @@ import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
@@ -104,7 +101,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Maps from consumer tag (String) to JMSMessageConsumer instance
*/
- private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
+ private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Maps from destination to count of JMSMessageConsumers
@@ -205,7 +202,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
message.bodies);
int errorCode = message.bounceBody.replyCode;
- String reason = message.bounceBody.replyText;
+ AMQShortString reason = message.bounceBody.replyText;
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -322,14 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return new JMSBytesMessage();
}
}
@@ -338,31 +328,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return new JMSMapMessage();
}
}
public javax.jms.Message createMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
+ return createBytesMessage();
}
public ObjectMessage createObjectMessage() throws JMSException
@@ -370,33 +342,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return (ObjectMessage) new JMSObjectMessage();
}
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- msg.setObject(object);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
+ ObjectMessage msg = createObjectMessage();
+ msg.setObject(object);
+ return msg;
}
public StreamMessage createStreamMessage() throws JMSException
@@ -405,14 +359,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
- try
- {
- return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
+ return new JMSStreamMessage();
}
}
@@ -422,33 +369,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
- try
- {
- return (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
+ return new JMSTextMessage();
}
}
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- msg.setText(text);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
- }
+
+ TextMessage msg = createTextMessage();
+ msg.setText(text);
+ return msg;
}
public boolean getTransacted() throws JMSException
@@ -530,7 +460,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "JMS client closing channel"); // replyText
+ new AMQShortString("JMS client closing channel")); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -1050,12 +980,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
- public void declareExchange(String name, String type)
+ public void declareExchange(AMQShortString name, AMQShortString type)
{
declareExchange(name, type, _connection.getProtocolHandler());
}
- public void declareExchangeSynch(String name, String type) throws AMQException
+ public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1079,7 +1009,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
}
- private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
+ private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1106,7 +1036,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
* @throws AMQException
*/
- private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+ private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
{
// For queues (but not topics) we generate the name in the client rather than the
// server. This allows the name to be reused on failover if required. In general,
@@ -1127,14 +1057,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqd.isExclusive(), // exclusive
true, // nowait
false, // passive
- amqd.getQueueName(), // queue
+ amqd.getAMQQueueName(), // queue
0); // ticket
protocolHandler.writeFrame(queueDeclare);
- return amqd.getQueueName();
+ return amqd.getAMQQueueName();
}
- private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+ private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1157,12 +1087,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param queueName
* @return the consumer tag generated by the broker
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
+ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
- String tag = Integer.toString(_nextTag++);
+ AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
FieldTable arguments = FieldTableFactory.newFieldTable();
if (messageSelector != null && !messageSelector.equals(""))
@@ -1282,7 +1212,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(topicName);
+ return new AMQTopic(new AMQShortString(topicName));
}
else
{
@@ -1352,12 +1282,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
+ AMQShortString topicName;
+ if(topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic)topic).getDestinationName();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getQueueName()) &&
- !isQueueBound(dest.getQueueName(), topic.getTopicName()))
+ if (isQueueBound(dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getAMQQueueName(), topicName))
{
- deleteQueue(dest.getQueueName());
+ deleteQueue(dest.getAMQQueueName());
}
}
@@ -1369,7 +1308,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return subscriber;
}
- void deleteQueue(String queueName) throws JMSException
+ void deleteQueue(AMQShortString queueName) throws JMSException
{
try
{
@@ -1461,12 +1400,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- boolean isQueueBound(String queueName) throws JMSException
+ boolean isQueueBound(AMQShortString queueName) throws JMSException
{
return isQueueBound(queueName, null);
}
- boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1606,7 +1545,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
declareExchange(amqd, protocolHandler);
- String queueName = declareQueue(amqd, protocolHandler);
+ AMQShortString queueName = declareQueue(amqd, protocolHandler);
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
@@ -1674,7 +1613,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void resubscribeProducers() throws AMQException
{
ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove
+ _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
BasicMessageProducer producer = (BasicMessageProducer) it.next();
@@ -1718,7 +1657,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
- public void confirmConsumerCancelled(String consumerTag)
+ public void confirmConsumerCancelled(AMQShortString consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
if((consumer != null) && (consumer.isAutoClose()))
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
index 81fee69f90..18c655a829 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
@@ -38,7 +40,7 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor
*/
public AMQTemporaryQueue(AMQSession session)
{
- super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+ super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
_session = session;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 39304f3f4c..9b8a6686d3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.Topic;
@@ -40,10 +41,15 @@ public class AMQTopic extends AMQDestination implements Topic
public AMQTopic(String name)
{
+ this(new AMQShortString(name));
+ }
+
+ public AMQTopic(AMQShortString name)
+ {
this(name, true, null, false);
}
- public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+ public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
queueName, isDurable);
@@ -56,17 +62,17 @@ public class AMQTopic extends AMQDestination implements Topic
true);
}
- public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
+ public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
- return connection.getClientID() + ":" + subscriptionName;
+ return new AMQShortString(connection.getClientID() + ":" + subscriptionName);
}
public String getTopicName() throws JMSException
{
- return super.getDestinationName();
+ return super.getDestinationName().toString();
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return getDestinationName();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 1033e827de..c5e97a27f6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -22,16 +22,11 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -74,7 +69,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* The consumer tag allows us to close the consumer by sending a jmsCancel method to the
* broker
*/
- private String _consumerTag;
+ private AMQShortString _consumerTag;
/**
* We need to know the channel id when constructing frames
@@ -255,17 +250,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
- String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString());
- try
- {
- Destination dest = AMQDestination.createDestination(new AMQBindingURL(url));
- jmsMsg.setJMSDestination(dest);
- }
- catch (URLSyntaxException e)
- {
- _logger.warn("Unable to parse the supplied destination header: " + url);
- }
-
+ byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
+ Destination dest = AMQDestination.createDestination(url);
+ jmsMsg.setJMSDestination(dest);
+
}
_session.setInRecovery(false);
}
@@ -498,7 +486,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
{
- if (_logger.isDebugEnabled())
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug)
{
_logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
}
@@ -509,7 +499,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
messageFrame.contentHeader,
messageFrame.bodies);
- _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ if(debug)
+ {
+ _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ }
jmsMessage.setConsumer(this);
preDeliver(jmsMessage);
@@ -642,12 +635,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session.deregisterConsumer(this);
}
- public String getConsumerTag()
+ public AMQShortString getConsumerTag()
{
return _consumerTag;
}
- public void setConsumerTag(String consumerTag)
+ public void setConsumerTag(AMQShortString consumerTag)
{
_consumerTag = consumerTag;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index d38e461400..56b8f44e56 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -522,7 +522,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
+ message.getJmsContentHeaderProperties().setBytes(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName(), destination.toByteEncoding());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
@@ -534,26 +534,22 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
destination.getRoutingKey(), // routingKey
0); // ticket
- long currentTime = 0;
- if (!_disableTimestamps)
- {
- currentTime = System.currentTimeMillis();
- message.setJMSTimestamp(currentTime);
- }
+
+
message.prepareForSending();
ByteBuffer payload = message.getData();
BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
- if (timeToLive > 0)
+ if (!_disableTimestamps)
{
- if (!_disableTimestamps)
+ final long currentTime = System.currentTimeMillis();
+ contentHeaderProperties.setTimestamp(currentTime);
+
+ if (timeToLive > 0)
{
contentHeaderProperties.setExpiration(currentTime + timeToLive);
}
- }
- else
- {
- if (!_disableTimestamps)
+ else
{
contentHeaderProperties.setExpiration(0);
}
@@ -561,14 +557,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- int size = (payload != null) ? payload.limit() : 0;
- ContentBody[] contentBodies = createContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
- for (int i = 0; i < contentBodies.length; i++)
+ final int size = (payload != null) ? payload.limit() : 0;
+ final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+ final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+ if(payload != null)
{
- frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]);
+ createContentBodies(payload, frames, 2, _channelId);
}
- if (contentBodies.length > 0 && _logger.isDebugEnabled())
+
+ if (contentBodyFrameCount != 0 && _logger.isDebugEnabled())
{
_logger.debug("Sending content body frames to " + destination);
}
@@ -592,10 +590,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (message != origMessage)
{
- _logger.warn("Updating original message");
+ _logger.debug("Updating original message");
origMessage.setJMSPriority(message.getJMSPriority());
origMessage.setJMSTimestamp(message.getJMSTimestamp());
- _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration());
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
@@ -625,42 +623,52 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* maximum frame size.
*
* @param payload
- * @return the array of content bodies
+ * @param frames
+ * @param offset
+ * @param channelId @return the array of content bodies
*/
- private ContentBody[] createContentBodies(ByteBuffer payload)
+ private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
{
- if (payload == null || payload.remaining() == 0)
- {
- return NO_CONTENT_BODIES;
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- final ContentBody[] bodies = new ContentBody[frameCount];
-
- if (frameCount == 1)
+ if (frames.length == offset + 1)
{
- bodies[0] = new ContentBody();
- bodies[0].payload = payload;
+ frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload));
}
else
{
- long remaining = dataLength;
- for (int i = 0; i < bodies.length; i++)
+
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ long remaining = payload.remaining();
+ for (int i = offset; i < frames.length; i++)
{
- bodies[i] = new ContentBody();
- payload.position((int) framePayloadMax * i);
+ payload.position((int) framePayloadMax * (i-offset));
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- bodies[i].payload = payload.slice();
+ frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice()));
+
remaining -= length;
}
}
- return bodies;
+
+ }
+
+ private int calculateContentBodyFrameCount(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if(payload == null || payload.remaining() == 0)
+ {
+ frameCount = 0;
+ }
+ else
+ {
+ int dataLength = payload.remaining();
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
+ frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+ }
+ return frameCount;
}
public void setMimeType(String mimeType) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
index 3a7b7a7b3d..26e26781c0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
+++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
@@ -20,23 +20,38 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.*;
-public enum CustomJMXProperty
+public enum CustomJMSXProperty
{
JMSX_QPID_JMSDESTINATIONURL,
JMSXGroupID,
JMSXGroupSeq;
+
+ private final AMQShortString _nameAsShortString;
+
+ CustomJMSXProperty()
+ {
+ _nameAsShortString = new AMQShortString(toString());
+ }
+
+ public AMQShortString getShortStringName()
+ {
+ return _nameAsShortString;
+ }
+
private static Enumeration _names;
public static synchronized Enumeration asEnumeration()
{
if(_names == null)
{
- CustomJMXProperty[] properties = values();
+ CustomJMSXProperty[] properties = values();
ArrayList<String> nameList = new ArrayList<String>(properties.length);
- for(CustomJMXProperty property : properties)
+ for(CustomJMSXProperty property : properties)
{
nameList.add(property.toString());
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
index 9ee802ff10..7749bded2c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
@@ -56,7 +56,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData
public Enumeration getJMSXPropertyNames() throws JMSException
{
- return CustomJMXProperty.asEnumeration();
+ return CustomJMSXProperty.asEnumeration();
}
public int getProviderMajorVersion() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 278f0906ea..dbc1512b2f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.AMQShortString;
public class ChannelCloseMethodHandler implements StateAwareMethodListener
{
@@ -51,7 +52,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
int errorCode = method.replyCode;
- String reason = method.replyText;
+ AMQShortString reason = method.replyText;
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
@@ -77,7 +78,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
{
_logger.info("Broker responded with Invalid Selector.");
- throw new AMQInvalidSelectorException(reason);
+ throw new AMQInvalidSelectorException(String.valueOf(reason));
}
else
{
@@ -85,6 +86,6 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
}
}
- evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
+ evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index bbfb100b25..bd1be5d629 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -31,6 +31,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.AMQShortString;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener
{
@@ -56,7 +57,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
//stateManager.changeState(AMQState.CONNECTION_CLOSING);
int errorCode = method.replyCode;
- String reason = method.replyText;
+ AMQShortString reason = method.replyText;
// TODO: check whether channel id of zero is appropriate
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -75,7 +76,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
//todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
- throw new AMQAuthenticationException(errorCode, reason);
+ throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
index a658e3e787..5580d02895 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
@@ -49,19 +49,20 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
_logger.info("ConnectionRedirect frame received");
ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
+ String host = method.host.toString();
// the host is in the form hostname:port with the port being optional
- int portIndex = method.host.indexOf(':');
- String host;
+ int portIndex = host.indexOf(':');
+
int port;
if (portIndex == -1)
{
- host = method.host;
port = DEFAULT_REDIRECT_PORT;
}
else
{
- host = method.host.substring(0, portIndex);
- port = Integer.parseInt(method.host.substring(portIndex + 1));
+ port = Integer.parseInt(host.substring(portIndex + 1));
+ host = host.substring(0, portIndex);
+
}
evt.getProtocolSession().failover(host, port);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 8640bbb999..6f206735fe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -31,10 +31,7 @@ import org.apache.qpid.client.security.CallbackHandlerRegistry;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.*;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -122,18 +119,18 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
- clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
- clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
- clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
+ clientProperties.setString(ClientProperties.instance.toString(), ps.getClientID());
+ clientProperties.setString(ClientProperties.product.toString(), QpidProperties.getProductName());
+ clientProperties.setString(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
+ clientProperties.setString(ClientProperties.platform.toString(), getFullSystemInfo());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
(byte)8, (byte)0, // AMQP version (major, minor)
clientProperties, // clientProperties
- selectedLocale, // locale
- mechanism, // mechanism
+ new AMQShortString(selectedLocale), // locale
+ new AMQShortString(mechanism), // mechanism
saslResponse)); // response
}
catch (UnsupportedEncodingException e)
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index 3592ee4c53..604202a742 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -28,10 +28,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.*;
public class ConnectionTuneMethodHandler implements StateAwareMethodListener
{
@@ -67,10 +64,10 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
session.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
- session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
+ session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), new AMQShortString(session.getAMQConnection().getVirtualHost()), null, true));
}
- protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
+ protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 011f7c09ab..5fb8de3690 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -55,7 +55,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
AbstractBytesMessage(ByteBuffer data)
{
super(data); // this instanties a content header
- getJmsContentHeaderProperties().setContentType(getMimeType());
+ getJmsContentHeaderProperties().setContentType(getMimeTypeAsShortString());
if (_data == null)
{
@@ -74,7 +74,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
- getJmsContentHeaderProperties().setContentType(getMimeType());
+ getJmsContentHeaderProperties().setContentType(getMimeTypeAsShortString());
}
public void clearBodyImpl() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 0c29344c37..4a0d3283b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -30,6 +30,7 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -168,7 +169,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
final AMQDestination amqd = (AMQDestination) destination;
- final String encodedDestination = amqd.getEncodedName();
+ final AMQShortString encodedDestination = amqd.getEncodedName();
_destinationCache.put(encodedDestination, destination);
getJmsContentHeaderProperties().setReplyTo(encodedDestination);
}
@@ -235,7 +236,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void clearProperties() throws JMSException
{
- getJmsContentHeaderProperties().getJMSHeaders().clear();
+ getJmsContentHeaderProperties().clear();
_readableProperties = false;
}
@@ -247,139 +248,168 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
+ public boolean propertyExists(AMQShortString propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ return getJmsContentHeaderProperties().propertyExists(propertyName);
+ }
+
+
public boolean propertyExists(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().propertyExists(propertyName);
+ return getJmsContentHeaderProperties().propertyExists(propertyName);
}
+ public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ return getJmsContentHeaderProperties().getBoolean(propertyName);
+ }
+
+
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getBoolean(propertyName);
+ return getJmsContentHeaderProperties().getBoolean(propertyName);
}
public byte getByteProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getByte(propertyName);
+ return getJmsContentHeaderProperties().getByte(propertyName);
}
public short getShortProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getShort(propertyName);
+ return getJmsContentHeaderProperties().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getInteger(propertyName);
+ return getJmsContentHeaderProperties().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getLong(propertyName);
+ return getJmsContentHeaderProperties().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getFloat(propertyName);
+ return getJmsContentHeaderProperties().getFloat(propertyName);
}
public double getDoubleProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getDouble(propertyName);
+ return getJmsContentHeaderProperties().getDouble(propertyName);
}
public String getStringProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getString(propertyName);
+ return getJmsContentHeaderProperties().getString(propertyName);
}
public Object getObjectProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getJMSHeaders().getObject(propertyName);
+ return getJmsContentHeaderProperties().getObject(propertyName);
}
public Enumeration getPropertyNames() throws JMSException
{
- return getJmsContentHeaderProperties().getJMSHeaders().getPropertyNames();
+ return getJmsContentHeaderProperties().getPropertyNames();
+ }
+
+ public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
+ {
+ checkWritableProperties();
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().setBoolean(propertyName, b);
}
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setBoolean(propertyName, b);
+ getJmsContentHeaderProperties().setBoolean(propertyName, b);
}
public void setByteProperty(String propertyName, byte b) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setByte(propertyName, new Byte(b));
+ getJmsContentHeaderProperties().setByte(propertyName, new Byte(b));
}
public void setShortProperty(String propertyName, short i) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setShort(propertyName, new Short(i));
+ getJmsContentHeaderProperties().setShort(propertyName, new Short(i));
}
public void setIntProperty(String propertyName, int i) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setInteger(propertyName, new Integer(i));
+ getJmsContentHeaderProperties().setInteger(propertyName, new Integer(i));
}
public void setLongProperty(String propertyName, long l) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setLong(propertyName, new Long(l));
+ getJmsContentHeaderProperties().setLong(propertyName, new Long(l));
}
public void setFloatProperty(String propertyName, float f) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setFloat(propertyName, new Float(f));
+ getJmsContentHeaderProperties().setFloat(propertyName, new Float(f));
}
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setDouble(propertyName, new Double(v));
+ getJmsContentHeaderProperties().setDouble(propertyName, new Double(v));
}
public void setStringProperty(String propertyName, String value) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setString(propertyName, value);
+ getJmsContentHeaderProperties().setString(propertyName, value);
}
public void setObjectProperty(String propertyName, Object object) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().setObject(propertyName, object);
+ getJmsContentHeaderProperties().setObject(propertyName, object);
}
+ protected void removeProperty(AMQShortString propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().remove(propertyName);
+ }
+
+
protected void removeProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName);
+ getJmsContentHeaderProperties().remove(propertyName);
}
public void acknowledgeThis() throws JMSException
@@ -421,7 +451,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
*/
public abstract String toBodyString() throws JMSException;
- public abstract String getMimeType();
+ public String getMimeType()
+ {
+ return getMimeTypeAsShortString().toString();
+ }
+
+ public abstract AMQShortString getMimeTypeAsShortString();
public String toString()
{
@@ -436,13 +471,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
buf.append("\nAMQ message number: ").append(_deliveryTag);
buf.append("\nProperties:");
- if (getJmsContentHeaderProperties().getJMSHeaders().isEmpty())
+ if (getJmsContentHeaderProperties().isEmpty())
{
buf.append("<NONE>");
}
else
{
- buf.append('\n').append(getJmsContentHeaderProperties().getJMSHeaders());
+ buf.append('\n').append(getJmsContentHeaderProperties().getHeaders());
}
return buf.toString();
}
@@ -458,13 +493,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
getJmsContentHeaderProperties().setHeaders(messageProperties);
}
- private void checkPropertyName(String propertyName)
+ private void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
{
throw new IllegalArgumentException("Property name must not be null");
}
- else if ("".equals(propertyName))
+ else if (propertyName.length()==0)
{
throw new IllegalArgumentException("Property name must not be the empty string");
}
@@ -537,4 +572,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
_consumer = basicMessageConsumer;
}
+
+ public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ return getJmsContentHeaderProperties().getBytes(propertyName);
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index dcff8c348b..4b28a43c64 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -43,16 +43,23 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
List bodies) throws AMQException
{
ByteBuffer data;
+ final boolean debug = _logger.isDebugEnabled();
// we optimise the non-fragmented case to avoid copying
if (bodies != null && bodies.size() == 1)
{
- _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")");
+ if(debug)
+ {
+ _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")");
+ }
data = ((ContentBody)bodies.get(0)).payload;
}
else
{
- _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")");
+ if(debug)
+ {
+ _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")");
+ }
data = ByteBuffer.allocate((int)contentHeader.bodySize); // XXX: Is cast a problem?
final Iterator it = bodies.iterator();
while (it.hasNext())
@@ -63,7 +70,10 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
data.flip();
}
- _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
+ if(debug)
+ {
+ _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
+ }
return createMessage(messageNbr, data, contentHeader);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index d769300c69..ec7ef453eb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -35,9 +36,11 @@ import java.nio.CharBuffer;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
- private static final String MIME_TYPE = "application/octet-stream";
+ public static final String MIME_TYPE = "application/octet-stream";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
- JMSBytesMessage()
+
+ public JMSBytesMessage()
{
this(null);
}
@@ -65,9 +68,9 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
_readableMessage = true;
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
public long getBodyLength() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index 88e78a1dad..fcbb6500d4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -37,10 +38,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
public static final String MIME_TYPE = "jms/map-message";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
private Map<String,Object> _map = new HashMap<String, Object>();
- JMSMapMessage() throws JMSException
+ public JMSMapMessage() throws JMSException
{
this(null);
}
@@ -74,9 +76,9 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
return _map.toString();
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 35c5377f14..ae29cef901 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
@@ -34,14 +35,15 @@ import java.nio.charset.Charset;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
- static final String MIME_TYPE = "application/java-object-stream";
+ public static final String MIME_TYPE = "application/java-object-stream";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
private static final int DEFAULT_BUFFER_SIZE = 1024;
/**
* Creates empty, writable message for use by producers
*/
- JMSObjectMessage()
+ public JMSObjectMessage()
{
this(null);
}
@@ -54,7 +56,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
- getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+ getJmsContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
}
/**
@@ -80,9 +82,9 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
return toString(_data);
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
public void setObject(Serializable serializable) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index 972a5fc8bf..747b97b11c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
import java.nio.charset.CharacterCodingException;
@@ -34,6 +35,7 @@ import java.nio.charset.Charset;
public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
{
public static final String MIME_TYPE="jms/stream-message";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
/**
@@ -42,7 +44,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
*/
private int _byteArrayRemaining = -1;
- JMSStreamMessage()
+ public JMSStreamMessage()
{
this(null);
}
@@ -71,9 +73,9 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
_readableMessage = true;
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index d8394b0489..f386346dd1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.message;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.mina.common.ByteBuffer;
@@ -32,15 +33,18 @@ import java.nio.charset.CharacterCodingException;
public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
{
private static final String MIME_TYPE = "text/plain";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
private String _decodedValue;
/**
* This constant represents the name of a property that is set when the message payload is null.
*/
- private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL";
+ private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
+ private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
- JMSTextMessage() throws JMSException
+ public JMSTextMessage() throws JMSException
{
this(null, null);
}
@@ -48,7 +52,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
{
super(data); // this instantiates a content header
- getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+ getJmsContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
getJmsContentHeaderProperties().setEncoding(encoding);
}
@@ -56,7 +60,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
throws AMQException
{
super(deliveryTag, contentHeader, data);
- contentHeader.setContentType(MIME_TYPE);
+ contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
_data = data;
}
@@ -91,9 +95,9 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
_data = data;
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
public void setText(String text) throws JMSException
@@ -109,13 +113,14 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
_data.limit(text.length()) ;
//_data.sweep();
_data.setAutoExpand(true);
- if (getJmsContentHeaderProperties().getEncoding() == null)
+ final String encoding = getJmsContentHeaderProperties().getEncoding();
+ if (encoding == null)
{
_data.put(text.getBytes());
}
else
{
- _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
+ _data.put(text.getBytes(encoding));
}
_changedData=true;
}
@@ -164,7 +169,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
try
{
- _decodedValue = _data.getString(Charset.defaultCharset().newDecoder());
+ _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
}
catch (CharacterCodingException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 348988f06d..df7537f1e8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import java.util.HashMap;
@@ -31,7 +32,8 @@ import java.util.List;
public class MessageFactoryRegistry
{
- private final Map _mimeToFactoryMap = new HashMap();
+ private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
+ private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>();
public void registerFactory(String mimeType, MessageFactory mf)
{
@@ -39,12 +41,14 @@ public class MessageFactoryRegistry
{
throw new IllegalArgumentException("Message factory must not be null");
}
- _mimeToFactoryMap.put(mimeType, mf);
+ _mimeStringToFactoryMap.put(mimeType, mf);
+ _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
}
public MessageFactory deregisterFactory(String mimeType)
{
- return (MessageFactory) _mimeToFactoryMap.remove(mimeType);
+ _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+ return _mimeStringToFactoryMap.remove(mimeType);
}
/**
@@ -63,7 +67,7 @@ public class MessageFactoryRegistry
List bodies) throws AMQException, JMSException
{
BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
- MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(properties.getContentType());
+ MessageFactory mf = _mimeShortStringToFactoryMap.get(properties.getContentTypeShortString());
if (mf == null)
{
throw new AMQException("Unsupport MIME type of " + properties.getContentType());
@@ -80,7 +84,7 @@ public class MessageFactoryRegistry
{
throw new IllegalArgumentException("Mime type must not be null");
}
- MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(mimeType);
+ MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
if (mf == null)
{
throw new AMQException("Unsupport MIME type of " + mimeType);
@@ -101,7 +105,7 @@ public class MessageFactoryRegistry
mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
mf.registerFactory("text/plain", new JMSTextMessageFactory());
mf.registerFactory("text/xml", new JMSTextMessageFactory());
- mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory());
+ mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
mf.registerFactory(null, new JMSBytesMessageFactory());
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index f37af835e1..bd60b2c250 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -37,14 +37,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.ssl.BogusSSLContextFactory;
@@ -99,7 +92,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// We add a proxy for the state manager so that we can substitute the state manager easily in this class.
// We substitute the state manager when performing failover
- _frameListeners.add(new AMQMethodListener()
+/* _frameListeners.add(new AMQMethodListener()
{
public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
@@ -110,7 +103,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_stateManager.error(e);
}
- });
+ });*/
}
public boolean isUseSSL()
@@ -284,11 +277,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void propagateExceptionToWaiters(Exception e)
{
_stateManager.error(e);
- final Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ if(!_frameListeners.isEmpty())
{
- final AMQMethodListener ml = (AMQMethodListener) it.next();
- ml.error(e);
+ final Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener ml = (AMQMethodListener) it.next();
+ ml.error(e);
+ }
}
}
@@ -296,12 +292,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageReceived(IoSession session, Object message) throws Exception
{
+ final long msgNumber = ++_messageReceivedCount;
- if (_messageReceivedCount++ % 1000 == 0)
+ if (_logger.isDebugEnabled() && (msgNumber % 1000 == 0))
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
- Iterator it = _frameListeners.iterator();
+
AMQFrame frame = (AMQFrame) message;
HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
@@ -314,13 +311,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody) frame.bodyFrame, _protocolSession);
+
try
{
- boolean wasAnyoneInterested = false;
- while (it.hasNext())
+
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
+ if(!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
}
if (!wasAnyoneInterested)
{
@@ -329,11 +332,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
catch (AMQException e)
{
- it = _frameListeners.iterator();
- while (it.hasNext())
+ _stateManager.error(e);
+ if(!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
exceptionCaught(session, e);
}
@@ -359,17 +366,21 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageSent(IoSession session, Object message) throws Exception
{
- if (_messagesOut++ % 1000 == 0)
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && (sentMessages % 1000 == 0))
{
_logger.debug("Sent " + _messagesOut + " protocol messages");
}
_connection.bytesSent(session.getWrittenBytes());
- if (_logger.isDebugEnabled())
+ if (debug)
{
_logger.debug("Sent frame " + message);
}
}
-
+/*
public void addFrameListener(AMQMethodListener listener)
{
_frameListeners.add(listener);
@@ -379,7 +390,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_frameListeners.remove(listener);
}
-
+ */
public void attainState(AMQState s) throws AMQException
{
_stateManager.attainState(s);
@@ -423,9 +434,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
+ catch (AMQException e)
+ {
+ throw e;
+ }
finally
{
- // If we don't remove the listener then no-one will
+ // If we don't removeKey the listener then no-one will
_frameListeners.remove(listener);
}
@@ -480,7 +495,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "JMS client is closing the connection."); // replyText
+ new AMQShortString("JMS client is closing the connection.")); // replyText
syncWrite(frame, ConnectionCloseOkBody.class);
_protocolSession.closeProtocolSession();
@@ -518,7 +533,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
- public String generateQueueName()
+ public AMQShortString generateQueueName()
{
return _protocolSession.generateQueueName();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 6a40fd3133..ca622a98ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -31,11 +31,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.*;
import org.apache.commons.lang.StringUtils;
import javax.jms.JMSException;
@@ -381,7 +377,7 @@ public class AMQProtocolSession implements ProtocolVersionList
_protocolHandler.failover(host, port);
}
- protected String generateQueueName()
+ protected AMQShortString generateQueueName()
{
int id;
synchronized(_queueIdLock)
@@ -390,7 +386,7 @@ public class AMQProtocolSession implements ProtocolVersionList
}
//get rid of / and : and ; from address for spec conformance
String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:","");
- return "tmp_" + localAddress + "_" + id;
+ return new AMQShortString("tmp_" + localAddress + "_" + id);
}
/**
@@ -407,7 +403,7 @@ public class AMQProtocolSession implements ProtocolVersionList
}
}
- public void confirmConsumerCancelled(int channelId, String consumerTag)
+ public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final Integer chId = channelId;
final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
diff --git a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
index 4291cb3259..ddbea9a557 100644
--- a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
+++ b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
@@ -73,8 +73,8 @@ public class AmqPlainSaslClient implements SaslClient
throw new SaslException("Error handling SASL callbacks: " + e, e);
}
FieldTable table = FieldTableFactory.newFieldTable();
- table.put("LOGIN", nameCallback.getName());
- table.put("PASSWORD", pwdCallback.getPassword());
+ table.setString("LOGIN", nameCallback.getName());
+ table.setString("PASSWORD", new String(pwdCallback.getPassword()));
return table.getDataAsBytes();
}
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 5497cafed4..b724bbfc05 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -29,6 +29,7 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -232,10 +233,14 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
*/
protected Queue createQueue(Object value)
{
- if (value instanceof String)
+ if(value instanceof AMQShortString)
+ {
+ return new AMQQueue((AMQShortString) value);
+ }
+ else if (value instanceof String)
{
- return new AMQQueue((String) value);
+ return new AMQQueue(new AMQShortString((String) value));
}
else if (value instanceof BindingURL)
@@ -251,9 +256,13 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
*/
protected Topic createTopic(Object value)
{
- if (value instanceof String)
+ if(value instanceof AMQShortString)
+ {
+ return new AMQTopic((AMQShortString)value);
+ }
+ else if (value instanceof String)
{
- return new AMQTopic((String) value);
+ return new AMQTopic(new AMQShortString((String) value));
}
else if (value instanceof BindingURL)
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index d12ab01bdc..a1763ddc73 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -25,6 +25,8 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import javax.jms.*;
@@ -50,10 +52,10 @@ public class RecoverTest extends TestCase
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -107,10 +109,10 @@ public class RecoverTest extends TestCase
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -171,8 +173,8 @@ public class RecoverTest extends TestCase
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("Q1", "Q1", false, true);
- Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ Queue queue2 = new AMQQueue(new AMQShortString("Q2"), new AMQShortString("Q2"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
@@ -210,7 +212,7 @@ public class RecoverTest extends TestCase
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
MessageConsumer consumer = consumerSession.createConsumer(queue);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
index ad180e3a89..fb347053c7 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
@@ -35,16 +35,20 @@ import junit.framework.TestCase;
public class FieldTableKeyEnumeratorTest extends TestCase
{
+ public void testTrue()
+ {
+
+ }
public void testKeyEnumeration()
{
FieldTable result = FieldTableFactory.newFieldTable();
- result.put("one", 1L);
- result.put("two", 2L);
- result.put("three", 3L);
- result.put("four", 4L);
- result.put("five", 5L);
+ result.setObject("one", 1L);
+ result.setObject("two", 2L);
+ result.setObject("three", 3L);
+ result.setObject("four", 4L);
+ result.setObject("five", 5L);
- Iterator iterator = result.keySet().iterator();
+ Iterator iterator = result.keys().iterator();
try
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
index f4efd64dbb..5af55d6625 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
@@ -85,11 +85,11 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
private FieldTable load() throws IOException
{
FieldTable result = FieldTableFactory.newFieldTable();
- result.put("one", 1L);
- result.put("two", 2L);
- result.put("three", 3L);
- result.put("four", 4L);
- result.put("five", 5L);
+ result.setLong("one", 1L);
+ result.setLong("two", 2L);
+ result.setLong("three", 3L);
+ result.setLong("four", 4L);
+ result.setLong("five", 5L);
return result;
}
@@ -133,10 +133,9 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
{
ByteBuffer buffer = ((JMSBytesMessage) m).getData();
FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining());
- for (Object o : _expected.keySet())
- {
- String key = (String) o;
- assertEquals("Values for " + key + " did not match", _expected.get(key), actual.get(key));
+ for (String key : _expected.keys())
+ {
+ assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));
}
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index 17679788bd..7423a3d8f0 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -29,12 +29,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.testutil.VMBrokerSetup;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
+import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -81,7 +76,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
{
_connection = connection;
_destination = destination;
- _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set up a slow consumer
_session.createConsumer(destination).setMessageListener(this);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
index 903f6a9da9..81481bc94d 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -75,7 +75,7 @@ public class TextMessageTest extends TestCase implements MessageListener
{
_connection = connection;
_destination = destination;
- _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set up a slow consumer
_session.createConsumer(destination).setMessageListener(this);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
index 22015dbc93..691acbb213 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
@@ -21,6 +21,7 @@
package org.apache.qpid.test.unit.client.forwardall;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
/**
* Queue that allows several private queues to be registered and bound
@@ -29,15 +30,19 @@ import org.apache.qpid.client.AMQQueue;
*/
class SpecialQueue extends AMQQueue
{
- private final String name;
+ private final AMQShortString name;
SpecialQueue(String name)
{
+ this(new AMQShortString(name));
+ }
+ SpecialQueue(AMQShortString name)
+ {
super(name, true);
this.name = name;
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return name;
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index eee9b2de9f..64898a1b9a 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.client.protocol;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.mina.common.IoSession;
import junit.framework.TestCase;
@@ -45,7 +46,7 @@ public class AMQProtocolSessionTest extends TestCase
return (TestIoSession) _minaProtocolSession;
}
- public String genQueueName()
+ public AMQShortString genQueueName()
{
return generateQueueName();
}
@@ -80,26 +81,26 @@ public class AMQProtocolSessionTest extends TestCase
public void testGenerateQueueName()
{
- String testAddress;
+ AMQShortString testAddress;
- //test address with / and ; chars which generateQueueName should remove
+ //test address with / and ; chars which generateQueueName should removeKey
_testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
_testSession.getMinaProtocolSession().setLocalPort(_port);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
//test empty address
_testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress);
+ assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
//test address with no special chars
_testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
index c14b5317c7..23e3b9cc88 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
@@ -11,6 +11,7 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -41,7 +42,7 @@ public class JMSDestinationTest extends TestCase
{
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 794316d2f5..8a6e279142 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -170,7 +170,7 @@ public class TopicSessionTest extends TestCase
con.start();
TextMessage tm = session1.createTextMessage("Hello");
publisher.publish(tm);
- tm = (TextMessage) consumer1.receive(2000);
+ tm = (TextMessage) consumer1.receive(200000L);
assertNotNull(tm);
String msgText = tm.getText();
assertEquals("Hello", msgText);
@@ -178,7 +178,7 @@ public class TopicSessionTest extends TestCase
msgText = tm.getText();
assertNull(msgText);
publisher.publish(tm);
- tm = (TextMessage) consumer1.receive(2000);
+ tm = (TextMessage) consumer1.receive(20000000L);
assertNotNull(tm);
msgText = tm.getText();
assertNull(msgText);