summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java89
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java134
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java138
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java43
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java (renamed from java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java)36
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java33
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java27
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java20
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java30
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java17
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java101
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java111
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java85
-rw-r--r--java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java39
-rw-r--r--java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java37
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java25
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java65
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java110
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java52
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java264
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java61
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java93
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java38
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java34
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java79
44 files changed, 1479 insertions, 556 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 7cabc667c1..6da0da9f6f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -56,7 +56,7 @@ public class AMQBrokerDetails implements BrokerDetails
{
//todo this list of valid transports should be enumerated somewhere
if ((!(transport.equalsIgnoreCase("vm") ||
- transport.equalsIgnoreCase("tcp"))))
+ transport.equalsIgnoreCase("tcp"))))
{
if (transport.equalsIgnoreCase("localhost"))
{
@@ -65,7 +65,7 @@ public class AMQBrokerDetails implements BrokerDetails
}
else
{
- if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' )
+ if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/')
{
//Then most likely we have a host:port value
connection = new URI(DEFAULT_TRANSPORT + "://" + url);
@@ -88,7 +88,7 @@ public class AMQBrokerDetails implements BrokerDetails
if (transport == null)
{
URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
- " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
+ " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
}
setTransport(transport);
@@ -107,12 +107,45 @@ public class AMQBrokerDetails implements BrokerDetails
if (port == -1)
{
- // Another fix for Java 1.5 URI handling
+ // Fix for when there is port data but it is not automatically parseable by getPort().
String auth = connection.getAuthority();
- if (auth != null && auth.startsWith(":"))
+ if (auth != null && auth.contains(":"))
{
- setPort(Integer.parseInt(auth.substring(1)));
+ int start = auth.indexOf(":") + 1;
+ int end = start;
+ boolean looking = true;
+ boolean found = false;
+ //Walk the authority looking for a port value.
+ while (looking)
+ {
+ try
+ {
+ end++;
+ Integer.parseInt(auth.substring(start, end));
+
+ if (end >= auth.length())
+ {
+ looking = false;
+ found = true;
+ }
+ }
+ catch (NumberFormatException nfe)
+ {
+ looking = false;
+ }
+
+ }
+ if (found)
+ {
+ setPort(Integer.parseInt(auth.substring(start, end)));
+ }
+ else
+ {
+ URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+ "Illegal character in port number", connection.toString());
+ }
+
}
else
{
@@ -134,7 +167,7 @@ public class AMQBrokerDetails implements BrokerDetails
{
if (uris instanceof URLSyntaxException)
{
- throw (URLSyntaxException) uris;
+ throw(URLSyntaxException) uris;
}
URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
@@ -245,9 +278,9 @@ public class AMQBrokerDetails implements BrokerDetails
BrokerDetails bd = (BrokerDetails) o;
return _host.equalsIgnoreCase(bd.getHost()) &&
- (_port == bd.getPort()) &&
- _transport.equalsIgnoreCase(bd.getTransport()) &&
- (useSSL() == bd.useSSL());
+ (_port == bd.getPort()) &&
+ _transport.equalsIgnoreCase(bd.getTransport()) &&
+ (useSSL() == bd.useSSL());
//todo do we need to compare all the options as well?
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 98db26d0c4..0bb8736227 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -44,6 +44,7 @@ import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
+import javax.jms.IllegalStateException;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
@@ -92,7 +93,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Maps from session id (Integer) to AMQSession instance
*/
- private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
@@ -142,7 +143,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
+ username + ":" + password + "@" +
+ (clientName==null?"":clientName) +
virtualHost + "?brokerlist='" + broker + "'"));
}
@@ -157,11 +159,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
this(new AMQConnectionURL(useSSL ?
ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
+ username + ":" + password + "@" +
+ (clientName==null?"":clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='true'" :
ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
+ username + ":" + password + "@" +
+ (clientName==null?"":clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='false'"
));
@@ -537,7 +541,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void setClientID(String clientID) throws JMSException
{
checkNotClosed();
- _clientName = clientID;
+ // in AMQP it is not possible to change the client ID. If one is not specified
+ // upon connection construction, an id is generated automatically. Therefore
+ // we can always throw an exception.
+ throw new IllegalStateException("Client name cannot be changed after being set");
}
public ConnectionMetaData getMetaData() throws JMSException
@@ -583,7 +590,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void stop() throws JMSException
{
checkNotClosed();
-
if (_started)
{
for (Iterator i = _sessions.values().iterator(); i.hasNext();)
@@ -920,8 +926,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
void deregisterSession(int channelId)
{
_sessions.remove(channelId);
- }
-
+ }
+
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a847658846..8f90913e5c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
@@ -367,13 +368,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public StreamMessage createStreamMessage() throws JMSException
{
- checkNotClosed();
- throw new UnsupportedOperationException("Stream messages not supported");
+ synchronized (_connection.getFailoverMutex())
+ {
+ checkNotClosed();
+
+ try
+ {
+ return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException("Unable to create text message: " + e);
+ }
+ }
}
public TextMessage createTextMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -462,28 +474,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// that can be called from a different thread of control from the one controlling the session
synchronized(_connection.getFailoverMutex())
{
- _closed.set(true);
-
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
-
- try
+ //Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
{
- _connection.getProtocolHandler().closeSession(this);
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(
- getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
- _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
- }
- catch (AMQException e)
- {
- throw new JMSException("Error closing session: " + e);
- }
- finally
- {
- _connection.deregisterSession(_channelId);
+ try
+ {
+ _connection.getProtocolHandler().closeSession(this);
+ final AMQFrame frame = ChannelCloseBody.createAMQFrame(
+ getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+ _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully
+
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException("Error closing session: " + e);
+ }
+ finally
+ {
+ _connection.deregisterSession(_channelId);
+ }
}
}
}
@@ -723,6 +737,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Creates a QueueReceiver
+ *
* @param destination
* @return QueueReceiver - a wrapper around our MessageConsumer
* @throws JMSException
@@ -736,6 +751,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Creates a QueueReceiver using a message selector
+ *
* @param destination
* @param messageSelector
* @return QueueReceiver - a wrapper around our MessageConsumer
@@ -826,7 +842,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
// TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = new FieldTable();
+ final FieldTable ft = FieldTableFactory.newFieldTable();
//if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
if (rawSelector != null)
@@ -935,6 +951,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public Queue createQueue(String queueName) throws JMSException
{
+ checkNotClosed();
if (queueName.indexOf('/') == -1)
{
return new AMQQueue(queueName);
@@ -957,12 +974,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Creates a QueueReceiver wrapping a MessageConsumer
+ *
* @param queue
* @return QueueReceiver
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -970,6 +989,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Creates a QueueReceiver wrapping a MessageConsumer using a message selector
+ *
* @param queue
* @param messageSelector
* @return QueueReceiver
@@ -977,6 +997,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector);
@@ -985,11 +1006,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueSender createSender(Queue queue) throws JMSException
{
- return (QueueSender) createProducer(queue);
+ checkNotClosed();
+ //return (QueueSender) createProducer(queue);
+ return new QueueSenderAdapter(createProducer(queue), queue);
}
public Topic createTopic(String topicName) throws JMSException
{
+ checkNotClosed();
+
if (topicName.indexOf('/') == -1)
{
return new AMQTopic(topicName);
@@ -1012,18 +1037,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Creates a non-durable subscriber
+ *
* @param topic
* @return TopicSubscriber - a wrapper round our MessageConsumer
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
/**
* Creates a non-durable subscriber with a message selector
+ *
* @param topic
* @param messageSelector
* @param noLocal
@@ -1032,6 +1060,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
@@ -1045,6 +1074,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1055,6 +1085,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1062,26 +1093,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- return (TopicPublisher) createProducer(topic);
+ checkNotClosed();
+ //return (TopicPublisher) createProducer(topic);
+ return new TopicPublisherAdapter(createProducer(topic), topic);
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
+ checkNotClosed();
throw new UnsupportedOperationException("Queue browsing not supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
+ checkNotClosed();
throw new UnsupportedOperationException("Queue browsing not supported");
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
+ checkNotClosed();
return new AMQTemporaryQueue();
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
+ checkNotClosed();
return new AMQTemporaryTopic();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index f97ea6bf1e..ded2152bf8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -159,11 +159,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public String getMessageSelector() throws JMSException
{
+ checkPreConditions();
return _messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
+ checkPreConditions();
return (MessageListener) _messageListener.get();
}
@@ -179,7 +181,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void setMessageListener(MessageListener messageListener) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
//if the current listener is non-null and the session is not stopped, then
//it is an error to call this method.
@@ -277,7 +279,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
acquireReceiving();
@@ -311,7 +313,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
acquireReceiving();
@@ -520,7 +522,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(_consumerTag);
+ _session.deregisterConsumer(_consumerTag);
}
public String getConsumerTag()
@@ -529,7 +531,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
public void setConsumerTag(String consumerTag)
- {
+ {
_consumerTag = consumerTag;
}
+
+ public AMQSession getSession() {
+ return _session;
+ }
+
+ private void checkPreConditions() throws JMSException{
+
+ this.checkNotClosed();
+
+ if(_session == null || _session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 14cafc3558..8d6287eca3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -143,6 +143,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDisableMessageID(boolean b) throws JMSException
{
+ checkPreConditions();
checkNotClosed();
// IGNORED
}
@@ -156,7 +157,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDisableMessageTimestamp(boolean b) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
_disableTimestamps = b;
}
@@ -168,7 +169,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDeliveryMode(int i) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
@@ -185,7 +186,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setPriority(int i) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
if (i < 0 || i > 9)
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -201,7 +202,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setTimeToLive(long l) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
if (l < 0)
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -229,6 +230,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
@@ -238,6 +240,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -247,6 +250,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -257,6 +261,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
@@ -266,7 +271,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Destination destination, Message message) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -279,7 +284,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int priority, long timeToLive)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -292,7 +297,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int priority, long timeToLive, boolean mandatory)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -305,7 +310,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -319,7 +324,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
boolean immediate, boolean waitUntilSent)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -334,7 +339,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
throw new JMSException("Unsupported destination class: " +
(destination != null ? destination.getClass() : null));
- }
+ }
declareDestination((AMQDestination)destination);
}
@@ -481,4 +486,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkNotClosed();
_encoding = encoding;
}
+
+ private void checkPreConditions() throws IllegalStateException, JMSException {
+ checkNotClosed();
+
+ if(_destination == null){
+ throw new UnsupportedOperationException("Destination is null");
+ }
+
+ if(_session == null || _session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
+
+ public AMQSession getSession() {
+ return _session;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
index 57e458d833..21ec50c046 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
@@ -39,31 +39,37 @@ public class QueueReceiverAdaptor implements QueueReceiver {
public String getMessageSelector() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageSelector();
}
public MessageListener getMessageListener() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageListener();
}
public void setMessageListener(MessageListener messageListener) throws JMSException
{
+ checkPreConditions();
_consumer.setMessageListener(messageListener);
}
public Message receive() throws JMSException
{
+ checkPreConditions();
return _consumer.receive();
}
public Message receive(long l) throws JMSException
{
+ checkPreConditions();
return _consumer.receive(l);
}
public Message receiveNoWait() throws JMSException
{
+ checkPreConditions();
return _consumer.receiveNoWait();
}
@@ -79,8 +85,26 @@ public class QueueReceiverAdaptor implements QueueReceiver {
*/
public Queue getQueue() throws JMSException
{
+ checkPreConditions();
return _queue;
}
+ private void checkPreConditions() throws javax.jms.IllegalStateException {
+ BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+
+ if (msgConsumer.isClosed() ){
+ throw new javax.jms.IllegalStateException("Consumer is closed");
+ }
+
+ if(_queue == null){
+ throw new UnsupportedOperationException("Queue is null");
+ }
+
+ AMQSession session = msgConsumer.getSession();
+
+ if(session == null || session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
new file mode 100644
index 0000000000..15bf4a125f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -0,0 +1,134 @@
+package org.apache.qpid.client;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+public class QueueSenderAdapter implements QueueSender {
+
+ private MessageProducer delegate;
+ private Queue queue;
+ private boolean closed = false;
+
+ public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){
+ delegate = msgProducer;
+ this.queue = queue;
+ }
+
+ public Queue getQueue() throws JMSException {
+ checkPreConditions();
+ return queue;
+ }
+
+ public void send(Message msg) throws JMSException {
+ checkPreConditions();
+ delegate.send(msg);
+ }
+
+ public void send(Queue queue, Message msg) throws JMSException {
+ checkPreConditions();
+ delegate.send(queue, msg);
+ }
+
+ public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ checkPreConditions();
+ delegate.send(msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ checkPreConditions();
+ delegate.send(queue,msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void close() throws JMSException {
+ delegate.close();
+ closed = true;
+ }
+
+ public int getDeliveryMode() throws JMSException {
+ return delegate.getDeliveryMode();
+ }
+
+ public Destination getDestination() throws JMSException {
+ return delegate.getDestination();
+ }
+
+ public boolean getDisableMessageID() throws JMSException {
+ return delegate.getDisableMessageID();
+ }
+
+ public boolean getDisableMessageTimestamp() throws JMSException {
+ return delegate.getDisableMessageTimestamp();
+ }
+
+ public int getPriority() throws JMSException {
+ return delegate.getPriority();
+ }
+
+ public long getTimeToLive() throws JMSException {
+ return delegate.getTimeToLive();
+ }
+
+ public void send(Destination dest, Message msg) throws JMSException {
+ checkPreConditions();
+ delegate.send(dest,msg);
+ }
+
+ public void send(Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ checkPreConditions();
+ delegate.send(msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ checkPreConditions();
+ delegate.send(dest,msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void setDeliveryMode(int deliveryMode) throws JMSException {
+ checkPreConditions();
+ delegate.setDeliveryMode(deliveryMode);
+ }
+
+ public void setDisableMessageID(boolean disableMessageID) throws JMSException {
+ checkPreConditions();
+ delegate.setDisableMessageID(disableMessageID);
+ }
+
+ public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
+ checkPreConditions();
+ delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+ }
+
+ public void setPriority(int priority) throws JMSException {
+ checkPreConditions();
+ delegate.setPriority(priority);
+ }
+
+ public void setTimeToLive(long timeToLive) throws JMSException {
+ checkPreConditions();
+ delegate.setTimeToLive(timeToLive);
+ }
+
+ private void checkPreConditions() throws IllegalStateException, IllegalStateException {
+ if (closed){
+ throw new javax.jms.IllegalStateException("Publisher is closed");
+ }
+
+ if(queue == null){
+ throw new UnsupportedOperationException("Queue is null");
+ }
+
+ AMQSession session = ((BasicMessageProducer)delegate).getSession();
+
+ if(session == null || session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
new file mode 100644
index 0000000000..0702202c2a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
@@ -0,0 +1,138 @@
+package org.apache.qpid.client;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+public class TopicPublisherAdapter implements TopicPublisher {
+
+ private MessageProducer delegate;
+ private Topic topic;
+ private boolean closed = false;
+
+ public TopicPublisherAdapter(MessageProducer msgProducer, Topic topic){
+ delegate = msgProducer;
+ this.topic = topic;
+ }
+
+ public Topic getTopic() throws JMSException {
+ checkPreConditions();
+ return topic;
+ }
+
+ public void publish(Message msg) throws JMSException {
+ checkPreConditions();
+ delegate.send(msg);
+ }
+
+ public void publish(Topic topic, Message msg) throws JMSException {
+ checkPreConditions();
+ delegate.send(topic,msg);
+ }
+
+ public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ checkPreConditions();
+ delegate.send(msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void publish(Topic topic, Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ checkPreConditions();
+ delegate.send(topic,msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void close() throws JMSException {
+ delegate.close();
+ closed = true;
+ }
+
+ public int getDeliveryMode() throws JMSException {
+ return delegate.getDeliveryMode();
+ }
+
+ public Destination getDestination() throws JMSException {
+ return delegate.getDestination();
+ }
+
+ public boolean getDisableMessageID() throws JMSException {
+ return delegate.getDisableMessageID();
+ }
+
+ public boolean getDisableMessageTimestamp() throws JMSException {
+ return delegate.getDisableMessageTimestamp();
+ }
+
+ public int getPriority() throws JMSException {
+ return delegate.getPriority();
+ }
+
+ public long getTimeToLive() throws JMSException {
+ return delegate.getTimeToLive();
+ }
+
+ public void send(Message msg) throws JMSException {
+ checkPreConditions();
+ delegate.send(msg);
+ }
+
+ public void send(Destination dest, Message msg) throws JMSException {
+ checkPreConditions();
+ delegate.send(dest,msg);
+ }
+
+ public void send(Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ checkPreConditions();
+ delegate.send(msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ checkPreConditions();
+ delegate.send(dest,msg, deliveryMode,priority,timeToLive);
+ }
+
+ public void setDeliveryMode(int deliveryMode) throws JMSException {
+ checkPreConditions();
+ delegate.setDeliveryMode(deliveryMode);
+ }
+
+ public void setDisableMessageID(boolean disableMessageID) throws JMSException {
+ checkPreConditions();
+ delegate.setDisableMessageID(disableMessageID);
+ }
+
+ public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
+ checkPreConditions();
+ delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+ }
+
+ public void setPriority(int priority) throws JMSException {
+ checkPreConditions();
+ delegate.setPriority(priority);
+ }
+
+ public void setTimeToLive(long timeToLive) throws JMSException {
+ checkPreConditions();
+ delegate.setTimeToLive(timeToLive);
+ }
+
+ private void checkPreConditions() throws IllegalStateException, IllegalStateException {
+ if (closed){
+ throw new javax.jms.IllegalStateException("Publisher is closed");
+ }
+
+ if(topic == null){
+ throw new UnsupportedOperationException("Topic is null");
+ }
+
+ AMQSession session = ((BasicMessageProducer)delegate).getSession();
+ if(session == null || session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index c776a9943e..06e353e271 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -43,37 +44,45 @@ class TopicSubscriberAdaptor implements TopicSubscriber
_consumer = consumer;
_noLocal = noLocal;
}
+
TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
{
this(topic, consumer, consumer.isNoLocal());
}
+
public Topic getTopic() throws JMSException
{
+ checkPreConditions();
return _topic;
}
public boolean getNoLocal() throws JMSException
{
+ checkPreConditions();
return _noLocal;
}
public String getMessageSelector() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageSelector();
}
public MessageListener getMessageListener() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageListener();
}
public void setMessageListener(MessageListener messageListener) throws JMSException
{
+ checkPreConditions();
_consumer.setMessageListener(messageListener);
}
public Message receive() throws JMSException
{
+ checkPreConditions();
return _consumer.receive();
}
@@ -84,6 +93,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber
public Message receiveNoWait() throws JMSException
{
+ checkPreConditions();
return _consumer.receiveNoWait();
}
@@ -91,4 +101,22 @@ class TopicSubscriberAdaptor implements TopicSubscriber
{
_consumer.close();
}
+
+ private void checkPreConditions() throws javax.jms.IllegalStateException{
+ BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+
+ if (msgConsumer.isClosed() ){
+ throw new javax.jms.IllegalStateException("Consumer is closed");
+ }
+
+ if(_topic == null){
+ throw new UnsupportedOperationException("Topic is null");
+ }
+
+ AMQSession session = msgConsumer.getSession();
+
+ if(session == null || session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index caef9a3f44..9333df3fe4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -32,6 +32,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -117,7 +118,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- FieldTable clientProperties = new FieldTable();
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.put("instance", ps.getClientID());
clientProperties.put("product", "Qpid");
clientProperties.put("version", "1.0");
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 6745052a5d..456d4d520c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
+import javax.jms.MessageEOFException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index 2001573ef9..5282dce4c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -23,10 +23,10 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.PropertyFieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.AMQException;
import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
import java.util.Enumeration;
public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessage
@@ -58,7 +58,7 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag
try
{
- _map = new PropertyFieldTable(getText());
+ _map = FieldTableFactory.newFieldTable(getText());
}
catch (JMSException e)
{
@@ -68,7 +68,7 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag
// AbstractJMSMessage Interface
- public void clearBody() throws JMSException
+ public void clearBodyImpl() throws JMSException
{
if (_data != null)
{
@@ -206,48 +206,55 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag
public void setBoolean(String string, boolean b) throws JMSException
{
+ checkWritable();
_map.setBoolean(string, b);
}
public void setByte(String string, byte b) throws JMSException
{
+ checkWritable();
_map.setByte(string, b);
}
public void setShort(String string, short i) throws JMSException
{
+ checkWritable();
_map.setShort(string, i);
}
public void setChar(String string, char c) throws JMSException
{
+ checkWritable();
_map.setChar(string, c);
}
public void setInt(String string, int i) throws JMSException
{
+ checkWritable();
_map.setInteger(string, i);
}
public void setLong(String string, long l) throws JMSException
{
+ checkWritable();
_map.setLong(string, l);
}
public void setFloat(String string, float v) throws JMSException
{
-
+ checkWritable();
_map.setFloat(string, v);
}
public void setDouble(String string, double v) throws JMSException
{
-
+ checkWritable();
_map.setDouble(string, v);
}
public void setString(String string, String string1) throws JMSException
{
+ checkWritable();
_map.setString(string, string1);
}
@@ -258,11 +265,13 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag
public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException
{
+ checkWritable();
_map.setBytes(string, bytes, i, i1);
}
public void setObject(String string, Object object) throws JMSException
{
+ checkWritable();
_map.setObject(string, object);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 34dd7e9ec1..61f326d52b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -36,7 +36,6 @@ import java.nio.charset.CharacterCodingException;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
static final String MIME_TYPE = "application/java-object-stream";
- private final boolean _readonly;
private static final int DEFAULT_BUFFER_SIZE = 1024;
@@ -56,7 +55,6 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
- _readonly = (data != null);
getJmsContentHeaderProperties().setContentType(MIME_TYPE);
}
@@ -66,10 +64,9 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException
{
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
- _readonly = data != null;
}
- public void clearBody() throws JMSException
+ public void clearBodyImpl() throws JMSException
{
if (_data != null)
{
@@ -90,10 +87,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
public void setObject(Serializable serializable) throws JMSException
{
- if (_readonly)
- {
- throw new MessageNotWriteableException("Message is not writable.");
- }
+ checkWritable();
if (_data == null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 2624c20105..3061d5a59c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -66,7 +66,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
setText(text);
}
- public void clearBody() throws JMSException
+ public void clearBodyImpl() throws JMSException
{
if (_data != null)
{
@@ -93,6 +93,8 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
public void setText(String string) throws JMSException
{
+ checkWritable();
+
clearBody();
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
index 81d3fb76d5..4291cb3259 100644
--- a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
+++ b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.security.amqplain;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
@@ -71,7 +72,7 @@ public class AmqPlainSaslClient implements SaslClient
{
throw new SaslException("Error handling SASL callbacks: " + e, e);
}
- FieldTable table = new FieldTable();
+ FieldTable table = FieldTableFactory.newFieldTable();
table.put("LOGIN", nameCallback.getName());
table.put("PASSWORD", pwdCallback.getPassword());
return table.getDataAsBytes();
diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
index 02fe103c6a..c26f67bf10 100644
--- a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
+++ b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
@@ -39,7 +39,7 @@ public class TestMessageHelper
return new JMSMapMessage();
}
- public static JMSStreamMessage newJMSStreamMessage() throws JMSException
+ public static JMSStreamMessage newJMSStreamMessage()
{
return new JMSStreamMessage();
}
diff --git a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java
index 21a6816af7..006bda7e2e 100644
--- a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java
+++ b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java
@@ -41,7 +41,7 @@
* Copyright 2001, 2003 (C) Exoffice Technologies Inc. All Rights Reserved.
*
*/
-package org.exolab.jmscts.amqp;
+package org.apache.qpid.cts.src.providers.amqp.org.exolab.jmscts.amqp;
import org.apache.qpid.client.*;
import org.exolab.jmscts.provider.Administrator;
diff --git a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java
index 21610d39b2..aafa415d1e 100644
--- a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java
+++ b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java
@@ -41,7 +41,7 @@
* Copyright 2001, 2003 (C) Exoffice Technologies Inc. All Rights Reserved.
*
*/
-package org.exolab.jmscts.amqp;
+package org.apache.qpid.cts.src.providers.amqp.org.exolab.jmscts.amqp;
import javax.jms.JMSException;
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
index ca3e5ce3f5..b199d41432 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
@@ -19,7 +19,7 @@
package org.apache.qpid.example.publisher;
import org.apache.log4j.Logger;
-import java.util.Properties;
+
import java.io.File;
import org.apache.qpid.example.shared.FileUtils;
@@ -34,12 +34,17 @@ import javax.jms.JMSException;
*/
public class FileMessageDispatcher {
- private static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class);
-
- private static Publisher _publisher = null;
+ protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class);
- private static final String DEFAULT_PUB_NAME = "Publisher";
+ protected static Publisher _publisher = null;
+ /**
+ * To use this main method you need to specify a path or file to use for input
+ * This class then uses file contents from the dir/file specified to generate
+ * messages to publish
+ * Intended to be a very simple way to get going with publishing using the broker
+ * @param args - must specify one value, the path to file(s) for publisher
+ */
public static void main(String[] args)
{
@@ -52,7 +57,7 @@ public class FileMessageDispatcher {
{
try
{
- //publish message(s) from file(s) and send message to monitor queue
+ //publish message(s) from file(s) to configured queue
publish(args[0]);
//Move payload file(s) to archive location as no error
@@ -60,7 +65,8 @@ public class FileMessageDispatcher {
}
catch(Exception e)
{
- System.err.println("Error trying to dispatch message: " + e);
+ //log error and exit
+ _logger.error("Error trying to dispatch message: " + e);
System.exit(1);
}
finally
@@ -81,8 +87,12 @@ public class FileMessageDispatcher {
System.exit(0);
}
-
- //Publish files or file as message
+ /**
+ * Publish the content of a file or files from a directory as messages
+ * @param path - from main args
+ * @throws JMSException
+ * @throws MessageFactoryException - if cannot create message from file content
+ */
public static void publish(String path) throws JMSException, MessageFactoryException
{
File tempFile = new File(path);
@@ -100,7 +110,7 @@ public class FileMessageDispatcher {
for (File file : files)
{
//Create message factory passing in payload path
- MessageFactory factory = new MessageFactory(getPublisher().getSession(), file.toString());
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString());
//Send the message generated from the payload using the _publisher
getPublisher().sendMessage(factory.createEventMessage());
@@ -110,16 +120,18 @@ public class FileMessageDispatcher {
}
else
{
- //handle as single file
+ //handle a single file
//Create message factory passing in payload path
- MessageFactory factory = new MessageFactory(getPublisher().getSession(),tempFile.toString());
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString());
//Send the message generated from the payload using the _publisher
getPublisher().sendMessage(factory.createEventMessage());
}
}
- //cleanup publishers
+ /**
+ * Cleanup before exit
+ */
public static void cleanup()
{
if (getPublisher() != null)
@@ -128,8 +140,8 @@ public class FileMessageDispatcher {
}
}
- /*
- * Returns a _publisher for a queue
+ /**
+ * @return A Publisher instance
*/
private static Publisher getPublisher()
{
@@ -141,7 +153,6 @@ public class FileMessageDispatcher {
//Create a _publisher
_publisher = new Publisher();
- _publisher.setName(DEFAULT_PUB_NAME);
return _publisher;
}
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java
index f9944284c8..88bcbbbccb 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java
@@ -25,13 +25,19 @@ import org.apache.qpid.example.shared.Statics;
import java.io.*;
import javax.jms.*;
-public class MessageFactory
+public class FileMessageFactory
{
- private final Session _session;
- private final String _payload;
- private final String _filename;
+ protected final Session _session;
+ protected final String _payload;
+ protected final String _filename;
- public MessageFactory(Session session, String filename) throws MessageFactoryException
+ /**
+ * Contructs and instance using a filename from which content will be used to create message
+ * @param session
+ * @param filename
+ * @throws MessageFactoryException
+ */
+ public FileMessageFactory(Session session, String filename) throws MessageFactoryException
{
try
{
@@ -45,9 +51,13 @@ public class MessageFactory
}
}
- /*
- * Creates message and sets filename property on it
- */
+ /**
+ * Creates a text message and sets filename property on it
+ * The filename property is purely intended to provide visibility
+ * of file content passing trhough the broker using example classes
+ * @return Message - a TextMessage with content from file
+ * @throws JMSException
+ */
public Message createEventMessage() throws JMSException
{
TextMessage msg = _session.createTextMessage();
@@ -56,9 +66,13 @@ public class MessageFactory
return msg;
}
- /*
- * Creates message from a string for use by the monitor
- */
+ /**
+ * Creates message from a string for use by the monitor
+ * @param session
+ * @param textMsg - message content
+ * @return Message - TextMessage with content from String
+ * @throws JMSException
+ */
public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException
{
TextMessage msg = session.createTextMessage();
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
index 16b32da22a..8784d340da 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
@@ -20,8 +20,9 @@ package org.apache.qpid.example.publisher;
import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
-import org.apache.qpid.example.shared.Statics;
+
import javax.jms.*;
+
import java.util.Properties;
/**
@@ -32,14 +33,18 @@ public class MonitorMessageDispatcher {
private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class);
- private static MonitorPublisher _monitorPublisher = null;
+ protected static MonitorPublisher _monitorPublisher = null;
- private static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher";
+ protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher";
+ /**
+ * Easy entry point for running a message dispatcher for monitoring consumption
+ * @param args
+ */
public static void main(String[] args)
{
- //@TODO switch on logging appropriately at your app level
+ //Switch on logging appropriately for your app
BasicConfigurator.configure();
try
@@ -61,7 +66,7 @@ public class MonitorMessageDispatcher {
}
catch(UndeliveredMessageException a)
{
- //@TODO trigger application specific failure handling here
+ //trigger application specific failure handling here
_logger.error("Problem delivering monitor message");
break;
}
@@ -69,8 +74,7 @@ public class MonitorMessageDispatcher {
}
catch(Exception e)
{
-
- System.err.println("Error trying to dispatch AMS monitor message: " + e);
+ _logger.error("Error trying to dispatch AMS monitor message: " + e);
System.exit(1);
}
finally
@@ -84,15 +88,21 @@ public class MonitorMessageDispatcher {
System.exit(1);
}
- //Publish heartbeat message
+ /**
+ * Publish heartbeat message
+ * @throws JMSException
+ * @throws UndeliveredMessageException
+ */
public static void publish() throws JMSException, UndeliveredMessageException
{
//Send the message generated from the payload using the _publisher
getMonitorPublisher().sendImmediateMessage
- (MessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+ (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
}
- //cleanup publishers
+ /**
+ * Cleanup publishers
+ */
public static void cleanup()
{
if (getMonitorPublisher() != null)
@@ -114,9 +124,6 @@ public class MonitorMessageDispatcher {
return _monitorPublisher;
}
- //Create _publisher using system properties
- Properties props = System.getProperties();
-
//Create a _publisher using failover details and constant for monitor queue
_monitorPublisher = new MonitorPublisher();
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
index d64fd9b142..be42e0e413 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
@@ -22,14 +22,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.jms.Session;
-
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.DeliveryMode;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.Connection;
+import javax.jms.Session;
+
import javax.naming.InitialContext;
import org.apache.qpid.example.shared.InitialContextHelper;
@@ -44,7 +44,7 @@ public class Publisher
protected Session _session;
- private MessageProducer _producer;
+ protected MessageProducer _producer;
protected String _destinationDir;
@@ -54,7 +54,10 @@ public class Publisher
protected static final String _defaultDestinationDir = "/tmp";
- //constructor for use with a single host
+ /**
+ * Creates a Publisher instance using properties from example.properties
+ * See InitialContextHelper for details of how context etc created
+ */
public Publisher()
{
try
@@ -68,7 +71,7 @@ public class Publisher
_connection = cf.createConnection();
//create a transactional session
- _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//lookup the example queue and use it
//Queue is non-exclusive and not deleted when last consumer detaches
@@ -90,8 +93,9 @@ public class Publisher
}
/**
- * Publishes a non-persistent message using transacted session
- **/
+ * Publishes a non-persistent message using transacted session
+ * Note that persistent is the default mode for send - so need to specify for transient
+ */
public boolean sendMessage(Message message)
{
try
@@ -124,6 +128,9 @@ public class Publisher
return true;
}
+ /**
+ * Cleanup resources before exit
+ */
public void cleanup()
{
try
@@ -138,11 +145,15 @@ public class Publisher
}
catch(Exception e)
{
- System.err.println("Error trying to cleanup publisher " + e);
+ _log.error("Error trying to cleanup publisher " + e);
System.exit(1);
}
}
+ /**
+ * Exposes session
+ * @return Session
+ */
public Session getSession()
{
return _session;
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
index d6e020bf43..9c195aef40 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
@@ -42,6 +42,9 @@ public class MonitoredSubscriber extends Subscriber
_monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX;
}
+ /**
+ * MessageListener implementation for this subscriber
+ */
public static class MonitorMessageListener implements MessageListener
{
private String _name;
@@ -52,9 +55,10 @@ public class MonitoredSubscriber extends Subscriber
}
- /*
- * Listens for heartbeat messages and acknowledges them
- */
+ /**
+ * Listens for heartbeat messages and acknowledges them
+ * @param message
+ */
public void onMessage(javax.jms.Message message)
{
_logger.info(_name + " monitor got message '" + message + "'");
@@ -79,9 +83,9 @@ public class MonitoredSubscriber extends Subscriber
}
}
- /*
- * Subscribes to Queue and attaches additional monitor listener
- */
+ /**
+ * Subscribes to Queue and attaches additional monitor listener
+ */
public void subscribeAndMonitor()
{
try
@@ -115,7 +119,9 @@ public class MonitoredSubscriber extends Subscriber
}
}
- //stop consuming
+ /**
+ * Stop consuming
+ */
public void stopMonitor()
{
try
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
index d6ec8bd5de..d2f27da052 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
@@ -19,9 +19,6 @@
package org.apache.qpid.example.subscriber;
import org.apache.log4j.BasicConfigurator;
-import org.apache.qpid.example.shared.Statics;
-
-import java.util.Properties;
/**
* Allows you to simply start a monitored subscriber
@@ -30,6 +27,10 @@ public class MonitoredSubscriptionWrapper {
private static MonitoredSubscriber _subscriber;
+ /**
+ * Create a monitored subscriber and start it
+ * @param args - no params required
+ */
public static void main(String args[])
{
//switch on logging
@@ -37,15 +38,12 @@ public class MonitoredSubscriptionWrapper {
_subscriber = new MonitoredSubscriber();
- //using system props but can replace with app appropriate config here
- Properties props = System.getProperties();
-
- //note that for failover should set -Dhost=host1:port1;host2:port2
- //Client will then failover in order i.e. connect to first host and failover to second and so on
_subscriber.subscribe();
}
- //Stop subscribing now ...
+ /**
+ * Stop subscribing now ...
+ */
public static void stop()
{
_subscriber.stop();
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
index 6b89567b83..34c7d6c7bb 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
@@ -69,9 +69,9 @@ public class Subscriber
}
}
- /*
- * Listener class that handles messages
- */
+ /**
+ * Listener class that handles messages
+ */
public static class ExampleMessageListener implements MessageListener
{
private String _name;
@@ -82,10 +82,10 @@ public class Subscriber
}
- /*
- * Listens for message callbacks, handles and then acknowledges them
- * @param message - the message received
- */
+ /**
+ * Listens for message callbacks, handles and then acknowledges them
+ * @param message - the message received
+ */
public void onMessage(javax.jms.Message message)
{
_log.info(_name + " got message '" + message + "'");
@@ -113,9 +113,9 @@ public class Subscriber
}
}
- /*
- * Subscribes to example Queue and attaches listener
- */
+ /**
+ * Subscribes to example Queue and attaches listener
+ */
public void subscribe()
{
_log.info("Starting subscription ...");
@@ -160,14 +160,18 @@ public class Subscriber
}
}
+ /**
+ * Set destination (queue or topic) name
+ * @param name
+ */
public void setDestinationName(String name)
{
_destinationName = name;
}
- /*
- * stop consuming and close connection
- */
+ /**
+ * Stop consuming and close connection
+ */
public void stop()
{
try
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
index 4e755e858f..32a0ef685c 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
@@ -18,10 +18,6 @@
*/
package org.apache.qpid.example.subscriber;
-import org.apache.qpid.example.shared.Statics;
-
-import java.util.Properties;
-
import org.apache.log4j.BasicConfigurator;
/**
@@ -31,6 +27,10 @@ public class SubscriptionWrapper {
private static Subscriber _subscriber;
+ /**
+ * Create a subscriber and start it
+ * @param args
+ */
public static void main(String args[])
{
//switch on logging
@@ -38,15 +38,12 @@ public class SubscriptionWrapper {
_subscriber = new Subscriber();
- //using system props but can replace with app appropriate config here
- Properties props = System.getProperties();
-
- //note that for failover should set -Dhost=host1:port1;host2:port2
- //Client will then failover in order i.e. connect to first host and failover to second and so on
_subscriber.subscribe();
}
- //Stop subscribing now ...
+ /**
+ * Stop subscribing now ...
+ */
public static void stop()
{
_subscriber.stop();
diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java b/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java
deleted file mode 100644
index 3a81a0224b..0000000000
--- a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.example.test;
-
-import org.apache.qpid.example.subscriber.Subscriber;
-import org.apache.qpid.example.publisher.FileMessageDispatcher;
-import org.apache.qpid.example.shared.Statics;
-
-import java.net.InetAddress;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-import org.apache.log4j.BasicConfigurator;
-
-
-public class TestAMSPubSub {
-
- private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class);
- private static final String _defaultPayloadPath = "/tmp";
-
- private static Subscriber subscriber;
-
-
- /**
- * Test main for class using default of local file for message payload
- */
- public static void main(String[] args)
- {
-
- //switch on logging
- BasicConfigurator.configure();
-
- InetAddress _address;
- TestAMSPubSub testPubSub = new TestAMSPubSub();
-
- //create publisher and subscriber
- subscriber = new Subscriber();
-
- //subscribe
- testPubSub.subscribe();
-
- //publish a message
- if (args.length == 1)
- {
- testPubSub.publish(args[0]);
- }
- else
- {
- testPubSub.publish(null);
- }
-
- //Should be able to see message publication and receipt in logs now
-
- //Disconnect and end test run
- FileMessageDispatcher.cleanup();
-
- //and exit as we're all done
- System.exit(0);
-
- }
-
- private void subscribe()
- {
- subscriber.subscribe();
- }
-
- private void publish(String payloadPath)
- {
-
- try
- {
- if (payloadPath == null|| payloadPath.length() == 0)
- {
- payloadPath = _defaultPayloadPath;
- }
-
- FileMessageDispatcher.publish(payloadPath);
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-}
diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java b/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java
deleted file mode 100644
index f1a921e106..0000000000
--- a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.example.test;
-
-import org.apache.qpid.example.subscriber.Subscriber;
-import org.apache.qpid.example.publisher.FileMessageDispatcher;
-import org.apache.qpid.example.shared.Statics;
-
-import java.net.InetAddress;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-import org.apache.log4j.BasicConfigurator;
-
-
-public class TestMultSubscribers {
-
- private static final Logger _logger = Logger.getLogger(TestMultSubscribers.class);
- private static final String _defaultPayloadPath = "/tmp";
-
- private static Subscriber subscriber1;
- private static Subscriber subscriber2;
-
- private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
-
- /**
- * Test main for class using default of local file for message payload
- */
- public static void main(String[] args)
- {
-
- //switch on logging
- BasicConfigurator.configure();
-
- InetAddress _address;
- TestMultSubscribers testMultSub = new TestMultSubscribers();
-
- //create publisher and subscriber
- subscriber1 = new Subscriber();
- subscriber2 = new Subscriber();
-
- //subscribe to the topic
- testMultSub.subscribe(args);
-
- //publish a message
- if (args.length == 1)
- {
- testMultSub.publish(args[0]);
- }
- else
- {
- testMultSub.publish(null);
- }
-
- //Should be able to see message publication and receipt in logs now
-
- //Disconnect and end test run
- FileMessageDispatcher.cleanup();
-
- //and exit as we're all done
- System.exit(0);
-
- }
-
- /*
- * Point both of our subscribers at one queue
- */
- private void subscribe(String[] args)
- {
- Properties props = System.getProperties();
- subscriber1.subscribe();
- subscriber2.subscribe();
-
- }
-
- private void publish(String payloadPath)
- {
-
- try
- {
- if (payloadPath == null|| payloadPath.length() == 0)
- {
- payloadPath = _defaultPayloadPath;
- }
-
- FileMessageDispatcher.publish(payloadPath);
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-}
-
diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java b/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java
deleted file mode 100644
index 6ff6028ccd..0000000000
--- a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.example.test;
-
-import org.apache.qpid.example.publisher.FileMessageDispatcher;
-
-import java.net.InetAddress;
-
-import org.apache.log4j.Logger;
-import org.apache.log4j.BasicConfigurator;
-
-
-public class TestPublisher {
-
- private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class);
- private static final String _defaultPayloadPath = "/tmp";
-
- /**
- * Test main for class using default of local file for message payload
- */
- public static void main(String[] args)
- {
-
- //switch on logging
- BasicConfigurator.configure();
-
- InetAddress _address;
- TestPublisher testPub = new TestPublisher();
-
- //publish a message
- if (args.length == 1)
- {
- testPub.publish(args[0]);
- }
- else
- {
- testPub.publish(null);
- }
-
- //Should be able to see message publication and receipt in logs now
-
- //Disconnect and end test run
- FileMessageDispatcher.cleanup();
-
- //and exit as we're all done
- System.exit(0);
-
- }
-
- private void publish(String payloadPath)
- {
-
- try
- {
- if (payloadPath == null|| payloadPath.length() == 0)
- {
- payloadPath = _defaultPayloadPath;
- }
-
- FileMessageDispatcher.publish(payloadPath);
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-}
-
diff --git a/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java b/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java
index 49e1630f15..2a7cb8be30 100644
--- a/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java
+++ b/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java
@@ -33,6 +33,33 @@ import junit.framework.TestCase;
public class FieldTableTest extends TestCase
{
+
+ public void testEncoding()
+ {
+ FieldTable table = FieldTableFactory.newFieldTable();
+
+ String key = "String";
+ String value = "Hello";
+ table.put(key, value);
+
+ //Add one for the type encoding
+ int size = EncodingUtils.encodedShortStringLength(key) + 1 +
+ EncodingUtils.encodedLongStringLength(value);
+
+ assertEquals(table.getEncodedSize(), size);
+
+ key = "Integer";
+ Integer number = new Integer(60);
+ table.put(key, number);
+
+ //Add one for the type encoding
+ size += EncodingUtils.encodedShortStringLength(key) + 1 + 4;
+
+
+ assertEquals(table.getEncodedSize(), size);
+ }
+
+
public void testDataDump() throws IOException, AMQFrameDecodingException
{
byte[] data = readBase64("content.txt");
@@ -46,7 +73,7 @@ public class FieldTableTest extends TestCase
ByteBuffer buffer = ByteBuffer.allocate(data.length);
buffer.put(data);
buffer.flip();
- FieldTable table = new FieldTable(buffer, size);
+ FieldTable table = FieldTableFactory.newFieldTable(buffer, size);
}
/*
@@ -107,7 +134,7 @@ public class FieldTableTest extends TestCase
FieldTable load(String name) throws IOException
{
- return populate(new FieldTable(), read(name));
+ return populate(FieldTableFactory.newFieldTable(), read(name));
}
Properties read(String name) throws IOException
@@ -123,11 +150,12 @@ public class FieldTableTest extends TestCase
{
String key = (String) i.nextElement();
String value = properties.getProperty(key);
- try{
+ try
+ {
int ival = Integer.parseInt(value);
table.put(key, (long) ival);
}
- catch(NumberFormatException e)
+ catch (NumberFormatException e)
{
table.put(key, value);
}
@@ -144,7 +172,8 @@ public class FieldTableTest extends TestCase
{
StringBuffer buffer = new StringBuffer();
String line = in.readLine();
- while (line != null){
+ while (line != null)
+ {
buffer.append(line).append(" ");
line = in.readLine();
}
diff --git a/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java b/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java
index f1f310c6e5..6f538d068c 100644
--- a/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java
+++ b/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.headers;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@@ -127,14 +128,14 @@ class MessageFactory
FieldTable getConsumerBinding()
{
- FieldTable binding = new FieldTable();
+ FieldTable binding = FieldTableFactory.newFieldTable();
binding.put("SF0000", "value");
return binding;
}
FieldTable getControllerBinding()
{
- FieldTable binding = new FieldTable();
+ FieldTable binding = FieldTableFactory.newFieldTable();
binding.put("SCONTROL", "value");
return binding;
}
diff --git a/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java b/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java
index 5dc57364b3..74becfd9bb 100644
--- a/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java
+++ b/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java
@@ -107,6 +107,7 @@ public class ServiceRequestingClient implements ExceptionListener
}
try
{
+ m.getPropertyNames();
if (m.propertyExists("timeSent"))
{
long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
index 4d37c5d2a6..2983a16e6d 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
@@ -135,6 +135,8 @@ public class BytesMessageTest extends TestCase implements MessageListener
buffer.get(data);
actual.add(data);
+
+ //Check Body Write Status
try
{
m.writeBoolean(true);
@@ -144,6 +146,41 @@ public class BytesMessageTest extends TestCase implements MessageListener
{
//normal execution
}
+
+ m.clearBody();
+
+ try
+ {
+ m.writeBoolean(true);
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+
+ //Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ //normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
}
assertEqual(messages.iterator(), actual.iterator());
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
index 079def81d0..ad180e3a89 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
@@ -21,10 +21,13 @@
package org.apache.qpid.test.unit.basic;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.TestMessageHelper;
import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
import javax.jms.JMSException;
@@ -34,20 +37,28 @@ public class FieldTableKeyEnumeratorTest extends TestCase
{
public void testKeyEnumeration()
{
- FieldTable result = new FieldTable();
+ FieldTable result = FieldTableFactory.newFieldTable();
result.put("one", 1L);
result.put("two", 2L);
result.put("three", 3L);
result.put("four", 4L);
result.put("five", 5L);
- Enumeration e = result.keys();
+ Iterator iterator = result.keySet().iterator();
+
+ try
+ {
+ assertTrue("one".equals(iterator.next()));
+ assertTrue("two".equals(iterator.next()));
+ assertTrue("three".equals(iterator.next()));
+ assertTrue("four".equals(iterator.next()));
+ assertTrue("five".equals(iterator.next()));
+ }
+ catch (NoSuchElementException e)
+ {
+ fail("All elements should be found.");
+ }
- assertTrue("one".equals(e.nextElement()));
- assertTrue("two".equals(e.nextElement()));
- assertTrue("three".equals(e.nextElement()));
- assertTrue("four".equals(e.nextElement()));
- assertTrue("five".equals(e.nextElement()));
}
public void testPropertEnu()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
index 67b7f49565..c1ecef6b57 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
@@ -26,10 +26,12 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableTest;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.test.VMBrokerSetup;
import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
@@ -39,6 +41,9 @@ import junit.framework.TestCase;
public class FieldTableMessageTest extends TestCase implements MessageListener
{
+
+ private static final Logger _logger = Logger.getLogger(FieldTableMessageTest.class);
+
private AMQConnection _connection;
private AMQDestination _destination;
private AMQSession _session;
@@ -50,7 +55,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
}
protected void tearDown() throws Exception
@@ -80,7 +85,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
private FieldTable load() throws IOException
{
- FieldTable result = new FieldTable();
+ FieldTable result = FieldTableFactory.newFieldTable();
result.put("one", 1L);
result.put("two", 2L);
result.put("three", 3L);
@@ -128,7 +133,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
for (Object m : received)
{
ByteBuffer buffer = ((JMSBytesMessage) m).getData();
- FieldTable actual = new FieldTable(buffer, buffer.remaining());
+ FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining());
new FieldTableTest().assertEquivalent(_expected, actual);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java
new file mode 100644
index 0000000000..92b4831d93
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.framing.PropertyFieldTable;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.TestMessageHelper;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+public class FieldTablePropertyTest extends TestCase
+{
+ public void testPropertyNames()
+ {
+ try
+ {
+ JMSTextMessage text = TestMessageHelper.newJMSTextMessage();
+
+ text.setBooleanProperty("Boolean1", true);
+ text.setBooleanProperty("Boolean2", true);
+ text.setIntProperty("Int", 2);
+ text.setLongProperty("Long", 2);
+
+ Enumeration e = text.getPropertyNames();
+
+ assertEquals("Boolean1", e.nextElement());
+ assertTrue("Boolean2".equals(e.nextElement()));
+ assertTrue("Int".equals(e.nextElement()));
+ assertTrue("Long".equals(e.nextElement()));
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(FieldTablePropertyTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
index f25d2887ae..5353a19d13 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -47,6 +47,7 @@ public class MapMessageTest extends TestCase implements MessageListener
private final List<String> messages = new ArrayList<String>();
private int _count = 100;
public String _connectionString = "vm://:1";
+ private byte[] _bytes = {99, 98, 97, 96, 95};
protected void setUp() throws Exception
{
@@ -104,9 +105,31 @@ public class MapMessageTest extends TestCase implements MessageListener
MapMessage message = _session.createMapMessage();
message.setBoolean("odd", i / 2 == 0);
+ message.setByte("byte", (byte) Byte.MAX_VALUE);
+
+ message.setBytes("bytes", _bytes);
+ message.setChar("char", (char) 'c');
+ message.setDouble("double", (double) Double.MAX_VALUE);
+ message.setFloat("float", (float) Float.MAX_VALUE);
+
message.setInt("messageNumber", i);
+ message.setInt("int", (int) Integer.MAX_VALUE);
+
+ message.setLong("long", (long) Long.MAX_VALUE);
+ message.setShort("short", (short) Short.MAX_VALUE);
message.setString("message", text);
+
+ message.setObject("object-bool", true);
+ message.setObject("object-byte", Byte.MAX_VALUE);
+ message.setObject("object-bytes", _bytes);
+ message.setObject("object-char", 'c');
+ message.setObject("object-double", Double.MAX_VALUE);
+ message.setObject("object-float", Float.MAX_VALUE);
+ message.setObject("object-int", Integer.MAX_VALUE);
+ message.setObject("object-long", Long.MAX_VALUE);
+ message.setObject("object-short", Short.MAX_VALUE);
+
producer.send(message);
}
}
@@ -130,18 +153,74 @@ public class MapMessageTest extends TestCase implements MessageListener
{
actual.add(m.getString("message"));
assertEqual(m.getInt("messageNumber"), count);
- assertEqual(m.getBoolean("odd"), count / 2 == 0);
-// try
-// {
-// m.setInt("testint", 3);
-// fail("Message should not be writeable");
-// }
-// catch (MessageNotWriteableException mnwe)
-// {
-// //normal execution
-// }
+ assertEqual(count / 2 == 0, m.getBoolean("odd"));
+ assertEqual((byte) Byte.MAX_VALUE, m.getByte("byte"));
+
+ assertBytesEqual(_bytes, m.getBytes("bytes"));
+ assertEqual((char) 'c', m.getChar("char"));
+ assertEqual((double) Double.MAX_VALUE, m.getDouble("double"));
+ assertEqual((float) Float.MAX_VALUE, m.getFloat("float"));
+
+ assertEqual(count, m.getInt("messageNumber"));
+ assertEqual((int) Integer.MAX_VALUE, m.getInt("int"));
+ assertEqual((long) Long.MAX_VALUE, m.getLong("long"));
+ assertEqual((short) Short.MAX_VALUE, m.getShort("short"));
+
+ assertEqual(true, m.getObject("object-bool"));
+ assertEqual(Byte.MAX_VALUE, m.getObject("object-byte"));
+ assertBytesEqual(_bytes, (byte[]) m.getObject("object-bytes"));
+ assertEqual('c', m.getObject("object-char"));
+ assertEqual(Double.MAX_VALUE, m.getObject("object-double"));
+ assertEqual(Float.MAX_VALUE, m.getObject("object-float"));
+ assertEqual(Integer.MAX_VALUE, m.getObject("object-int"));
+ assertEqual(Long.MAX_VALUE, m.getObject("object-long"));
+ assertEqual(Short.MAX_VALUE, m.getObject("object-short"));
+
+
+ try
+ {
+ m.setInt("testint", 3);
+ fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ //normal execution
+ }
+
+ m.clearBody();
+
+ try
+ {
+ m.setInt("testint", 3);
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ //Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ //normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
count++;
}
@@ -149,6 +228,17 @@ public class MapMessageTest extends TestCase implements MessageListener
assertEqual(messages.iterator(), actual.iterator());
}
+ private void assertBytesEqual(byte[] expected, byte[] actual)
+ {
+ Assert.assertEquals(expected.length, actual.length);
+
+ for (int index = 0; index < expected.length; index++)
+ {
+ Assert.assertEquals(expected[index], actual[index]);
+ }
+ }
+
+
private static void assertEqual(Iterator expected, Iterator actual)
{
List<String> errors = new ArrayList<String>();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
index dfb1b26454..e7d7159bd8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
@@ -125,19 +125,53 @@ public class ObjectMessageTest extends TestCase implements MessageListener
{
actual.add(m.getObject());
-// try
-// {
-// m.setObject("Test text");
-// Assert.fail("Message should not be writeable");
-// }
-// catch (MessageNotWriteableException mnwe)
-// {
-// //normal execution
-// }
+ try
+ {
+ m.setObject("Test text");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ //normal execution
+ }
+
+ m.clearBody();
+
+ try
+ {
+ m.setObject("Test text");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ //Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ //normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
}
assertEqual(messages.iterator(), actual.iterator());
+
}
private static void assertEqual(Iterator expected, Iterator actual)
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
new file mode 100644
index 0000000000..02f371e81b
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.test.VMBrokerSetup;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class PropertyValueTest extends TestCase implements MessageListener
+{
+
+ private static final Logger _logger = Logger.getLogger(PropertyValueTest.class);
+
+ private int count = 0;
+ private AMQConnection _connection;
+ private Destination _destination;
+ private AMQSession _session;
+ private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
+ private final List<String> messages = new ArrayList<String>();
+ private int _count = 100;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ Destination destination = new AMQQueue(randomize("PropertyValueTest"), true);
+ init(connection, destination);
+ }
+
+ private void init(AMQConnection connection, Destination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ //set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ public void test() throws Exception
+ {
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ System.out.println("Completed without failure");
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException
+ {
+ //create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ String text = "Message " + i;
+ messages.add(text);
+ Message m = _session.createTextMessage(text);
+
+ m.setBooleanProperty("Bool", true);
+
+ m.setByteProperty("Byte", (byte) Byte.MAX_VALUE);
+ m.setDoubleProperty("Double", (double) Double.MAX_VALUE);
+ m.setFloatProperty("Float", (float) Float.MAX_VALUE);
+ m.setIntProperty("Int", (int) Integer.MAX_VALUE);
+
+ m.setJMSCorrelationID("Correlation");
+ m.setJMSPriority(100);
+
+ // Queue
+ Queue q = //_session.createTemporaryQueue();
+ q = new AMQQueue("TestReply");
+ m.setJMSReplyTo(q);
+ m.setStringProperty("TempQueue", q.toString());
+
+ _logger.info("Message:" + m);
+
+ Assert.assertEquals("Check temp queue has been set correctly",
+ m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue"));
+
+ m.setJMSType("Test");
+ m.setLongProperty("UnsignedInt", (long) 4294967295L);
+ m.setLongProperty("Long", (long) Long.MAX_VALUE);
+
+ m.setShortProperty("Short", (short) Short.MAX_VALUE);
+ m.setStringProperty("String", "Test");
+
+ _logger.info("Sending Msg:" + m);
+ producer.send(m);
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized(received)
+ {
+ while (received.size() < count)
+ {
+ received.wait();
+ }
+ }
+ }
+
+ void check() throws JMSException
+ {
+ List<String> actual = new ArrayList<String>();
+ for (JMSTextMessage m : received)
+ {
+ actual.add(m.getText());
+
+ //Check Properties
+
+ Assert.assertEquals("Check Boolean properties are correctly transported",
+ true, m.getBooleanProperty("Bool"));
+ Assert.assertEquals("Check Byte properties are correctly transported",
+ (byte) Byte.MAX_VALUE, m.getByteProperty("Byte"));
+ Assert.assertEquals("Check Double properties are correctly transported",
+ (double) Double.MAX_VALUE, m.getDoubleProperty("Double"));
+ Assert.assertEquals("Check Float properties are correctly transported",
+ (float) Float.MAX_VALUE, m.getFloatProperty("Float"));
+ Assert.assertEquals("Check Int properties are correctly transported",
+ (int) Integer.MAX_VALUE, m.getIntProperty("Int"));
+ Assert.assertEquals("Check CorrelationID properties are correctly transported",
+ "Correlation", m.getJMSCorrelationID());
+// Assert.assertEquals("Check Priority properties are correctly transported",
+// 100, m.getJMSPriority());
+
+ // Queue
+ Assert.assertEquals("Check ReplyTo properties are correctly transported",
+ m.getStringProperty("TempQueue"), m.getJMSReplyTo().toString());
+
+// Assert.assertEquals("Check Type properties are correctly transported",
+// "Test", m.getJMSType());
+ Assert.assertEquals("Check Short properties are correctly transported",
+ (short) Short.MAX_VALUE, m.getShortProperty("Short"));
+ Assert.assertEquals("Check UnsignedInt properties are correctly transported",
+ (long) 4294967295L, m.getLongProperty("UnsignedInt"));
+ Assert.assertEquals("Check Long properties are correctly transported",
+ (long) Long.MAX_VALUE, m.getLongProperty("Long"));
+ Assert.assertEquals("Check String properties are correctly transported",
+ "Test", m.getStringProperty("String"));
+ }
+
+ assertEqual(messages.iterator(), actual.iterator());
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while (expected.hasNext() && actual.hasNext())
+ {
+ try
+ {
+ assertEqual(expected.next(), actual.next());
+ }
+ catch (Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while (expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while (actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+ if (!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEqual(Object expected, Object actual)
+ {
+ if (!expected.equals(actual))
+ {
+ throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized(received)
+ {
+ received.add((JMSTextMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ PropertyValueTest test = new PropertyValueTest();
+ test._connectionString = argv.length == 0 ? "vm://:1" : argv[0];
+ test.setUp();
+ if (argv.length > 1)
+ {
+ test._count = Integer.parseInt(argv[1]);
+ }
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class));
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
index 04a9185fa6..cd3954fbcb 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Iterator;
@@ -39,6 +40,8 @@ import junit.framework.Assert;
public class TextMessageTest extends TestCase implements MessageListener
{
+ private static final Logger _logger = Logger.getLogger(TextMessageTest.class);
+
private AMQConnection _connection;
private Destination _destination;
private AMQSession _session;
@@ -100,7 +103,11 @@ public class TextMessageTest extends TestCase implements MessageListener
{
String text = "Message " + i;
messages.add(text);
- producer.send(_session.createTextMessage(text));
+ Message m = _session.createTextMessage(text);
+ m.setStringProperty("String", "hello");
+
+ _logger.info("Sending Msg:" + m);
+ producer.send(m);
}
}
@@ -122,15 +129,49 @@ public class TextMessageTest extends TestCase implements MessageListener
{
actual.add(m.getText());
-// try
-// {
-// m.setText("Test text");
-// Assert.fail("Message should not be writeable");
-// }
-// catch (MessageNotWriteableException mnwe)
-// {
-// //normal execution
-// }
+ //Check body write status
+ try
+ {
+ m.setText("Test text");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ //normal execution
+ }
+
+ m.clearBody();
+
+ try
+ {
+ m.setText("Test text");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ //Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ //normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
new file mode 100644
index 0000000000..84e9026a6a
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.BrokerDetails;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class BrokerDetailsTest extends TestCase
+{
+
+ public void testMultiParameters() throws URLSyntaxException
+ {
+ String url = "tcp://localhost:5672?timeout='200',immediatedelivery='true'";
+
+ AMQBrokerDetails broker = new AMQBrokerDetails(url);
+
+ assertTrue(broker.getOption("timeout").equals("200"));
+ assertTrue(broker.getOption("immediatedelivery").equals("true"));
+ }
+
+ public void testVMBroker() throws URLSyntaxException
+ {
+ String url = "vm://:2";
+
+ AMQBrokerDetails broker = new AMQBrokerDetails(url);
+ assertTrue(broker.getTransport().equals("vm"));
+ assertEquals(broker.getPort(), 2);
+ }
+
+ public void testTransportsDefaultToTCP() throws URLSyntaxException
+ {
+ String url = "localhost:5672";
+
+ AMQBrokerDetails broker = new AMQBrokerDetails(url);
+ assertTrue(broker.getTransport().equals("tcp"));
+ }
+
+ public void testCheckDefaultPort() throws URLSyntaxException
+ {
+ String url = "tcp://localhost";
+
+ AMQBrokerDetails broker = new AMQBrokerDetails(url);
+ assertTrue(broker.getPort() == AMQBrokerDetails.DEFAULT_PORT);
+ }
+
+ public void testBothDefaults() throws URLSyntaxException
+ {
+ String url = "localhost";
+
+ AMQBrokerDetails broker = new AMQBrokerDetails(url);
+
+ assertTrue(broker.getTransport().equals("tcp"));
+ assertTrue(broker.getPort() == AMQBrokerDetails.DEFAULT_PORT);
+ }
+
+ public void testWrongOptionSeparatorInBroker()
+ {
+ String url = "tcp://localhost:5672+option='value'";
+ try
+ {
+ new AMQBrokerDetails(url);
+ }
+ catch (URLSyntaxException urise)
+ {
+ assertTrue(urise.getReason().equals("Illegal character in port number"));
+ }
+
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(BrokerDetailsTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index d7862d047f..0da4147351 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -22,12 +22,10 @@ package org.apache.qpid.test.unit.client.connection;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQAuthenticationException;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.test.VMBrokerSetup;
import javax.jms.Connection;
@@ -40,6 +38,18 @@ public class ConnectionTest extends TestCase
String _broker_NotRunning = "vm://:2";
String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+
public void testSimpleConnection()
{
try
@@ -102,8 +112,30 @@ public class ConnectionTest extends TestCase
}
}
+ public void testClientIdCannotBeChanged() throws Exception
+ {
+ Connection connection = new AMQConnection(_broker, "guest", "guest",
+ "fred", "/test");
+ try
+ {
+ connection.setClientID("someClientId");
+ fail("No IllegalStateException thrown when resetting clientid");
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testClientIdIsPopulatedAutomatically() throws Exception
+ {
+ Connection connection = new AMQConnection(_broker, "guest", "guest",
+ null, "/test");
+ assertNotNull(connection.getClientID());
+ }
+
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(ConnectionTest.class));
+ return new junit.framework.TestSuite(ConnectionTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 13a6d214ba..64adcb13e4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -310,29 +310,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getBrokerCount() == 1);
}
- // FIXME Connection now parses but result is wrong QPID-71
- /*
- public void testWrongOptionSeparatorInBroker()
- {
- String url = "amqp://user:@/test?brokerlist='tcp://localhost:5672+option='value''";
- try
- {
- AMQConnectionURL connection = new AMQConnectionURL(url);
-
- Float version = Float.parseFloat(System.getProperty("java.specification.version"));
- if (version > 1.5)
- {
- fail("URL Should not parse on Java " + version + " Connection is:" + connection);
- }
- }
- catch (URLSyntaxException urise)
- {
- assertTrue(urise.getReason().equals("Illegal character in port number"));
- }
-
- }
- */
public void testWrongOptionSeparatorInOptions()
{
@@ -349,18 +327,6 @@ public class ConnectionURLTest extends TestCase
}
- public void testTransportsDefaultToTCP() throws URLSyntaxException
- {
- String url = "amqp://guest:guest@/test?brokerlist='localhost:5672;myhost:5673'&failover='roundrobin'";
-
- AMQConnectionURL connection = new AMQConnectionURL(url);
-
- BrokerDetails broker = connection.getBrokerDetails(0);
- assertTrue(broker.getTransport().equals("tcp"));
-
- broker = connection.getBrokerDetails(1);
- assertTrue(broker.getTransport().equals("tcp"));
- }
public void testNoUserDetailsProvidedWithClientID()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
index 9425b7c304..9bb2fcc59b 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -24,8 +24,6 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.test.VMBrokerSetup;
import javax.jms.MessageListener;
@@ -36,6 +34,7 @@ import javax.jms.ObjectMessage;
import java.io.Serializable;
import java.util.HashMap;
import java.util.ArrayList;
+import java.util.Arrays;
import junit.framework.TestCase;
@@ -44,6 +43,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
private AMQConnection connection;
private AMQDestination destination;
private AMQSession session;
+ private MessageProducer producer;
private Serializable[] data;
private volatile boolean waiting;
private int received;
@@ -57,6 +57,13 @@ public class ObjectMessageTest extends TestCase implements MessageListener
connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path");
destination = new AMQQueue(randomize("LatencyTest"), true);
session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ //set up a consumer
+ session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ //create a publisher
+ producer = session.createProducer(destination, false, false, true);
A a1 = new A(1, "A");
A a2 = new A(2, "a");
B b = new B(1, "B");
@@ -83,7 +90,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
_broker = broker;
}
- public void test() throws Exception
+ public void testSendAndReceive() throws Exception
{
try
{
@@ -102,16 +109,68 @@ public class ObjectMessageTest extends TestCase implements MessageListener
}
}
- private void send() throws Exception
+ public void testSetObjectPropertyForString() throws Exception
{
- //set up a consumer
- session.createConsumer(destination).setMessageListener(this);
- connection.start();
+ String testStringProperty = "TestStringProperty";
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestStringProperty",testStringProperty);
+ assertEquals(testStringProperty, msg.getObjectProperty("TestStringProperty"));
+ }
- //create a publisher
- MessageProducer producer = session.createProducer(destination, false, false, true);
+ public void testSetObjectPropertyForBoolean() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestBooleanProperty",Boolean.TRUE);
+ assertEquals(Boolean.TRUE, msg.getObjectProperty("TestBooleanProperty"));
+ }
+ public void testSetObjectPropertyForByte() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestByteProperty",Byte.MAX_VALUE);
+ assertEquals(Byte.MAX_VALUE, msg.getObjectProperty("TestByteProperty"));
+ }
+ public void testSetObjectPropertyForShort() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestShortProperty",Short.MAX_VALUE);
+ assertEquals(Short.MAX_VALUE, msg.getObjectProperty("TestShortProperty"));
+ }
+ public void testSetObjectPropertyForInteger() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestIntegerProperty",Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, msg.getObjectProperty("TestIntegerProperty"));
+ }
+
+ public void testSetObjectPropertyForDouble() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestDoubleProperty",Double.MAX_VALUE);
+ assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty"));
+ }
+
+ public void testSetObjectPropertyForFloat() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestFloatProperty",Float.MAX_VALUE);
+ assertEquals(Float.MAX_VALUE, msg.getObjectProperty("TestFloatProperty"));
+ }
+
+ public void testSetObjectPropertyForByteArray() throws Exception
+ {
+ byte[] array = {1,2,3,4,5};
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestByteArrayProperty",array);
+ assertTrue(Arrays.equals(array,(byte[])msg.getObjectProperty("TestByteArrayProperty")));
+ }
+
+
+
+
+ private void send() throws Exception
+ {
for (int i = 0; i < data.length; i++)
{
ObjectMessage msg;
@@ -207,7 +266,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
{
System.out.println("Usage: <broker>");
}
- new ObjectMessageTest(broker).test();
+ new ObjectMessageTest(broker).testSendAndReceive();
}
private static class A implements Serializable