summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-12-09 23:58:25 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-12-09 23:58:25 +0000
commitdb6241e3cfb4ef420025ba5c8b8ddae888c7171c (patch)
treee5aec74133fb1e4e13fdac632de81a863e270dd1
parentf1f7698a041f6534b4c3396f90c3352549ec95f9 (diff)
downloadqpid-python-db6241e3cfb4ef420025ba5c8b8ddae888c7171c.tar.gz
QPID-2258 : AMQP0-9-1 Compliance fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@889022 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java74
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java16
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java16
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java9
26 files changed, 197 insertions, 51 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 4788f96d6c..3c3902c545 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -212,7 +212,7 @@ public class DirectExchange extends AbstractExchange
final String routingKey = payload.getRoutingKey();
- final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
+ final ArrayList<AMQQueue> queues = (routingKey == null) ? _index.get("") : _index.get(routingKey);
if (_logger.isDebugEnabled())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 859a3477e6..0343457a73 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -102,6 +102,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
body.getNoLocal(), body.getNowait(), queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
}
if (body.getConsumerTag() != null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index a473184efb..2c4a9b310a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -97,6 +97,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
if (!performGet(queue,session, channel, !body.getNoAck()))
{
@@ -188,6 +193,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ public boolean isTransient()
+ {
+ return true;
+ }
+
public boolean wouldSuspend(QueueEntry msg)
{
return !getCreditManager().useCreditForMessage(msg.getMessage());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index bd4b610933..8dbd457cc9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -60,6 +60,10 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
try
{
+ if(exchangeRegistry.getExchange(body.getExchange()) == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
+ }
exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
@@ -68,6 +72,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
}
catch (ExchangeInUseException e)
{
+ throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use");
// TODO: sort out consistent channel close mechanism that does all clean up etc.
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 84491c1d2e..57ce7a7240 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -113,6 +113,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ }
if (!exch.isBound(routingKey, body.getArguments(), queue))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 4f69afe755..bb57fdbc36 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -114,25 +114,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
{
store.createQueue(queue, body.getArguments());
}
+ if(body.getAutoDelete())
+ {
+ queue.setDeleteOnNoConsumers(true);
+ }
queueRegistry.registerQueue(queue);
- if(queue.isExclusive() && !queue.isAutoDelete())
+ if(body.getExclusive())
{
- final AMQQueue q = queue;
- queue.setExclusiveOwner(session);
- final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+ if(body.getDurable())
{
- public void doTask(AMQProtocolSession session) throws AMQException
- {
- q.setExclusiveOwner(null);
- }
- };
- session.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
+ queue.setExclusiveOwner(session.getPrincipal().getName());
+ }
+ else
+ {
+ final AMQQueue q = queue;
+ queue.setExclusiveOwner(session);
+ final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
- session.removeSessionCloseTask(sessionCloseTask);
- }
- });
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ q.setExclusiveOwner(null);
+ }
+ };
+ session.addSessionCloseTask(sessionCloseTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task() {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ session.removeSessionCloseTask(sessionCloseTask);
+ }
+ });
+ }
+
}
if (autoRegister)
{
@@ -143,11 +155,19 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
- else if (queue.getPrincipalHolder() != null
- && queue.getPrincipalHolder().getPrincipal() != null
- && queue.getPrincipalHolder().getPrincipal().getName() != null
- && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName())
- || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session))))
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ }
+ else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive()))
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
+ + queue.isExclusive() + " requested " + body.getExclusive() + ")");
+ }
+ else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -155,6 +175,20 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
+ queue.getPrincipalHolder().getPrincipal().getName() + "')");
}
+ else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: "
+ + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ }
+ else if(!body.getPassive() && queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: "
+ + queue.isDurable() + " requested " + body.getDurable() + ")");
+ }
+
+
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 8417492171..3d58ec2133 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -110,7 +110,11 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
-
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ }
int purged = queue.delete();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 3e0f2182b7..b94ebb6538 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -103,6 +103,11 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
if (!virtualHost.getAccessManager().authorisePurge(session, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
}
long purged = queue.clearQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a459c64946..028f7e15a4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -42,6 +42,9 @@ import java.util.Map;
public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
{
+ boolean getDeleteOnNoConsumers();
+
+ void setDeleteOnNoConsumers(boolean b);
public interface Context
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 6915850376..3d5d99f0b0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -150,6 +150,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private boolean _nolocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private boolean _deleteOnNoConsumers;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
{
@@ -374,7 +375,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new ExistingExclusiveSubscription();
}
- if (exclusive)
+ if (exclusive && !subscription.isTransient())
{
if (getConsumerCount() != 0)
{
@@ -431,7 +432,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getConsumerCount() == 0 && !isExclusive())
+ if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -448,6 +449,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public boolean getDeleteOnNoConsumers()
+ {
+ return _deleteOnNoConsumers;
+ }
+
+ public void setDeleteOnNoConsumers(boolean b)
+ {
+ _deleteOnNoConsumers = b;
+ }
+
+
// ------ Enqueue / Dequeue
public QueueEntry enqueue(ServerMessage message) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 4db9c305b2..9e9d2da579 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -31,6 +31,8 @@ public interface Subscription
{
LogActor getLogActor();
+ boolean isTransient();
+
public static enum State
{
ACTIVE,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 5302a3c5d4..684d3c2e74 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -667,6 +667,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return !isBrowser();
}
+ public boolean isTransient()
+ {
+ return false;
+ }
+
public void set(String key, Object value)
{
_properties.put(key, value);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index fb0a5cf2c7..5b3668ab64 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -649,6 +649,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _logActor;
}
+ public boolean isTransient()
+ {
+ return false;
+ }
+
ServerSession getSession()
{
return _session;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index df2754c16b..36ed8e24ce 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -823,6 +823,11 @@ public class ServerSessionDelegate extends SessionDelegate
queue.setPrincipalHolder((ServerSession)session);
queue.setExclusiveOwner(session);
}
+ else if(method.getAutoDelete())
+ {
+ queue.setDeleteOnNoConsumers(true);
+ }
+
final String alternateExchangeName = method.getAlternateExchange();
if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 910c7d42ed..a487b160e1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -52,6 +52,15 @@ public class MockAMQQueue implements AMQQueue
_name = new AMQShortString(name);
}
+ public boolean getDeleteOnNoConsumers()
+ {
+ return false;
+ }
+
+ public void setDeleteOnNoConsumers(boolean b)
+ {
+ }
+
public AMQShortString getName()
{
return _name;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 8c6574095b..408893870b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -259,6 +259,7 @@ public class SimpleAMQQueueTest extends TestCase
{
_queue.stop();
_queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost);
+ _queue.setDeleteOnNoConsumers(true);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
_queue.enqueue(message);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 97ba143bdf..e6fd2172f0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -105,6 +105,11 @@ public class MockSubscription implements Subscription
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isTransient()
+ {
+ return false;
+ }
+
public AMQQueue getQueue()
{
return queue;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 8857f1115a..c9212a54c1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -68,6 +68,14 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
+ // 0-9-1 is indistinguishable from 0-9 using only major and minor ... if we established the connection as 0-9-1
+ // and now get back major = 0 , minor = 9 then we can assume it means 0-9-1
+
+ if(pv.equals(ProtocolVersion.v0_9) && session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+ {
+ pv = ProtocolVersion.v0_91;
+ }
+
// For the purposes of interop, we can make the client accept the broker's version string.
// If it does, it then internally records the version as being the latest one that it understands.
// It needs to do this since frame lookup is done by version.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 8910920017..2d59146b43 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -403,9 +403,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
-
- // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
- }
+ }
public byte getProtocolMinorVersion()
{
@@ -422,11 +420,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return _protocolVersion;
}
-// public VersionSpecificRegistry getRegistry()
-// {
-// return _registry;
-// }
-
public MethodRegistry getMethodRegistry()
{
return _methodRegistry;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index b2fdf48267..4c2fefb312 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -36,6 +36,7 @@ import javax.naming.Context;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.UUID;
/**
* QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
@@ -61,6 +62,7 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase
private Session _clientSession1;
private Queue _queue;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); // all messages Sent Lock
+ private static final String QUEUE_NAME = "queue" + UUID.randomUUID().toString();
protected void setUp() throws Exception
{
@@ -73,7 +75,7 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase
_clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _queue =_clientSession1.createQueue("queue");
+ _queue =_clientSession1.createQueue(QUEUE_NAME);
_consumer1 = _clientSession1.createConsumer(_queue);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
index 287a3fe412..6a4292ec2e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
@@ -249,11 +249,15 @@ public class DurableQueueLoggingTest extends AbstractTestLogging
final Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities", PRIORITIES);
// Need to create a queue that does not exist so use test name
- ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), false, _durable, false, arguments);
+ final String queueName = getTestQueueName();
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), false, _durable, false, arguments);
+
+ Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='false'");
+
//Need to create a Consumer to ensure that the log has had time to write
// as the above Create is Asynchronous
- _session.createConsumer(_session.createQueue(getTestQueueName()));
+ _session.createConsumer(queue);
// Validation
List<String> results = _monitor.findMatches(QUEUE_PREFIX);
@@ -310,11 +314,15 @@ public class DurableQueueLoggingTest extends AbstractTestLogging
final Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities", PRIORITIES);
// Need to create a queue that does not exist so use test name
- ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), true, _durable, false, arguments);
+ final String queueName = getTestQueueName() + "-autoDeletePriority";
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, _durable, false, arguments);
+
+ Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='true'");
+
//Need to create a Consumer to ensure that the log has had time to write
// as the above Create is Asynchronous
- _session.createConsumer(_session.createQueue(getTestQueueName()));
+ _session.createConsumer(queue);
// Validation
List<String> results = _monitor.findMatches(QUEUE_PREFIX);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
index 35b4d7c772..ca38807fb1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
@@ -90,7 +90,8 @@ public class PriorityTest extends QpidTestCase
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities",10);
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -130,7 +131,8 @@ public class PriorityTest extends QpidTestCase
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities",3);
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index e6be7c8263..ecb2f7d559 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -107,7 +107,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",queueName);
+ queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -149,7 +149,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",queueName);
+ queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -194,7 +194,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",queueName);
+ queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -224,7 +224,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",1000);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",queueName);
+ queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -266,7 +266,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
- queue = new AMQQueue("amq.direct",queueName);
+ queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'");
((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
consumerConnection.start();
@@ -322,7 +322,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",queueName);
+ queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -354,7 +354,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",0);
arguments.put("x-qpid-flow-resume-capacity",0);
((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",queueName);
+
+ queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index fe25bf07f0..352f6ad119 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -188,6 +188,11 @@ public class SubscriptionTestHelper implements Subscription
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isTransient()
+ {
+ return false;
+ }
+
public AMQQueue getQueue()
{
return null;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
index 1744b92d62..39861bb2d5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
@@ -21,7 +21,9 @@
package org.apache.qpid.test.client.message;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -57,12 +59,16 @@ public class MessageToStringTest extends QpidTestCase
//Create Producer put some messages on the queue
_connection = getConnection();
- //Create Queue
- _queue = new AMQQueue("amq.direct", "queue");
-
//Create Consumer
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = getTestQueueName();
+
+ //Create Queue
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+ _queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
+
_consumer = _session.createConsumer(_queue);
_connection.start();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
index 001a40988b..f0bbcc7003 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
@@ -21,7 +21,9 @@
package org.apache.qpid.test.client.message;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -46,12 +48,15 @@ public class ObjectMessageTest extends QpidTestCase
//Create Connection
_connection = getConnection();
- //Create Queue
- Queue queue = new AMQQueue("amq.direct", "queue");
//Create Session
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ //Create Queue
+ String queueName = getTestQueueName();
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+ Queue queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
//Create Consumer
_consumer = _session.createConsumer(queue);