diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-15 14:12:24 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-15 14:12:24 +0000 |
commit | 75793016c7cc8fc156411041e4399243aadc563e (patch) | |
tree | 418a201fa99bf12b32f5b77722ce92ed34e66e2f /java | |
parent | 6ca88beeea89e37ec725e5e99bada7ae48d2870f (diff) | |
download | qpid-python-75793016c7cc8fc156411041e4399243aadc563e.tar.gz |
QPID-183 Patch supplied by Rob Godfrey. Major changes to durable topic subscription handling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
23 files changed, 844 insertions, 219 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 6dc97f9e48..ffd0e5d8ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -197,4 +197,34 @@ public class DestNameExchange extends AbstractExchange } } } + + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + final List<AMQQueue> queues = _index.get(routingKey); + return queues != null && queues.contains(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + final List<AMQQueue> queues = _index.get(routingKey); + return queues != null && !queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + Map<String, List<AMQQueue>> bindings = _index.getBindingsMap(); + for (List<AMQQueue> queues : bindings.values()) + { + if (queues.contains(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_index.getBindingsMap().isEmpty(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index a692f9ebca..139307488e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -126,10 +126,11 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; assert routingKey != null; + _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); // if we got null back, no previous value was associated with the specified routing key hence @@ -159,6 +160,8 @@ public class DestWildExchange extends AbstractExchange // TODO: add support for the immediate flag if (queues == null) { + _logger.warn("No queues found for routing key " + routingKey); + _logger.warn("Routing map contains: " + _routingKey2queues); //todo Check for valid topic - mritchie return; } @@ -172,7 +175,37 @@ public class DestWildExchange extends AbstractExchange } } - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + List<AMQQueue> queues = _routingKey2queues.get(routingKey); + return queues != null && queues.contains(queue); + } + + + public boolean isBound(String routingKey) throws AMQException + { + List<AMQQueue> queues = _routingKey2queues.get(routingKey); + return queues != null && !queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + for (List<AMQQueue> queues : _routingKey2queues.values()) + { + if (queues.contains(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_routingKey2queues.isEmpty(); + } + + public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException { assert queue != null; assert routingKey != null; @@ -190,6 +223,10 @@ public class DestWildExchange extends AbstractExchange throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey); } + if (queues.isEmpty()) + { + _routingKey2queues.remove(queues); + } } protected ExchangeMBean createMBean() throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 787d0eddfd..824e85dc5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,4 +47,30 @@ public interface Exchange void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException; void route(AMQMessage message) throws AMQException; + + /** + * Determines whether a message would be isBound to a particular queue using a specific routing key + * @param routingKey + * @param queue + * @return + * @throws AMQException + */ + boolean isBound(String routingKey, AMQQueue queue) throws AMQException; + + /** + * Determines whether a message is routing to any queue using a specific routing key + * @param routingKey + * @return + * @throws AMQException + */ + boolean isBound(String routingKey) throws AMQException; + + boolean isBound(AMQQueue queue) throws AMQException; + + /** + * Returns true if this exchange has at least one binding associated with it. + * @return + * @throws AMQException + */ + boolean hasBindings() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 3fa73aa2e2..8c4df68dea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -221,6 +221,33 @@ public class HeadersExchange extends AbstractExchange } } + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + return isBound(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + return hasBindings(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + for (Registration r : _bindings) + { + if (r.queue.equals(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_bindings.isEmpty(); + } + protected Map getHeaders(ContentHeaderBody contentHeaderFrame) { //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java index fb48729c9e..485c4739bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQQueue; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -37,7 +38,7 @@ class Index private ConcurrentMap<String, List<AMQQueue>> _index = new ConcurrentHashMap<String, List<AMQQueue>>(); - boolean add(String key, AMQQueue queue) + synchronized boolean add(String key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if(queues == null) @@ -61,7 +62,7 @@ class Index } } - boolean remove(String key, AMQQueue queue) + synchronized boolean remove(String key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if (queues != null) @@ -83,6 +84,6 @@ class Index Map<String, List<AMQQueue>> getBindingsMap() { - return _index; + return new HashMap<String, List<AMQQueue>>(_index); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java new file mode 100644 index 0000000000..5aaf78d6b7 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -0,0 +1,163 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ExchangeBoundBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +/** + * @author Apache Software Foundation + */ +public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody> +{ + private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler(); + + public static final int OK = 0; + + public static final int EXCHANGE_NOT_FOUND = 1; + + public static final int QUEUE_NOT_FOUND = 2; + + public static final int NO_BINDINGS = 3; + + public static final int QUEUE_NOT_BOUND = 4; + + public static final int NO_QUEUE_BOUND_WITH_RK = 5; + + public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; + + public static ExchangeBoundHandler getInstance() + { + return _instance; + } + + private ExchangeBoundHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, + AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException + { + ExchangeBoundBody body = evt.getMethod(); + + String exchangeName = body.exchange; + String queueName = body.queue; + String routingKey = body.routingKey; + if (exchangeName == null) + { + throw new AMQException("Exchange exchange must not be null"); + } + Exchange exchange = exchangeRegistry.getExchange(exchangeName); + AMQFrame response; + if (exchange == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND, + "Exchange " + exchangeName + " not found"); + } + else if (routingKey == null) + { + if (queueName == null) + { + if (exchange.hasBindings()) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null); + } + } + else + { + AMQQueue queue = queueRegistry.getQueue(queueName); + if (queue == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, + "Queue " + queueName + " not found"); + } + else + { + if (exchange.isBound(queue)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND, + "Queue " + queueName + " not bound to exchange " + + exchangeName); + } + } + } + } + else if (queueName != null) + { + AMQQueue queue = queueRegistry.getQueue(queueName); + if (queue == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, + "Queue " + queueName + " not found"); + } + else + { + if (exchange.isBound(body.routingKey, queue)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, + null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, + "Queue " + queueName + + " not bound with routing key " + + body.routingKey + " to exchange " + + exchangeName); + } + } + } + else + { + if (exchange.isBound(body.routingKey)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, + null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + NO_QUEUE_BOUND_WITH_RK, + "No queue bound with routing key " + + body.routingKey + " to exchange " + + exchangeName); + } + } + protocolSession.writeFrame(response); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 5e88ff7f2d..4e9deeb8db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -109,6 +109,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance()); frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance()); + frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance()); frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance()); frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance()); frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 5c13e7861f..c6f3f9c492 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -79,13 +79,19 @@ public abstract class AMQDestination implements Destination, Referenceable protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, boolean isAutoDelete, String queueName) { + this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false); + } + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, + boolean isAutoDelete, String queueName, boolean isDurable) + { if (destinationName == null) { - throw new IllegalArgumentException("Destination name must not be null"); + throw new IllegalArgumentException("Destination exchange must not be null"); } if (exchangeName == null) { - throw new IllegalArgumentException("Exchange name must not be null"); + throw new IllegalArgumentException("Exchange exchange must not be null"); } if (exchangeClass == null) { @@ -97,6 +103,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isExclusive = isExclusive; _isAutoDelete = isAutoDelete; _queueName = queueName; + _isDurable = isDurable; } public String getEncodedName() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0cbf5bac60..d149683646 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -23,13 +23,15 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.server.handler.ExchangeBoundHandler; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; @@ -66,6 +68,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; /** + * Used to reference durable subscribers so they requests for unsubscribe can be handled + * correctly. Note this only keeps a record of subscriptions which have been created + * in the current instance. It does not remember subscriptions between executions of the + * client + */ + private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = + new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + + /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait * feature. */ @@ -1087,19 +1098,53 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - * If a name is reused in creating a new subscriber with a different topic/selecto or no-local - * flag then the subcriber will receive messages matching the old subscription AND the new one. - * The spec states that the new one should replace the old one. - * TODO: fix it. - */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getQueueName()) && + !isQueueBound(dest.getQueueName(), topic.getTopicName())) + { + deleteSubscriptionQueue(dest.getQueueName()); + } + } + + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + _subscriptions.put(name,subscriber); + + return subscriber; + } + + private void deleteSubscriptionQueue(String queueName) throws JMSException + { + try + { + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false, + false, true); + _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } /** @@ -1110,9 +1155,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - return new TopicSubscriberAdaptor(dest, consumer); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name,subscriber); + return subscriber; } public TopicPublisher createPublisher(Topic topic) throws JMSException @@ -1150,20 +1197,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void unsubscribe(String name) throws JMSException { checkNotClosed(); - - String queue = _connection.getClientID() + ":" + name; - - AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null); - - try { - AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class); - // if this method doen't throw an exception means we have received a queue declare ok. - } catch (AMQException e) { - throw new javax.jms.InvalidDestinationException("This destination doesn't exist"); - } - //send a queue.delete for the subscription - AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); - _connection.getProtocolHandler().writeFrame(frame); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + // send a queue.delete for the subscription + deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + _subscriptions.remove(name); + } + else + { + if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection))) + { + deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + } + else + { + throw new InvalidDestinationException("Unknown subscription exchange:" + name); + } + } + } + + private boolean isQueueBound(String queueName) throws JMSException + { + return isQueueBound(queueName, null); + } + + private boolean isQueueBound(String queueName, String routingKey) throws JMSException + { + AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME, + routingKey, queueName); + AMQMethodEvent response = null; + try + { + response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + return (responseBody.replyCode == ExchangeBoundHandler.OK); } private void checkTransacted() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 4dd38eea18..39304f3f4c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,20 +40,25 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(String name) { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null); - _isDurable = false; + this(name, true, null, false); } - /** - * Constructor for use in creating a topic to represent a durable subscription - * @param topic - * @param clientId - * @param subscriptionName - */ - public AMQTopic(AMQTopic topic, String clientId, String subscriptionName) + public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable) + { + super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, + queueName, isDurable); + } + + public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) + throws JMSException + { + return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection), + true); + } + + public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName); - _isDurable = true; + return connection.getClientID() + ":" + subscriptionName; } public String getTopicName() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4fb62b49fc..2448e14542 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -303,6 +303,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { + _logger.warn("Interrupted: " + e, e); return null; } finally @@ -531,18 +532,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } public void setConsumerTag(String consumerTag) - { + { _consumerTag = consumerTag; } public AMQSession getSession() { return _session; } - + private void checkPreConditions() throws JMSException{ - + this.checkNotClosed(); - + if(_session == null || _session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java new file mode 100644 index 0000000000..34ec49436e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +/** + * @author Apache Software Foundation + */ +public class JMSAMQException extends JMSException +{ + public JMSAMQException(AMQException s) + { + super(s.getMessage(), String.valueOf(s.getErrorCode())); + setLinkedException(s); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java new file mode 100644 index 0000000000..858726745e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ExchangeBoundOkBody; + +/** + * @author Apache Software Foundation + */ +public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class); + private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler(); + + public static ExchangeBoundOkMethodHandler getInstance() + { + return _instance; + } + + private ExchangeBoundOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod(); + _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " + + body.replyText); + } + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java new file mode 100644 index 0000000000..3271a715a2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.log4j.Logger; + +/** + * @author Apache Software Foundation + */ +public class QueueDeleteOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class); + private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler(); + + public static QueueDeleteOkMethodHandler getInstance() + { + return _instance; + } + + private QueueDeleteOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod(); + _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount); + } + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index ab707bb51d..887850c06e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -104,6 +104,8 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); + frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); + frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index 823406daa4..02a98f67d9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -20,20 +20,19 @@ */ package org.apache.qpid.test.unit.basic; +import junit.framework.Assert; +import junit.framework.TestCase; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSMapMessage; -import org.apache.qpid.testutil.VMBrokerSetup; -import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import javax.jms.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import javax.jms.*; - -import junit.framework.TestCase; -import junit.framework.Assert; public class MapMessageTest extends TestCase implements MessageListener { @@ -56,7 +55,7 @@ public class MapMessageTest extends TestCase implements MessageListener super.setUp(); try { - //TransportConnection.createVMBroker(1); + TransportConnection.createVMBroker(1); init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); } catch (Exception e) @@ -69,7 +68,7 @@ public class MapMessageTest extends TestCase implements MessageListener { _logger.info("Tearing Down unit.basic.MapMessageTest"); super.tearDown(); - //TransportConnection.killAllVMBrokers(); + TransportConnection.killAllVMBrokers(); } private void init(AMQConnection connection) throws Exception @@ -166,12 +165,12 @@ public class MapMessageTest extends TestCase implements MessageListener testMapValues(m, count); - testPropertyWriteStatus(m); - testCorrectExceptions(m); testMessageWriteStatus(m); + testPropertyWriteStatus(m); + count++; } } @@ -207,9 +206,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (NumberFormatException nfe) { //normal execution } @@ -217,9 +216,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (NumberFormatException nfe) { //normal execution } @@ -228,9 +227,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -238,18 +237,18 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getInt("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (NumberFormatException nfe) { //normal execution } try { m.getLong("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (NumberFormatException nfe) { //normal execution } @@ -258,9 +257,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (NumberFormatException nfe) { //normal execution } @@ -268,9 +267,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (NumberFormatException nfe) { //normal execution } @@ -278,7 +277,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("message"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -295,7 +294,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("short"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -305,7 +304,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("short"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -318,9 +317,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("short"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -333,7 +332,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("short"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -343,7 +342,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("short"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -353,7 +352,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("short"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -370,7 +369,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -380,7 +379,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -390,7 +389,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -401,9 +400,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -411,7 +410,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getInt("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -424,7 +423,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -434,7 +433,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -444,7 +443,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("long"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -461,7 +460,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -471,7 +470,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -481,7 +480,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -492,9 +491,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -502,7 +501,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getInt("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -511,7 +510,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getLong("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -522,7 +521,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -536,7 +535,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("double"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -554,7 +553,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("float"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -564,7 +563,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("float"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -574,7 +573,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("float"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -585,9 +584,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("float"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -595,7 +594,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getInt("float"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -604,7 +603,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getLong("float"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -620,7 +619,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("float"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -638,7 +637,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("int"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -648,7 +647,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("int"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -658,7 +657,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("int"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -669,9 +668,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("int"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -684,7 +683,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("int"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -694,7 +693,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("int"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -704,7 +703,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("int"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -722,7 +721,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -732,7 +731,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -742,7 +741,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -754,7 +753,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getInt("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -763,7 +762,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getLong("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -774,7 +773,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -784,7 +783,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -794,7 +793,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("char"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -810,7 +809,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -820,7 +819,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -830,7 +829,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -841,9 +840,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -851,7 +850,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getInt("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -861,7 +860,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getLong("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -872,7 +871,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -882,7 +881,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -895,7 +894,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getString("bytes"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -911,7 +910,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBoolean("byte"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -926,9 +925,9 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("byte"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } - catch (MessageFormatException nfe) + catch (MessageFormatException npe) { //normal execution } @@ -942,7 +941,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("byte"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -952,7 +951,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("byte"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -962,7 +961,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("byte"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -982,7 +981,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getByte("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -993,7 +992,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getShort("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -1003,7 +1002,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getChar("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException npe) { @@ -1013,7 +1012,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getInt("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -1023,7 +1022,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getLong("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -1033,7 +1032,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getFloat("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -1043,7 +1042,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getDouble("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -1053,7 +1052,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { m.getBytes("odd"); - fail("MessageFormatException expected as value doesn't exist."); + fail("Exception Expected."); } catch (MessageFormatException nfe) { @@ -1224,6 +1223,7 @@ public class MapMessageTest extends TestCase implements MessageListener { synchronized(received) { + _logger.info("****************** Recevied Messgage:" + (JMSMapMessage) message); received.add((JMSMapMessage) message); received.notify(); } @@ -1248,6 +1248,6 @@ public class MapMessageTest extends TestCase implements MessageListener public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(MapMessageTest.class)); + return new junit.framework.TestSuite(MapMessageTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index 80af81652e..c88024f39f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,18 +19,15 @@ */ package org.apache.qpid.test.unit.basic; +import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.testutil.VMBrokerSetup; import javax.jms.*; -import junit.framework.TestCase; - public class MultipleConnectionTest extends TestCase { public static final String _defaultBroker = "vm://:1"; @@ -138,6 +135,19 @@ public class MultipleConnectionTest extends TestCase } } + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException { for (int i = 0; i < receivers.length; i++) @@ -209,6 +219,6 @@ public class MultipleConnectionTest extends TestCase public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(MultipleConnectionTest.class)); + return new junit.framework.TestSuite(MultipleConnectionTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java index 7ffb3ca469..a0e4aa9787 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java @@ -552,16 +552,6 @@ public class BytesMessageTest extends TestCase assertEquals((byte)0, result[2]); } - public void testToBodyString() throws Exception - { - JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); - final String testText = "This is a test"; - bm.writeUTF(testText); - bm.reset(); - String result = bm.toBodyString(); - assertEquals(testText, result); - } - public void testToBodyStringWithNull() throws Exception { JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java index 58e62e8252..276067a28d 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java @@ -14,18 +14,16 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.test.unit.client.message; -import junit.framework.TestCase; import junit.framework.Assert; -import org.apache.qpid.framing.PropertyFieldTable; +import junit.framework.TestCase; import org.apache.qpid.client.message.JMSMapMessage; import org.apache.qpid.client.message.TestMessageHelper; -import org.apache.log4j.Logger; import javax.jms.JMSException; import javax.jms.MessageFormatException; @@ -204,9 +202,9 @@ public class MapMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getByte("random"); - Assert.fail("MessageFormatException expected"); + Assert.fail("NumberFormatException expected"); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } @@ -295,9 +293,9 @@ public class MapMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getInt("random"); - Assert.fail("MessageFormatException should be received."); + Assert.fail("NumberFormatException should be received."); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } @@ -313,9 +311,9 @@ public class MapMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getLong("random"); - Assert.fail("MessageFormatException should be received."); + Assert.fail("NumberFormatException should be received."); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } @@ -331,9 +329,9 @@ public class MapMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getShort("random"); - Assert.fail("MessageFormatException should be received."); + Assert.fail("NumberFormatException should be received."); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java index 1345c0defb..64d10fb13f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -190,9 +190,9 @@ public class TextMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getByteProperty("random"); - Assert.fail("MessageFormatException expected"); + Assert.fail("NumberFormatException expected"); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } @@ -245,9 +245,9 @@ public class TextMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getIntProperty("random"); - Assert.fail("MessageFormatException should be received."); + Assert.fail("NumberFormatException should be received."); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } @@ -263,9 +263,9 @@ public class TextMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getLongProperty("random"); - Assert.fail("MessageFormatException should be received."); + Assert.fail("NumberFormatException should be received."); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } @@ -281,9 +281,9 @@ public class TextMessageTest extends TestCase { JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.getShortProperty("random"); - Assert.fail("MessageFormatException should be received."); + Assert.fail("NumberFormatException should be received."); } - catch (MessageFormatException e) + catch (NumberFormatException e) { //normal execution } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 98e355b0da..eee9b2de9f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -87,19 +87,19 @@ public class AMQProtocolSessionTest extends TestCase _testSession.getMinaProtocolSession().setLocalPort(_port); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue name from an address with special chars",_generatedAddress,testAddress); + assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress); //test empty address _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue name from an empty address",_generatedAddress_2,testAddress); + assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress); //test address with no special chars _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue name from an address with no special chars",_generatedAddress_3,testAddress); + assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 50944730c3..315ba6ae4c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -73,7 +73,7 @@ public class StreamMessageTest extends TestCase MessageProducer mandatoryProducer = producerSession.createProducer(queue); // Third test - should be routed - _logger.info("Sending routable message"); + _logger.info("Sending isBound message"); StreamMessage msg = producerSession.createStreamMessage(); msg.setStringProperty("F1000","1"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index fa46a4bcfb..3b6e3517c2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -26,10 +26,8 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; -import javax.jms.TopicSession; -import javax.jms.TextMessage; -import javax.jms.TopicPublisher; -import javax.jms.MessageConsumer; +import javax.jms.*; + /** * @author Apache Software Foundation @@ -46,12 +44,126 @@ public class TopicSessionTest extends TestCase { super.tearDown(); TransportConnection.killAllVMBrokers(); + //Thread.sleep(2000); } - public void testTextMessageCreation() throws Exception + + public void testTopicSubscriptionUnsubscription() throws Exception { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0"); + TopicPublisher publisher = session1.createPublisher(topic); + + con.start(); + + TextMessage tm = session1.createTextMessage("Hello"); + publisher.publish(tm); + + tm = (TextMessage) sub.receive(2000); + assertNotNull(tm); + + session1.unsubscribe("subscription0"); + + try + { + session1.unsubscribe("not a subscription"); + fail("expected InvalidDestinationException when unsubscribing from unknown subscription"); + } + catch(InvalidDestinationException e) + { + ; // PASS + } + catch(Exception e) + { + fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e); + } + + con.close(); + } + + public void testSubscriptionNameReuseForDifferentTopicSingleConnection() throws Exception + { + subscriptionNameReuseForDifferentTopic(false); + } + + public void testSubscriptionNameReuseForDifferentTopicTwoConnections() throws Exception + { + subscriptionNameReuseForDifferentTopic(true); + } + + private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown)); + AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown)); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); + TopicPublisher publisher = session1.createPublisher(null); + + con.start(); + + publisher.publish(topic, session1.createTextMessage("hello")); + TextMessage m = (TextMessage) sub.receive(2000); + assertNotNull(m); + + if (shutdown) + { + session1.close(); + con.close(); + con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + con.start(); + session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + publisher = session1.createPublisher(null); + } + TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0"); + publisher.publish(topic, session1.createTextMessage("hello")); + if (!shutdown) + { + m = (TextMessage) sub.receive(2000); + assertNull(m); + } + publisher.publish(topic2, session1.createTextMessage("goodbye")); + m = (TextMessage) sub2.receive(2000); + assertNotNull(m); + assertEquals("goodbye", m.getText()); + con.close(); + } + + public void testUnsubscriptionAfterConnectionClose() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic3"); + AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicPublisher publisher = session1.createPublisher(topic); + + AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); + + con2.start(); + + publisher.publish(session1.createTextMessage("Hello")); + TextMessage tm = (TextMessage) sub.receive(2000); + assertNotNull(tm); + con2.close(); + publisher.publish(session1.createTextMessage("Hello2")); + con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + sub = session2.createDurableSubscriber(topic, "subscription0"); + con2.start(); + tm = (TextMessage) sub.receive(2000); + assertNotNull(tm); + assertEquals("Hello2", tm.getText()); + con1.close(); + con2.close(); + } + + public void testTextMessageCreation() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic4"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); @@ -85,6 +197,7 @@ public class TopicSessionTest extends TestCase tm = (TextMessage) consumer1.receive(2000); assertNotNull(tm); assertEquals("Empty string not returned", "", msgText); + con.close(); } public static junit.framework.Test suite() |