diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-21 20:09:35 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-21 20:09:35 +0000 |
commit | c3343495a4b4da8d894d5ac89c2e8856b413ecdf (patch) | |
tree | dcfa7902c6e0daff91ded938d82d2a9372205bae | |
parent | 40c09bcfed642399d4b88d2994f46fad78deecb9 (diff) | |
download | qpid-python-c3343495a4b4da8d894d5ac89c2e8856b413ecdf.tar.gz |
Merge from trunk up to revision 487214
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@489452 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 559 insertions, 1027 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 9dcbfca6bc..8b2387b9a0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -879,7 +879,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - je = new JMSException("Exception thrown against " + toString() + ": " + cause); + if (cause instanceof AMQException) + { + je = new JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception thrown against " + toString() + ": " + cause); + } + else + { + je = new JMSException("Exception thrown against " + toString() + ": " + cause); + } if (cause instanceof Exception) { je.setLinkedException((Exception) cause); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5a16a148cb..8e93b19eea 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,9 +25,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.JMSStreamMessage; +import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; @@ -38,7 +39,6 @@ import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; import javax.jms.IllegalStateException; - import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -287,7 +287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -303,7 +303,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -319,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public javax.jms.Message createMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -335,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -351,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -403,7 +403,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage(String text) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -473,7 +473,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -493,7 +493,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw new JMSException("Error closing session: " + e); + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; } finally { @@ -536,7 +538,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -747,7 +749,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); @@ -763,7 +765,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); @@ -772,20 +774,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); } @@ -795,7 +797,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); } @@ -807,7 +809,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } @@ -818,7 +820,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector); } @@ -831,7 +833,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector); } @@ -963,7 +965,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Queue createQueue(String queueName) throws JMSException { - checkNotClosed(); + checkNotClosed(); if (queueName.indexOf('/') == -1) { return new AMQQueue(queueName); @@ -993,7 +995,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createReceiver(Queue queue) throws JMSException { - checkNotClosed(); + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -1009,7 +1011,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { - checkNotClosed(); + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); @@ -1018,14 +1020,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueSender createSender(Queue queue) throws JMSException { - checkNotClosed(); + checkNotClosed(); //return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } public Topic createTopic(String topicName) throws JMSException { - checkNotClosed(); + checkNotClosed(); if (topicName.indexOf('/') == -1) { @@ -1056,8 +1058,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1073,8 +1075,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } @@ -1088,8 +1090,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1100,8 +1102,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); return new TopicSubscriberAdaptor(dest, consumer); @@ -1109,44 +1111,51 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicPublisher createPublisher(Topic topic) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); - //return (TopicPublisher) createProducer(topic); - return new TopicPublisherAdapter(createProducer(topic), topic); + checkNotClosed(); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); } public QueueBrowser createBrowser(Queue queue) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); + checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); + checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public TemporaryQueue createTemporaryQueue() throws JMSException { - checkNotClosed(); + checkNotClosed(); return new AMQTemporaryQueue(); } public TemporaryTopic createTemporaryTopic() throws JMSException { - checkNotClosed(); + checkNotClosed(); return new AMQTemporaryTopic(); } public void unsubscribe(String name) throws JMSException { - checkNotClosed(); - - //send a queue.delete for the subscription + checkNotClosed(); + String queue = _connection.getClientID() + ":" + name; + + AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null); + + try { + AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class); + // if this method doen't throw an exception means we have received a queue declare ok. + } catch (AMQException e) { + throw new javax.jms.InvalidDestinationException("This destination doesn't exist"); + } + //send a queue.delete for the subscription AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); _connection.getProtocolHandler().writeFrame(frame); } @@ -1350,21 +1359,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws InvalidDestinationException{ - if (topic == null){ - throw new javax.jms.InvalidDestinationException("Invalid Topic"); - } + private void checkValidTopic(Topic topic) throws InvalidDestinationException + { + if (topic == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } } - private void checkValidQueue(Queue queue) throws InvalidDestinationException{ - if (queue == null){ - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + private void checkValidQueue(Queue queue) throws InvalidDestinationException + { + if (queue == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - private void checkValidDestination(Destination destination) throws InvalidDestinationException{ - if (destination == null){ - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + private void checkValidDestination(Destination destination) throws InvalidDestinationException + { + if (destination == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java index 73613b6923..0f50c330fb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,136 +33,169 @@ public class AMQTopicSessionAdaptor implements TopicSession _session = (AMQSession) session; } - public Topic createTopic(String string) throws JMSException { + public Topic createTopic(String string) throws JMSException + { return _session.createTopic(string); } - public TopicSubscriber createSubscriber(Topic topic) throws JMSException { + public TopicSubscriber createSubscriber(Topic topic) throws JMSException + { return _session.createSubscriber(topic); } - public TopicSubscriber createSubscriber(Topic topic, String string, boolean b) throws JMSException { + public TopicSubscriber createSubscriber(Topic topic, String string, boolean b) throws JMSException + { return _session.createSubscriber(topic, string, b); } - public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException { + public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException + { return _session.createDurableSubscriber(topic, string); } - public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws JMSException { + public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws JMSException + { return _session.createDurableSubscriber(topic, string, string1, b); } - public TopicPublisher createPublisher(Topic topic) throws JMSException { + public TopicPublisher createPublisher(Topic topic) throws JMSException + { return _session.createPublisher(topic); } - public TemporaryTopic createTemporaryTopic() throws JMSException { + public TemporaryTopic createTemporaryTopic() throws JMSException + { return _session.createTemporaryTopic(); } - public void unsubscribe(String string) throws JMSException { + public void unsubscribe(String string) throws JMSException + { _session.unsubscribe(string); } - public BytesMessage createBytesMessage() throws JMSException { + public BytesMessage createBytesMessage() throws JMSException + { return _session.createBytesMessage(); } - public MapMessage createMapMessage() throws JMSException { + public MapMessage createMapMessage() throws JMSException + { return _session.createMapMessage(); } - public Message createMessage() throws JMSException { + public Message createMessage() throws JMSException + { return _session.createMessage(); } - public ObjectMessage createObjectMessage() throws JMSException { + public ObjectMessage createObjectMessage() throws JMSException + { return _session.createObjectMessage(); } - public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { + public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException + { return _session.createObjectMessage(); } - public StreamMessage createStreamMessage() throws JMSException { + public StreamMessage createStreamMessage() throws JMSException + { return _session.createStreamMessage(); } - public TextMessage createTextMessage() throws JMSException { + public TextMessage createTextMessage() throws JMSException + { return _session.createTextMessage(); } - public TextMessage createTextMessage(String string) throws JMSException { - return _session.createTextMessage(); + public TextMessage createTextMessage(String string) throws JMSException + { + return _session.createTextMessage(string); } - public boolean getTransacted() throws JMSException { + public boolean getTransacted() throws JMSException + { return _session.getTransacted(); } - public int getAcknowledgeMode() throws JMSException { + public int getAcknowledgeMode() throws JMSException + { return _session.getAcknowledgeMode(); } - public void commit() throws JMSException { + public void commit() throws JMSException + { _session.commit(); } - public void rollback() throws JMSException { + public void rollback() throws JMSException + { _session.rollback(); } - public void close() throws JMSException { + public void close() throws JMSException + { _session.close(); } - public void recover() throws JMSException { + public void recover() throws JMSException + { _session.recover(); } - public MessageListener getMessageListener() throws JMSException { + public MessageListener getMessageListener() throws JMSException + { return _session.getMessageListener(); } - public void setMessageListener(MessageListener messageListener) throws JMSException { + public void setMessageListener(MessageListener messageListener) throws JMSException + { _session.setMessageListener(messageListener); } - public void run() { + public void run() + { _session.run(); } - public MessageProducer createProducer(Destination destination) throws JMSException { + public MessageProducer createProducer(Destination destination) throws JMSException + { return _session.createProducer(destination); } - public MessageConsumer createConsumer(Destination destination) throws JMSException { + public MessageConsumer createConsumer(Destination destination) throws JMSException + { return _session.createConsumer(destination); } - public MessageConsumer createConsumer(Destination destination, String string) throws JMSException { + public MessageConsumer createConsumer(Destination destination, String string) throws JMSException + { return _session.createConsumer(destination, string); } - public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException { + public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException + { return _session.createConsumer(destination, string, b); } //The following methods cannot be called from a TopicSession as per JMS spec - public Queue createQueue(String string) throws JMSException { + public Queue createQueue(String string) throws JMSException + { throw new IllegalStateException("Cannot call createQueue from TopicSession"); } - public QueueBrowser createBrowser(Queue queue) throws JMSException { + public QueueBrowser createBrowser(Queue queue) throws JMSException + { throw new IllegalStateException("Cannot call createBrowser from TopicSession"); } - public QueueBrowser createBrowser(Queue queue, String string) throws JMSException { + public QueueBrowser createBrowser(Queue queue, String string) throws JMSException + { throw new IllegalStateException("Cannot call createBrowser from TopicSession"); } - public TemporaryQueue createTemporaryQueue() throws JMSException { + public TemporaryQueue createTemporaryQueue() throws JMSException + { throw new IllegalStateException("Cannot call createTemporaryQueue from TopicSession"); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index fd6070a045..8c53d93de6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,15 +24,10 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; +import javax.jms.*; import java.io.UnsupportedEncodingException; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer @@ -103,6 +98,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _mandatory; private final boolean _waitUntilSent; + private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, @@ -349,7 +345,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { throw new JMSException("Unsupported destination class: " + (destination != null ? destination.getClass() : null)); - } + } declareDestination((AMQDestination)destination); } @@ -382,17 +378,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j currentTime = System.currentTimeMillis(); message.setJMSTimestamp(currentTime); } - // - // Very nasty temporary hack for GRM-206. Will be altered ASAP. - // - if (message instanceof JMSBytesMessage) - { - JMSBytesMessage msg = (JMSBytesMessage) message; - if (!msg.isReadable()) - { - msg.reset(); - } - } + message.prepareForSending(); ByteBuffer payload = message.getData(); BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties(); @@ -413,7 +399,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - int size = payload.limit(); + int size = (payload != null) ? payload.limit() : 0; ContentBody[] contentBodies = createContentBodies(payload); AMQFrame[] frames = new AMQFrame[2 + contentBodies.length]; for (int i = 0; i < contentBodies.length; i++) @@ -448,14 +434,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j */ private ContentBody[] createContentBodies(ByteBuffer payload) { - if (payload == null) - { - return null; - } - else if (payload.remaining() == 0) + if (payload == null || payload.remaining() == 0) { - return new ContentBody[0]; + return NO_CONTENT_BODIES; } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame // (0xCE byte). int dataLength = payload.remaining(); @@ -496,31 +479,31 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkNotClosed(); _encoding = encoding; } - + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { checkNotClosed(); - + if(_session == null || _session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } } - + private void checkInitialDestination(){ if(_destination == null){ throw new UnsupportedOperationException("Destination is null"); } } - + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{ if (_destination != null && suppliedDestination != null){ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } - + if (suppliedDestination == null){ - throw new InvalidDestinationException("Supplied Destination was invalid"); + throw new InvalidDestinationException("Supplied Destination was invalid"); } } - + public AMQSession getSession() { return _session; diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java index 02da284b83..803f2e03a4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java @@ -1,138 +1,174 @@ package org.apache.qpid.client; -import javax.jms.Destination; +import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -public class TopicPublisherAdapter implements TopicPublisher { +public class TopicPublisherAdapter implements TopicPublisher +{ + + private BasicMessageProducer _delegate; + private Topic _topic; + + public TopicPublisherAdapter(BasicMessageProducer msgProducer, Topic topic) + { + _delegate = msgProducer; + _topic = topic; + } + + public Topic getTopic() throws JMSException + { + checkPreConditions(); + return _topic; + } + + public void publish(Message msg) throws JMSException + { + checkPreConditions(); + checkTopic(_topic); + _delegate.send(msg); + } + + public void publish(Topic topic, Message msg) throws JMSException + { + checkPreConditions(); + checkTopic(topic); + _delegate.send(topic, msg); + } + + public void publish(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(); + checkTopic(_topic); + _delegate.send(msg, deliveryMode, priority, timeToLive); + } - private MessageProducer delegate; - private Topic topic; - private boolean closed = false; - - public TopicPublisherAdapter(MessageProducer msgProducer, Topic topic){ - delegate = msgProducer; - this.topic = topic; - } - - public Topic getTopic() throws JMSException { - checkPreConditions(); - return topic; - } - - public void publish(Message msg) throws JMSException { - checkPreConditions(); - delegate.send(msg); - } - - public void publish(Topic topic, Message msg) throws JMSException { - checkPreConditions(); - delegate.send(topic,msg); - } - - public void publish(Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { + public int getDeliveryMode() throws JMSException { checkPreConditions(); - delegate.send(msg, deliveryMode,priority,timeToLive); + return _delegate.getDeliveryMode(); } public void publish(Topic topic, Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { - checkPreConditions(); - delegate.send(topic,msg, deliveryMode,priority,timeToLive); - } - - public void close() throws JMSException { - delegate.close(); - closed = true; - } - - public int getDeliveryMode() throws JMSException { - return delegate.getDeliveryMode(); - } - - public Destination getDestination() throws JMSException { - return delegate.getDestination(); - } + throws JMSException + { + checkPreConditions(); + checkTopic(topic); + _delegate.send(topic, msg, deliveryMode, priority, timeToLive); + } + + public void close() throws JMSException + { + _delegate.close(); + } public boolean getDisableMessageID() throws JMSException { - return delegate.getDisableMessageID(); - } - - public boolean getDisableMessageTimestamp() throws JMSException { - return delegate.getDisableMessageTimestamp(); - } - - public int getPriority() throws JMSException { - return delegate.getPriority(); - } - - public long getTimeToLive() throws JMSException { - return delegate.getTimeToLive(); - } - - public void send(Message msg) throws JMSException { - checkPreConditions(); - delegate.send(msg); - } - - public void send(Destination dest, Message msg) throws JMSException { - checkPreConditions(); - delegate.send(dest,msg); - } - - public void send(Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { checkPreConditions(); - delegate.send(msg, deliveryMode,priority,timeToLive); + return _delegate.getDisableMessageID(); } - public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { + public boolean getDisableMessageTimestamp() throws JMSException { checkPreConditions(); - delegate.send(dest,msg, deliveryMode,priority,timeToLive); + return _delegate.getDisableMessageTimestamp(); } - public void setDeliveryMode(int deliveryMode) throws JMSException { - checkPreConditions(); - delegate.setDeliveryMode(deliveryMode); - } - - public void setDisableMessageID(boolean disableMessageID) throws JMSException { - checkPreConditions(); - delegate.setDisableMessageID(disableMessageID); - } - - public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { + public Destination getDestination() throws JMSException + { checkPreConditions(); - delegate.setDisableMessageTimestamp(disableMessageTimestamp); - } + return _delegate.getDestination(); + } - public void setPriority(int priority) throws JMSException { + public int getPriority() throws JMSException { checkPreConditions(); - delegate.setPriority(priority); + return _delegate.getPriority(); } - public void setTimeToLive(long timeToLive) throws JMSException { + public long getTimeToLive() throws JMSException { checkPreConditions(); - delegate.setTimeToLive(timeToLive); - } - - private void checkPreConditions() throws IllegalStateException, IllegalStateException { - if (closed){ - throw new javax.jms.IllegalStateException("Publisher is closed"); - } - - if(topic == null){ - throw new UnsupportedOperationException("Topic is null"); - } - - AMQSession session = ((BasicMessageProducer)delegate).getSession(); - if(session == null || session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - } + return _delegate.getTimeToLive(); + } + + public void send(Message msg) throws JMSException + { + checkPreConditions(); + checkTopic(_topic); + _delegate.send(msg); + } + + public void send(Destination dest, Message msg) throws JMSException + { + checkPreConditions(); + checkTopic(_topic); + _delegate.send(dest, msg); + } + + public void send(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(); + checkTopic(_topic); + _delegate.send(msg, deliveryMode, priority, timeToLive); + } + + public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException + { + checkPreConditions(); + checkTopic(dest); + _delegate.send(dest, msg, deliveryMode, priority, timeToLive); + } + + public void setDeliveryMode(int deliveryMode) throws JMSException + { + checkPreConditions(); + _delegate.setDeliveryMode(deliveryMode); + } + + public void setDisableMessageID(boolean disableMessageID) throws JMSException + { + checkPreConditions(); + _delegate.setDisableMessageID(disableMessageID); + } + + public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException + { + checkPreConditions(); + _delegate.setDisableMessageTimestamp(disableMessageTimestamp); + } + + public void setPriority(int priority) throws JMSException + { + checkPreConditions(); + _delegate.setPriority(priority); + } + + public void setTimeToLive(long timeToLive) throws JMSException + { + checkPreConditions(); + _delegate.setTimeToLive(timeToLive); + } + + private void checkPreConditions() throws IllegalStateException + { + if (_delegate.isClosed()) + { + throw new javax.jms.IllegalStateException("Publisher is _closed"); + } + + AMQSession session = _delegate.getSession(); + if (session == null || session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } + + private void checkTopic(Destination topic) throws InvalidDestinationException + { + if (topic == null) + { + throw new UnsupportedOperationException("Topic is null"); + } + if (!(topic instanceof Topic)) + { + throw new InvalidDestinationException("Destination " + topic + " is not a topic"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java index edabed90b3..dd82eb13c1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,8 @@ package org.apache.qpid.client.message; import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.client.AMQSession; +import javax.jms.JMSException; + public class AMQMessage { protected ContentHeaderProperties _contentHeaderProperties; @@ -67,5 +69,13 @@ public class AMQMessage public long getDeliveryTag() { return _deliveryTag; - } + } + + /** + * Invoked prior to sending the message. Allows the message to be modified if necessary before + * sending. + */ + public void prepareForSending() throws JMSException + { + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 41eb21a415..c1ed88b167 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -32,7 +32,6 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.JmsNotImplementedException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import javax.jms.Destination; import javax.jms.JMSException; @@ -40,7 +39,6 @@ import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; import java.util.Collections; import java.util.Enumeration; -import java.util.Iterator; import java.util.Map; public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message @@ -257,13 +255,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - - if (getJmsContentHeaderProperties() == null) - { - System.out.println("HEADERS ARE NULL"); - } - - return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName); } @@ -383,6 +374,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object); } + protected void removeProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().remove(propertyName); + } + public void acknowledge() throws JMSException { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge @@ -470,31 +467,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms getJmsContentHeaderProperties().getHeaders(); } - public FieldTable populateHeadersFromMessageProperties() - { - // - // We need to convert every property into a String representation - // Note that type information is preserved in the property name - // - final FieldTable table = FieldTableFactory.newFieldTable(); - final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator(); - while (entries.hasNext()) - { - final Map.Entry entry = (Map.Entry) entries.next(); - final String propertyName = (String) entry.getKey(); - if (propertyName == null) - { - continue; - } - else - { - table.put(propertyName, entry.getValue().toString()); - } - } - return table; - - } - public BasicContentHeaderProperties getJmsContentHeaderProperties() { return (BasicContentHeaderProperties) _contentHeaderProperties; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 3061d5a59c..76f8a1c32f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,6 +35,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text private String _decodedValue; + /** + * This constant represents the name of a property that is set when the message payload is null. + */ + private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL"; + JMSTextMessage() throws JMSException { this(null, null); @@ -91,31 +96,34 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text return MIME_TYPE; } - public void setText(String string) throws JMSException + public void setText(String text) throws JMSException { checkWritable(); - + clearBody(); try { - _data = ByteBuffer.allocate(string.length()); - _data.limit(string.length()); - //_data.sweep(); - _data.setAutoExpand(true); - if (getJmsContentHeaderProperties().getEncoding() == null) - { - _data.put(string.getBytes()); - } - else - { - _data.put(string.getBytes(getJmsContentHeaderProperties().getEncoding())); + if (text != null) + { + _data = ByteBuffer.allocate(text.length()); + _data.limit(text.length()) ; + //_data.sweep(); + _data.setAutoExpand(true); + if (getJmsContentHeaderProperties().getEncoding() == null) + { + _data.put(text.getBytes()); + } + else + { + _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding())); + } } - _decodedValue = string; + _decodedValue = text; } catch (UnsupportedEncodingException e) { // should never occur - JMSException jmse = new JMSException("Unable to decode string data"); + JMSException jmse = new JMSException("Unable to decode text data"); jmse.setLinkedException(e); } } @@ -133,6 +141,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text else { _data.rewind(); + + if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY)) + { + return null; + } if (getJmsContentHeaderProperties().getEncoding() != null) { try @@ -162,4 +175,18 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text return _decodedValue; } } + + @Override + public void prepareForSending() throws JMSException + { + super.prepareForSending(); + if (_data == null) + { + setBooleanProperty(PAYLOAD_NULL_PROPERTY, true); + } + else + { + removeProperty(PAYLOAD_NULL_PROPERTY); + } + } } diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/bin/jmscts.sh b/java/client/src/old_test/java/org/apache/qpid/cts/bin/jmscts.sh deleted file mode 100755 index 37b8018aaf..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/bin/jmscts.sh +++ /dev/null @@ -1,162 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# ----------------------------------------------------------------------------- -# Start/Stop Script for the JMS compliance test suite -# -# Required Environment Variables -# -# JAVA_HOME Points to the Java Development Kit installation. -# -# Optional Environment Variables -# -# JMSCTS_HOME Points to the JMS CTS installation directory. -# -# JAVA_OPTS Java runtime options used when the command is executed. -# -# -# $Id: jmscts.sh,v 1.6 2003/09/27 09:50:49 tanderson Exp $ -# --------------------------------------------------------------------------- - -# OS specific support. $var _must_ be set to either true or false. -cygwin=false -case "`uname`" in -CYGWIN*) cygwin=true;; -esac - -# For Cygwin, ensure paths are in UNIX format before anything is touched -if $cygwin; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - -if [ -z "$JAVA_HOME" ]; then - echo "The JAVA_HOME environment variable is not set." - echo "This is required to run jmscts" - exit 1 -fi -if [ ! -r "$JAVA_HOME"/bin/java ]; then - echo "The JAVA_HOME environment variable is not set correctly." - echo "This is required to run jmscts" - exit 1 -fi -_RUNJAVA="$JAVA_HOME"/bin/java - - -# Guess JMSCTS_HOME if it is not set -if [ -z "$JMSCTS_HOME" ]; then -# resolve links - $0 may be a softlink - PRG="$0" - while [ -h "$PRG" ]; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '.*/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`/"$link" - fi - done - - PRGDIR=`dirname "$PRG"` - JMSCTS_HOME=`cd "$PRGDIR/.." ; pwd` -elif [ ! -r "$JMSCTS_HOME"/bin/jmscts.sh ]; then - echo "The JMSCTS_HOME environment variable is not set correctly." - echo "This is required to run jmscts" - exit 1 -fi - -# Set CLASSPATH to empty by default. User jars can be added via the setenv.sh -# script -CLASSPATH= - -if [ -r "$JMSCTS_HOME"/bin/setenv.sh ]; then - . "$JMSCTS_HOME"/bin/setenv.sh -fi - -CLASSPATH="$CLASSPATH":"$JMSCTS_HOME"/lib/jmscts-0.5-b2.jar - -# For Cygwin, switch paths to Windows format before running java -if $cygwin; then - JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` - JMSCTS_HOME=`cygpath --path --windows "$JMSCTS_HOME"` - CLASSPATH=`cygpath --path --windows "$CLASSPATH"` -fi - -POLICY_FILE="$JMSCTS_HOME"/config/jmscts.policy - -# Configure TrAX -JAVAX_OPTS=-Djavax.xml.transform.TransformerFactory=org.apache.xalan.processor.TransformerFactoryImpl - - -# Execute the requested command - -echo "Using JMSCTS_HOME: $JMSCTS_HOME" -echo "Using JAVA_HOME: $JAVA_HOME" -echo "Using CLASSPATH: $CLASSPATH" - -if [ "$1" = "run" ]; then - - shift - exec "$_RUNJAVA" $JAVA_OPTS $JAVAX_OPTS -Djmscts.home="$JMSCTS_HOME" \ - -classpath "$CLASSPATH" \ - -Djava.security.manager -Djava.security.policy="$POLICY_FILE" \ - org.exolab.jmscts.test.ComplianceTestSuite "$@" - -elif [ "$1" = "stress" ]; then - - shift - exec "$_RUNJAVA" $JAVA_OPTS $JAVAX_OPTS -Djmscts.home="$JMSCTS_HOME" \ - -classpath "$CLASSPATH" \ - -Djava.security.manager -Djava.security.policy="$POLICY_FILE" \ - org.exolab.jmscts.stress.StressTestSuite "$@" - -elif [ "$1" = "stop" ] ; then - - shift - "$_RUNJAVA" $JAVA_OPTS $JAVAX_OPTS -Djmscts.home="$JMSCTS_HOME" \ - -classpath "$CLASSPATH" \ - -Djava.security.manager -Djava.security.policy="$POLICY_FILE" \ - org.exolab.jmscts.core.Admin -stop - -elif [ "$1" = "abort" ] ; then - - shift - exec "$_RUNJAVA" $JAVA_OPTS $JAVAX_OPTS -Djmscts.home="$JMSCTS_HOME" \ - -classpath "$CLASSPATH" \ - -Djava.security.manager -Djava.security.policy="$POLICY_FILE" \ - org.exolab.jmscts.core.Admin -abort - -elif [ "$1" = "snapshot" ] ; then - - shift - exec "$_RUNJAVA" $JAVA_OPTS $JAVAX_OPTS -Djmscts.home="$JMSCTS_HOME" \ - -classpath "$CLASSPATH" \ - -Djava.security.manager -Djava.security.policy="$POLICY_FILE" \ - org.exolab.jmscts.core.Admin -snapshot "$@" - -else - echo "usage: jmscts.sh (commands)" - echo "commands:" - echo " run Run compliance tests" - echo " stress Run stress tests" - echo " stop Stop the JMS CTS" - echo " abort Abort the JMS CTS" - echo " snapshot Take a snapshot" - exit 1 -fi diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/bin/setenv.sh b/java/client/src/old_test/java/org/apache/qpid/cts/bin/setenv.sh deleted file mode 100755 index 9b9189d646..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/bin/setenv.sh +++ /dev/null @@ -1,41 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# --------------------------------------------------------------------------- -# Sample environment script for JMS CTS -# -# This is invoked by jmscts.sh to configure: -# . the CLASSPATH, for JMS provider jars -# . JVM options -# -# The following configures the JMS CTS for OpenJMS 0.7.6 -# --------------------------------------------------------------------------- - -# Configure the CLASSPATH -# -DISTDIR="$IBASE/amqp/dist" -LIBDIR="$IBASE/amqp/lib" - -CLASSPATH="$LIBDIR/jakarta-commons/commons-collections-3.1.jar:$LIBDIR/util-concurrent/backport-util-concurrent.jar:$LIBDIR/mina/mina-0.7.3.jar:$LIBDIR/jms/jms.jar:$LIBDIR/logging-log4j/log4j-1.2.9.jar:$DISTDIR/amqp-common.jar:$DISTDIR/amqp-jms.jar" - -# Configure JVM options -# -JAVA_OPTS=-Xmx512m -Xms512m -JAVA_OPTS="$JAVA_OPTS \ - -Damqj.logging.level=WARN" diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/config/jmscts.policy b/java/client/src/old_test/java/org/apache/qpid/cts/config/jmscts.policy deleted file mode 100644 index ff8b5db5ec..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/config/jmscts.policy +++ /dev/null @@ -1,22 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// -// grant all users all permissions. This is only for test cases -// and should be modified for deployment -grant { - permission java.security.AllPermission; -}; diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/config/jmscts.properties b/java/client/src/old_test/java/org/apache/qpid/cts/config/jmscts.properties deleted file mode 100644 index 7177fed49d..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/config/jmscts.properties +++ /dev/null @@ -1,71 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# ============================================================================= -# General properties -# ----------------------------------------------------------------------------- - -# -# Username & password -# A user name and password for creating Connection instances via -# TopicConnectionFactory.createTopicConnection(...) etc -# -valid.username=guest -valid.password=guest - -# -# Invalid user name and password -# As above, but guaranteed to fail. -# -invalid.username=guest -invalid.password=guest - -# -# Message receipt timeout -# The default time to wait for messages, in milliseconds -# -org.exolab.jmscts.core.MessagingBehaviour.timeout=2000 - - -# ============================================================================= -# Compliance test properties -# ----------------------------------------------------------------------------- - -# -# Expiration interval -# Time in milliseconds to wait for the JMS provider to collect expired -# messages. -# This can be set for providers which collect expired messages periodically, -# rather than at the moment they expire. -# NOTE: for OpenJMS 0.7.6, this should be set to 5000 -org.exolab.jmscts.test.producer.ttl.ExpirationTest.expirationInterval=0 - - -# ============================================================================= -# Stress test properties -# ----------------------------------------------------------------------------- - -# -# Each of the following properties determines the no. of messages that -# will be sent by stress tests -# -org.exolab.jmscts.stress.Send0KTest.count=1000 -org.exolab.jmscts.stress.ReceiveSize0KTest.count=1000 -org.exolab.jmscts.stress.SendReceive0KTest.count=1000 -org.exolab.jmscts.stress.SendReceive2Size0KTest.count=1000 diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/config/providers.xml b/java/client/src/old_test/java/org/apache/qpid/cts/config/providers.xml deleted file mode 100644 index 30c4a39c5b..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/config/providers.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0"?> -<!-- - - - - Licensed to the Apache Software Foundation (ASF) under one - - or more contributor license agreements. See the NOTICE file - - distributed with this work for additional information - - regarding copyright ownership. The ASF licenses this file - - to you under the Apache License, Version 2.0 (the - - "License"); you may not use this file except in compliance - - with the License. You may obtain a copy of the License at - - - - http://www.apache.org/licenses/LICENSE-2.0 - - - - Unless required by applicable law or agreed to in writing, - - software distributed under the License is distributed on an - - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - - KIND, either express or implied. See the License for the - - specific language governing permissions and limitations - - under the License. - - - --> - -<!-- ====================================================================== --> -<!-- Sample provider configuration file --> -<!-- --> -<!-- This configures JMS CTS to test OpenJMS --> -<!-- ====================================================================== --> - -<configuration> - - <provider> - <name>AMQP</name> - <class>org.exolab.jmscts.amqp.AMQPProvider</class> - <paths> - <path>/home/guso/harness/jmscts-0.5-b2/lib/amqp-provider-0.0a1.jar</path> - </paths> - <config> - </config> - </provider> - -</configuration> diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/readme.txt b/java/client/src/old_test/java/org/apache/qpid/cts/readme.txt deleted file mode 100644 index 117e7d4954..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/readme.txt +++ /dev/null @@ -1,5 +0,0 @@ -The files present in the bin, config and src directories should be copied over a complete copy of jms-cts-0.5-b2. - -The path entries on the config/providers.xml and src/compile.sh files should be changed before attempting to run. - -The scripts expect a properly configured IBASE environment. Before attempting to run, the amqp provider classes must be packaged and installed. The src/compile.sh script will help to achieve that.
\ No newline at end of file diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/src/compile.sh b/java/client/src/old_test/java/org/apache/qpid/cts/src/compile.sh deleted file mode 100755 index 7b8a9f03ec..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/src/compile.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -JMSCTS_PATH=/home/guso/harness/jmscts-0.5-b2 - -distjms="$IBASE/amqp/dist" -lib="$IBASE/amqp/lib" -lib2="$JMSCTS_PATH/lib/" -libs="$lib/jakarta-commons/commons-collections-3.1.jar:$lib/util-concurrent/backport-util-concurrent.jar:$lib/mina/mina-0.7.3.jar:$lib/jms/jms.jar:$lib/logging-log4j/log4j-1.2.9.jar:$distjms/amqp-common.jar:$distjms/amqp-jms.jar" -libs2="$lib2/ant-1.5.3-1.jar:$lib2/junit-3.8.1.jar:$lib2/ant-optional-1.5.3-1.jar:$lib2/log4j-1.2.7.jar:$lib2/castor-0.9.5.jar:$lib2/openjms-provider-0.5-b2.jar:$lib2/commons-cli-1.0.jar:$lib2/oro-2.0.7.jar:$lib2/commons-collections-2.1.jar:$lib2/xalan-2.5.1.jar:$lib2/commons-logging-1.0.2.jar:$lib2/xdoclet-1.2b2.jar:$lib2/concurrent-1.3.2.jar:$lib2/xdoclet-xdoclet-module-1.2b2.jar:$lib2/exolabcore-0.3.7.jar:$lib2/xdoclet-xjavadoc-uc-1.2b2.jar:$lib2/jms-1.0.2a.jar:$lib2/xerces-2.3.0.jar:$lib2/jmscts-0.5-b2.jar:$lib2/xml-apis-1.0.b2.jar" - -javac -classpath $libs:$libs2 $JMSCTS_PATH/src/providers/amqp/org/exolab/jmscts/amqp/*.java -cd $JMSCTS_PATH/src/providers/amqp -jar cvf amqp-provider-0.0a1.jar org/exolab/jmscts/amqp/*.class -mv amqp-provider-0.0a1.jar $lib2 - diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java b/java/client/src/old_test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java deleted file mode 100644 index 006bda7e2e..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPAdministrator.java +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Redistribution and use of this software and associated documentation - * ("Software"), with or without modification, are permitted provided - * that the following conditions are met: - * - * 1. Redistributions of source code must retain copyright - * statements and notices. Redistributions must also contain a - * copy of this document. - * - * 2. Redistributions in binary form must reproduce the - * above copyright notice, this list of conditions and the - * following disclaimer in the documentation and/or other - * materials provided with the distribution. - * - * 3. The name "Exolab" must not be used to endorse or promote - * products derived from this Software without prior written - * permission of Exoffice Technologies. For written permission, - * please contact jima@intalio.com. - * - * 4. Products derived from this Software may not be called "Exolab" - * nor may "Exolab" appear in their names without prior written - * permission of Exoffice Technologies. Exolab is a registered - * trademark of Exoffice Technologies. - * - * 5. Due credit should be given to the Exolab Project - * (http://www.exolab.org/). - * - * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS - * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT - * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND - * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED - * OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Copyright 2001, 2003 (C) Exoffice Technologies Inc. All Rights Reserved. - * - */ -package org.apache.qpid.cts.src.providers.amqp.org.exolab.jmscts.amqp; - -import org.apache.qpid.client.*; -import org.exolab.jmscts.provider.Administrator; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; -import java.net.InetAddress; -import java.util.HashMap; - -/** - * This class provides methods for obtaining and manipulating administered - * objects managed by the Sonicmq implementation of JMS - * - */ -class AMQPAdministrator implements Administrator { - // AMQ Connection configuration - private int port = 5672; - private String host = "localhost"; - private String user = "guest"; - private String pass = "guest"; - private String vhost = "/test"; - - // The cached broker connection & session - private AMQConnection _connection = null; - private Session _session = null; - - // Factory request names - private static final String QUEUE_CONNECTION_FACTORY = "QueueConnectionFactory"; - private static final String TOPIC_CONNECTION_FACTORY = "TopicConnectionFactory"; - - /** - * The cache of known administered objects - */ - private HashMap<String, Object> _directory = new HashMap<String, Object>(); - - /** - * Returns the name of the QueueConnectionFactory bound in JNDI - * - * @return the default QueueConnectionFactory name - */ - public String getQueueConnectionFactory() { - return QUEUE_CONNECTION_FACTORY; - } - - /** - * Returns the name of the TopicConnectionFactory bound in JNDI - * - * @return the default TopicConnectionFactory name - */ - public String getTopicConnectionFactory() { - return TOPIC_CONNECTION_FACTORY; - } - - /** - * Returns the name of the XAQueueConnectionFactory bound in JNDI - * - * @return the default XAQueueConnectionFactory name - */ - public String getXAQueueConnectionFactory() { - return null; - } - - /** - * Returns the name of the XATopicConnectionFactory bound in JNDI - * - * @return the default XATopicConnectionFactory name - */ - public String getXATopicConnectionFactory() { - return null; - } - - /** - * Look up the named administered object - * - * @param name the name that the administered object is bound to - * @return the administered object bound to name - * @throws NamingException if the object is not bound, or the lookup fails - */ - public Object lookup(String name) throws NamingException { - Object result = _directory.get(name); - if (result == null) { - if (name.equals(QUEUE_CONNECTION_FACTORY)) { - _directory.put(QUEUE_CONNECTION_FACTORY, new AMQConnectionFactory(host, port, user, pass, vhost)); - } else if (name.equals(TOPIC_CONNECTION_FACTORY)) { - _directory.put(TOPIC_CONNECTION_FACTORY, new AMQConnectionFactory(host, port, user, pass, vhost)); - } else { - throw new NameNotFoundException("Name not found: " + name); - } - } - return result; - } - - /** - * Create an administered destination - * - * @param name the destination name - * @param queue if true, create a queue, else create a topic - * @throws JMSException if the destination cannot be created - */ - public void createDestination(String name, boolean queue) - throws JMSException { - AMQDestination destination = null; - - try { - if (queue) { - destination = new AMQQueue(name); - createConsumer(destination); - } else { - destination = new AMQTopic(name); - createConsumer(destination); - } - - _directory.put(name, destination); - } catch (Exception exception) { - JMSException error = new JMSException(exception.getMessage()); - error.setLinkedException(exception); - throw error; - } - } - - /** - * Destroy an administered destination - * - * @param name the destination name - * @throws JMSException if the destination cannot be destroyed - */ - public void destroyDestination(String name) - throws JMSException { - - try { - Destination destination = (Destination) lookup(name); - _directory.remove(name); - } catch (NamingException exception) { - JMSException error = new JMSException(exception.getMessage()); - error.setLinkedException(exception); - throw error; - } catch (Exception exception) { - JMSException error = new JMSException(exception.getMessage()); - error.setLinkedException(exception); - throw error; - } - } - - /** - * Returns true if an administered destination exists - * - * @param name the destination name - * @throws JMSException for any internal JMS provider error - */ - public boolean destinationExists(String name) - throws JMSException { - - boolean exists = false; - try { - lookup(name); - exists = true; - } catch (NameNotFoundException ignore) { - } catch (Exception exception) { - JMSException error = new JMSException(exception.getMessage()); - error.setLinkedException(exception); - throw error; - } - return exists; - } - - public void initialise() throws JMSException { - try { - InetAddress address = InetAddress.getLocalHost(); - _connection = new AMQConnection(host, port, user, pass, - address.getHostName(), vhost); - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } catch (Exception exception) { - JMSException error = new JMSException(exception.getMessage()); - error.setLinkedException(exception); - throw error; - } - } - - public synchronized void cleanup() { - try { - _connection.close(); - } catch (JMSException e) { - e.printStackTrace(); - } - _connection = null; - _session = null; - _directory.clear(); - } - - MessageConsumer createConsumer(AMQDestination destination) throws JMSException - { - return ((AMQSession)_session).createConsumer(destination, /*pre-fetch*/0, false, /*exclusive*/false, null); - } -} //-- AMQPAdministrator diff --git a/java/client/src/old_test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java b/java/client/src/old_test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java deleted file mode 100644 index aafa415d1e..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/cts/src/providers/amqp/org/exolab/jmscts/amqp/AMQPProvider.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Redistribution and use of this software and associated documentation - * ("Software"), with or without modification, are permitted provided - * that the following conditions are met: - * - * 1. Redistributions of source code must retain copyright - * statements and notices. Redistributions must also contain a - * copy of this document. - * - * 2. Redistributions in binary form must reproduce the - * above copyright notice, this list of conditions and the - * following disclaimer in the documentation and/or other - * materials provided with the distribution. - * - * 3. The name "Exolab" must not be used to endorse or promote - * products derived from this Software without prior written - * permission of Exoffice Technologies. For written permission, - * please contact jima@intalio.com. - * - * 4. Products derived from this Software may not be called "Exolab" - * nor may "Exolab" appear in their names without prior written - * permission of Exoffice Technologies. Exolab is a registered - * trademark of Exoffice Technologies. - * - * 5. Due credit should be given to the Exolab Project - * (http://www.exolab.org/). - * - * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS - * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT - * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND - * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED - * OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Copyright 2001, 2003 (C) Exoffice Technologies Inc. All Rights Reserved. - * - */ -package org.apache.qpid.cts.src.providers.amqp.org.exolab.jmscts.amqp; - -import javax.jms.JMSException; - -import org.exolab.jmscts.provider.Administrator; -import org.exolab.jmscts.provider.Provider; - - -/** - * This class enables test cases to be run against the SonicMQ provider - * - * @see AMQPAdministrator - */ -public class AMQPProvider implements Provider { - - /** - * The administration interface - */ - private AMQPAdministrator _admin = new AMQPAdministrator(); - - /** - * Construct an instance of the interface to the AMQP provider - */ - public AMQPProvider() { - } - - /** - * Initialises the administation interface - * - * @throws JMSException if the administration interface can't be - * initialised - */ - public void initialise(boolean start) throws JMSException { - _admin.initialise(); - } - - /** - * Returns the administration interface - */ - public Administrator getAdministrator() { - return _admin; - } - - /** - * This method cleans up the administrator - */ - public void cleanup(boolean stop) { - _admin.cleanup(); - _admin = null; - } - -} //-- AMQPProvider diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java new file mode 100644 index 0000000000..4ffb3e8459 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.topic; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.transport.TransportConnection; + +import javax.jms.*; + +/** + * @author Apache Software Foundation + */ +public class TopicPublisherTest extends TestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + public void testUnidentifiedProducer() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicPublisher publisher = session1.createPublisher(null); + MessageConsumer consumer1 = session1.createConsumer(topic); + con.start(); + publisher.publish(topic, session1.createTextMessage("Hello")); + TextMessage m = (TextMessage) consumer1.receive(2000); + assertNotNull(m); + try + { + publisher.publish(session1.createTextMessage("Goodbye")); + fail("Did not throw UnsupportedOperationException"); + } + catch (UnsupportedOperationException e) + { + // PASS + } + + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TopicPublisherTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java new file mode 100644 index 0000000000..fa46a4bcfb --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.topic; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.transport.TransportConnection; + +import javax.jms.TopicSession; +import javax.jms.TextMessage; +import javax.jms.TopicPublisher; +import javax.jms.MessageConsumer; + +/** + * @author Apache Software Foundation + */ +public class TopicSessionTest extends TestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + public void testTextMessageCreation() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicPublisher publisher = session1.createPublisher(topic); + MessageConsumer consumer1 = session1.createConsumer(topic); + con.start(); + TextMessage tm = session1.createTextMessage("Hello"); + publisher.publish(tm); + tm = (TextMessage) consumer1.receive(2000); + assertNotNull(tm); + String msgText = tm.getText(); + assertEquals("Hello", msgText); + tm = session1.createTextMessage(); + msgText = tm.getText(); + assertNull(msgText); + publisher.publish(tm); + tm = (TextMessage) consumer1.receive(2000); + assertNotNull(tm); + msgText = tm.getText(); + assertNull(msgText); + tm.clearBody(); + tm.setText("Now we are not null"); + publisher.publish(tm); + tm = (TextMessage) consumer1.receive(2000); + assertNotNull(tm); + msgText = tm.getText(); + assertEquals("Now we are not null", msgText); + + tm = session1.createTextMessage(""); + msgText = tm.getText(); + assertEquals("Empty string not returned", "", msgText); + publisher.publish(tm); + tm = (TextMessage) consumer1.receive(2000); + assertNotNull(tm); + assertEquals("Empty string not returned", "", msgText); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TopicSessionTest.class); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java index b6556b2468..8c9b5f3b4c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,7 +33,7 @@ import java.util.StringTokenizer; import java.util.Vector; //extends FieldTable -public class PropertyFieldTable implements FieldTable, Map +public class PropertyFieldTable implements FieldTable { private static final Logger _logger = Logger.getLogger(PropertyFieldTable.class); @@ -597,7 +597,7 @@ public class PropertyFieldTable implements FieldTable, Map // FIXME: Should be able to short-cut this process if the old and new values are // the same object and/or type and size... - _encodedSize -= getEncodingSize(currentValue + propertyName, previous); + _encodedSize -= getEncodingSize(currentValue + propertyName, previous) + 1; } _propertyNamesTypeMap.put(propertyName, "" + propertyPrefix); @@ -1028,8 +1028,8 @@ public class PropertyFieldTable implements FieldTable, Map // This is ambiguous in the JMS spec and needs thrashing out and potentially // testing against other implementations. - //Add the size of the content - _encodedSize += getEncodingSize(key, value); + //Add the size of the content plus one for the type identifier + _encodedSize += getEncodingSize(key, value) + 1; } if (_logger.isDebugEnabled()) @@ -1044,25 +1044,18 @@ public class PropertyFieldTable implements FieldTable, Map public Object remove(Object key) { - if (key instanceof String) - { - throw new IllegalArgumentException("Property key be a string"); - } - - char propertyPrefix = ((String) key).charAt(0); - if (_properties.containsKey(key)) { final Object value = _properties.remove(key); + String typePrefix = _propertyNamesTypeMap.remove(key); // plus one for the type - _encodedSize -= EncodingUtils.encodedShortStringLength(((String) key)); + _encodedSize -= EncodingUtils.encodedShortStringLength(((String) key)) + 1; // This check is, for now, unnecessary (we don't store null values). if (value != null) { - _encodedSize -= getEncodingSize(propertyPrefix + (String) key, value); + _encodedSize -= getEncodingSize(typePrefix, value); } - return value; } else @@ -1329,7 +1322,7 @@ public class PropertyFieldTable implements FieldTable, Map */ private static int getEncodingSize(String name, Object value) { - int encodingSize = 1; // Initialy 1 to cover the char prefix + int encodingSize = 0; char propertyPrefix = name.charAt(0); |