diff options
Diffstat (limited to 'java/client')
41 files changed, 1717 insertions, 481 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 5c13e7861f..c6f3f9c492 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -79,13 +79,19 @@ public abstract class AMQDestination implements Destination, Referenceable protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, boolean isAutoDelete, String queueName) { + this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false); + } + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, + boolean isAutoDelete, String queueName, boolean isDurable) + { if (destinationName == null) { - throw new IllegalArgumentException("Destination name must not be null"); + throw new IllegalArgumentException("Destination exchange must not be null"); } if (exchangeName == null) { - throw new IllegalArgumentException("Exchange name must not be null"); + throw new IllegalArgumentException("Exchange exchange must not be null"); } if (exchangeClass == null) { @@ -97,6 +103,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isExclusive = isExclusive; _isAutoDelete = isAutoDelete; _queueName = queueName; + _isDurable = isDurable; } public String getEncodedName() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8e93b19eea..21fc5c89c5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -23,13 +23,15 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.server.handler.ExchangeBoundHandler; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; @@ -37,6 +39,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; + import javax.jms.*; import javax.jms.IllegalStateException; import java.io.Serializable; @@ -46,6 +49,8 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -66,6 +71,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; /** + * Used to reference durable subscribers so they requests for unsubscribe can be handled + * correctly. Note this only keeps a record of subscriptions which have been created + * in the current instance. It does not remember subscriptions between executions of the + * client + */ + private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = + new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = + new ConcurrentHashMap<BasicMessageConsumer, String>(); + + /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait * feature. */ @@ -91,6 +107,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private Map _consumers = new ConcurrentHashMap(); /** + * Maps from destination to count of JMSMessageConsumers + */ + private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = + new ConcurrentHashMap<Destination, AtomicInteger>(); + + /** * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not * need to be attached to a queue */ @@ -116,6 +138,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); + private final AtomicLong _lastDeliveryTag = new AtomicLong(); + + /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -154,10 +179,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring..."); + _logger.warn("Consumers that exist: " + _consumers); + _logger.warn("Session hashcode: " + System.identityHashCode(this)); } else { + consumer.notifyMessage(message, _channelId); + } } else @@ -673,6 +702,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } + + public void acknowledge() throws JMSException + { + if (getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + if (isClosed()) + { + throw new javax.jms.IllegalStateException("Session is already closed"); + } + acknowledgeMessage(_lastDeliveryTag.get(), true); + + } + + void setLastDeliveredMessage(AbstractJMSMessage message) + { + _lastDeliveryTag.set(message.getDeliveryTag()); + } + + public MessageListener getMessageListener() throws JMSException { checkNotClosed(); @@ -775,20 +825,38 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + false, + false, + null, + null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + false, + false, + messageSelector, + null); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + noLocal, + false, + messageSelector, + null); } public MessageConsumer createConsumer(Destination destination, @@ -798,7 +866,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null); } @@ -810,7 +878,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } public MessageConsumer createConsumer(Destination destination, @@ -846,6 +914,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final String selector, final FieldTable rawSelector) throws JMSException { + checkTemporaryDestination(destination); + return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() { public Object operation() throws JMSException @@ -870,7 +940,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - registerConsumer(consumer); + registerConsumer(consumer, false); } catch (AMQException e) { @@ -879,11 +949,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw ex; } + synchronized(destination) + { + _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger()); + _destinationConsumerCount.get(destination).incrementAndGet(); + } + return consumer; } }.execute(_connection); } + private void checkTemporaryDestination(Destination destination) + throws JMSException + { + if((destination instanceof TemporaryDestination)) + { + _logger.debug("destination is temporary"); + final TemporaryDestination tempDest = (TemporaryDestination) destination; + if(tempDest.getSession() != this) + { + _logger.debug("destination is on different session"); + throw new JMSException("Cannot consume from a temporary destination created onanother session"); + } + if(tempDest.isDeleted()) + { + _logger.debug("destination is deleted"); + throw new JMSException("Cannot consume from a deleted destination"); + } + } + } + + + public boolean hasConsumer(Destination destination) + { + AtomicInteger counter = _destinationConsumerCount.get(destination); + + return (counter != null) && (counter.get() != 0); + } + + public void declareExchange(String name, String type) { declareExchange(name, type, _connection.getProtocolHandler()); @@ -947,20 +1052,38 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName * @return the consumer tag generated by the broker */ - private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, - boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException + private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler, + boolean nowait) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, - queueName, tag, noLocal, - acknowledgeMode == Session.NO_ACKNOWLEDGE, - exclusive, true); + consumer.setConsumerTag(tag); + // we must register the consumer in the map before we actually start listening + _consumers.put(tag, consumer); - protocolHandler.writeFrame(jmsConsume); - return tag; + try + { + AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, + queueName, tag, consumer.isNoLocal(), + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, + consumer.isExclusive(), nowait, null); + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); + } + else + { + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + } + } + catch (AMQException e) + { + // clean-up the map in the event of an error + _consumers.remove(tag); + throw e; + } } public Queue createQueue(String queueName) throws JMSException @@ -1081,19 +1204,55 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - * If a name is reused in creating a new subscriber with a different topic/selecto or no-local - * flag then the subcriber will receive messages matching the old subscription AND the new one. - * The spec states that the new one should replace the old one. - * TODO: fix it. - */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getQueueName()) && + !isQueueBound(dest.getQueueName(), topic.getTopicName())) + { + deleteQueue(dest.getQueueName()); + } + } + + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + + _subscriptions.put(name,subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); + + return subscriber; + } + + void deleteQueue(String queueName) throws JMSException + { + try + { + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false, + false, true); + _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } /** @@ -1104,9 +1263,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - return new TopicSubscriberAdaptor(dest, consumer); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name,subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); + return subscriber; } public TopicPublisher createPublisher(Topic topic) throws JMSException @@ -1132,32 +1294,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TemporaryQueue createTemporaryQueue() throws JMSException { checkNotClosed(); - return new AMQTemporaryQueue(); + return new AMQTemporaryQueue(this); } public TemporaryTopic createTemporaryTopic() throws JMSException { checkNotClosed(); - return new AMQTemporaryTopic(); + return new AMQTemporaryTopic(this); } public void unsubscribe(String name) throws JMSException { checkNotClosed(); - - String queue = _connection.getClientID() + ":" + name; - - AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null); - - try { - AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class); - // if this method doen't throw an exception means we have received a queue declare ok. - } catch (AMQException e) { - throw new javax.jms.InvalidDestinationException("This destination doesn't exist"); - } - //send a queue.delete for the subscription - AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); - _connection.getProtocolHandler().writeFrame(frame); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + // send a queue.delete for the subscription + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + _subscriptions.remove(name); + _reverseSubscriptionMap.remove(subscriber); + } + else + { + if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection))) + { + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + } + else + { + throw new InvalidDestinationException("Unknown subscription exchange:" + name); + } + } + } + + boolean isQueueBound(String queueName) throws JMSException + { + return isQueueBound(queueName, null); + } + + boolean isQueueBound(String queueName, String routingKey) throws JMSException + { + AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME, + routingKey, queueName); + AMQMethodEvent response = null; + try + { + response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + return (responseBody.replyCode == ExchangeBoundHandler.OK); } private void checkTransacted() throws JMSException @@ -1263,7 +1452,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param consumer * @throws AMQException */ - void registerConsumer(BasicMessageConsumer consumer) throws AMQException + void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException { AMQDestination amqd = consumer.getDestination(); @@ -1275,22 +1464,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), - consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode()); - - consumer.setConsumerTag(consumerTag); - _consumers.put(consumerTag, consumer); + consumeFromQueue(consumer, queueName, protocolHandler, nowait); } /** * Called by the MessageConsumer when closing, to deregister the consumer from the * map from consumerTag to consumer instance. * - * @param consumerTag the consumer tag, that was broker-generated + * @param consumer the consum */ - void deregisterConsumer(String consumerTag) + void deregisterConsumer(BasicMessageConsumer consumer) { - _consumers.remove(consumerTag); + _consumers.remove(consumer.getConsumerTag()); + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if(subscriptionName != null) + { + _subscriptions.remove(subscriptionName); + } + + Destination dest = consumer.getDestination(); + synchronized(dest) + { + if(_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } + } } private void registerProducer(long producerId, MessageProducer producer) @@ -1338,7 +1537,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); - registerConsumer(consumer); + registerConsumer(consumer, true); } } @@ -1359,12 +1558,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws InvalidDestinationException + private void checkValidTopic(Topic topic) throws JMSException { if (topic == null) { throw new javax.jms.InvalidDestinationException("Invalid Topic"); } + if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this) + { + throw new JMSException("Cannot create a subscription on a temporary topic created in another session"); + } } private void checkValidQueue(Queue queue) throws InvalidDestinationException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index 6b41ea0112..81fee69f90 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -26,25 +26,45 @@ import javax.jms.TemporaryQueue; /** * AMQ implementation of a TemporaryQueue. */ -final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue +final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination { + private final AMQSession _session; + private boolean _deleted; + /** * Create a new instance of an AMQTemporaryQueue */ - public AMQTemporaryQueue() + public AMQTemporaryQueue(AMQSession session) { super("TempQueue" + Long.toString(System.currentTimeMillis()), true); + _session = session; } /** * @see javax.jms.TemporaryQueue#delete() */ - public void delete() throws JMSException + public synchronized void delete() throws JMSException { - throw new UnsupportedOperationException("Delete not supported, " + - "will auto-delete when connection closed"); + if(_session.hasConsumer(this)) + { + throw new JMSException("Temporary Queue has consumers so cannot be deleted"); + } + _deleted = true; + + // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted + // by the server when there are no more subscriptions to that queue. This is probably not + // quite right for JMSCompliance. } + public AMQSession getSession() + { + return _session; + } + + public boolean isDeleted() + { + return _deleted; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java index 0ba5cb3c3a..241a9abc9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java @@ -26,15 +26,18 @@ import javax.jms.TemporaryTopic; /** * AMQ implementation of TemporaryTopic. */ -class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic +class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDestination { + private final AMQSession _session; + private boolean _deleted; /** * Create new temporary topic. */ - public AMQTemporaryTopic() + public AMQTemporaryTopic(AMQSession session) { super("TempQueue" + Long.toString(System.currentTimeMillis())); + _session = session; } /** @@ -42,8 +45,25 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic */ public void delete() throws JMSException { - throw new UnsupportedOperationException("Delete not supported, " + - "will auto-delete when connection closed"); + if(_session.hasConsumer(this)) + { + throw new JMSException("Temporary Topic has consumers so cannot be deleted"); + } + + _deleted = true; + // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted + // by the server when there are no more subscriptions to that queue. This is probably not + // quite right for JMSCompliance. + } + + public AMQSession getSession() + { + return _session; + } + + public boolean isDeleted() + { + return _deleted; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 4dd38eea18..39304f3f4c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,20 +40,25 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(String name) { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null); - _isDurable = false; + this(name, true, null, false); } - /** - * Constructor for use in creating a topic to represent a durable subscription - * @param topic - * @param clientId - * @param subscriptionName - */ - public AMQTopic(AMQTopic topic, String clientId, String subscriptionName) + public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable) + { + super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, + queueName, isDurable); + } + + public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) + throws JMSException + { + return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection), + true); + } + + public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName); - _isDurable = true; + return connection.getClientID() + ":" + subscriptionName; } public String getTopicName() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4fb62b49fc..9f9038fddd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -214,10 +214,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //handle case where connection has already been started, and the dispatcher is blocked //doing a put on the _synchronousQueue - Object msg = _synchronousQueue.poll(); - if (msg != null) + AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); + if (jmsMsg != null) { - AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg; + _session.setLastDeliveredMessage(jmsMsg); messageListener.onMessage(jmsMsg); postDeliver(jmsMsg); } @@ -280,7 +280,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { checkPreConditions(); - + acquireReceiving(); try @@ -297,12 +297,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { + _session.setLastDeliveredMessage(m); postDeliver(m); } + return m; } catch (InterruptedException e) { + _logger.warn("Interrupted: " + e, e); return null; } finally @@ -323,8 +326,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { + _session.setLastDeliveredMessage(m); postDeliver(m); } + return m; } finally @@ -423,6 +428,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //we do not need a lock around the test above, and the dispatch below as it is invalid //for an application to alter an installed listener while the session is started + _session.setLastDeliveredMessage(jmsMessage); getMessageListener().onMessage(jmsMessage); postDeliver(jmsMessage); } @@ -459,8 +465,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - private void postDeliver(AbstractJMSMessage msg) + private void postDeliver(AbstractJMSMessage msg) throws JMSException { + msg.setJMSDestination(_destination); switch (_acknowledgeMode) { case Session.DUPS_OK_ACKNOWLEDGE: @@ -522,7 +529,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private void deregisterConsumer() { - _session.deregisterConsumer(_consumerTag); + _session.deregisterConsumer(this); } public String getConsumerTag() @@ -531,18 +538,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } public void setConsumerTag(String consumerTag) - { + { _consumerTag = consumerTag; } public AMQSession getSession() { return _session; } - + private void checkPreConditions() throws JMSException{ - + this.checkNotClosed(); - + if(_session == null || _session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 8c53d93de6..e11d70cf41 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -24,11 +24,13 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; import javax.jms.*; import java.io.UnsupportedEncodingException; +import java.util.Enumeration; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -140,7 +142,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageID(boolean b) throws JMSException { - checkPreConditions(); + checkPreConditions(); checkNotClosed(); // IGNORED } @@ -154,7 +156,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDisableMessageTimestamp(boolean b) throws JMSException { - checkPreConditions(); + checkPreConditions(); _disableTimestamps = b; } @@ -166,11 +168,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setDeliveryMode(int i) throws JMSException { - checkPreConditions(); + checkPreConditions(); if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) { throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + - " is illegal"); + " is illegal"); } _deliveryMode = i; } @@ -183,7 +185,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setPriority(int i) throws JMSException { - checkPreConditions(); + checkPreConditions(); if (i < 0 || i > 9) { throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); @@ -199,7 +201,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void setTimeToLive(long l) throws JMSException { - checkPreConditions(); + checkPreConditions(); if (l < 0) { throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); @@ -227,33 +229,36 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); + + synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, + sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } public void send(Message message, int deliveryMode) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); + synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } public void send(Message message, int deliveryMode, boolean immediate) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate); } } @@ -261,23 +266,23 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions(); - checkInitialDestination(); + checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory, + sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } public void send(Destination destination, Message message) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -286,12 +291,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } @@ -305,7 +310,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate); } } @@ -314,12 +319,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate); } } @@ -329,27 +334,158 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j boolean immediate, boolean waitUntilSent) throws JMSException { - checkPreConditions(); - checkDestination(destination); + checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, waitUntilSent); } } + + private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException + { + if (message instanceof AbstractJMSMessage) + { + return (AbstractJMSMessage) message; + } + else + { + AbstractJMSMessage newMessage; + + if (message instanceof BytesMessage) + { + BytesMessage bytesMessage = (BytesMessage) message; + bytesMessage.reset(); + + JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage(); + + + byte[] buf = new byte[1024]; + + int len; + + while ((len = bytesMessage.readBytes(buf)) != -1) + { + nativeMsg.writeBytes(buf, 0, len); + } + + newMessage = nativeMsg; + } + else if (message instanceof MapMessage) + { + MapMessage origMessage = (MapMessage) message; + MapMessage nativeMessage = _session.createMapMessage(); + + Enumeration mapNames = origMessage.getMapNames(); + while (mapNames.hasMoreElements()) + { + String name = (String) mapNames.nextElement(); + nativeMessage.setObject(name, origMessage.getObject(name)); + } + newMessage = (AbstractJMSMessage) nativeMessage; + } + else if (message instanceof ObjectMessage) + { + ObjectMessage origMessage = (ObjectMessage) message; + ObjectMessage nativeMessage = _session.createObjectMessage(); + + nativeMessage.setObject(origMessage.getObject()); + + newMessage = (AbstractJMSMessage) nativeMessage; + } + else if (message instanceof TextMessage) + { + TextMessage origMessage = (TextMessage) message; + TextMessage nativeMessage = _session.createTextMessage(); + + nativeMessage.setText(origMessage.getText()); + + newMessage = (AbstractJMSMessage) nativeMessage; + } + else if (message instanceof StreamMessage) + { + StreamMessage origMessage = (StreamMessage) message; + StreamMessage nativeMessage = _session.createStreamMessage(); + + + try + { + origMessage.reset(); + while (true) + { + nativeMessage.writeObject(origMessage.readObject()); + } + } + catch (MessageEOFException e) + { + ;// + } + newMessage = (AbstractJMSMessage) nativeMessage; + } + else + { + newMessage = (AbstractJMSMessage) _session.createMessage(); + + } + + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) + { + String propertyName = String.valueOf(propertyNames.nextElement()); + if (!propertyName.startsWith("JMSX_")) + { + Object value = message.getObjectProperty(propertyName); + newMessage.setObjectProperty(propertyName, value); + } + } + + newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); + + + int priority = message.getJMSPriority(); + if (priority < 0) + { + priority = 0; + } + else if (priority > 9) + { + priority = 9; + } + + newMessage.setJMSPriority(priority); + if (message.getJMSReplyTo() != null) + { + newMessage.setJMSReplyTo(message.getJMSReplyTo()); + } + newMessage.setJMSType(message.getJMSType()); + + + if (newMessage != null) + { + return newMessage; + } + else + { + throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName()); + } + } + } + + private void validateDestination(Destination destination) throws JMSException { if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " + - (destination != null ? destination.getClass() : null)); + (destination != null ? destination.getClass() : null)); } - declareDestination((AMQDestination)destination); + declareDestination((AMQDestination) destination); } - protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, + protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); @@ -357,8 +493,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j /** * The caller of this method must hold the failover mutex. + * * @param destination - * @param message + * @param origMessage * @param deliveryMode * @param priority * @param timeToLive @@ -366,9 +503,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @param immediate * @throws JMSException */ - protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, + protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { + checkTemporaryDestination(destination); + + AbstractJMSMessage message = convertToNativeMessage(origMessage); AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); @@ -424,11 +564,42 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); _protocolHandler.writeFrame(compositeFrame, wait); + + + if (message != origMessage) + { + _logger.warn("Updating original message"); + origMessage.setJMSPriority(message.getJMSPriority()); + origMessage.setJMSTimestamp(message.getJMSTimestamp()); + _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration()); + origMessage.setJMSExpiration(message.getJMSExpiration()); + origMessage.setJMSMessageID(message.getJMSMessageID()); + } + } + + private void checkTemporaryDestination(AMQDestination destination) throws JMSException + { + if(destination instanceof TemporaryDestination) + { + _logger.debug("destination is temporary destination"); + TemporaryDestination tempDest = (TemporaryDestination) destination; + if(tempDest.getSession().isClosed()) + { + _logger.debug("session is closed"); + throw new JMSException("Session for temporary destination has been closed"); + } + if(tempDest.isDeleted()) + { + _logger.debug("destination is deleted"); + throw new JMSException("Cannot send to a deleted temporary destination"); + } + } } /** * Create content bodies. This will split a large message into numerous bodies depending on the negotiated * maximum frame size. + * * @param payload * @return the array of content bodies */ @@ -458,8 +629,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j for (int i = 0; i < bodies.length; i++) { bodies[i] = new ContentBody(); - payload.position((int)framePayloadMax * i); - int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining; + payload.position((int) framePayloadMax * i); + int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); bodies[i].payload = payload.slice(); remaining -= length; @@ -480,32 +651,42 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _encoding = encoding; } - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { - checkNotClosed(); + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + { + checkNotClosed(); + + if (_session == null || _session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } + + private void checkInitialDestination() + { + if (_destination == null) + { + throw new UnsupportedOperationException("Destination is null"); + } + } - if(_session == null || _session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - } + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + { + if (_destination != null && suppliedDestination != null) + { + throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + } - private void checkInitialDestination(){ - if(_destination == null){ - throw new UnsupportedOperationException("Destination is null"); - } - } + if (suppliedDestination == null) + { + throw new InvalidDestinationException("Supplied Destination was invalid"); + } - private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{ - if (_destination != null && suppliedDestination != null){ - throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); - } - if (suppliedDestination == null){ - throw new InvalidDestinationException("Supplied Destination was invalid"); - } - } + } - public AMQSession getSession() { - return _session; - } + public AMQSession getSession() + { + return _session; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java new file mode 100644 index 0000000000..34ec49436e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +/** + * @author Apache Software Foundation + */ +public class JMSAMQException extends JMSException +{ + public JMSAMQException(AMQException s) + { + super(s.getMessage(), String.valueOf(s.getErrorCode())); + setLinkedException(s); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index f90cc97a80..c8de298ba1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -10,119 +10,124 @@ import javax.jms.QueueSender; public class QueueSenderAdapter implements QueueSender { - private MessageProducer delegate; - private Queue queue; + private MessageProducer _delegate; + private Queue _queue; private boolean closed = false; public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){ - delegate = msgProducer; - this.queue = queue; + _delegate = msgProducer; + _queue = queue; } public Queue getQueue() throws JMSException { checkPreConditions(); - return queue; + return _queue; } public void send(Message msg) throws JMSException { checkPreConditions(); - delegate.send(msg); + _delegate.send(msg); } public void send(Queue queue, Message msg) throws JMSException { - checkPreConditions(); - delegate.send(queue, msg); + checkPreConditions(queue); + _delegate.send(queue, msg); } public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { checkPreConditions(); - delegate.send(msg, deliveryMode,priority,timeToLive); + _delegate.send(msg, deliveryMode,priority,timeToLive); } public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions(); - delegate.send(queue,msg, deliveryMode,priority,timeToLive); + checkPreConditions(queue); + _delegate.send(queue,msg, deliveryMode,priority,timeToLive); } public void close() throws JMSException { - delegate.close(); + _delegate.close(); closed = true; } public int getDeliveryMode() throws JMSException { checkPreConditions(); - return delegate.getDeliveryMode(); + return _delegate.getDeliveryMode(); } public Destination getDestination() throws JMSException { checkPreConditions(); - return delegate.getDestination(); + return _delegate.getDestination(); } public boolean getDisableMessageID() throws JMSException { checkPreConditions(); - return delegate.getDisableMessageID(); + return _delegate.getDisableMessageID(); } public boolean getDisableMessageTimestamp() throws JMSException { checkPreConditions(); - return delegate.getDisableMessageTimestamp(); + return _delegate.getDisableMessageTimestamp(); } public int getPriority() throws JMSException { checkPreConditions(); - return delegate.getPriority(); + return _delegate.getPriority(); } public long getTimeToLive() throws JMSException { checkPreConditions(); - return delegate.getTimeToLive(); + return _delegate.getTimeToLive(); } public void send(Destination dest, Message msg) throws JMSException { - checkPreConditions(); - delegate.send(dest,msg); + checkPreConditions((Queue)dest); + _delegate.send(dest,msg); } public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { checkPreConditions(); - delegate.send(msg, deliveryMode,priority,timeToLive); + _delegate.send(msg, deliveryMode,priority,timeToLive); } public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions(); - delegate.send(dest,msg, deliveryMode,priority,timeToLive); + checkPreConditions((Queue)dest); + _delegate.send(dest,msg, deliveryMode,priority,timeToLive); } public void setDeliveryMode(int deliveryMode) throws JMSException { checkPreConditions(); - delegate.setDeliveryMode(deliveryMode); + _delegate.setDeliveryMode(deliveryMode); } public void setDisableMessageID(boolean disableMessageID) throws JMSException { checkPreConditions(); - delegate.setDisableMessageID(disableMessageID); + _delegate.setDisableMessageID(disableMessageID); } public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { checkPreConditions(); - delegate.setDisableMessageTimestamp(disableMessageTimestamp); + _delegate.setDisableMessageTimestamp(disableMessageTimestamp); } public void setPriority(int priority) throws JMSException { checkPreConditions(); - delegate.setPriority(priority); + _delegate.setPriority(priority); } public void setTimeToLive(long timeToLive) throws JMSException { checkPreConditions(); - delegate.setTimeToLive(timeToLive); + _delegate.setTimeToLive(timeToLive); } - - private void checkPreConditions() throws IllegalStateException, IllegalStateException { + + private void checkPreConditions() throws IllegalStateException, IllegalStateException + { + checkPreConditions(_queue); + } + + private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException { if (closed){ throw new javax.jms.IllegalStateException("Publisher is closed"); } @@ -131,7 +136,7 @@ public class QueueSenderAdapter implements QueueSender { throw new UnsupportedOperationException("Queue is null"); } - AMQSession session = ((BasicMessageProducer)delegate).getSession(); + AMQSession session = ((BasicMessageProducer) _delegate).getSession(); if(session == null || session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java new file mode 100644 index 0000000000..8c11672a65 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java @@ -0,0 +1,17 @@ +package org.apache.qpid.client;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * so that operations related to their "temporary-ness" can be abstracted out.
+ */
+interface TemporaryDestination extends Destination
+{
+
+ public void delete() throws JMSException;
+ public AMQSession getSession();
+ public boolean isDeleted();
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index 014c7c3311..dbc7b72813 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -35,10 +35,10 @@ import javax.jms.TopicSubscriber; class TopicSubscriberAdaptor implements TopicSubscriber { private final Topic _topic; - private final MessageConsumer _consumer; + private final BasicMessageConsumer _consumer; private final boolean _noLocal; - TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal) + TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal) { _topic = topic; _consumer = consumer; @@ -119,4 +119,10 @@ class TopicSubscriberAdaptor implements TopicSubscriber throw new javax.jms.IllegalStateException("Invalid Session"); } } + + BasicMessageConsumer getMessageConsumer() + { + return _consumer; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java new file mode 100644 index 0000000000..858726745e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ExchangeBoundOkBody; + +/** + * @author Apache Software Foundation + */ +public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class); + private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler(); + + public static ExchangeBoundOkMethodHandler getInstance() + { + return _instance; + } + + private ExchangeBoundOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod(); + _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " + + body.replyText); + } + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java new file mode 100644 index 0000000000..3271a715a2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.log4j.Logger; + +/** + * @author Apache Software Foundation + */ +public class QueueDeleteOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class); + private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler(); + + public static QueueDeleteOkMethodHandler getInstance() + { + return _instance; + } + + private QueueDeleteOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod(); + _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount); + } + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index c1ed88b167..75e84fee96 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -32,16 +32,18 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.JmsNotImplementedException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.JMSPropertyFieldTable; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; +import javax.jms.MessageFormatException; import java.util.Collections; import java.util.Enumeration; import java.util.Map; -public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message +public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); @@ -50,7 +52,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms protected ByteBuffer _data; private boolean _readableProperties = false; private boolean _readableMessage = false; - + private Destination _destination; + protected AbstractJMSMessage(ByteBuffer data) { super(new BasicContentHeaderProperties()); @@ -174,12 +177,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public Destination getJMSDestination() throws JMSException { // TODO: implement this once we have sorted out how to figure out the exchange class - throw new JmsNotImplementedException(); + return _destination; } public void setJMSDestination(Destination destination) throws JMSException { - throw new JmsNotImplementedException(); + _destination = destination; } public int getJMSDeliveryMode() throws JMSException @@ -234,7 +237,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public void clearProperties() throws JMSException { - getJmsContentHeaderProperties().getHeaders().clear(); + getJmsContentHeaderProperties().getJMSHeaders().clear(); _readableProperties = false; } @@ -249,138 +252,139 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public boolean propertyExists(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().propertyExists(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().propertyExists(propertyName); } public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName); + + return getJmsContentHeaderProperties().getJMSHeaders().getBoolean(propertyName); } public byte getByteProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getByte(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getByte(propertyName); } public short getShortProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getShort(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getInteger(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getLong(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getFloat(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getDouble(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getString(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getHeaders().getObject(propertyName); + return getJmsContentHeaderProperties().getJMSHeaders().getObject(propertyName); } public Enumeration getPropertyNames() throws JMSException { - return getJmsContentHeaderProperties().getHeaders().getPropertyNames(); + return getJmsContentHeaderProperties().getJMSHeaders().getPropertyNames(); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setBoolean(propertyName, b); + getJmsContentHeaderProperties().getJMSHeaders().setBoolean(propertyName, b); } public void setByteProperty(String propertyName, byte b) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setByte(propertyName, new Byte(b)); + getJmsContentHeaderProperties().getJMSHeaders().setByte(propertyName, new Byte(b)); } public void setShortProperty(String propertyName, short i) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setShort(propertyName, new Short(i)); + getJmsContentHeaderProperties().getJMSHeaders().setShort(propertyName, new Short(i)); } public void setIntProperty(String propertyName, int i) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setInteger(propertyName, new Integer(i)); + getJmsContentHeaderProperties().getJMSHeaders().setInteger(propertyName, new Integer(i)); } public void setLongProperty(String propertyName, long l) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setLong(propertyName, new Long(l)); + getJmsContentHeaderProperties().getJMSHeaders().setLong(propertyName, new Long(l)); } public void setFloatProperty(String propertyName, float f) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setFloat(propertyName, new Float(f)); + getJmsContentHeaderProperties().getJMSHeaders().setFloat(propertyName, new Float(f)); } public void setDoubleProperty(String propertyName, double v) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setDouble(propertyName, new Double(v)); + getJmsContentHeaderProperties().getJMSHeaders().setDouble(propertyName, new Double(v)); } public void setStringProperty(String propertyName, String value) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setString(propertyName, value); + getJmsContentHeaderProperties().getJMSHeaders().setString(propertyName, value); } public void setObjectProperty(String propertyName, Object object) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object); + getJmsContentHeaderProperties().getJMSHeaders().setObject(propertyName, object); } protected void removeProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - getJmsContentHeaderProperties().getHeaders().remove(propertyName); + getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName); } - public void acknowledge() throws JMSException + public void acknowledgeThis() throws JMSException { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. @@ -397,6 +401,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } } + public void acknowledge() throws JMSException + { + if(_session != null) + { + _session.acknowledge(); + } + } + /** * This forces concrete classes to implement clearBody() @@ -426,13 +438,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); buf.append("\nAMQ message number: ").append(_deliveryTag); buf.append("\nProperties:"); - if (getJmsContentHeaderProperties().getHeaders().isEmpty()) + if (getJmsContentHeaderProperties().getJMSHeaders().isEmpty()) { buf.append("<NONE>"); } else { - buf.append('\n').append(getJmsContentHeaderProperties().getHeaders()); + buf.append('\n').append(getJmsContentHeaderProperties().getJMSHeaders()); } return buf.toString(); } @@ -462,9 +474,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { throw new IllegalArgumentException("Property name must not be the empty string"); } - - // Call to ensure that the it has been set. - getJmsContentHeaderProperties().getHeaders(); } public BasicContentHeaderProperties getJmsContentHeaderProperties() @@ -478,14 +487,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms // position beyond the start if (_data != null) { - if (!_readableMessage) - { - _data.flip(); - } - else - { - _data.rewind(); - } + reset(); } return _data; } @@ -524,7 +526,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms return !_readableMessage; } - public void reset() throws JMSException + public void reset() { if (_readableMessage) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 456d4d520c..debabfd559 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -30,6 +30,9 @@ import javax.jms.MessageFormatException; import javax.jms.MessageEOFException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CharsetDecoder; +import java.nio.CharBuffer; public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { @@ -149,10 +152,27 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag checkReadable(); // we check only for one byte since theoretically the string could be only a // single byte when using UTF-8 encoding - checkAvailable(1); + try { - return _data.getString(Charset.forName("UTF-8").newDecoder()); + short length = readShort(); + if(length == 0) + { + return ""; + } + else + { + CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = decoder.decode(encodedString.buf()); + + return string.toString(); + } + + + } catch (CharacterCodingException e) { @@ -257,9 +277,15 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag checkWritable(); try { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); + CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + _data.putShort((short)encodedString.limit()); + _data.put(encodedString); + + //_data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must add the null terminator manually - _data.put((byte)0); + //_data.put((byte)0); } catch (CharacterCodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 85d434e4eb..f69bed0fc0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -25,6 +25,8 @@ import org.apache.qpid.framing.PropertyFieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.EncodingUtils; +import org.apache.qpid.framing.JMSPropertyFieldTable; +import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; @@ -39,7 +41,7 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa public static final String MIME_TYPE = "jms/map-message"; - private PropertyFieldTable _map; + private JMSPropertyFieldTable _properties; JMSMapMessage() throws JMSException { @@ -49,10 +51,9 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa JMSMapMessage(ByteBuffer data) throws JMSException { super(data); // this instantiates a content header - _map = new PropertyFieldTable(); + _properties = new JMSPropertyFieldTable(); } - JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { @@ -62,19 +63,33 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa { long tableSize = EncodingUtils.readInteger(_data); - _map = (PropertyFieldTable) FieldTableFactory.newFieldTable(_data, tableSize); - + try + { + _properties = new JMSPropertyFieldTable(_data, tableSize); + } + catch (JMSException e) + { + Exception error = e.getLinkedException(); + if (error instanceof AMQFrameDecodingException) + { + throw(AMQFrameDecodingException) error; + } + else + { + throw new AMQException(e.getMessage(), e); + } + } } else { - _map = (PropertyFieldTable) FieldTableFactory.newFieldTable(); + _properties = new JMSPropertyFieldTable(); } } public String toBodyString() throws JMSException { - return "MapSize:" + _map.getEncodedSize() + "\nMapData:\n" + _map.toString(); + return _properties.toString(); } public String getMimeType() @@ -82,85 +97,43 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa return MIME_TYPE; } - // MapMessage Interface - public boolean getBoolean(String string) throws JMSException + public ByteBuffer getData() { - Boolean b = _map.getBoolean(string); - - if (b == null) - { - if (_map.containsKey(string)) - { - Object str = _map.getObject(string); + //What if _data is null? + _properties.writeToBuffer(_data); + return super.getData(); + } - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } - } + @Override + public void clearBodyImpl() throws JMSException + { + super.clearBodyImpl(); + _properties.clear(); + } - return b; + public boolean getBoolean(String string) throws JMSException + { + return _properties.getBoolean(string); } public byte getByte(String string) throws JMSException { - Byte b = _map.getByte(string); - if (b == null) - { - if (_map.containsKey(string)) - { - Object str = _map.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getByte can't use " + string + " item."); - } - else - { - return Byte.valueOf((String) str); - } - } - else - { - b = Byte.valueOf(null); - } - } - - return b; + return _properties.getByte(string); } public short getShort(String string) throws JMSException { - { - Short s = _map.getShort(string); - - if (s == null) - { - s = Short.valueOf(getByte(string)); - } - - return s; - } + return _properties.getShort(string); } public char getChar(String string) throws JMSException { - - Character result = _map.getCharacter(string); + Character result = _properties.getCharacter(string); if (result == null) { - throw new MessageFormatException("getChar couldn't find " + string + " item."); + throw new NullPointerException("getChar couldn't find " + string + " item."); } else { @@ -170,179 +143,97 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa public int getInt(String string) throws JMSException { - Integer i = _map.getInteger(string); - - if (i == null) - { - i = Integer.valueOf(getShort(string)); - } - - return i; + return _properties.getInteger(string); } public long getLong(String string) throws JMSException { - - Long l = _map.getLong(string); - - if (l == null) - { - l = Long.valueOf(getInt(string)); - } - - return l; - + return _properties.getLong(string); } public float getFloat(String string) throws JMSException { - - Float f = _map.getFloat(string); - - if (f == null) - { - if (_map.containsKey(string)) - { - Object str = _map.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getFloat can't use " + string + " item."); - } - else - { - return Float.valueOf((String) str); - } - } - else - { - f = Float.valueOf(null); - } - - } - - return f; - + return _properties.getFloat(string); } public double getDouble(String string) throws JMSException { - Double d = _map.getDouble(string); - - if (d == null) - { - d = Double.valueOf(getFloat(string)); - } - - return d; + return _properties.getDouble(string); } public String getString(String string) throws JMSException { - String s = _map.getString(string); - - if (s == null) - { - if (_map.containsKey(string)) - { - Object o = _map.getObject(string); - if (o instanceof byte[]) - { - throw new MessageFormatException("getObject couldn't find " + string + " item."); - } - else - { - if (o == null) - { - return null; - } - else - { - s = String.valueOf(o); - } - } - } - } - - return s; + return _properties.getString(string); } public byte[] getBytes(String string) throws JMSException { - - byte[] result = _map.getBytes(string); - - if (result == null) - { - throw new MessageFormatException("getBytes couldn't find " + string + " item."); - } - - return result; - + return _properties.getBytes(string); } public Object getObject(String string) throws JMSException { - return _map.getObject(string); + return _properties.getObject(string); } public Enumeration getMapNames() throws JMSException { - return _map.getPropertyNames(); + return _properties.getMapNames(); } + public void setBoolean(String string, boolean b) throws JMSException { checkWritable(); - _map.setBoolean(string, b); + _properties.setBoolean(string, b); } public void setByte(String string, byte b) throws JMSException { checkWritable(); - _map.setByte(string, b); + _properties.setByte(string, b); } public void setShort(String string, short i) throws JMSException { checkWritable(); - _map.setShort(string, i); + _properties.setShort(string, i); } public void setChar(String string, char c) throws JMSException { checkWritable(); - _map.setChar(string, c); + _properties.setChar(string, c); } public void setInt(String string, int i) throws JMSException { checkWritable(); - _map.setInteger(string, i); + _properties.setInteger(string, i); } public void setLong(String string, long l) throws JMSException { checkWritable(); - _map.setLong(string, l); + _properties.setLong(string, l); } public void setFloat(String string, float v) throws JMSException { checkWritable(); - _map.setFloat(string, v); + _properties.setFloat(string, v); } public void setDouble(String string, double v) throws JMSException { checkWritable(); - _map.setDouble(string, v); + _properties.setDouble(string, v); } public void setString(String string, String string1) throws JMSException { checkWritable(); - _map.setString(string, string1); + _properties.setString(string, string1); } public void setBytes(String string, byte[] bytes) throws JMSException @@ -353,25 +244,18 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException { checkWritable(); - _map.setBytes(string, bytes, i, i1); + _properties.setBytes(string, bytes, i, i1); } public void setObject(String string, Object object) throws JMSException { checkWritable(); - _map.setObject(string, object); + _properties.setObject(string, object); } public boolean itemExists(String string) throws JMSException { - return _map.itemExists(string); - } - - public ByteBuffer getData() - { - //What if _data is null? - _map.writeToBuffer(_data); - return super.getData(); + return _properties.itemExists(string); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 7393cea714..4fb070d2ff 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -72,6 +72,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data.release(); } _data = null; + } public String toBodyString() throws JMSException @@ -97,6 +98,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { _data.rewind(); } + try { ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); @@ -108,6 +110,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { throw new MessageFormatException("Message not serializable: " + e); } + } public Serializable getObject() throws JMSException @@ -120,15 +123,18 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag try { + _data.rewind(); in = new ObjectInputStream(_data.asInputStream()); return (Serializable) in.readObject(); } catch (IOException e) - { - throw new MessageFormatException("Could not deserialize message: " + e); + { + e.printStackTrace(); + throw new MessageFormatException("Could not deserialize message: " + e); } catch (ClassNotFoundException e) { + e.printStackTrace(); throw new MessageFormatException("Could not deserialize message: " + e); } finally diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 04f3c5ee17..c2dfdc1b65 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -226,6 +226,10 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess byte wireType = readWireType(); try { + if(wireType == NULL_STRING_TYPE){ + throw new NullPointerException(); + } + if (wireType != CHAR_TYPE) { _data.position(position); @@ -428,7 +432,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess break; case NULL_STRING_TYPE: result = null; - break; + throw new NullPointerException("data is null"); case BOOLEAN_TYPE: checkAvailable(1); result = String.valueOf(readBooleanImpl()); diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index ab707bb51d..887850c06e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -104,6 +104,8 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); + frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); + frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); } diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java new file mode 100644 index 0000000000..d73e51d755 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java @@ -0,0 +1,28 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.*;
+
+public interface Message extends javax.jms.Message
+{
+ public void acknowledgeThis() throws JMSException;
+}
diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Config.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java index bd104e5407..bd104e5407 100644 --- a/java/client/src/test/java/org/apache/qpid/transacted/Config.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Ping.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java index e0af4422a6..e0af4422a6 100644 --- a/java/client/src/test/java/org/apache/qpid/transacted/Ping.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Pong.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java index 13295c137a..13295c137a 100644 --- a/java/client/src/test/java/org/apache/qpid/transacted/Pong.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Relay.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java index cede95e5f0..cede95e5f0 100644 --- a/java/client/src/test/java/org/apache/qpid/transacted/Relay.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Start.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java index 5564ed93ab..5564ed93ab 100644 --- a/java/client/src/test/java/org/apache/qpid/transacted/Start.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java diff --git a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceProvider.java b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java index 71d806b338..71d806b338 100644 --- a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceProvider.java +++ b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java diff --git a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java index a1e15258c3..a1e15258c3 100644 --- a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java +++ b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index bd68f32c23..dc0ade76c4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -36,18 +36,21 @@ public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); - static + protected void setUp() throws Exception { - String workdir = System.getProperty("QPID_WORK"); - if (workdir == null || workdir.equals("")) - { - String tempdir = System.getProperty("java.io.tmpdir"); - System.out.println("QPID_WORK not set using tmp directory: " + tempdir); - System.setProperty("QPID_WORK", tempdir); - } - DOMConfigurator.configure("../broker/etc/log4j.xml"); + super.setUp(); + TransportConnection.createVMBroker(1); } + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + //Thread.sleep(2000); + } + + + public void testRecoverResendsMsgs() throws Exception { Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); @@ -104,8 +107,74 @@ public class RecoverTest extends TestCase con.close(); } + + public void testRecoverResendsMsgsAckOnEarlier() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("someQ", "someQ", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + TextMessage tm2 = (TextMessage) consumer.receive(); + tm.acknowledge(); + _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); + + consumer.receive(); + consumer.receive(); + _logger.info("Received all four messages. Calling recover with two outstanding messages"); + // no ack for last three messages so when I call recover I expect to get three messages back + consumerSession.recover(); + TextMessage tm3 = (TextMessage) consumer.receive(3000); + assertEquals("msg3", tm3.getText()); + + TextMessage tm4 = (TextMessage) consumer.receive(3000); + assertEquals("msg4", tm4.getText()); + + + _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); + ((org.apache.qpid.jms.Message)tm3).acknowledgeThis(); + + _logger.info("Calling recover"); + // all acked so no messages to be delivered + consumerSession.recover(); + + tm4 = (TextMessage) consumer.receive(3000); + assertEquals("msg4", tm4.getText()); + ((org.apache.qpid.jms.Message)tm4).acknowledgeThis(); + + _logger.info("Calling recover"); + // all acked so no messages to be delivered + consumerSession.recover(); + + + tm = (TextMessage) consumer.receiveNoWait(); + assertNull(tm); + _logger.info("No messages redelivered as is expected"); + + con.close(); + } + + public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(RecoverTest.class)); + return new junit.framework.TestSuite(RecoverTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index 3fb1dd3f7d..f4efd64dbb 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -26,7 +26,6 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.FieldTableTest; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.testutil.VMBrokerSetup; @@ -134,7 +133,11 @@ public class FieldTableMessageTest extends TestCase implements MessageListener { ByteBuffer buffer = ((JMSBytesMessage) m).getData(); FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining()); - new FieldTableTest().assertEquivalent(_expected, actual); + for (Object o : _expected.keySet()) + { + String key = (String) o; + assertEquals("Values for " + key + " did not match", _expected.get(key), actual.get(key)); + } } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index 1a469c1d12..02a98f67d9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -20,20 +20,19 @@ */ package org.apache.qpid.test.unit.basic; +import junit.framework.Assert; +import junit.framework.TestCase; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSMapMessage; -import org.apache.qpid.testutil.VMBrokerSetup; -import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import javax.jms.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import javax.jms.*; - -import junit.framework.TestCase; -import junit.framework.Assert; public class MapMessageTest extends TestCase implements MessageListener { @@ -56,7 +55,7 @@ public class MapMessageTest extends TestCase implements MessageListener super.setUp(); try { - //TransportConnection.createVMBroker(1); + TransportConnection.createVMBroker(1); init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); } catch (Exception e) @@ -69,7 +68,7 @@ public class MapMessageTest extends TestCase implements MessageListener { _logger.info("Tearing Down unit.basic.MapMessageTest"); super.tearDown(); - //TransportConnection.killAllVMBrokers(); + TransportConnection.killAllVMBrokers(); } private void init(AMQConnection connection) throws Exception @@ -166,12 +165,12 @@ public class MapMessageTest extends TestCase implements MessageListener testMapValues(m, count); + testCorrectExceptions(m); + testMessageWriteStatus(m); testPropertyWriteStatus(m); - testCorrectExceptions(m); - count++; } } @@ -230,7 +229,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("message"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -320,7 +319,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("short"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -403,7 +402,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("long"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -494,7 +493,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("double"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -587,7 +586,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("float"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -671,7 +670,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("int"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -843,7 +842,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("bytes"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -928,7 +927,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("byte"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -1005,7 +1004,7 @@ public class MapMessageTest extends TestCase implements MessageListener m.getChar("odd"); fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -1249,6 +1248,6 @@ public class MapMessageTest extends TestCase implements MessageListener public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(MapMessageTest.class)); + return new junit.framework.TestSuite(MapMessageTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index 80af81652e..c88024f39f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,18 +19,15 @@ */ package org.apache.qpid.test.unit.basic; +import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.testutil.VMBrokerSetup; import javax.jms.*; -import junit.framework.TestCase; - public class MultipleConnectionTest extends TestCase { public static final String _defaultBroker = "vm://:1"; @@ -138,6 +135,19 @@ public class MultipleConnectionTest extends TestCase } } + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException { for (int i = 0; i < receivers.length; i++) @@ -209,6 +219,6 @@ public class MultipleConnectionTest extends TestCase public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(MultipleConnectionTest.class)); + return new junit.framework.TestSuite(MultipleConnectionTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java new file mode 100644 index 0000000000..f4814795c4 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; + +import javax.jms.*; + +/** + * @author Apache Software Foundation + */ +public class PubSubTwoConnectionTest extends TestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + /** + * This tests that a consumer is set up synchronously + * @throws Exception + */ + public void testTwoConnections() throws Exception + { + Topic topic = new AMQTopic("MyTopic"); + Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "/test_path"); + Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE); + MessageProducer producer = session1.createProducer(topic); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "/test_path"); + Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + producer.send(session1.createTextMessage("Hello")); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("Hello", tm1.getText()); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index 04ad15da7a..903f6a9da9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -21,11 +21,8 @@ package org.apache.qpid.test.unit.basic; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.testutil.VMBrokerSetup; import org.apache.log4j.Logger; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java index 7ffb3ca469..a0e4aa9787 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java @@ -552,16 +552,6 @@ public class BytesMessageTest extends TestCase assertEquals((byte)0, result[2]); } - public void testToBodyString() throws Exception - { - JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); - final String testText = "This is a test"; - bm.writeUTF(testText); - bm.reset(); - String result = bm.toBodyString(); - assertEquals(testText, result); - } - public void testToBodyStringWithNull() throws Exception { JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java index e5458fd89e..bd4b3b3987 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java @@ -14,18 +14,16 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.test.unit.client.message; -import junit.framework.TestCase; import junit.framework.Assert; -import org.apache.qpid.framing.PropertyFieldTable; +import junit.framework.TestCase; import org.apache.qpid.client.message.JMSMapMessage; import org.apache.qpid.client.message.TestMessageHelper; -import org.apache.log4j.Logger; import javax.jms.JMSException; import javax.jms.MessageFormatException; @@ -104,6 +102,27 @@ public class MapMessageTest extends TestCase { Assert.fail("JMSException received." + e); } + + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + + mm.setString("value", null); + char c = mm.getChar("value"); + fail("Expected NullPointerException"); + + } + catch (NullPointerException e) + { + ; // pass + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + + + } public void testDoubleLookup() @@ -204,7 +223,7 @@ public class MapMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getByte("random"); - Assert.fail("MessageFormatException expected"); + Assert.fail("NumberFormatException expected"); } catch (NumberFormatException e) { @@ -331,7 +350,7 @@ public class MapMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getShort("random"); - Assert.fail("NumberFormatException should be received."); + Assert.fail("NumberFormatException should be received."); } catch (NumberFormatException e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index 07eedc8bb9..bbd1870168 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -167,6 +167,12 @@ public class ObjectMessageTest extends TestCase implements MessageListener } + public void testSetObjectForNull() throws Exception + { + ObjectMessage msg = session.createObjectMessage(); + msg.setObject(null); + assertNull(msg.getObject()); + } private void send() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java index 337b0f3bbc..64d10fb13f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,8 +22,13 @@ package org.apache.qpid.test.unit.client.message; import org.apache.qpid.client.message.TestMessageHelper; import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.JMSMapMessage; import junit.framework.TestCase; +import junit.framework.Assert; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; public class TextMessageTest extends TestCase { @@ -47,6 +52,248 @@ public class TextMessageTest extends TestCase assertEquals(val, "Banana"); } + + public void testBooleanPropertyLookup() + { + try + { + JMSTextMessage tm = TestMessageHelper.newJMSTextMessage(); + + tm.setBooleanProperty("value", true); + Assert.assertEquals(true, tm.getBooleanProperty("value")); + Assert.assertEquals("true", tm.getStringProperty("value")); + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + public void testBytePropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.setByteProperty("value", Byte.MAX_VALUE); + + Assert.assertEquals(Byte.MAX_VALUE, mm.getByteProperty("value")); + Assert.assertEquals((short) Byte.MAX_VALUE, mm.getShortProperty("value")); + Assert.assertEquals(Byte.MAX_VALUE, mm.getIntProperty("value")); + Assert.assertEquals((long) Byte.MAX_VALUE, mm.getLongProperty("value")); + Assert.assertEquals("" + Byte.MAX_VALUE, mm.getStringProperty("value")); + + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + public void testShortPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.setShortProperty("value", Short.MAX_VALUE); + Assert.assertEquals(Short.MAX_VALUE, mm.getShortProperty("value")); + Assert.assertEquals((int) Short.MAX_VALUE, mm.getIntProperty("value")); + Assert.assertEquals((long) Short.MAX_VALUE, mm.getLongProperty("value")); + Assert.assertEquals("" + Short.MAX_VALUE, mm.getStringProperty("value")); + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + public void testDoublePropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.setDoubleProperty("value", Double.MAX_VALUE); + Assert.assertEquals(Double.MAX_VALUE, mm.getDoubleProperty("value")); + Assert.assertEquals("" + Double.MAX_VALUE, mm.getStringProperty("value")); + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + public void testFloatPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.setFloatProperty("value", Float.MAX_VALUE); + Assert.assertEquals(Float.MAX_VALUE, mm.getFloatProperty("value")); + Assert.assertEquals((double) Float.MAX_VALUE, mm.getDoubleProperty("value")); + Assert.assertEquals("" + Float.MAX_VALUE, mm.getStringProperty("value")); + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + public void testIntPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.setIntProperty("value", Integer.MAX_VALUE); + Assert.assertEquals(Integer.MAX_VALUE, mm.getIntProperty("value")); + Assert.assertEquals((long) Integer.MAX_VALUE, mm.getLongProperty("value")); + Assert.assertEquals("" + Integer.MAX_VALUE, mm.getStringProperty("value")); + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + public void testLongPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.setLongProperty("value", Long.MAX_VALUE); + Assert.assertEquals(Long.MAX_VALUE, mm.getLongProperty("value")); + Assert.assertEquals("" + Long.MAX_VALUE, mm.getStringProperty("value")); + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + + // Failed Lookups + + public void testFailedBooleanPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + Assert.assertEquals(false, mm.getBooleanProperty("int")); + } + catch (JMSException e) + { + Assert.fail("JMSException received." + e); + } + } + + public void testFailedBytePropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.getByteProperty("random"); + Assert.fail("NumberFormatException expected"); + } + catch (NumberFormatException e) + { + //normal execution + } + catch (JMSException e) + { + Assert.fail("JMSException received:" + e); + } + + } + + public void testFailedDoublePropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.getDoubleProperty("random"); + Assert.fail("NullPointerException should be received."); + } + catch (NullPointerException e) + { + //normal execution + } + catch (JMSException e) + { + Assert.fail("JMSException received:" + e); + } + } + + public void testFailedFloatPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.getFloatProperty("random"); + Assert.fail("NullPointerException should be received."); + } + catch (NullPointerException e) + { + //normal execution + } + catch (JMSException e) + { + Assert.fail("JMSException received:" + e); + } + } + + public void testFailedIntPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.getIntProperty("random"); + Assert.fail("NumberFormatException should be received."); + } + catch (NumberFormatException e) + { + //normal execution + } + catch (JMSException e) + { + Assert.fail("JMSException received:" + e); + } + } + + public void testFailedLongPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.getLongProperty("random"); + Assert.fail("NumberFormatException should be received."); + } + catch (NumberFormatException e) + { + //normal execution + } + catch (JMSException e) + { + Assert.fail("JMSException received:" + e); + } + } + + public void testFailedShortPropertyLookup() + { + try + { + JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); + mm.getShortProperty("random"); + Assert.fail("NumberFormatException should be received."); + } + catch (NumberFormatException e) + { + //normal execution + } + catch (JMSException e) + { + Assert.fail("JMSException received:" + e); + } + } + + public static junit.framework.Test suite() { return new junit.framework.TestSuite(TextMessageTest.class); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java new file mode 100644 index 0000000000..1e13962f01 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -0,0 +1,80 @@ +package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+
+public class TemporaryQueueTest extends TestCase
+{
+
+ String _broker = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+
+ protected Connection createConnection() throws AMQException, URLSyntaxException
+ {
+ return new AMQConnection(_broker, "guest", "guest",
+ "fred", "/test");
+ }
+
+ public void testTempoaryQueue() throws Exception
+ {
+ Connection conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello",tm.getText());
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch(JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ queue.delete();
+ }
+ catch(JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ conn.close();
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TemporaryQueueTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 50944730c3..315ba6ae4c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -73,7 +73,7 @@ public class StreamMessageTest extends TestCase MessageProducer mandatoryProducer = producerSession.createProducer(queue); // Third test - should be routed - _logger.info("Sending routable message"); + _logger.info("Sending isBound message"); StreamMessage msg = producerSession.createStreamMessage(); msg.setStringProperty("F1000","1"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index fa46a4bcfb..14ceaa75f1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -26,10 +26,8 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; -import javax.jms.TopicSession; -import javax.jms.TextMessage; -import javax.jms.TopicPublisher; -import javax.jms.MessageConsumer; +import javax.jms.*; + /** * @author Apache Software Foundation @@ -46,12 +44,126 @@ public class TopicSessionTest extends TestCase { super.tearDown(); TransportConnection.killAllVMBrokers(); + //Thread.sleep(2000); } - public void testTextMessageCreation() throws Exception + + public void testTopicSubscriptionUnsubscription() throws Exception { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0"); + TopicPublisher publisher = session1.createPublisher(topic); + + con.start(); + + TextMessage tm = session1.createTextMessage("Hello"); + publisher.publish(tm); + + tm = (TextMessage) sub.receive(2000); + assertNotNull(tm); + + session1.unsubscribe("subscription0"); + + try + { + session1.unsubscribe("not a subscription"); + fail("expected InvalidDestinationException when unsubscribing from unknown subscription"); + } + catch(InvalidDestinationException e) + { + ; // PASS + } + catch(Exception e) + { + fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e); + } + + con.close(); + } + + public void testSubscriptionNameReuseForDifferentTopicSingleConnection() throws Exception + { + subscriptionNameReuseForDifferentTopic(false); + } + + public void testSubscriptionNameReuseForDifferentTopicTwoConnections() throws Exception + { + subscriptionNameReuseForDifferentTopic(true); + } + + private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown)); + AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown)); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); + TopicPublisher publisher = session1.createPublisher(null); + + con.start(); + + publisher.publish(topic, session1.createTextMessage("hello")); + TextMessage m = (TextMessage) sub.receive(2000); + assertNotNull(m); + + if (shutdown) + { + session1.close(); + con.close(); + con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + con.start(); + session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + publisher = session1.createPublisher(null); + } + TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0"); + publisher.publish(topic, session1.createTextMessage("hello")); + if (!shutdown) + { + m = (TextMessage) sub.receive(2000); + assertNull(m); + } + publisher.publish(topic2, session1.createTextMessage("goodbye")); + m = (TextMessage) sub2.receive(2000); + assertNotNull(m); + assertEquals("goodbye", m.getText()); + con.close(); + } + + public void testUnsubscriptionAfterConnectionClose() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic3"); + AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicPublisher publisher = session1.createPublisher(topic); + + AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); + + con2.start(); + + publisher.publish(session1.createTextMessage("Hello")); + TextMessage tm = (TextMessage) sub.receive(2000); + assertNotNull(tm); + con2.close(); + publisher.publish(session1.createTextMessage("Hello2")); + con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + sub = session2.createDurableSubscriber(topic, "subscription0"); + con2.start(); + tm = (TextMessage) sub.receive(2000); + assertNotNull(tm); + assertEquals("Hello2", tm.getText()); + con1.close(); + con2.close(); + } + + public void testTextMessageCreation() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic4"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); @@ -85,8 +197,83 @@ public class TopicSessionTest extends TestCase tm = (TextMessage) consumer1.receive(2000); assertNotNull(tm); assertEquals("Empty string not returned", "", msgText); + con.close(); } + public void testSendingSameMessage() throws Exception + { + AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + TopicPublisher producer = session.createPublisher(topic); + MessageConsumer consumer = session.createConsumer(topic); + conn.start(); + TextMessage sentMessage = session.createTextMessage("Test Message"); + producer.send(sentMessage); + TextMessage receivedMessage = (TextMessage) consumer.receive(2000); + assertNotNull(receivedMessage); + assertEquals(sentMessage.getText(),receivedMessage.getText()); + producer.send(sentMessage); + receivedMessage = (TextMessage) consumer.receive(2000); + assertNotNull(receivedMessage); + assertEquals(sentMessage.getText(),receivedMessage.getText()); + + + } + + public void testTemporaryTopic() throws Exception + { + AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + TopicPublisher producer = session.createPublisher(topic); + MessageConsumer consumer = session.createConsumer(topic); + conn.start(); + producer.send(session.createTextMessage("hello")); + TextMessage tm = (TextMessage) consumer.receive(2000); + assertNotNull(tm); + assertEquals("hello",tm.getText()); + + try + { + topic.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); + } + catch(JMSException je) + { + ; //pass + } + + consumer.close(); + + try + { + topic.delete(); + } + catch(JMSException je) + { + fail("Unexpected Exception: " + je.getMessage()); + } + + TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + MessageConsumer consumer2 = session2.createConsumer(topic); + fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session"); + } + catch (JMSException je) + { + ; // pass + } + + + + conn.close(); + } + + public static junit.framework.Test suite() { return new junit.framework.TestSuite(TopicSessionTest.class); diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index e858e1ad36..90a11307b8 100644 --- a/java/client/src/old_test/java/org/apache/qpid/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -23,8 +23,6 @@ package org.apache.qpid.test.unit.transacted; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.testutil.VMBrokerSetup; import javax.jms.*; |