summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-15 14:12:24 +0000
committerRobert Greig <rgreig@apache.org>2006-12-15 14:12:24 +0000
commit75793016c7cc8fc156411041e4399243aadc563e (patch)
tree418a201fa99bf12b32f5b77722ce92ed34e66e2f /java
parent6ca88beeea89e37ec725e5e99bada7ae48d2870f (diff)
downloadqpid-python-75793016c7cc8fc156411041e4399243aadc563e.tar.gz
QPID-183 Patch supplied by Rob Godfrey. Major changes to durable topic subscription handling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java163
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java125
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java198
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java20
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java125
23 files changed, 844 insertions, 219 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 6dc97f9e48..ffd0e5d8ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -197,4 +197,34 @@ public class DestNameExchange extends AbstractExchange
}
}
}
+
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
+ for (List<AMQQueue> queues : bindings.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_index.getBindingsMap().isEmpty();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index a692f9ebca..139307488e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -126,10 +126,11 @@ public class DestWildExchange extends AbstractExchange
} // End of MBean class
- public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
// if we got null back, no previous value was associated with the specified routing key hence
@@ -159,6 +160,8 @@ public class DestWildExchange extends AbstractExchange
// TODO: add support for the immediate flag
if (queues == null)
{
+ _logger.warn("No queues found for routing key " + routingKey);
+ _logger.warn("Routing map contains: " + _routingKey2queues);
//todo Check for valid topic - mritchie
return;
}
@@ -172,7 +175,37 @@ public class DestWildExchange extends AbstractExchange
}
}
- public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (List<AMQQueue> queues : _routingKey2queues.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_routingKey2queues.isEmpty();
+ }
+
+ public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
{
assert queue != null;
assert routingKey != null;
@@ -190,6 +223,10 @@ public class DestWildExchange extends AbstractExchange
throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey);
}
+ if (queues.isEmpty())
+ {
+ _routingKey2queues.remove(queues);
+ }
}
protected ExchangeMBean createMBean() throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 787d0eddfd..824e85dc5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,4 +47,30 @@ public interface Exchange
void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException;
void route(AMQMessage message) throws AMQException;
+
+ /**
+ * Determines whether a message would be isBound to a particular queue using a specific routing key
+ * @param routingKey
+ * @param queue
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey, AMQQueue queue) throws AMQException;
+
+ /**
+ * Determines whether a message is routing to any queue using a specific routing key
+ * @param routingKey
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey) throws AMQException;
+
+ boolean isBound(AMQQueue queue) throws AMQException;
+
+ /**
+ * Returns true if this exchange has at least one binding associated with it.
+ * @return
+ * @throws AMQException
+ */
+ boolean hasBindings() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 3fa73aa2e2..8c4df68dea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -221,6 +221,33 @@ public class HeadersExchange extends AbstractExchange
}
}
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ return isBound(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ return hasBindings();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (Registration r : _bindings)
+ {
+ if (r.queue.equals(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_bindings.isEmpty();
+ }
+
protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
index fb48729c9e..485c4739bd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import java.util.List;
import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,7 +38,7 @@ class Index
private ConcurrentMap<String, List<AMQQueue>> _index
= new ConcurrentHashMap<String, List<AMQQueue>>();
- boolean add(String key, AMQQueue queue)
+ synchronized boolean add(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if(queues == null)
@@ -61,7 +62,7 @@ class Index
}
}
- boolean remove(String key, AMQQueue queue)
+ synchronized boolean remove(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if (queues != null)
@@ -83,6 +84,6 @@ class Index
Map<String, List<AMQQueue>> getBindingsMap()
{
- return _index;
+ return new HashMap<String, List<AMQQueue>>(_index);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
new file mode 100644
index 0000000000..5aaf78d6b7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeBoundBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
+{
+ private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+
+ public static final int OK = 0;
+
+ public static final int EXCHANGE_NOT_FOUND = 1;
+
+ public static final int QUEUE_NOT_FOUND = 2;
+
+ public static final int NO_BINDINGS = 3;
+
+ public static final int QUEUE_NOT_BOUND = 4;
+
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
+ public static ExchangeBoundHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ {
+ ExchangeBoundBody body = evt.getMethod();
+
+ String exchangeName = body.exchange;
+ String queueName = body.queue;
+ String routingKey = body.routingKey;
+ if (exchangeName == null)
+ {
+ throw new AMQException("Exchange exchange must not be null");
+ }
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ AMQFrame response;
+ if (exchange == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
+ "Exchange " + exchangeName + " not found");
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+ }
+ }
+ else
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
+ "Queue " + queueName + " not bound to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey, queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
+ "Queue " + queueName +
+ " not bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ NO_QUEUE_BOUND_WITH_RK,
+ "No queue bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ protocolSession.writeFrame(response);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 5e88ff7f2d..4e9deeb8db 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -109,6 +109,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 5c13e7861f..c6f3f9c492 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,13 +79,19 @@ public abstract class AMQDestination implements Destination, Referenceable
protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
boolean isAutoDelete, String queueName)
{
+ this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
+ }
+
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
+ boolean isAutoDelete, String queueName, boolean isDurable)
+ {
if (destinationName == null)
{
- throw new IllegalArgumentException("Destination name must not be null");
+ throw new IllegalArgumentException("Destination exchange must not be null");
}
if (exchangeName == null)
{
- throw new IllegalArgumentException("Exchange name must not be null");
+ throw new IllegalArgumentException("Exchange exchange must not be null");
}
if (exchangeClass == null)
{
@@ -97,6 +103,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_isExclusive = isExclusive;
_isAutoDelete = isAutoDelete;
_queueName = queueName;
+ _isDurable = isDurable;
}
public String getEncodedName()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0cbf5bac60..d149683646 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -23,13 +23,15 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.server.handler.ExchangeBoundHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
@@ -66,6 +68,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled
+ * correctly. Note this only keeps a record of subscriptions which have been created
+ * in the current instance. It does not remember subscriptions between executions of the
+ * client
+ */
+ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+
+ /**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
* feature.
*/
@@ -1087,19 +1098,53 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- * If a name is reused in creating a new subscriber with a different topic/selecto or no-local
- * flag then the subcriber will receive messages matching the old subscription AND the new one.
- * The spec states that the new one should replace the old one.
- * TODO: fix it.
- */
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " +
+ name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getQueueName()) &&
+ !isQueueBound(dest.getQueueName(), topic.getTopicName()))
+ {
+ deleteSubscriptionQueue(dest.getQueueName());
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ _subscriptions.put(name,subscriber);
+
+ return subscriber;
+ }
+
+ private void deleteSubscriptionQueue(String queueName) throws JMSException
+ {
+ try
+ {
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
+ false, true);
+ _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
/**
@@ -1110,9 +1155,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- return new TopicSubscriberAdaptor(dest, consumer);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name,subscriber);
+ return subscriber;
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
@@ -1150,20 +1197,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void unsubscribe(String name) throws JMSException
{
checkNotClosed();
-
- String queue = _connection.getClientID() + ":" + name;
-
- AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null);
-
- try {
- AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class);
- // if this method doen't throw an exception means we have received a queue declare ok.
- } catch (AMQException e) {
- throw new javax.jms.InvalidDestinationException("This destination doesn't exist");
- }
- //send a queue.delete for the subscription
- AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
- _connection.getProtocolHandler().writeFrame(frame);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ // send a queue.delete for the subscription
+ deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ _subscriptions.remove(name);
+ }
+ else
+ {
+ if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
+ {
+ deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ }
+ else
+ {
+ throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+ }
+ }
+ }
+
+ private boolean isQueueBound(String queueName) throws JMSException
+ {
+ return isQueueBound(queueName, null);
+ }
+
+ private boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ {
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
+ routingKey, queueName);
+ AMQMethodEvent response = null;
+ try
+ {
+ response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+ return (responseBody.replyCode == ExchangeBoundHandler.OK);
}
private void checkTransacted() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 4dd38eea18..39304f3f4c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,20 +40,25 @@ public class AMQTopic extends AMQDestination implements Topic
public AMQTopic(String name)
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null);
- _isDurable = false;
+ this(name, true, null, false);
}
- /**
- * Constructor for use in creating a topic to represent a durable subscription
- * @param topic
- * @param clientId
- * @param subscriptionName
- */
- public AMQTopic(AMQTopic topic, String clientId, String subscriptionName)
+ public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+ {
+ super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
+ queueName, isDurable);
+ }
+
+ public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection),
+ true);
+ }
+
+ public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName);
- _isDurable = true;
+ return connection.getClientID() + ":" + subscriptionName;
}
public String getTopicName() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4fb62b49fc..2448e14542 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -303,6 +303,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
+ _logger.warn("Interrupted: " + e, e);
return null;
}
finally
@@ -531,18 +532,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
public void setConsumerTag(String consumerTag)
- {
+ {
_consumerTag = consumerTag;
}
public AMQSession getSession() {
return _session;
}
-
+
private void checkPreConditions() throws JMSException{
-
+
this.checkNotClosed();
-
+
if(_session == null || _session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
new file mode 100644
index 0000000000..34ec49436e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSAMQException extends JMSException
+{
+ public JMSAMQException(AMQException s)
+ {
+ super(s.getMessage(), String.valueOf(s.getErrorCode()));
+ setLinkedException(s);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
new file mode 100644
index 0000000000..858726745e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class);
+ private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
+
+ public static ExchangeBoundOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
+ body.replyText);
+ }
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
new file mode 100644
index 0000000000..3271a715a2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
+ private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
+
+ public static QueueDeleteOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueDeleteOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
+ _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+ }
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index ab707bb51d..887850c06e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -104,6 +104,8 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+ frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
index 823406daa4..02a98f67d9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -20,20 +20,19 @@
*/
package org.apache.qpid.test.unit.basic;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
-import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import javax.jms.*;
-
-import junit.framework.TestCase;
-import junit.framework.Assert;
public class MapMessageTest extends TestCase implements MessageListener
{
@@ -56,7 +55,7 @@ public class MapMessageTest extends TestCase implements MessageListener
super.setUp();
try
{
- //TransportConnection.createVMBroker(1);
+ TransportConnection.createVMBroker(1);
init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
}
catch (Exception e)
@@ -69,7 +68,7 @@ public class MapMessageTest extends TestCase implements MessageListener
{
_logger.info("Tearing Down unit.basic.MapMessageTest");
super.tearDown();
- //TransportConnection.killAllVMBrokers();
+ TransportConnection.killAllVMBrokers();
}
private void init(AMQConnection connection) throws Exception
@@ -166,12 +165,12 @@ public class MapMessageTest extends TestCase implements MessageListener
testMapValues(m, count);
- testPropertyWriteStatus(m);
-
testCorrectExceptions(m);
testMessageWriteStatus(m);
+ testPropertyWriteStatus(m);
+
count++;
}
}
@@ -207,9 +206,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (NumberFormatException nfe)
{
//normal execution
}
@@ -217,9 +216,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (NumberFormatException nfe)
{
//normal execution
}
@@ -228,9 +227,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -238,18 +237,18 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getInt("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (NumberFormatException nfe)
{
//normal execution
}
try
{
m.getLong("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (NumberFormatException nfe)
{
//normal execution
}
@@ -258,9 +257,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (NumberFormatException nfe)
{
//normal execution
}
@@ -268,9 +267,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (NumberFormatException nfe)
{
//normal execution
}
@@ -278,7 +277,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("message");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -295,7 +294,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("short");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -305,7 +304,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("short");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -318,9 +317,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("short");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -333,7 +332,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("short");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -343,7 +342,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("short");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -353,7 +352,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("short");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -370,7 +369,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -380,7 +379,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -390,7 +389,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -401,9 +400,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -411,7 +410,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getInt("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -424,7 +423,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -434,7 +433,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -444,7 +443,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("long");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -461,7 +460,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -471,7 +470,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -481,7 +480,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -492,9 +491,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -502,7 +501,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getInt("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -511,7 +510,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getLong("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -522,7 +521,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -536,7 +535,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("double");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -554,7 +553,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("float");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -564,7 +563,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("float");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -574,7 +573,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("float");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -585,9 +584,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("float");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -595,7 +594,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getInt("float");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -604,7 +603,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getLong("float");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -620,7 +619,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("float");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -638,7 +637,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("int");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -648,7 +647,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("int");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -658,7 +657,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("int");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -669,9 +668,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("int");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -684,7 +683,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("int");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -694,7 +693,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("int");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -704,7 +703,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("int");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -722,7 +721,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -732,7 +731,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -742,7 +741,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -754,7 +753,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getInt("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -763,7 +762,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getLong("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -774,7 +773,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -784,7 +783,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -794,7 +793,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("char");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -810,7 +809,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -820,7 +819,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -830,7 +829,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -841,9 +840,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -851,7 +850,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getInt("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -861,7 +860,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getLong("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -872,7 +871,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -882,7 +881,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -895,7 +894,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getString("bytes");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -911,7 +910,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBoolean("byte");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -926,9 +925,9 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("byte");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -942,7 +941,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("byte");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -952,7 +951,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("byte");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -962,7 +961,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("byte");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -982,7 +981,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getByte("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -993,7 +992,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getShort("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -1003,7 +1002,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getChar("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException npe)
{
@@ -1013,7 +1012,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getInt("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -1023,7 +1022,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getLong("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -1033,7 +1032,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getFloat("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -1043,7 +1042,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getDouble("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -1053,7 +1052,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
m.getBytes("odd");
- fail("MessageFormatException expected as value doesn't exist.");
+ fail("Exception Expected.");
}
catch (MessageFormatException nfe)
{
@@ -1224,6 +1223,7 @@ public class MapMessageTest extends TestCase implements MessageListener
{
synchronized(received)
{
+ _logger.info("****************** Recevied Messgage:" + (JMSMapMessage) message);
received.add((JMSMapMessage) message);
received.notify();
}
@@ -1248,6 +1248,6 @@ public class MapMessageTest extends TestCase implements MessageListener
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(MapMessageTest.class));
+ return new junit.framework.TestSuite(MapMessageTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
index 80af81652e..c88024f39f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,15 @@
*/
package org.apache.qpid.test.unit.basic;
+import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.testutil.VMBrokerSetup;
import javax.jms.*;
-import junit.framework.TestCase;
-
public class MultipleConnectionTest extends TestCase
{
public static final String _defaultBroker = "vm://:1";
@@ -138,6 +135,19 @@ public class MultipleConnectionTest extends TestCase
}
}
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException
{
for (int i = 0; i < receivers.length; i++)
@@ -209,6 +219,6 @@ public class MultipleConnectionTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(MultipleConnectionTest.class));
+ return new junit.framework.TestSuite(MultipleConnectionTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
index 7ffb3ca469..a0e4aa9787 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
@@ -552,16 +552,6 @@ public class BytesMessageTest extends TestCase
assertEquals((byte)0, result[2]);
}
- public void testToBodyString() throws Exception
- {
- JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
- final String testText = "This is a test";
- bm.writeUTF(testText);
- bm.reset();
- String result = bm.toBodyString();
- assertEquals(testText, result);
- }
-
public void testToBodyStringWithNull() throws Exception
{
JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
index 58e62e8252..276067a28d 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
@@ -14,18 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.test.unit.client.message;
-import junit.framework.TestCase;
import junit.framework.Assert;
-import org.apache.qpid.framing.PropertyFieldTable;
+import junit.framework.TestCase;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.TestMessageHelper;
-import org.apache.log4j.Logger;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
@@ -204,9 +202,9 @@ public class MapMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getByte("random");
- Assert.fail("MessageFormatException expected");
+ Assert.fail("NumberFormatException expected");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
@@ -295,9 +293,9 @@ public class MapMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getInt("random");
- Assert.fail("MessageFormatException should be received.");
+ Assert.fail("NumberFormatException should be received.");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
@@ -313,9 +311,9 @@ public class MapMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getLong("random");
- Assert.fail("MessageFormatException should be received.");
+ Assert.fail("NumberFormatException should be received.");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
@@ -331,9 +329,9 @@ public class MapMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getShort("random");
- Assert.fail("MessageFormatException should be received.");
+ Assert.fail("NumberFormatException should be received.");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
index 1345c0defb..64d10fb13f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -190,9 +190,9 @@ public class TextMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getByteProperty("random");
- Assert.fail("MessageFormatException expected");
+ Assert.fail("NumberFormatException expected");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
@@ -245,9 +245,9 @@ public class TextMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getIntProperty("random");
- Assert.fail("MessageFormatException should be received.");
+ Assert.fail("NumberFormatException should be received.");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
@@ -263,9 +263,9 @@ public class TextMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getLongProperty("random");
- Assert.fail("MessageFormatException should be received.");
+ Assert.fail("NumberFormatException should be received.");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
@@ -281,9 +281,9 @@ public class TextMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getShortProperty("random");
- Assert.fail("MessageFormatException should be received.");
+ Assert.fail("NumberFormatException should be received.");
}
- catch (MessageFormatException e)
+ catch (NumberFormatException e)
{
//normal execution
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 98e355b0da..eee9b2de9f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -87,19 +87,19 @@ public class AMQProtocolSessionTest extends TestCase
_testSession.getMinaProtocolSession().setLocalPort(_port);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue name from an address with special chars",_generatedAddress,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress);
//test empty address
_testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue name from an empty address",_generatedAddress_2,testAddress);
+ assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress);
//test address with no special chars
_testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue name from an address with no special chars",_generatedAddress_3,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 50944730c3..315ba6ae4c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -73,7 +73,7 @@ public class StreamMessageTest extends TestCase
MessageProducer mandatoryProducer = producerSession.createProducer(queue);
// Third test - should be routed
- _logger.info("Sending routable message");
+ _logger.info("Sending isBound message");
StreamMessage msg = producerSession.createStreamMessage();
msg.setStringProperty("F1000","1");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index fa46a4bcfb..3b6e3517c2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -26,10 +26,8 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
-import javax.jms.TopicSession;
-import javax.jms.TextMessage;
-import javax.jms.TopicPublisher;
-import javax.jms.MessageConsumer;
+import javax.jms.*;
+
/**
* @author Apache Software Foundation
@@ -46,12 +44,126 @@ public class TopicSessionTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
+ //Thread.sleep(2000);
}
- public void testTextMessageCreation() throws Exception
+
+ public void testTopicSubscriptionUnsubscription() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ con.start();
+
+ TextMessage tm = session1.createTextMessage("Hello");
+ publisher.publish(tm);
+
+ tm = (TextMessage) sub.receive(2000);
+ assertNotNull(tm);
+
+ session1.unsubscribe("subscription0");
+
+ try
+ {
+ session1.unsubscribe("not a subscription");
+ fail("expected InvalidDestinationException when unsubscribing from unknown subscription");
+ }
+ catch(InvalidDestinationException e)
+ {
+ ; // PASS
+ }
+ catch(Exception e)
+ {
+ fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e);
+ }
+
+ con.close();
+ }
+
+ public void testSubscriptionNameReuseForDifferentTopicSingleConnection() throws Exception
+ {
+ subscriptionNameReuseForDifferentTopic(false);
+ }
+
+ public void testSubscriptionNameReuseForDifferentTopicTwoConnections() throws Exception
+ {
+ subscriptionNameReuseForDifferentTopic(true);
+ }
+
+ private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown));
+ AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown));
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
+ TopicPublisher publisher = session1.createPublisher(null);
+
+ con.start();
+
+ publisher.publish(topic, session1.createTextMessage("hello"));
+ TextMessage m = (TextMessage) sub.receive(2000);
+ assertNotNull(m);
+
+ if (shutdown)
+ {
+ session1.close();
+ con.close();
+ con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ con.start();
+ session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ publisher = session1.createPublisher(null);
+ }
+ TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0");
+ publisher.publish(topic, session1.createTextMessage("hello"));
+ if (!shutdown)
+ {
+ m = (TextMessage) sub.receive(2000);
+ assertNull(m);
+ }
+ publisher.publish(topic2, session1.createTextMessage("goodbye"));
+ m = (TextMessage) sub2.receive(2000);
+ assertNotNull(m);
+ assertEquals("goodbye", m.getText());
+ con.close();
+ }
+
+ public void testUnsubscriptionAfterConnectionClose() throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic3");
+ AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
+
+ con2.start();
+
+ publisher.publish(session1.createTextMessage("Hello"));
+ TextMessage tm = (TextMessage) sub.receive(2000);
+ assertNotNull(tm);
+ con2.close();
+ publisher.publish(session1.createTextMessage("Hello2"));
+ con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ sub = session2.createDurableSubscriber(topic, "subscription0");
+ con2.start();
+ tm = (TextMessage) sub.receive(2000);
+ assertNotNull(tm);
+ assertEquals("Hello2", tm.getText());
+ con1.close();
+ con2.close();
+ }
+
+ public void testTextMessageCreation() throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic4");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -85,6 +197,7 @@ public class TopicSessionTest extends TestCase
tm = (TextMessage) consumer1.receive(2000);
assertNotNull(tm);
assertEquals("Empty string not returned", "", msgText);
+ con.close();
}
public static junit.framework.Test suite()