diff options
26 files changed, 129 insertions, 80 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index be4e8f8ec1..b2e43b4f0d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -278,8 +278,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr ownerShortString = new AMQShortString(owner); } - queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost(), - null); + queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, + getVirtualHost(), null); if (queue.isDurable() && !queue.isAutoDelete()) { _durableConfig.createQueue(queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 8e64aee174..1812f657d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -58,6 +58,7 @@ public class QueueConfiguration extends ConfigurationPlugin "minimumAlertRepeatGap", "durable", "exchange", + "exclusive", "queue", "autodelete", "priority", @@ -79,6 +80,11 @@ public class QueueConfiguration extends ConfigurationPlugin { return _config.getBoolean("durable" ,false); } + + public boolean getExclusive() + { + return _config.getBoolean("exclusive" ,false); + } public boolean getAutoDelete() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java index c97d71dc39..ae578eb196 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java +++ b/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java @@ -678,8 +678,8 @@ public class Bridge implements BridgeConfig isDurable(), _link.getFederationTag(), false, - getVirtualHost(), - options); + false, + getVirtualHost(), options); FlowCreditManager_0_10 creditManager = new WindowCreditManager(0xFFFFFFFF,getMessageWindowSize()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 444505f5bb..4f7d275e71 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -236,8 +236,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final QueueRegistry registry = virtualHost.getQueueRegistry(); AMQShortString owner = body.getExclusive() ? session.getContextKey() : null; - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost, - body.getArguments()); + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), + body.getExclusive(),virtualHost, body.getArguments()); if (body.getExclusive() && !body.getDurable()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index d8986ec303..b6e97e08fb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -33,19 +33,21 @@ public class AMQPriorityQueue extends SimpleAMQQueue final boolean durable, final AMQShortString owner, final boolean autoDelete, - final VirtualHost virtualHost, + boolean exclusive, + final VirtualHost virtualHost, int priorities, Map<String, Object> arguments) { - super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities),arguments); + super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments); } public AMQPriorityQueue(String queueName, boolean durable, String owner, boolean autoDelete, - VirtualHost virtualHost, int priorities, Map<String,Object> arguments) + boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments) { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost,priorities, arguments); + this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), + autoDelete, exclusive,virtualHost, priorities, arguments); } public int getPriorities() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index fd7dd4cc60..3340c1e20a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -133,13 +133,16 @@ public class AMQQueueFactory boolean durable, AMQShortString owner, boolean autoDelete, - VirtualHost virtualHost, final FieldTable arguments) + boolean exclusive, + VirtualHost virtualHost, + final FieldTable arguments) { return createAMQQueueImpl(name == null ? null : name.toString(), durable, owner == null ? null : owner.toString(), autoDelete, - virtualHost, + exclusive, + virtualHost, FieldTable.convertToMap(arguments)); } @@ -148,6 +151,7 @@ public class AMQQueueFactory boolean durable, String owner, boolean autoDelete, + boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) { int priorities = 1; @@ -175,15 +179,15 @@ public class AMQQueueFactory AMQQueue q; if(conflationKey != null) { - q = new ConflationQueue(queueName, durable, owner, autoDelete, virtualHost, arguments, conflationKey); + q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities, arguments); + q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments); } else { - q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost, arguments); + q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); } //Register the new queue @@ -212,6 +216,7 @@ public class AMQQueueFactory boolean durable = config.getDurable(); boolean autodelete = config.getAutoDelete(); + boolean exclusive = config.getExclusive(); String owner = config.getOwner(); Map<String,Object> arguments = null; if(config.isLVQ() || config.getLVQKey() != null) @@ -241,7 +246,7 @@ public class AMQQueueFactory } } - AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments); + AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); q.configure(config); return q; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index 26c0d7cf26..400d9867ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -31,11 +31,12 @@ public class ConflationQueue extends SimpleAMQQueue boolean durable, String owner, boolean autoDelete, + boolean exclusive, VirtualHost virtualHost, Map<String, Object> args, String conflationKey) { - super(name, durable, owner, autoDelete, virtualHost, new ConflationQueueList.Factory(conflationKey), args); + super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b819538544..afc7fb6480 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -85,6 +85,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private PrincipalHolder _prinicpalHolder; + private boolean _exclusive = false; private AMQSessionModel _exclusiveOwner; @@ -188,27 +189,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private long _createTime = System.currentTimeMillis(); private QueueConfiguration _queueConfiguration; - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost, Map<String,Object> arguments) + + + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { - this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory(),arguments); + this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments); } - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost, Map<String, Object> arguments) + public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) { - this(queueName, durable, owner,autoDelete,virtualHost,new SimpleQueueEntryList.Factory(),arguments); + this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); } - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost,entryListFactory, arguments); + this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments); } - - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, + boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String,Object> arguments) @@ -229,6 +231,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _durable = durable; _owner = owner; _autoDelete = autoDelete; + _exclusive = exclusive; _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); _arguments = arguments; @@ -324,7 +327,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean isExclusive() { - return _exclusiveOwner != null; + return _exclusive; } public Exchange getAlternateExchange() @@ -2054,6 +2057,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) { + _exclusive = true; _exclusiveOwner = exclusiveOwner; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index c7606832d0..a883f656be 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -29,7 +29,7 @@ public interface ConfigurationRecoveryHandler public static interface QueueRecoveryHandler { - void queue(String queueName, String owner, FieldTable arguments); + void queue(String queueName, String owner, boolean exclusive, FieldTable arguments); ExchangeRecoveryHandler completeQueueRecovery(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index ccef66fd13..d38b318fdf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -73,7 +73,7 @@ public class DerbyMessageStore implements MessageStore private static final String META_DATA_TABLE_NAME = "QPID_META_DATA"; private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; - private static final int DB_VERSION = 1; + private static final int DB_VERSION = 3; @@ -90,9 +90,9 @@ public class DerbyMessageStore implements MessageStore private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; - private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )"; + private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))"; private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; - private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME; + private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME; private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; private static final String SELECT_FROM_BINDINGS = @@ -104,7 +104,7 @@ public class DerbyMessageStore implements MessageStore private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; - private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)"; + private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)"; private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; @@ -405,7 +405,23 @@ public class DerbyMessageStore implements MessageStore { String queueName = rs.getString(1); String owner = rs.getString(2); - qrh.queue(queueName, owner, null); + boolean exclusive = rs.getBoolean(3); + Blob argumentsAsBlob = rs.getBlob(4); + + byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); + FieldTable arguments; + if(dataAsBytes.length > 0) + { + org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes); + + arguments = new FieldTable(buffer,buffer.limit()); + } + else + { + arguments = null; + } + + qrh.queue(queueName, owner, exclusive, arguments); queues.add(queueName); } @@ -817,7 +833,21 @@ public class DerbyMessageStore implements MessageStore stmt.setString(1, queue.getNameShortString().toString()); stmt.setString(2, owner); + stmt.setBoolean(3,queue.isExclusive()); + final byte[] underlying; + if(arguments != null) + { + underlying = arguments.getDataAsBytes(); + } + else + { + underlying = new byte[0]; + } + + ByteArrayInputStream bis = new ByteArrayInputStream(underlying); + stmt.setBinaryStream(4,bis,underlying.length); + stmt.execute(); stmt.close(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 7479d801be..541810d2fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -914,6 +914,7 @@ public class ServerSessionDelegate extends SessionDelegate public void doTask(ServerSession session) { q.setPrincipalHolder(null); + q.setExclusiveOwningSession(null); } }; final ServerSession s = (ServerSession) session; @@ -962,8 +963,8 @@ public class ServerSessionDelegate extends SessionDelegate String owner = body.getExclusive() ? session.getClientID() : null; - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost, - body.getArguments()); + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), + body.getExclusive(), virtualHost, body.getArguments()); if (body.getExclusive() && !body.getDurable()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 221ec0b639..ca999ceb0b 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -92,7 +92,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return this; } - public void queue(String queueName, String owner, FieldTable arguments) + public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments) { AMQShortString queueNameShortString = new AMQShortString(queueName); @@ -100,8 +100,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if (q == null) { - q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost, - arguments); + q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, + _virtualHost, arguments); _virtualHost.getQueueRegistry().registerQueue(q); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 2d8b157297..21caabfff9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -297,7 +297,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase public TestQueue(AMQShortString name) throws AMQException { - super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),Collections.EMPTY_MAP); + super(name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP); ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java index b26c71a524..7800a51755 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -134,8 +134,8 @@ public class ExchangeMBeanTest extends TestCase IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); _queueRegistry = _virtualHost.getQueueRegistry(); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost, - null); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, false, + _virtualHost, null); _queueRegistry.registerQueue(_queue); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 4fa47d039e..d60cf5fa2b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -64,7 +64,7 @@ public class TopicExchangeTest extends TestCase public void testNoRoute() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null)); @@ -76,7 +76,7 @@ public class TopicExchangeTest extends TestCase public void testDirectMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("ab"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("ab"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); @@ -103,7 +103,7 @@ public class TopicExchangeTest extends TestCase public void testStarMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*", queue,_exchange, null)); @@ -142,7 +142,7 @@ public class TopicExchangeTest extends TestCase public void testHashMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.#", queue,_exchange, null)); @@ -205,7 +205,7 @@ public class TopicExchangeTest extends TestCase public void testMidHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null)); @@ -235,7 +235,7 @@ public class TopicExchangeTest extends TestCase public void testMatchafterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b.c", queue,_exchange, null)); @@ -281,7 +281,7 @@ public class TopicExchangeTest extends TestCase public void testHashAfterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.*.#.b.c.#.d", queue,_exchange, null)); @@ -308,7 +308,7 @@ public class TopicExchangeTest extends TestCase public void testHashHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.#.*.#.d", queue,_exchange, null)); @@ -334,7 +334,7 @@ public class TopicExchangeTest extends TestCase public void testSubMatchFails() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b.c.d", queue,_exchange, null)); @@ -364,7 +364,7 @@ public class TopicExchangeTest extends TestCase public void testMoreRouting() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); @@ -379,7 +379,7 @@ public class TopicExchangeTest extends TestCase public void testMoreQueue() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null)); diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index 9152e68ee0..e9d53c59f2 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -58,7 +58,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase false, new AMQShortString("test"), true, - _protocolSession.getVirtualHost(), null); + false, _protocolSession.getVirtualHost(), null); AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 5f0d77afea..69a0bfbcf4 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -71,8 +71,8 @@ public class AMQQueueAlertTest extends TestCase _protocolSession.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"), - false, _virtualHost, - null); + false, false, + _virtualHost, null); _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); @@ -99,8 +99,8 @@ public class AMQQueueAlertTest extends TestCase _protocolSession.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"), - false, _virtualHost, - null); + false, false, + _virtualHost, null); _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE); @@ -129,8 +129,8 @@ public class AMQQueueAlertTest extends TestCase _protocolSession.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"), - false, _virtualHost, - null); + false, false, + _virtualHost, null); _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH); @@ -162,8 +162,8 @@ public class AMQQueueAlertTest extends TestCase _protocolSession.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"), - false, _virtualHost, - null); + false, false, + _virtualHost, null); _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE); @@ -367,6 +367,6 @@ public class AMQQueueAlertTest extends TestCase false, new AMQShortString("AMQueueAlertTest"), false, - _virtualHost, null); + false, _virtualHost, null); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 97f061fdd1..3d3d8f93a8 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -56,7 +56,7 @@ public class AMQQueueFactoryTest extends TestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false, - _virtualHost, fieldTable); + false, _virtualHost, fieldTable); assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); } @@ -65,7 +65,7 @@ public class AMQQueueFactoryTest extends TestCase public void testSimpleQueueRegistration() { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, - _virtualHost, null); + false, _virtualHost, null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index e7544661cd..9ca1925c12 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -430,8 +430,8 @@ public class AMQQueueMBeanTest extends TestCase _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); _messageStore = _virtualHost.getMessageStore(); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost, - null); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, false, + _virtualHost, null); _queueMBean = new AMQQueueMBean(_queue); _protocolSession = new InternalTestProtocolSession(_virtualHost); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index d64e533f72..8884ff0a91 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -78,8 +78,8 @@ public class AckTest extends TestCase _protocolSession.addChannel(_channel); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, _virtualHost, - null); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, false, + _virtualHost, null); } protected void tearDown() diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 9346b1eda0..fe45be3f0e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -101,7 +101,7 @@ public class SimpleAMQQueueTest extends TestCase _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); } @Override @@ -115,7 +115,7 @@ public class SimpleAMQQueueTest extends TestCase { _queue.stop(); try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments ); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, false, _virtualHost, _arguments ); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -125,7 +125,7 @@ public class SimpleAMQQueueTest extends TestCase } try { - _queue = new SimpleAMQQueue(_qname, false, _owner, false, null,Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(_qname, false, _owner, false, false,null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -135,7 +135,7 @@ public class SimpleAMQQueueTest extends TestCase } _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, - _virtualHost, _arguments); + false, _virtualHost, _arguments); assertNotNull("Queue was not created", _queue); } @@ -251,7 +251,7 @@ public class SimpleAMQQueueTest extends TestCase public void testAutoDeleteQueue() throws Exception { _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost, Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(_qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); _queue.setDeleteOnNoConsumers(true); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index ba94af5936..824f34cb0e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -40,7 +40,7 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase { SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, new AMQShortString("owner"), - false, test, null); + false, false, test, null); assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java index df466380b9..8704f58e55 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java @@ -73,8 +73,8 @@ public class PrincipalPermissionsTest extends TestCase { _virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); _exchange = DirectExchange.TYPE.newInstance(_virtualHost, _exchangeName, _durable, _ticket, _autoDelete); - _queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, _virtualHost, _arguments); - _temporaryQueue = AMQQueueFactory.createAMQQueueImpl(_tempQueueName, false, _owner , true, _virtualHost, _arguments); + _queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, false, _virtualHost, _arguments); + _temporaryQueue = AMQQueueFactory.createAMQQueueImpl(_tempQueueName, false, _owner , true, false, _virtualHost, _arguments); } catch (Exception e) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 2427e28e70..ef2c3fa304 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -482,8 +482,8 @@ public class MessageStoreTest extends TestCase //Ideally we would be able to use the QueueDeclareHandler here. try { - queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, _virtualHost, - queueArguments); + queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, false, + _virtualHost, queueArguments); validateQueueProperties(queue, usePriority); diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index e3c43779f6..20bad0fb69 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -69,7 +69,7 @@ public class InternalBrokerBaseCase extends TestCase QUEUE_NAME = new AMQShortString("test"); _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"), - false, _virtualHost, null); + false, false, _virtualHost, null); _virtualHost.getQueueRegistry().registerQueue(_queue); diff --git a/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java b/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java index 5469bfad5f..8295502439 100644 --- a/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java +++ b/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java @@ -70,8 +70,8 @@ public class ManagementConsoleTest extends TestCase { // If this test fails due to changes in the broker code, // then the constants in the Constants.java shoule be updated accordingly - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost, - null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, false, + _virtualHost, null); AMQManagedObject mbean = new AMQQueueMBean(queue); MBeanInfo mbeanInfo = mbean.getMBeanInfo(); |