diff options
Diffstat (limited to 'java/client/src')
44 files changed, 1479 insertions, 556 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 7cabc667c1..6da0da9f6f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -56,7 +56,7 @@ public class AMQBrokerDetails implements BrokerDetails { //todo this list of valid transports should be enumerated somewhere if ((!(transport.equalsIgnoreCase("vm") || - transport.equalsIgnoreCase("tcp")))) + transport.equalsIgnoreCase("tcp")))) { if (transport.equalsIgnoreCase("localhost")) { @@ -65,7 +65,7 @@ public class AMQBrokerDetails implements BrokerDetails } else { - if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' ) + if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/') { //Then most likely we have a host:port value connection = new URI(DEFAULT_TRANSPORT + "://" + url); @@ -88,7 +88,7 @@ public class AMQBrokerDetails implements BrokerDetails if (transport == null) { URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + - " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, ""); + " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, ""); } setTransport(transport); @@ -107,12 +107,45 @@ public class AMQBrokerDetails implements BrokerDetails if (port == -1) { - // Another fix for Java 1.5 URI handling + // Fix for when there is port data but it is not automatically parseable by getPort(). String auth = connection.getAuthority(); - if (auth != null && auth.startsWith(":")) + if (auth != null && auth.contains(":")) { - setPort(Integer.parseInt(auth.substring(1))); + int start = auth.indexOf(":") + 1; + int end = start; + boolean looking = true; + boolean found = false; + //Walk the authority looking for a port value. + while (looking) + { + try + { + end++; + Integer.parseInt(auth.substring(start, end)); + + if (end >= auth.length()) + { + looking = false; + found = true; + } + } + catch (NumberFormatException nfe) + { + looking = false; + } + + } + if (found) + { + setPort(Integer.parseInt(auth.substring(start, end))); + } + else + { + URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1, + "Illegal character in port number", connection.toString()); + } + } else { @@ -134,7 +167,7 @@ public class AMQBrokerDetails implements BrokerDetails { if (uris instanceof URLSyntaxException) { - throw (URLSyntaxException) uris; + throw(URLSyntaxException) uris; } URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); @@ -245,9 +278,9 @@ public class AMQBrokerDetails implements BrokerDetails BrokerDetails bd = (BrokerDetails) o; return _host.equalsIgnoreCase(bd.getHost()) && - (_port == bd.getPort()) && - _transport.equalsIgnoreCase(bd.getTransport()) && - (useSSL() == bd.useSSL()); + (_port == bd.getPort()) && + _transport.equalsIgnoreCase(bd.getTransport()) && + (useSSL() == bd.useSSL()); //todo do we need to compare all the options as well? } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 98db26d0c4..0bb8736227 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -44,6 +44,7 @@ import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; +import javax.jms.IllegalStateException; import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; @@ -92,7 +93,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap + private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap private String _clientName; @@ -142,7 +143,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + + username + ":" + password + "@" + + (clientName==null?"":clientName) + virtualHost + "?brokerlist='" + broker + "'")); } @@ -157,11 +159,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { this(new AMQConnectionURL(useSSL ? ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + + username + ":" + password + "@" + + (clientName==null?"":clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'" : ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + + username + ":" + password + "@" + + (clientName==null?"":clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'" )); @@ -537,7 +541,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void setClientID(String clientID) throws JMSException { checkNotClosed(); - _clientName = clientID; + // in AMQP it is not possible to change the client ID. If one is not specified + // upon connection construction, an id is generated automatically. Therefore + // we can always throw an exception. + throw new IllegalStateException("Client name cannot be changed after being set"); } public ConnectionMetaData getMetaData() throws JMSException @@ -583,7 +590,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void stop() throws JMSException { checkNotClosed(); - if (_started) { for (Iterator i = _sessions.values().iterator(); i.hasNext();) @@ -920,8 +926,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect void deregisterSession(int channelId) { _sessions.remove(channelId); - } - + } + /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a847658846..8f90913e5c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; @@ -367,13 +368,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - checkNotClosed(); - throw new UnsupportedOperationException("Stream messages not supported"); + synchronized (_connection.getFailoverMutex()) + { + checkNotClosed(); + + try + { + return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE); + } + catch (AMQException e) + { + throw new JMSException("Unable to create text message: " + e); + } + } } public TextMessage createTextMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -462,28 +474,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // that can be called from a different thread of control from the one controlling the session synchronized(_connection.getFailoverMutex()) { - _closed.set(true); - - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try + //Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { - _connection.getProtocolHandler().closeSession(this); - final AMQFrame frame = ChannelCloseBody.createAMQFrame( - getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0); - _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully + // we pass null since this is not an error case + closeProducersAndConsumers(null); - } - catch (AMQException e) - { - throw new JMSException("Error closing session: " + e); - } - finally - { - _connection.deregisterSession(_channelId); + try + { + _connection.getProtocolHandler().closeSession(this); + final AMQFrame frame = ChannelCloseBody.createAMQFrame( + getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0); + _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully + + } + catch (AMQException e) + { + throw new JMSException("Error closing session: " + e); + } + finally + { + _connection.deregisterSession(_channelId); + } } } } @@ -723,6 +737,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver + * * @param destination * @return QueueReceiver - a wrapper around our MessageConsumer * @throws JMSException @@ -736,6 +751,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver using a message selector + * * @param destination * @param messageSelector * @return QueueReceiver - a wrapper around our MessageConsumer @@ -826,7 +842,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler(); // TODO: construct the rawSelector from the selector string if rawSelector == null - final FieldTable ft = new FieldTable(); + final FieldTable ft = FieldTableFactory.newFieldTable(); //if (rawSelector != null) // ft.put("headers", rawSelector.getDataAsBytes()); if (rawSelector != null) @@ -935,6 +951,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Queue createQueue(String queueName) throws JMSException { + checkNotClosed(); if (queueName.indexOf('/') == -1) { return new AMQQueue(queueName); @@ -957,12 +974,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver wrapping a MessageConsumer + * * @param queue * @return QueueReceiver * @throws JMSException */ public QueueReceiver createReceiver(Queue queue) throws JMSException { + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -970,6 +989,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver wrapping a MessageConsumer using a message selector + * * @param queue * @param messageSelector * @return QueueReceiver @@ -977,6 +997,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); @@ -985,11 +1006,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueSender createSender(Queue queue) throws JMSException { - return (QueueSender) createProducer(queue); + checkNotClosed(); + //return (QueueSender) createProducer(queue); + return new QueueSenderAdapter(createProducer(queue), queue); } public Topic createTopic(String topicName) throws JMSException { + checkNotClosed(); + if (topicName.indexOf('/') == -1) { return new AMQTopic(topicName); @@ -1012,18 +1037,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a non-durable subscriber + * * @param topic * @return TopicSubscriber - a wrapper round our MessageConsumer * @throws JMSException */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException { + checkNotClosed(); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } /** * Creates a non-durable subscriber with a message selector + * * @param topic * @param messageSelector * @param noLocal @@ -1032,6 +1060,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { + checkNotClosed(); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } @@ -1045,6 +1074,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + checkNotClosed(); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1055,6 +1085,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { + checkNotClosed(); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); return new TopicSubscriberAdaptor(dest, consumer); @@ -1062,26 +1093,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicPublisher createPublisher(Topic topic) throws JMSException { - return (TopicPublisher) createProducer(topic); + checkNotClosed(); + //return (TopicPublisher) createProducer(topic); + return new TopicPublisherAdapter(createProducer(topic), topic); } public QueueBrowser createBrowser(Queue queue) throws JMSException { + checkNotClosed(); throw new UnsupportedOperationException("Queue browsing not supported"); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + checkNotClosed(); throw new UnsupportedOperationException("Queue browsing not supported"); } public TemporaryQueue createTemporaryQueue() throws JMSException { + checkNotClosed(); return new AMQTemporaryQueue(); } public TemporaryTopic createTemporaryTopic() throws JMSException { + checkNotClosed(); return new AMQTemporaryTopic(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f97ea6bf1e..ded2152bf8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -159,11 +159,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { + checkPreConditions(); return _messageSelector; } public MessageListener getMessageListener() throws JMSException { + checkPreConditions(); return (MessageListener) _messageListener.get(); } @@ -179,7 +181,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void setMessageListener(MessageListener messageListener) throws JMSException { - checkNotClosed(); + checkPreConditions(); //if the current listener is non-null and the session is not stopped, then //it is an error to call this method. @@ -277,7 +279,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - checkNotClosed(); + checkPreConditions(); acquireReceiving(); @@ -311,7 +313,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - checkNotClosed(); + checkPreConditions(); acquireReceiving(); @@ -520,7 +522,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private void deregisterConsumer() { - _session.deregisterConsumer(_consumerTag); + _session.deregisterConsumer(_consumerTag); } public String getConsumerTag() @@ -529,7 +531,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } public void setConsumerTag(String consumerTag) - { + { _consumerTag = consumerTag; } + + public AMQSession getSession() { + return _session; + } + + private void checkPreConditions() throws JMSException{ + + this.checkNotClosed(); + + if(_session == null || _session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 14cafc3558..8d6287eca3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -143,6 +143,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageID(boolean b) throws JMSException { + checkPreConditions(); checkNotClosed(); // IGNORED } @@ -156,7 +157,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageTimestamp(boolean b) throws JMSException { - checkNotClosed(); + checkPreConditions(); _disableTimestamps = b; } @@ -168,7 +169,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDeliveryMode(int i) throws JMSException { - checkNotClosed(); + checkPreConditions(); if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) { throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + @@ -185,7 +186,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setPriority(int i) throws JMSException { - checkNotClosed(); + checkPreConditions(); if (i < 0 || i > 9) { throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); @@ -201,7 +202,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setTimeToLive(long l) throws JMSException { - checkNotClosed(); + checkPreConditions(); if (l < 0) { throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); @@ -229,6 +230,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, @@ -238,6 +240,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -247,6 +250,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, boolean immediate) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -257,6 +261,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory, @@ -266,7 +271,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Destination destination, Message message) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -279,7 +284,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -292,7 +297,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive, boolean mandatory) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -305,7 +310,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -319,7 +324,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j boolean immediate, boolean waitUntilSent) throws JMSException { - checkNotClosed(); + checkPreConditions(); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -334,7 +339,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { throw new JMSException("Unsupported destination class: " + (destination != null ? destination.getClass() : null)); - } + } declareDestination((AMQDestination)destination); } @@ -481,4 +486,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkNotClosed(); _encoding = encoding; } + + private void checkPreConditions() throws IllegalStateException, JMSException { + checkNotClosed(); + + if(_destination == null){ + throw new UnsupportedOperationException("Destination is null"); + } + + if(_session == null || _session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } + + public AMQSession getSession() { + return _session; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java index 57e458d833..21ec50c046 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java @@ -39,31 +39,37 @@ public class QueueReceiverAdaptor implements QueueReceiver { public String getMessageSelector() throws JMSException { + checkPreConditions(); return _consumer.getMessageSelector(); } public MessageListener getMessageListener() throws JMSException { + checkPreConditions(); return _consumer.getMessageListener(); } public void setMessageListener(MessageListener messageListener) throws JMSException { + checkPreConditions(); _consumer.setMessageListener(messageListener); } public Message receive() throws JMSException { + checkPreConditions(); return _consumer.receive(); } public Message receive(long l) throws JMSException { + checkPreConditions(); return _consumer.receive(l); } public Message receiveNoWait() throws JMSException { + checkPreConditions(); return _consumer.receiveNoWait(); } @@ -79,8 +85,26 @@ public class QueueReceiverAdaptor implements QueueReceiver { */ public Queue getQueue() throws JMSException { + checkPreConditions(); return _queue; } + private void checkPreConditions() throws javax.jms.IllegalStateException { + BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer; + + if (msgConsumer.isClosed() ){ + throw new javax.jms.IllegalStateException("Consumer is closed"); + } + + if(_queue == null){ + throw new UnsupportedOperationException("Queue is null"); + } + + AMQSession session = msgConsumer.getSession(); + + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java new file mode 100644 index 0000000000..15bf4a125f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -0,0 +1,134 @@ +package org.apache.qpid.client; + +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueSender; + +public class QueueSenderAdapter implements QueueSender { + + private MessageProducer delegate; + private Queue queue; + private boolean closed = false; + + public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){ + delegate = msgProducer; + this.queue = queue; + } + + public Queue getQueue() throws JMSException { + checkPreConditions(); + return queue; + } + + public void send(Message msg) throws JMSException { + checkPreConditions(); + delegate.send(msg); + } + + public void send(Queue queue, Message msg) throws JMSException { + checkPreConditions(); + delegate.send(queue, msg); + } + + public void publish(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException { + checkPreConditions(); + delegate.send(msg, deliveryMode,priority,timeToLive); + } + + public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException { + checkPreConditions(); + delegate.send(queue,msg, deliveryMode,priority,timeToLive); + } + + public void close() throws JMSException { + delegate.close(); + closed = true; + } + + public int getDeliveryMode() throws JMSException { + return delegate.getDeliveryMode(); + } + + public Destination getDestination() throws JMSException { + return delegate.getDestination(); + } + + public boolean getDisableMessageID() throws JMSException { + return delegate.getDisableMessageID(); + } + + public boolean getDisableMessageTimestamp() throws JMSException { + return delegate.getDisableMessageTimestamp(); + } + + public int getPriority() throws JMSException { + return delegate.getPriority(); + } + + public long getTimeToLive() throws JMSException { + return delegate.getTimeToLive(); + } + + public void send(Destination dest, Message msg) throws JMSException { + checkPreConditions(); + delegate.send(dest,msg); + } + + public void send(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException { + checkPreConditions(); + delegate.send(msg, deliveryMode,priority,timeToLive); + } + + public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); + delegate.send(dest,msg, deliveryMode,priority,timeToLive); + } + + public void setDeliveryMode(int deliveryMode) throws JMSException { + checkPreConditions(); + delegate.setDeliveryMode(deliveryMode); + } + + public void setDisableMessageID(boolean disableMessageID) throws JMSException { + checkPreConditions(); + delegate.setDisableMessageID(disableMessageID); + } + + public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { + checkPreConditions(); + delegate.setDisableMessageTimestamp(disableMessageTimestamp); + } + + public void setPriority(int priority) throws JMSException { + checkPreConditions(); + delegate.setPriority(priority); + } + + public void setTimeToLive(long timeToLive) throws JMSException { + checkPreConditions(); + delegate.setTimeToLive(timeToLive); + } + + private void checkPreConditions() throws IllegalStateException, IllegalStateException { + if (closed){ + throw new javax.jms.IllegalStateException("Publisher is closed"); + } + + if(queue == null){ + throw new UnsupportedOperationException("Queue is null"); + } + + AMQSession session = ((BasicMessageProducer)delegate).getSession(); + + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java new file mode 100644 index 0000000000..0702202c2a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java @@ -0,0 +1,138 @@ +package org.apache.qpid.client; + +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Topic; +import javax.jms.TopicPublisher; + +public class TopicPublisherAdapter implements TopicPublisher { + + private MessageProducer delegate; + private Topic topic; + private boolean closed = false; + + public TopicPublisherAdapter(MessageProducer msgProducer, Topic topic){ + delegate = msgProducer; + this.topic = topic; + } + + public Topic getTopic() throws JMSException { + checkPreConditions(); + return topic; + } + + public void publish(Message msg) throws JMSException { + checkPreConditions(); + delegate.send(msg); + } + + public void publish(Topic topic, Message msg) throws JMSException { + checkPreConditions(); + delegate.send(topic,msg); + } + + public void publish(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException { + checkPreConditions(); + delegate.send(msg, deliveryMode,priority,timeToLive); + } + + public void publish(Topic topic, Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException { + checkPreConditions(); + delegate.send(topic,msg, deliveryMode,priority,timeToLive); + } + + public void close() throws JMSException { + delegate.close(); + closed = true; + } + + public int getDeliveryMode() throws JMSException { + return delegate.getDeliveryMode(); + } + + public Destination getDestination() throws JMSException { + return delegate.getDestination(); + } + + public boolean getDisableMessageID() throws JMSException { + return delegate.getDisableMessageID(); + } + + public boolean getDisableMessageTimestamp() throws JMSException { + return delegate.getDisableMessageTimestamp(); + } + + public int getPriority() throws JMSException { + return delegate.getPriority(); + } + + public long getTimeToLive() throws JMSException { + return delegate.getTimeToLive(); + } + + public void send(Message msg) throws JMSException { + checkPreConditions(); + delegate.send(msg); + } + + public void send(Destination dest, Message msg) throws JMSException { + checkPreConditions(); + delegate.send(dest,msg); + } + + public void send(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException { + checkPreConditions(); + delegate.send(msg, deliveryMode,priority,timeToLive); + } + + public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkPreConditions(); + delegate.send(dest,msg, deliveryMode,priority,timeToLive); + } + + public void setDeliveryMode(int deliveryMode) throws JMSException { + checkPreConditions(); + delegate.setDeliveryMode(deliveryMode); + } + + public void setDisableMessageID(boolean disableMessageID) throws JMSException { + checkPreConditions(); + delegate.setDisableMessageID(disableMessageID); + } + + public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { + checkPreConditions(); + delegate.setDisableMessageTimestamp(disableMessageTimestamp); + } + + public void setPriority(int priority) throws JMSException { + checkPreConditions(); + delegate.setPriority(priority); + } + + public void setTimeToLive(long timeToLive) throws JMSException { + checkPreConditions(); + delegate.setTimeToLive(timeToLive); + } + + private void checkPreConditions() throws IllegalStateException, IllegalStateException { + if (closed){ + throw new javax.jms.IllegalStateException("Publisher is closed"); + } + + if(topic == null){ + throw new UnsupportedOperationException("Topic is null"); + } + + AMQSession session = ((BasicMessageProducer)delegate).getSession(); + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index c776a9943e..06e353e271 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -43,37 +44,45 @@ class TopicSubscriberAdaptor implements TopicSubscriber _consumer = consumer; _noLocal = noLocal; } + TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer) { this(topic, consumer, consumer.isNoLocal()); } + public Topic getTopic() throws JMSException { + checkPreConditions(); return _topic; } public boolean getNoLocal() throws JMSException { + checkPreConditions(); return _noLocal; } public String getMessageSelector() throws JMSException { + checkPreConditions(); return _consumer.getMessageSelector(); } public MessageListener getMessageListener() throws JMSException { + checkPreConditions(); return _consumer.getMessageListener(); } public void setMessageListener(MessageListener messageListener) throws JMSException { + checkPreConditions(); _consumer.setMessageListener(messageListener); } public Message receive() throws JMSException { + checkPreConditions(); return _consumer.receive(); } @@ -84,6 +93,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber public Message receiveNoWait() throws JMSException { + checkPreConditions(); return _consumer.receiveNoWait(); } @@ -91,4 +101,22 @@ class TopicSubscriberAdaptor implements TopicSubscriber { _consumer.close(); } + + private void checkPreConditions() throws javax.jms.IllegalStateException{ + BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer; + + if (msgConsumer.isClosed() ){ + throw new javax.jms.IllegalStateException("Consumer is closed"); + } + + if(_topic == null){ + throw new UnsupportedOperationException("Topic is null"); + } + + AMQSession session = msgConsumer.getSession(); + + if(session == null || session.isClosed()){ + throw new UnsupportedOperationException("Invalid Session"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index caef9a3f44..9333df3fe4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -32,6 +32,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -117,7 +118,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener } stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - FieldTable clientProperties = new FieldTable(); + FieldTable clientProperties = FieldTableFactory.newFieldTable(); clientProperties.put("instance", ps.getClientID()); clientProperties.put("product", "Qpid"); clientProperties.put("version", "1.0"); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 6745052a5d..456d4d520c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MessageFormatException; +import javax.jms.MessageEOFException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 2001573ef9..5282dce4c9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -23,10 +23,10 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.PropertyFieldTable; +import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.AMQException; import javax.jms.JMSException; -import javax.jms.MessageFormatException; import java.util.Enumeration; public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessage @@ -58,7 +58,7 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag try { - _map = new PropertyFieldTable(getText()); + _map = FieldTableFactory.newFieldTable(getText()); } catch (JMSException e) { @@ -68,7 +68,7 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag // AbstractJMSMessage Interface - public void clearBody() throws JMSException + public void clearBodyImpl() throws JMSException { if (_data != null) { @@ -206,48 +206,55 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag public void setBoolean(String string, boolean b) throws JMSException { + checkWritable(); _map.setBoolean(string, b); } public void setByte(String string, byte b) throws JMSException { + checkWritable(); _map.setByte(string, b); } public void setShort(String string, short i) throws JMSException { + checkWritable(); _map.setShort(string, i); } public void setChar(String string, char c) throws JMSException { + checkWritable(); _map.setChar(string, c); } public void setInt(String string, int i) throws JMSException { + checkWritable(); _map.setInteger(string, i); } public void setLong(String string, long l) throws JMSException { + checkWritable(); _map.setLong(string, l); } public void setFloat(String string, float v) throws JMSException { - + checkWritable(); _map.setFloat(string, v); } public void setDouble(String string, double v) throws JMSException { - + checkWritable(); _map.setDouble(string, v); } public void setString(String string, String string1) throws JMSException { + checkWritable(); _map.setString(string, string1); } @@ -258,11 +265,13 @@ public class JMSMapMessage extends JMSTextMessage implements javax.jms.MapMessag public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException { + checkWritable(); _map.setBytes(string, bytes, i, i1); } public void setObject(String string, Object object) throws JMSException { + checkWritable(); _map.setObject(string, object); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 34dd7e9ec1..61f326d52b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -36,7 +36,6 @@ import java.nio.charset.CharacterCodingException; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { static final String MIME_TYPE = "application/java-object-stream"; - private final boolean _readonly; private static final int DEFAULT_BUFFER_SIZE = 1024; @@ -56,7 +55,6 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } - _readonly = (data != null); getJmsContentHeaderProperties().setContentType(MIME_TYPE); } @@ -66,10 +64,9 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - _readonly = data != null; } - public void clearBody() throws JMSException + public void clearBodyImpl() throws JMSException { if (_data != null) { @@ -90,10 +87,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag public void setObject(Serializable serializable) throws JMSException { - if (_readonly) - { - throw new MessageNotWriteableException("Message is not writable."); - } + checkWritable(); if (_data == null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 2624c20105..3061d5a59c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -66,7 +66,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text setText(text); } - public void clearBody() throws JMSException + public void clearBodyImpl() throws JMSException { if (_data != null) { @@ -93,6 +93,8 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text public void setText(String string) throws JMSException { + checkWritable(); + clearBody(); try { diff --git a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java index 81d3fb76d5..4291cb3259 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.security.amqplain; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; @@ -71,7 +72,7 @@ public class AmqPlainSaslClient implements SaslClient { throw new SaslException("Error handling SASL callbacks: " + e, e); } - FieldTable table = new FieldTable(); + FieldTable table = FieldTableFactory.newFieldTable(); table.put("LOGIN", nameCallback.getName()); table.put("PASSWORD", pwdCallback.getPassword()); return table.getDataAsBytes(); diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java index 02fe103c6a..c26f67bf10 100644 --- a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java +++ b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java @@ -39,7 +39,7 @@ public class TestMessageHelper return new JMSMapMessage(); } - public static JMSStreamMessage newJMSStreamMessage() throws JMSException + public static JMSStreamMessage newJMSStreamMessage() { return new JMSStreamMessage(); } diff --git a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java index 21a6816af7..006bda7e2e 100644 --- a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java +++ b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java @@ -41,7 +41,7 @@ * Copyright 2001, 2003 (C) Exoffice Technologies Inc. All Rights Reserved. * */ -package org.exolab.jmscts.amqp; +package org.apache.qpid.cts.src.providers.amqp.org.exolab.jmscts.amqp; import org.apache.qpid.client.*; import org.exolab.jmscts.provider.Administrator; diff --git a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java index 21610d39b2..aafa415d1e 100644 --- a/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java +++ b/java/client/src/test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java @@ -41,7 +41,7 @@ * Copyright 2001, 2003 (C) Exoffice Technologies Inc. All Rights Reserved. * */ -package org.exolab.jmscts.amqp; +package org.apache.qpid.cts.src.providers.amqp.org.exolab.jmscts.amqp; import javax.jms.JMSException; diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java index ca3e5ce3f5..b199d41432 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -19,7 +19,7 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; -import java.util.Properties; + import java.io.File; import org.apache.qpid.example.shared.FileUtils; @@ -34,12 +34,17 @@ import javax.jms.JMSException; */ public class FileMessageDispatcher { - private static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); - - private static Publisher _publisher = null; + protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); - private static final String DEFAULT_PUB_NAME = "Publisher"; + protected static Publisher _publisher = null; + /** + * To use this main method you need to specify a path or file to use for input + * This class then uses file contents from the dir/file specified to generate + * messages to publish + * Intended to be a very simple way to get going with publishing using the broker + * @param args - must specify one value, the path to file(s) for publisher + */ public static void main(String[] args) { @@ -52,7 +57,7 @@ public class FileMessageDispatcher { { try { - //publish message(s) from file(s) and send message to monitor queue + //publish message(s) from file(s) to configured queue publish(args[0]); //Move payload file(s) to archive location as no error @@ -60,7 +65,8 @@ public class FileMessageDispatcher { } catch(Exception e) { - System.err.println("Error trying to dispatch message: " + e); + //log error and exit + _logger.error("Error trying to dispatch message: " + e); System.exit(1); } finally @@ -81,8 +87,12 @@ public class FileMessageDispatcher { System.exit(0); } - - //Publish files or file as message + /** + * Publish the content of a file or files from a directory as messages + * @param path - from main args + * @throws JMSException + * @throws MessageFactoryException - if cannot create message from file content + */ public static void publish(String path) throws JMSException, MessageFactoryException { File tempFile = new File(path); @@ -100,7 +110,7 @@ public class FileMessageDispatcher { for (File file : files) { //Create message factory passing in payload path - MessageFactory factory = new MessageFactory(getPublisher().getSession(), file.toString()); + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); //Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); @@ -110,16 +120,18 @@ public class FileMessageDispatcher { } else { - //handle as single file + //handle a single file //Create message factory passing in payload path - MessageFactory factory = new MessageFactory(getPublisher().getSession(),tempFile.toString()); + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); //Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); } } - //cleanup publishers + /** + * Cleanup before exit + */ public static void cleanup() { if (getPublisher() != null) @@ -128,8 +140,8 @@ public class FileMessageDispatcher { } } - /* - * Returns a _publisher for a queue + /** + * @return A Publisher instance */ private static Publisher getPublisher() { @@ -141,7 +153,6 @@ public class FileMessageDispatcher { //Create a _publisher _publisher = new Publisher(); - _publisher.setName(DEFAULT_PUB_NAME); return _publisher; } diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java index f9944284c8..88bcbbbccb 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -25,13 +25,19 @@ import org.apache.qpid.example.shared.Statics; import java.io.*; import javax.jms.*; -public class MessageFactory +public class FileMessageFactory { - private final Session _session; - private final String _payload; - private final String _filename; + protected final Session _session; + protected final String _payload; + protected final String _filename; - public MessageFactory(Session session, String filename) throws MessageFactoryException + /** + * Contructs and instance using a filename from which content will be used to create message + * @param session + * @param filename + * @throws MessageFactoryException + */ + public FileMessageFactory(Session session, String filename) throws MessageFactoryException { try { @@ -45,9 +51,13 @@ public class MessageFactory } } - /* - * Creates message and sets filename property on it - */ + /** + * Creates a text message and sets filename property on it + * The filename property is purely intended to provide visibility + * of file content passing trhough the broker using example classes + * @return Message - a TextMessage with content from file + * @throws JMSException + */ public Message createEventMessage() throws JMSException { TextMessage msg = _session.createTextMessage(); @@ -56,9 +66,13 @@ public class MessageFactory return msg; } - /* - * Creates message from a string for use by the monitor - */ + /** + * Creates message from a string for use by the monitor + * @param session + * @param textMsg - message content + * @return Message - TextMessage with content from String + * @throws JMSException + */ public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException { TextMessage msg = session.createTextMessage(); diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 16b32da22a..8784d340da 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -20,8 +20,9 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; -import org.apache.qpid.example.shared.Statics; + import javax.jms.*; + import java.util.Properties; /** @@ -32,14 +33,18 @@ public class MonitorMessageDispatcher { private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); - private static MonitorPublisher _monitorPublisher = null; + protected static MonitorPublisher _monitorPublisher = null; - private static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + /** + * Easy entry point for running a message dispatcher for monitoring consumption + * @param args + */ public static void main(String[] args) { - //@TODO switch on logging appropriately at your app level + //Switch on logging appropriately for your app BasicConfigurator.configure(); try @@ -61,7 +66,7 @@ public class MonitorMessageDispatcher { } catch(UndeliveredMessageException a) { - //@TODO trigger application specific failure handling here + //trigger application specific failure handling here _logger.error("Problem delivering monitor message"); break; } @@ -69,8 +74,7 @@ public class MonitorMessageDispatcher { } catch(Exception e) { - - System.err.println("Error trying to dispatch AMS monitor message: " + e); + _logger.error("Error trying to dispatch AMS monitor message: " + e); System.exit(1); } finally @@ -84,15 +88,21 @@ public class MonitorMessageDispatcher { System.exit(1); } - //Publish heartbeat message + /** + * Publish heartbeat message + * @throws JMSException + * @throws UndeliveredMessageException + */ public static void publish() throws JMSException, UndeliveredMessageException { //Send the message generated from the payload using the _publisher getMonitorPublisher().sendImmediateMessage - (MessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); } - //cleanup publishers + /** + * Cleanup publishers + */ public static void cleanup() { if (getMonitorPublisher() != null) @@ -114,9 +124,6 @@ public class MonitorMessageDispatcher { return _monitorPublisher; } - //Create _publisher using system properties - Properties props = System.getProperties(); - //Create a _publisher using failover details and constant for monitor queue _monitorPublisher = new MonitorPublisher(); diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java index d64fd9b142..be42e0e413 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java @@ -22,14 +22,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.jms.Session; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.DeliveryMode; import javax.jms.Queue; import javax.jms.MessageProducer; import javax.jms.Connection; +import javax.jms.Session; + import javax.naming.InitialContext; import org.apache.qpid.example.shared.InitialContextHelper; @@ -44,7 +44,7 @@ public class Publisher protected Session _session; - private MessageProducer _producer; + protected MessageProducer _producer; protected String _destinationDir; @@ -54,7 +54,10 @@ public class Publisher protected static final String _defaultDestinationDir = "/tmp"; - //constructor for use with a single host + /** + * Creates a Publisher instance using properties from example.properties + * See InitialContextHelper for details of how context etc created + */ public Publisher() { try @@ -68,7 +71,7 @@ public class Publisher _connection = cf.createConnection(); //create a transactional session - _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //lookup the example queue and use it //Queue is non-exclusive and not deleted when last consumer detaches @@ -90,8 +93,9 @@ public class Publisher } /** - * Publishes a non-persistent message using transacted session - **/ + * Publishes a non-persistent message using transacted session + * Note that persistent is the default mode for send - so need to specify for transient + */ public boolean sendMessage(Message message) { try @@ -124,6 +128,9 @@ public class Publisher return true; } + /** + * Cleanup resources before exit + */ public void cleanup() { try @@ -138,11 +145,15 @@ public class Publisher } catch(Exception e) { - System.err.println("Error trying to cleanup publisher " + e); + _log.error("Error trying to cleanup publisher " + e); System.exit(1); } } + /** + * Exposes session + * @return Session + */ public Session getSession() { return _session; diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java index d6e020bf43..9c195aef40 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java @@ -42,6 +42,9 @@ public class MonitoredSubscriber extends Subscriber _monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX; } + /** + * MessageListener implementation for this subscriber + */ public static class MonitorMessageListener implements MessageListener { private String _name; @@ -52,9 +55,10 @@ public class MonitoredSubscriber extends Subscriber } - /* - * Listens for heartbeat messages and acknowledges them - */ + /** + * Listens for heartbeat messages and acknowledges them + * @param message + */ public void onMessage(javax.jms.Message message) { _logger.info(_name + " monitor got message '" + message + "'"); @@ -79,9 +83,9 @@ public class MonitoredSubscriber extends Subscriber } } - /* - * Subscribes to Queue and attaches additional monitor listener - */ + /** + * Subscribes to Queue and attaches additional monitor listener + */ public void subscribeAndMonitor() { try @@ -115,7 +119,9 @@ public class MonitoredSubscriber extends Subscriber } } - //stop consuming + /** + * Stop consuming + */ public void stopMonitor() { try diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java index d6ec8bd5de..d2f27da052 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java @@ -19,9 +19,6 @@ package org.apache.qpid.example.subscriber; import org.apache.log4j.BasicConfigurator; -import org.apache.qpid.example.shared.Statics; - -import java.util.Properties; /** * Allows you to simply start a monitored subscriber @@ -30,6 +27,10 @@ public class MonitoredSubscriptionWrapper { private static MonitoredSubscriber _subscriber; + /** + * Create a monitored subscriber and start it + * @param args - no params required + */ public static void main(String args[]) { //switch on logging @@ -37,15 +38,12 @@ public class MonitoredSubscriptionWrapper { _subscriber = new MonitoredSubscriber(); - //using system props but can replace with app appropriate config here - Properties props = System.getProperties(); - - //note that for failover should set -Dhost=host1:port1;host2:port2 - //Client will then failover in order i.e. connect to first host and failover to second and so on _subscriber.subscribe(); } - //Stop subscribing now ... + /** + * Stop subscribing now ... + */ public static void stop() { _subscriber.stop(); diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java index 6b89567b83..34c7d6c7bb 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java @@ -69,9 +69,9 @@ public class Subscriber } } - /* - * Listener class that handles messages - */ + /** + * Listener class that handles messages + */ public static class ExampleMessageListener implements MessageListener { private String _name; @@ -82,10 +82,10 @@ public class Subscriber } - /* - * Listens for message callbacks, handles and then acknowledges them - * @param message - the message received - */ + /** + * Listens for message callbacks, handles and then acknowledges them + * @param message - the message received + */ public void onMessage(javax.jms.Message message) { _log.info(_name + " got message '" + message + "'"); @@ -113,9 +113,9 @@ public class Subscriber } } - /* - * Subscribes to example Queue and attaches listener - */ + /** + * Subscribes to example Queue and attaches listener + */ public void subscribe() { _log.info("Starting subscription ..."); @@ -160,14 +160,18 @@ public class Subscriber } } + /** + * Set destination (queue or topic) name + * @param name + */ public void setDestinationName(String name) { _destinationName = name; } - /* - * stop consuming and close connection - */ + /** + * Stop consuming and close connection + */ public void stop() { try diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java index 4e755e858f..32a0ef685c 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java @@ -18,10 +18,6 @@ */ package org.apache.qpid.example.subscriber; -import org.apache.qpid.example.shared.Statics; - -import java.util.Properties; - import org.apache.log4j.BasicConfigurator; /** @@ -31,6 +27,10 @@ public class SubscriptionWrapper { private static Subscriber _subscriber; + /** + * Create a subscriber and start it + * @param args + */ public static void main(String args[]) { //switch on logging @@ -38,15 +38,12 @@ public class SubscriptionWrapper { _subscriber = new Subscriber(); - //using system props but can replace with app appropriate config here - Properties props = System.getProperties(); - - //note that for failover should set -Dhost=host1:port1;host2:port2 - //Client will then failover in order i.e. connect to first host and failover to second and so on _subscriber.subscribe(); } - //Stop subscribing now ... + /** + * Stop subscribing now ... + */ public static void stop() { _subscriber.stop(); diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java b/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java deleted file mode 100644 index 3a81a0224b..0000000000 --- a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.test; - -import org.apache.qpid.example.subscriber.Subscriber; -import org.apache.qpid.example.publisher.FileMessageDispatcher; -import org.apache.qpid.example.shared.Statics; - -import java.net.InetAddress; -import java.util.Properties; - -import org.apache.log4j.Logger; -import org.apache.log4j.BasicConfigurator; - - -public class TestAMSPubSub { - - private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class); - private static final String _defaultPayloadPath = "/tmp"; - - private static Subscriber subscriber; - - - /** - * Test main for class using default of local file for message payload - */ - public static void main(String[] args) - { - - //switch on logging - BasicConfigurator.configure(); - - InetAddress _address; - TestAMSPubSub testPubSub = new TestAMSPubSub(); - - //create publisher and subscriber - subscriber = new Subscriber(); - - //subscribe - testPubSub.subscribe(); - - //publish a message - if (args.length == 1) - { - testPubSub.publish(args[0]); - } - else - { - testPubSub.publish(null); - } - - //Should be able to see message publication and receipt in logs now - - //Disconnect and end test run - FileMessageDispatcher.cleanup(); - - //and exit as we're all done - System.exit(0); - - } - - private void subscribe() - { - subscriber.subscribe(); - } - - private void publish(String payloadPath) - { - - try - { - if (payloadPath == null|| payloadPath.length() == 0) - { - payloadPath = _defaultPayloadPath; - } - - FileMessageDispatcher.publish(payloadPath); - - } - catch (Exception e) - { - e.printStackTrace(); - } - } -} diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java b/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java deleted file mode 100644 index f1a921e106..0000000000 --- a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.test; - -import org.apache.qpid.example.subscriber.Subscriber; -import org.apache.qpid.example.publisher.FileMessageDispatcher; -import org.apache.qpid.example.shared.Statics; - -import java.net.InetAddress; -import java.util.Properties; - -import org.apache.log4j.Logger; -import org.apache.log4j.BasicConfigurator; - - -public class TestMultSubscribers { - - private static final Logger _logger = Logger.getLogger(TestMultSubscribers.class); - private static final String _defaultPayloadPath = "/tmp"; - - private static Subscriber subscriber1; - private static Subscriber subscriber2; - - private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - - /** - * Test main for class using default of local file for message payload - */ - public static void main(String[] args) - { - - //switch on logging - BasicConfigurator.configure(); - - InetAddress _address; - TestMultSubscribers testMultSub = new TestMultSubscribers(); - - //create publisher and subscriber - subscriber1 = new Subscriber(); - subscriber2 = new Subscriber(); - - //subscribe to the topic - testMultSub.subscribe(args); - - //publish a message - if (args.length == 1) - { - testMultSub.publish(args[0]); - } - else - { - testMultSub.publish(null); - } - - //Should be able to see message publication and receipt in logs now - - //Disconnect and end test run - FileMessageDispatcher.cleanup(); - - //and exit as we're all done - System.exit(0); - - } - - /* - * Point both of our subscribers at one queue - */ - private void subscribe(String[] args) - { - Properties props = System.getProperties(); - subscriber1.subscribe(); - subscriber2.subscribe(); - - } - - private void publish(String payloadPath) - { - - try - { - if (payloadPath == null|| payloadPath.length() == 0) - { - payloadPath = _defaultPayloadPath; - } - - FileMessageDispatcher.publish(payloadPath); - - } - catch (Exception e) - { - e.printStackTrace(); - } - } -} - diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java b/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java deleted file mode 100644 index 6ff6028ccd..0000000000 --- a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.test; - -import org.apache.qpid.example.publisher.FileMessageDispatcher; - -import java.net.InetAddress; - -import org.apache.log4j.Logger; -import org.apache.log4j.BasicConfigurator; - - -public class TestPublisher { - - private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class); - private static final String _defaultPayloadPath = "/tmp"; - - /** - * Test main for class using default of local file for message payload - */ - public static void main(String[] args) - { - - //switch on logging - BasicConfigurator.configure(); - - InetAddress _address; - TestPublisher testPub = new TestPublisher(); - - //publish a message - if (args.length == 1) - { - testPub.publish(args[0]); - } - else - { - testPub.publish(null); - } - - //Should be able to see message publication and receipt in logs now - - //Disconnect and end test run - FileMessageDispatcher.cleanup(); - - //and exit as we're all done - System.exit(0); - - } - - private void publish(String payloadPath) - { - - try - { - if (payloadPath == null|| payloadPath.length() == 0) - { - payloadPath = _defaultPayloadPath; - } - - FileMessageDispatcher.publish(payloadPath); - - } - catch (Exception e) - { - e.printStackTrace(); - } - } -} - diff --git a/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java b/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java index 49e1630f15..2a7cb8be30 100644 --- a/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java +++ b/java/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java @@ -33,6 +33,33 @@ import junit.framework.TestCase; public class FieldTableTest extends TestCase { + + public void testEncoding() + { + FieldTable table = FieldTableFactory.newFieldTable(); + + String key = "String"; + String value = "Hello"; + table.put(key, value); + + //Add one for the type encoding + int size = EncodingUtils.encodedShortStringLength(key) + 1 + + EncodingUtils.encodedLongStringLength(value); + + assertEquals(table.getEncodedSize(), size); + + key = "Integer"; + Integer number = new Integer(60); + table.put(key, number); + + //Add one for the type encoding + size += EncodingUtils.encodedShortStringLength(key) + 1 + 4; + + + assertEquals(table.getEncodedSize(), size); + } + + public void testDataDump() throws IOException, AMQFrameDecodingException { byte[] data = readBase64("content.txt"); @@ -46,7 +73,7 @@ public class FieldTableTest extends TestCase ByteBuffer buffer = ByteBuffer.allocate(data.length); buffer.put(data); buffer.flip(); - FieldTable table = new FieldTable(buffer, size); + FieldTable table = FieldTableFactory.newFieldTable(buffer, size); } /* @@ -107,7 +134,7 @@ public class FieldTableTest extends TestCase FieldTable load(String name) throws IOException { - return populate(new FieldTable(), read(name)); + return populate(FieldTableFactory.newFieldTable(), read(name)); } Properties read(String name) throws IOException @@ -123,11 +150,12 @@ public class FieldTableTest extends TestCase { String key = (String) i.nextElement(); String value = properties.getProperty(key); - try{ + try + { int ival = Integer.parseInt(value); table.put(key, (long) ival); } - catch(NumberFormatException e) + catch (NumberFormatException e) { table.put(key, value); } @@ -144,7 +172,8 @@ public class FieldTableTest extends TestCase { StringBuffer buffer = new StringBuffer(); String line = in.readLine(); - while (line != null){ + while (line != null) + { buffer.append(line).append(" "); line = in.readLine(); } diff --git a/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java b/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java index f1f310c6e5..6f538d068c 100644 --- a/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java +++ b/java/client/src/test/java/org/apache/qpid/headers/MessageFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.headers; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -127,14 +128,14 @@ class MessageFactory FieldTable getConsumerBinding() { - FieldTable binding = new FieldTable(); + FieldTable binding = FieldTableFactory.newFieldTable(); binding.put("SF0000", "value"); return binding; } FieldTable getControllerBinding() { - FieldTable binding = new FieldTable(); + FieldTable binding = FieldTableFactory.newFieldTable(); binding.put("SCONTROL", "value"); return binding; } diff --git a/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java b/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java index 5dc57364b3..74becfd9bb 100644 --- a/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java +++ b/java/client/src/test/java/org/apache/qpid/requestreply1/ServiceRequestingClient.java @@ -107,6 +107,7 @@ public class ServiceRequestingClient implements ExceptionListener } try { + m.getPropertyNames(); if (m.propertyExists("timeSent")) { long timeSent = Long.parseLong(m.getStringProperty("timeSent")); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java index 4d37c5d2a6..2983a16e6d 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java @@ -135,6 +135,8 @@ public class BytesMessageTest extends TestCase implements MessageListener buffer.get(data); actual.add(data); + + //Check Body Write Status try { m.writeBoolean(true); @@ -144,6 +146,41 @@ public class BytesMessageTest extends TestCase implements MessageListener { //normal execution } + + m.clearBody(); + + try + { + m.writeBoolean(true); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } + + + //Check property write status + try + { + m.setStringProperty("test", "test"); + Assert.fail("Message should not be writeable"); + } + catch (MessageNotWriteableException mnwe) + { + //normal execution + } + + m.clearProperties(); + + try + { + m.setStringProperty("test", "test"); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } + } assertEqual(messages.iterator(), actual.iterator()); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java index 079def81d0..ad180e3a89 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java @@ -21,10 +21,13 @@ package org.apache.qpid.test.unit.basic; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.TestMessageHelper; import java.util.Enumeration; +import java.util.Iterator; +import java.util.NoSuchElementException; import javax.jms.JMSException; @@ -34,20 +37,28 @@ public class FieldTableKeyEnumeratorTest extends TestCase { public void testKeyEnumeration() { - FieldTable result = new FieldTable(); + FieldTable result = FieldTableFactory.newFieldTable(); result.put("one", 1L); result.put("two", 2L); result.put("three", 3L); result.put("four", 4L); result.put("five", 5L); - Enumeration e = result.keys(); + Iterator iterator = result.keySet().iterator(); + + try + { + assertTrue("one".equals(iterator.next())); + assertTrue("two".equals(iterator.next())); + assertTrue("three".equals(iterator.next())); + assertTrue("four".equals(iterator.next())); + assertTrue("five".equals(iterator.next())); + } + catch (NoSuchElementException e) + { + fail("All elements should be found."); + } - assertTrue("one".equals(e.nextElement())); - assertTrue("two".equals(e.nextElement())); - assertTrue("three".equals(e.nextElement())); - assertTrue("four".equals(e.nextElement())); - assertTrue("five".equals(e.nextElement())); } public void testPropertEnu() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index 67b7f49565..c1ecef6b57 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -26,10 +26,12 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableTest; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.test.VMBrokerSetup; import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; @@ -39,6 +41,9 @@ import junit.framework.TestCase; public class FieldTableMessageTest extends TestCase implements MessageListener { + + private static final Logger _logger = Logger.getLogger(FieldTableMessageTest.class); + private AMQConnection _connection; private AMQDestination _destination; private AMQSession _session; @@ -50,7 +55,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); } protected void tearDown() throws Exception @@ -80,7 +85,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener private FieldTable load() throws IOException { - FieldTable result = new FieldTable(); + FieldTable result = FieldTableFactory.newFieldTable(); result.put("one", 1L); result.put("two", 2L); result.put("three", 3L); @@ -128,7 +133,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener for (Object m : received) { ByteBuffer buffer = ((JMSBytesMessage) m).getData(); - FieldTable actual = new FieldTable(buffer, buffer.remaining()); + FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining()); new FieldTableTest().assertEquivalent(_expected, actual); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java new file mode 100644 index 0000000000..92b4831d93 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import org.apache.qpid.framing.PropertyFieldTable; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.TestMessageHelper; + +import java.util.Enumeration; +import java.util.NoSuchElementException; + +import javax.jms.JMSException; + +import junit.framework.TestCase; + +public class FieldTablePropertyTest extends TestCase +{ + public void testPropertyNames() + { + try + { + JMSTextMessage text = TestMessageHelper.newJMSTextMessage(); + + text.setBooleanProperty("Boolean1", true); + text.setBooleanProperty("Boolean2", true); + text.setIntProperty("Int", 2); + text.setLongProperty("Long", 2); + + Enumeration e = text.getPropertyNames(); + + assertEquals("Boolean1", e.nextElement()); + assertTrue("Boolean2".equals(e.nextElement())); + assertTrue("Int".equals(e.nextElement())); + assertTrue("Long".equals(e.nextElement())); + } + catch (JMSException e) + { + + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(FieldTablePropertyTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index f25d2887ae..5353a19d13 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -47,6 +47,7 @@ public class MapMessageTest extends TestCase implements MessageListener private final List<String> messages = new ArrayList<String>(); private int _count = 100; public String _connectionString = "vm://:1"; + private byte[] _bytes = {99, 98, 97, 96, 95}; protected void setUp() throws Exception { @@ -104,9 +105,31 @@ public class MapMessageTest extends TestCase implements MessageListener MapMessage message = _session.createMapMessage(); message.setBoolean("odd", i / 2 == 0); + message.setByte("byte", (byte) Byte.MAX_VALUE); + + message.setBytes("bytes", _bytes); + message.setChar("char", (char) 'c'); + message.setDouble("double", (double) Double.MAX_VALUE); + message.setFloat("float", (float) Float.MAX_VALUE); + message.setInt("messageNumber", i); + message.setInt("int", (int) Integer.MAX_VALUE); + + message.setLong("long", (long) Long.MAX_VALUE); + message.setShort("short", (short) Short.MAX_VALUE); message.setString("message", text); + + message.setObject("object-bool", true); + message.setObject("object-byte", Byte.MAX_VALUE); + message.setObject("object-bytes", _bytes); + message.setObject("object-char", 'c'); + message.setObject("object-double", Double.MAX_VALUE); + message.setObject("object-float", Float.MAX_VALUE); + message.setObject("object-int", Integer.MAX_VALUE); + message.setObject("object-long", Long.MAX_VALUE); + message.setObject("object-short", Short.MAX_VALUE); + producer.send(message); } } @@ -130,18 +153,74 @@ public class MapMessageTest extends TestCase implements MessageListener { actual.add(m.getString("message")); assertEqual(m.getInt("messageNumber"), count); - assertEqual(m.getBoolean("odd"), count / 2 == 0); -// try -// { -// m.setInt("testint", 3); -// fail("Message should not be writeable"); -// } -// catch (MessageNotWriteableException mnwe) -// { -// //normal execution -// } + assertEqual(count / 2 == 0, m.getBoolean("odd")); + assertEqual((byte) Byte.MAX_VALUE, m.getByte("byte")); + + assertBytesEqual(_bytes, m.getBytes("bytes")); + assertEqual((char) 'c', m.getChar("char")); + assertEqual((double) Double.MAX_VALUE, m.getDouble("double")); + assertEqual((float) Float.MAX_VALUE, m.getFloat("float")); + + assertEqual(count, m.getInt("messageNumber")); + assertEqual((int) Integer.MAX_VALUE, m.getInt("int")); + assertEqual((long) Long.MAX_VALUE, m.getLong("long")); + assertEqual((short) Short.MAX_VALUE, m.getShort("short")); + + assertEqual(true, m.getObject("object-bool")); + assertEqual(Byte.MAX_VALUE, m.getObject("object-byte")); + assertBytesEqual(_bytes, (byte[]) m.getObject("object-bytes")); + assertEqual('c', m.getObject("object-char")); + assertEqual(Double.MAX_VALUE, m.getObject("object-double")); + assertEqual(Float.MAX_VALUE, m.getObject("object-float")); + assertEqual(Integer.MAX_VALUE, m.getObject("object-int")); + assertEqual(Long.MAX_VALUE, m.getObject("object-long")); + assertEqual(Short.MAX_VALUE, m.getObject("object-short")); + + + try + { + m.setInt("testint", 3); + fail("Message should not be writeable"); + } + catch (MessageNotWriteableException mnwe) + { + //normal execution + } + + m.clearBody(); + + try + { + m.setInt("testint", 3); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } + + //Check property write status + try + { + m.setStringProperty("test", "test"); + Assert.fail("Message should not be writeable"); + } + catch (MessageNotWriteableException mnwe) + { + //normal execution + } + + m.clearProperties(); + + try + { + m.setStringProperty("test", "test"); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } count++; } @@ -149,6 +228,17 @@ public class MapMessageTest extends TestCase implements MessageListener assertEqual(messages.iterator(), actual.iterator()); } + private void assertBytesEqual(byte[] expected, byte[] actual) + { + Assert.assertEquals(expected.length, actual.length); + + for (int index = 0; index < expected.length; index++) + { + Assert.assertEquals(expected[index], actual[index]); + } + } + + private static void assertEqual(Iterator expected, Iterator actual) { List<String> errors = new ArrayList<String>(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java index dfb1b26454..e7d7159bd8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java @@ -125,19 +125,53 @@ public class ObjectMessageTest extends TestCase implements MessageListener { actual.add(m.getObject()); -// try -// { -// m.setObject("Test text"); -// Assert.fail("Message should not be writeable"); -// } -// catch (MessageNotWriteableException mnwe) -// { -// //normal execution -// } + try + { + m.setObject("Test text"); + Assert.fail("Message should not be writeable"); + } + catch (MessageNotWriteableException mnwe) + { + //normal execution + } + + m.clearBody(); + + try + { + m.setObject("Test text"); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } + + //Check property write status + try + { + m.setStringProperty("test", "test"); + Assert.fail("Message should not be writeable"); + } + catch (MessageNotWriteableException mnwe) + { + //normal execution + } + + m.clearProperties(); + + try + { + m.setStringProperty("test", "test"); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } } assertEqual(messages.iterator(), actual.iterator()); + } private static void assertEqual(Iterator expected, Iterator actual) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java new file mode 100644 index 0000000000..02f371e81b --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -0,0 +1,264 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import junit.framework.Assert; +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.test.VMBrokerSetup; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class PropertyValueTest extends TestCase implements MessageListener +{ + + private static final Logger _logger = Logger.getLogger(PropertyValueTest.class); + + private int count = 0; + private AMQConnection _connection; + private Destination _destination; + private AMQSession _session; + private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>(); + private final List<String> messages = new ArrayList<String>(); + private int _count = 100; + public String _connectionString = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + try + { + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + } + catch (Exception e) + { + fail("Unable to initialilse connection: " + e); + } + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + private void init(AMQConnection connection) throws Exception + { + Destination destination = new AMQQueue(randomize("PropertyValueTest"), true); + init(connection, destination); + } + + private void init(AMQConnection connection, Destination destination) throws Exception + { + _connection = connection; + _destination = destination; + _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + //set up a slow consumer + _session.createConsumer(destination).setMessageListener(this); + connection.start(); + } + + public void test() throws Exception + { + int count = _count; + send(count); + waitFor(count); + check(); + System.out.println("Completed without failure"); + _connection.close(); + } + + void send(int count) throws JMSException + { + //create a publisher + MessageProducer producer = _session.createProducer(_destination); + for (int i = 0; i < count; i++) + { + String text = "Message " + i; + messages.add(text); + Message m = _session.createTextMessage(text); + + m.setBooleanProperty("Bool", true); + + m.setByteProperty("Byte", (byte) Byte.MAX_VALUE); + m.setDoubleProperty("Double", (double) Double.MAX_VALUE); + m.setFloatProperty("Float", (float) Float.MAX_VALUE); + m.setIntProperty("Int", (int) Integer.MAX_VALUE); + + m.setJMSCorrelationID("Correlation"); + m.setJMSPriority(100); + + // Queue + Queue q = //_session.createTemporaryQueue(); + q = new AMQQueue("TestReply"); + m.setJMSReplyTo(q); + m.setStringProperty("TempQueue", q.toString()); + + _logger.info("Message:" + m); + + Assert.assertEquals("Check temp queue has been set correctly", + m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); + + m.setJMSType("Test"); + m.setLongProperty("UnsignedInt", (long) 4294967295L); + m.setLongProperty("Long", (long) Long.MAX_VALUE); + + m.setShortProperty("Short", (short) Short.MAX_VALUE); + m.setStringProperty("String", "Test"); + + _logger.info("Sending Msg:" + m); + producer.send(m); + } + } + + void waitFor(int count) throws InterruptedException + { + synchronized(received) + { + while (received.size() < count) + { + received.wait(); + } + } + } + + void check() throws JMSException + { + List<String> actual = new ArrayList<String>(); + for (JMSTextMessage m : received) + { + actual.add(m.getText()); + + //Check Properties + + Assert.assertEquals("Check Boolean properties are correctly transported", + true, m.getBooleanProperty("Bool")); + Assert.assertEquals("Check Byte properties are correctly transported", + (byte) Byte.MAX_VALUE, m.getByteProperty("Byte")); + Assert.assertEquals("Check Double properties are correctly transported", + (double) Double.MAX_VALUE, m.getDoubleProperty("Double")); + Assert.assertEquals("Check Float properties are correctly transported", + (float) Float.MAX_VALUE, m.getFloatProperty("Float")); + Assert.assertEquals("Check Int properties are correctly transported", + (int) Integer.MAX_VALUE, m.getIntProperty("Int")); + Assert.assertEquals("Check CorrelationID properties are correctly transported", + "Correlation", m.getJMSCorrelationID()); +// Assert.assertEquals("Check Priority properties are correctly transported", +// 100, m.getJMSPriority()); + + // Queue + Assert.assertEquals("Check ReplyTo properties are correctly transported", + m.getStringProperty("TempQueue"), m.getJMSReplyTo().toString()); + +// Assert.assertEquals("Check Type properties are correctly transported", +// "Test", m.getJMSType()); + Assert.assertEquals("Check Short properties are correctly transported", + (short) Short.MAX_VALUE, m.getShortProperty("Short")); + Assert.assertEquals("Check UnsignedInt properties are correctly transported", + (long) 4294967295L, m.getLongProperty("UnsignedInt")); + Assert.assertEquals("Check Long properties are correctly transported", + (long) Long.MAX_VALUE, m.getLongProperty("Long")); + Assert.assertEquals("Check String properties are correctly transported", + "Test", m.getStringProperty("String")); + } + + assertEqual(messages.iterator(), actual.iterator()); + } + + private static void assertEqual(Iterator expected, Iterator actual) + { + List<String> errors = new ArrayList<String>(); + while (expected.hasNext() && actual.hasNext()) + { + try + { + assertEqual(expected.next(), actual.next()); + } + catch (Exception e) + { + errors.add(e.getMessage()); + } + } + while (expected.hasNext()) + { + errors.add("Expected " + expected.next() + " but no more actual values."); + } + while (actual.hasNext()) + { + errors.add("Found " + actual.next() + " but no more expected values."); + } + if (!errors.isEmpty()) + { + throw new RuntimeException(errors.toString()); + } + } + + private static void assertEqual(Object expected, Object actual) + { + if (!expected.equals(actual)) + { + throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'"); + } + } + + public void onMessage(Message message) + { + synchronized(received) + { + received.add((JMSTextMessage) message); + received.notify(); + } + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } + + public static void main(String[] argv) throws Exception + { + PropertyValueTest test = new PropertyValueTest(); + test._connectionString = argv.length == 0 ? "vm://:1" : argv[0]; + test.setUp(); + if (argv.length > 1) + { + test._count = Integer.parseInt(argv[1]); + } + test.test(); + } + + public static junit.framework.Test suite() + { + return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class)); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index 04a9185fa6..cd3954fbcb 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.test.VMBrokerSetup; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Iterator; @@ -39,6 +40,8 @@ import junit.framework.Assert; public class TextMessageTest extends TestCase implements MessageListener { + private static final Logger _logger = Logger.getLogger(TextMessageTest.class); + private AMQConnection _connection; private Destination _destination; private AMQSession _session; @@ -100,7 +103,11 @@ public class TextMessageTest extends TestCase implements MessageListener { String text = "Message " + i; messages.add(text); - producer.send(_session.createTextMessage(text)); + Message m = _session.createTextMessage(text); + m.setStringProperty("String", "hello"); + + _logger.info("Sending Msg:" + m); + producer.send(m); } } @@ -122,15 +129,49 @@ public class TextMessageTest extends TestCase implements MessageListener { actual.add(m.getText()); -// try -// { -// m.setText("Test text"); -// Assert.fail("Message should not be writeable"); -// } -// catch (MessageNotWriteableException mnwe) -// { -// //normal execution -// } + //Check body write status + try + { + m.setText("Test text"); + Assert.fail("Message should not be writeable"); + } + catch (MessageNotWriteableException mnwe) + { + //normal execution + } + + m.clearBody(); + + try + { + m.setText("Test text"); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } + + //Check property write status + try + { + m.setStringProperty("test", "test"); + Assert.fail("Message should not be writeable"); + } + catch (MessageNotWriteableException mnwe) + { + //normal execution + } + + m.clearProperties(); + + try + { + m.setStringProperty("test", "test"); + } + catch (MessageNotWriteableException mnwe) + { + Assert.fail("Message should be writeable"); + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java new file mode 100644 index 0000000000..84e9026a6a --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.BrokerDetails; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.url.URLSyntaxException; + +public class BrokerDetailsTest extends TestCase +{ + + public void testMultiParameters() throws URLSyntaxException + { + String url = "tcp://localhost:5672?timeout='200',immediatedelivery='true'"; + + AMQBrokerDetails broker = new AMQBrokerDetails(url); + + assertTrue(broker.getOption("timeout").equals("200")); + assertTrue(broker.getOption("immediatedelivery").equals("true")); + } + + public void testVMBroker() throws URLSyntaxException + { + String url = "vm://:2"; + + AMQBrokerDetails broker = new AMQBrokerDetails(url); + assertTrue(broker.getTransport().equals("vm")); + assertEquals(broker.getPort(), 2); + } + + public void testTransportsDefaultToTCP() throws URLSyntaxException + { + String url = "localhost:5672"; + + AMQBrokerDetails broker = new AMQBrokerDetails(url); + assertTrue(broker.getTransport().equals("tcp")); + } + + public void testCheckDefaultPort() throws URLSyntaxException + { + String url = "tcp://localhost"; + + AMQBrokerDetails broker = new AMQBrokerDetails(url); + assertTrue(broker.getPort() == AMQBrokerDetails.DEFAULT_PORT); + } + + public void testBothDefaults() throws URLSyntaxException + { + String url = "localhost"; + + AMQBrokerDetails broker = new AMQBrokerDetails(url); + + assertTrue(broker.getTransport().equals("tcp")); + assertTrue(broker.getPort() == AMQBrokerDetails.DEFAULT_PORT); + } + + public void testWrongOptionSeparatorInBroker() + { + String url = "tcp://localhost:5672+option='value'"; + try + { + new AMQBrokerDetails(url); + } + catch (URLSyntaxException urise) + { + assertTrue(urise.getReason().equals("Illegal character in port number")); + } + + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(BrokerDetailsTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index d7862d047f..0da4147351 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -22,12 +22,10 @@ package org.apache.qpid.test.unit.client.connection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQAuthenticationException; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.test.VMBrokerSetup; import javax.jms.Connection; @@ -40,6 +38,18 @@ public class ConnectionTest extends TestCase String _broker_NotRunning = "vm://:2"; String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs"; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + TransportConnection.killAllVMBrokers(); + } + public void testSimpleConnection() { try @@ -102,8 +112,30 @@ public class ConnectionTest extends TestCase } } + public void testClientIdCannotBeChanged() throws Exception + { + Connection connection = new AMQConnection(_broker, "guest", "guest", + "fred", "/test"); + try + { + connection.setClientID("someClientId"); + fail("No IllegalStateException thrown when resetting clientid"); + } + catch (javax.jms.IllegalStateException e) + { + // PASS + } + } + + public void testClientIdIsPopulatedAutomatically() throws Exception + { + Connection connection = new AMQConnection(_broker, "guest", "guest", + null, "/test"); + assertNotNull(connection.getClientID()); + } + public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(ConnectionTest.class)); + return new junit.framework.TestSuite(ConnectionTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 13a6d214ba..64adcb13e4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -310,29 +310,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getBrokerCount() == 1); } - // FIXME Connection now parses but result is wrong QPID-71 - /* - public void testWrongOptionSeparatorInBroker() - { - String url = "amqp://user:@/test?brokerlist='tcp://localhost:5672+option='value''"; - try - { - AMQConnectionURL connection = new AMQConnectionURL(url); - - Float version = Float.parseFloat(System.getProperty("java.specification.version")); - if (version > 1.5) - { - fail("URL Should not parse on Java " + version + " Connection is:" + connection); - } - } - catch (URLSyntaxException urise) - { - assertTrue(urise.getReason().equals("Illegal character in port number")); - } - - } - */ public void testWrongOptionSeparatorInOptions() { @@ -349,18 +327,6 @@ public class ConnectionURLTest extends TestCase } - public void testTransportsDefaultToTCP() throws URLSyntaxException - { - String url = "amqp://guest:guest@/test?brokerlist='localhost:5672;myhost:5673'&failover='roundrobin'"; - - AMQConnectionURL connection = new AMQConnectionURL(url); - - BrokerDetails broker = connection.getBrokerDetails(0); - assertTrue(broker.getTransport().equals("tcp")); - - broker = connection.getBrokerDetails(1); - assertTrue(broker.getTransport().equals("tcp")); - } public void testNoUserDetailsProvidedWithClientID() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index 9425b7c304..9bb2fcc59b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -24,8 +24,6 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.test.VMBrokerSetup; import javax.jms.MessageListener; @@ -36,6 +34,7 @@ import javax.jms.ObjectMessage; import java.io.Serializable; import java.util.HashMap; import java.util.ArrayList; +import java.util.Arrays; import junit.framework.TestCase; @@ -44,6 +43,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener private AMQConnection connection; private AMQDestination destination; private AMQSession session; + private MessageProducer producer; private Serializable[] data; private volatile boolean waiting; private int received; @@ -57,6 +57,13 @@ public class ObjectMessageTest extends TestCase implements MessageListener connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path"); destination = new AMQQueue(randomize("LatencyTest"), true); session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + //set up a consumer + session.createConsumer(destination).setMessageListener(this); + connection.start(); + + //create a publisher + producer = session.createProducer(destination, false, false, true); A a1 = new A(1, "A"); A a2 = new A(2, "a"); B b = new B(1, "B"); @@ -83,7 +90,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener _broker = broker; } - public void test() throws Exception + public void testSendAndReceive() throws Exception { try { @@ -102,16 +109,68 @@ public class ObjectMessageTest extends TestCase implements MessageListener } } - private void send() throws Exception + public void testSetObjectPropertyForString() throws Exception { - //set up a consumer - session.createConsumer(destination).setMessageListener(this); - connection.start(); + String testStringProperty = "TestStringProperty"; + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestStringProperty",testStringProperty); + assertEquals(testStringProperty, msg.getObjectProperty("TestStringProperty")); + } - //create a publisher - MessageProducer producer = session.createProducer(destination, false, false, true); + public void testSetObjectPropertyForBoolean() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestBooleanProperty",Boolean.TRUE); + assertEquals(Boolean.TRUE, msg.getObjectProperty("TestBooleanProperty")); + } + public void testSetObjectPropertyForByte() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestByteProperty",Byte.MAX_VALUE); + assertEquals(Byte.MAX_VALUE, msg.getObjectProperty("TestByteProperty")); + } + public void testSetObjectPropertyForShort() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestShortProperty",Short.MAX_VALUE); + assertEquals(Short.MAX_VALUE, msg.getObjectProperty("TestShortProperty")); + } + public void testSetObjectPropertyForInteger() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestIntegerProperty",Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, msg.getObjectProperty("TestIntegerProperty")); + } + + public void testSetObjectPropertyForDouble() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestDoubleProperty",Double.MAX_VALUE); + assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty")); + } + + public void testSetObjectPropertyForFloat() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestFloatProperty",Float.MAX_VALUE); + assertEquals(Float.MAX_VALUE, msg.getObjectProperty("TestFloatProperty")); + } + + public void testSetObjectPropertyForByteArray() throws Exception + { + byte[] array = {1,2,3,4,5}; + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestByteArrayProperty",array); + assertTrue(Arrays.equals(array,(byte[])msg.getObjectProperty("TestByteArrayProperty"))); + } + + + + + private void send() throws Exception + { for (int i = 0; i < data.length; i++) { ObjectMessage msg; @@ -207,7 +266,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener { System.out.println("Usage: <broker>"); } - new ObjectMessageTest(broker).test(); + new ObjectMessageTest(broker).testSendAndReceive(); } private static class A implements Serializable |