diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-12-09 23:58:25 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-12-09 23:58:25 +0000 |
commit | db6241e3cfb4ef420025ba5c8b8ddae888c7171c (patch) | |
tree | e5aec74133fb1e4e13fdac632de81a863e270dd1 | |
parent | f1f7698a041f6534b4c3396f90c3352549ec95f9 (diff) | |
download | qpid-python-db6241e3cfb4ef420025ba5c8b8ddae888c7171c.tar.gz |
QPID-2258 : AMQP0-9-1 Compliance fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@889022 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 197 insertions, 51 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4788f96d6c..3c3902c545 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -212,7 +212,7 @@ public class DirectExchange extends AbstractExchange final String routingKey = payload.getRoutingKey(); - final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); + final ArrayList<AMQQueue> queues = (routingKey == null) ? _index.get("") : _index.get(routingKey); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 859a3477e6..0343457a73 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -102,6 +102,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic body.getNoLocal(), body.getNowait(), queue)) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); + } + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); } if (body.getConsumerTag() != null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index a473184efb..2c4a9b310a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -97,6 +97,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB {
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
if (!performGet(queue,session, channel, !body.getNoAck()))
{
@@ -188,6 +193,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ public boolean isTransient()
+ {
+ return true;
+ }
+
public boolean wouldSuspend(QueueEntry msg)
{
return !getCreditManager().useCreditForMessage(msg.getMessage());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index bd4b610933..8dbd457cc9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -60,6 +60,10 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD try { + if(exchangeRegistry.getExchange(body.getExchange()) == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange()); + } exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused()); ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody(); @@ -68,6 +72,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD } catch (ExchangeInUseException e) { + throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use"); // TODO: sort out consistent channel close mechanism that does all clean up etc. } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 84491c1d2e..57ce7a7240 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -113,6 +113,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } if (!exch.isBound(routingKey, body.getArguments(), queue)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 4f69afe755..bb57fdbc36 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -114,25 +114,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar { store.createQueue(queue, body.getArguments()); } + if(body.getAutoDelete()) + { + queue.setDeleteOnNoConsumers(true); + } queueRegistry.registerQueue(queue); - if(queue.isExclusive() && !queue.isAutoDelete()) + if(body.getExclusive()) { - final AMQQueue q = queue; - queue.setExclusiveOwner(session); - final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() + if(body.getDurable()) { - public void doTask(AMQProtocolSession session) throws AMQException - { - q.setExclusiveOwner(null); - } - }; - session.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new AMQQueue.Task() { - public void doTask(AMQQueue queue) throws AMQException + queue.setExclusiveOwner(session.getPrincipal().getName()); + } + else + { + final AMQQueue q = queue; + queue.setExclusiveOwner(session); + final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() { - session.removeSessionCloseTask(sessionCloseTask); - } - }); + public void doTask(AMQProtocolSession session) throws AMQException + { + q.setExclusiveOwner(null); + } + }; + session.addSessionCloseTask(sessionCloseTask); + queue.addQueueDeleteTask(new AMQQueue.Task() { + public void doTask(AMQQueue queue) throws AMQException + { + session.removeSessionCloseTask(sessionCloseTask); + } + }); + } + } if (autoRegister) { @@ -143,11 +155,19 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } } } - else if (queue.getPrincipalHolder() != null - && queue.getPrincipalHolder().getPrincipal() != null - && queue.getPrincipalHolder().getPrincipal().getName() != null - && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName()) - || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session)))) + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } + else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive())) + { + + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " + + queue.isExclusive() + " requested " + body.getExclusive() + ")"); + } + else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session)) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -155,6 +175,20 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar + queue.getPrincipalHolder().getPrincipal().getName() + "')"); } + else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: " + + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); + } + else if(!body.getPassive() && queue.isDurable() != body.getDurable()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: " + + queue.isDurable() + " requested " + body.getDurable() + ")"); + } + + AMQChannel channel = session.getChannel(channelId); if (channel == null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 8417492171..3d58ec2133 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -110,7 +110,11 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } - + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } int purged = queue.delete(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 3e0f2182b7..b94ebb6538 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -103,6 +103,11 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod if (!virtualHost.getAccessManager().authorisePurge(session, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
}
long purged = queue.clearQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a459c64946..028f7e15a4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -42,6 +42,9 @@ import java.util.Map; public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource { + boolean getDeleteOnNoConsumers(); + + void setDeleteOnNoConsumers(boolean b); public interface Context diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 6915850376..3d5d99f0b0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -150,6 +150,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private boolean _nolocal; private final AtomicBoolean _overfull = new AtomicBoolean(false); + private boolean _deleteOnNoConsumers; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) { @@ -374,7 +375,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingExclusiveSubscription(); } - if (exclusive) + if (exclusive && !subscription.isTransient()) { if (getConsumerCount() != 0) { @@ -431,7 +432,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getConsumerCount() == 0 && !isExclusive()) + if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -448,6 +449,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public boolean getDeleteOnNoConsumers() + { + return _deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean b) + { + _deleteOnNoConsumers = b; + } + + // ------ Enqueue / Dequeue public QueueEntry enqueue(ServerMessage message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 4db9c305b2..9e9d2da579 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -31,6 +31,8 @@ public interface Subscription { LogActor getLogActor(); + boolean isTransient(); + public static enum State { ACTIVE, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 5302a3c5d4..684d3c2e74 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -667,6 +667,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return !isBrowser(); } + public boolean isTransient() + { + return false; + } + public void set(String key, Object value) { _properties.put(key, value); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index fb0a5cf2c7..5b3668ab64 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -649,6 +649,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _logActor; } + public boolean isTransient() + { + return false; + } + ServerSession getSession() { return _session; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index df2754c16b..36ed8e24ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -823,6 +823,11 @@ public class ServerSessionDelegate extends SessionDelegate queue.setPrincipalHolder((ServerSession)session); queue.setExclusiveOwner(session); } + else if(method.getAutoDelete()) + { + queue.setDeleteOnNoConsumers(true); + } + final String alternateExchangeName = method.getAlternateExchange(); if(alternateExchangeName != null && alternateExchangeName.length() != 0) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 910c7d42ed..a487b160e1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -52,6 +52,15 @@ public class MockAMQQueue implements AMQQueue _name = new AMQShortString(name); } + public boolean getDeleteOnNoConsumers() + { + return false; + } + + public void setDeleteOnNoConsumers(boolean b) + { + } + public AMQShortString getName() { return _name; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 8c6574095b..408893870b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -259,6 +259,7 @@ public class SimpleAMQQueueTest extends TestCase { _queue.stop(); _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost); + _queue.setDeleteOnNoConsumers(true); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); _queue.enqueue(message); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 97ba143bdf..e6fd2172f0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -105,6 +105,11 @@ public class MockSubscription implements Subscription return null; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isTransient() + { + return false; + } + public AMQQueue getQueue() { return queue; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 8857f1115a..c9212a54c1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -68,6 +68,14 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor()); + // 0-9-1 is indistinguishable from 0-9 using only major and minor ... if we established the connection as 0-9-1 + // and now get back major = 0 , minor = 9 then we can assume it means 0-9-1 + + if(pv.equals(ProtocolVersion.v0_9) && session.getProtocolVersion().equals(ProtocolVersion.v0_91)) + { + pv = ProtocolVersion.v0_91; + } + // For the purposes of interop, we can make the client accept the broker's version string. // If it does, it then internally records the version as being the latest one that it understands. // It needs to do this since frame lookup is done by version. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 8910920017..2d59146b43 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -403,9 +403,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); - - // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); - } + } public byte getProtocolMinorVersion() { @@ -422,11 +420,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return _protocolVersion; } -// public VersionSpecificRegistry getRegistry() -// { -// return _registry; -// } - public MethodRegistry getMethodRegistry() { return _methodRegistry; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index b2fdf48267..4c2fefb312 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -36,6 +36,7 @@ import javax.naming.Context; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.UUID; /** * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery @@ -61,6 +62,7 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase private Session _clientSession1; private Queue _queue; private final CountDownLatch _allMessagesSent = new CountDownLatch(2); // all messages Sent Lock + private static final String QUEUE_NAME = "queue" + UUID.randomUUID().toString(); protected void setUp() throws Exception { @@ -73,7 +75,7 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _queue =_clientSession1.createQueue("queue"); + _queue =_clientSession1.createQueue(QUEUE_NAME); _consumer1 = _clientSession1.createConsumer(_queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java index 287a3fe412..6a4292ec2e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java @@ -249,11 +249,15 @@ public class DurableQueueLoggingTest extends AbstractTestLogging final Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities", PRIORITIES); // Need to create a queue that does not exist so use test name - ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), false, _durable, false, arguments); + final String queueName = getTestQueueName(); + ((AMQSession) _session).createQueue(new AMQShortString(queueName), false, _durable, false, arguments); + + Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='false'"); + //Need to create a Consumer to ensure that the log has had time to write // as the above Create is Asynchronous - _session.createConsumer(_session.createQueue(getTestQueueName())); + _session.createConsumer(queue); // Validation List<String> results = _monitor.findMatches(QUEUE_PREFIX); @@ -310,11 +314,15 @@ public class DurableQueueLoggingTest extends AbstractTestLogging final Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities", PRIORITIES); // Need to create a queue that does not exist so use test name - ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), true, _durable, false, arguments); + final String queueName = getTestQueueName() + "-autoDeletePriority"; + ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, _durable, false, arguments); + + Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='true'"); + //Need to create a Consumer to ensure that the log has had time to write // as the above Create is Asynchronous - _session.createConsumer(_session.createQueue(getTestQueueName())); + _session.createConsumer(queue); // Validation List<String> results = _monitor.findMatches(QUEUE_PREFIX); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java index 35b4d7c772..ca38807fb1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -90,7 +90,8 @@ public class PriorityTest extends QpidTestCase final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities",10); ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -130,7 +131,8 @@ public class PriorityTest extends QpidTestCase final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities",3); ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index e6be7c8263..ecb2f7d559 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -107,7 +107,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -149,7 +149,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -194,7 +194,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -224,7 +224,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",1000); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -266,7 +266,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'"); ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); consumerConnection.start(); @@ -322,7 +322,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -354,7 +354,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",0); arguments.put("x-qpid-flow-resume-capacity",0); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index fe25bf07f0..352f6ad119 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -188,6 +188,11 @@ public class SubscriptionTestHelper implements Subscription return null; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isTransient() + { + return false; + } + public AMQQueue getQueue() { return null; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java index 1744b92d62..39861bb2d5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.test.client.message; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.framing.AMQShortString; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -57,12 +59,16 @@ public class MessageToStringTest extends QpidTestCase //Create Producer put some messages on the queue _connection = getConnection(); - //Create Queue - _queue = new AMQQueue("amq.direct", "queue"); - //Create Consumer _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = getTestQueueName(); + + //Create Queue + ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false); + _queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + + _consumer = _session.createConsumer(_queue); _connection.start(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java index 001a40988b..f0bbcc7003 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.test.client.message; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.framing.AMQShortString; import javax.jms.Connection; import javax.jms.JMSException; @@ -46,12 +48,15 @@ public class ObjectMessageTest extends QpidTestCase //Create Connection _connection = getConnection(); - //Create Queue - Queue queue = new AMQQueue("amq.direct", "queue"); //Create Session _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //Create Queue + String queueName = getTestQueueName(); + ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false); + Queue queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + //Create Consumer _consumer = _session.createConsumer(queue); |