diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-21 11:32:22 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-21 11:32:22 +0000 |
| commit | 41a17b91a43dfefa484ff0052c21101dd6dac3ec (patch) | |
| tree | e3780e35f71ecee79407dcc043e8475ffc89c8b0 /java | |
| parent | 6f3d4e569a5bb1b3a245d0dd8d68027b1ecc3840 (diff) | |
| download | qpid-python-41a17b91a43dfefa484ff0052c21101dd6dac3ec.tar.gz | |
QPID-5087 : [Java Broker] Allow use of separate message stores and configuration stores
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1516140 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
9 files changed, 114 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 7263387416..612fa855a4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -136,4 +136,10 @@ public class DefaultExchangeFactory implements ExchangeFactory Exchange e = exchType.newInstance(id, _host, exchange, durable, autoDelete); return e; } + + @Override + public Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete) throws AMQException + { + return createExchange(id, exchange, type, true, autoDelete); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index 3ccfa51499..f364691666 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -38,5 +38,6 @@ public interface ExchangeFactory Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; + Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 86c436389a..029c7e4f86 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -28,7 +28,6 @@ import java.util.UUID; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; @@ -177,19 +176,45 @@ public class AMQQueueFactory implements QueueFactory } }; + @Override + public AMQQueue restoreQueue(UUID id, + String queueName, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException + { + return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false); + + } + /** * @param id the id to use. * @param deleteOnNoConsumer */ @Override - public AMQQueue createAMQQueueImpl(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws AMQSecurityException, AMQException + public AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException + { + return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true); + } + + private AMQQueue createOrRestoreQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments, + boolean createInStore) throws AMQSecurityException, AMQException { if (id == null) { @@ -358,7 +383,7 @@ public class AMQQueueFactory implements QueueFactory q.setAlternateExchange(altExchange); } - if (q.isDurable() && !q.isAutoDelete()) + if (createInStore && q.isDurable() && !q.isAutoDelete()) { DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q); } @@ -379,7 +404,7 @@ public class AMQQueueFactory implements QueueFactory // we need queues that are defined in config to have deterministic ids. UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName()); - AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, false, arguments); + AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments); q.configure(config); return q; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java index 5411a2bc9c..3e4e1df5a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -27,12 +27,21 @@ import org.apache.qpid.AMQSecurityException; public interface QueueFactory { - AMQQueue createAMQQueueImpl(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws AMQSecurityException, AMQException; + AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException; + + AMQQueue restoreQueue(UUID id, + String queueName, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException; + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 02144c6ae1..0cd4f0b6b2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -518,8 +518,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName)); } - return _queueFactory.createAMQQueueImpl(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, - arguments); + return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, + arguments); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index 702f6e1bdf..6ad7014c47 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -77,7 +77,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< } if (_exchange == null) { - _exchange = _exchangeFactory.createExchange(id, exchangeName, exchangeType, true, autoDelete); + _exchange = _exchangeFactory.restoreExchange(id, exchangeName, exchangeType, autoDelete); _exchangeRegistry.registerExchange(_exchange); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index b4fbdf7544..02d628da68 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -31,7 +31,6 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.UnresolvedDependency; @@ -122,8 +121,8 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ if (_queue == null) { - _queue = _queueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, - false, queueArgumentsMap); + _queue = _queueFactory.restoreQueue(_id, queueName, owner, false, exclusive, + false, queueArgumentsMap); } } catch (AMQException e) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index ecc20aa48a..9a2c5bc166 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -116,7 +116,7 @@ public class AMQQueueFactoryTest extends QpidTestCase @Override public AMQQueue answer(InvocationOnMock invocation) throws Throwable { - return _queueFactory.createAMQQueueImpl(id.getValue(), + return _queueFactory.createQueue(id.getValue(), queueName.getValue(), durable.getValue(), owner.getValue(), @@ -222,8 +222,12 @@ public class AMQQueueFactoryTest extends QpidTestCase Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5); - AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false, - false, + AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), + "testPriorityQueue", + false, + "owner", + false, + false, false, attributes); @@ -238,8 +242,8 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = getName(); String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, - false, + AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, + false, false, null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); @@ -269,7 +273,12 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), + queueName, + false, + "owner", + false, + false, false, attributes); @@ -309,7 +318,12 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), + queueName, + false, + "owner", + false, + false, false, null); @@ -348,7 +362,12 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), + queueName, + false, + "owner", + false, + false, false, attributes); @@ -379,7 +398,12 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); //create an autodelete queue - AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, + AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), + queueName, + false, + "owner", + true, + false, false, attributes); assertTrue("Queue should be autodelete", queue.isAutoDelete()); @@ -401,7 +425,12 @@ public class AMQQueueFactoryTest extends QpidTestCase { Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), + "testMaximumDeliveryCount", + false, + "owner", + false, + false, false, attributes); @@ -417,7 +446,12 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testMaximumDeliveryCountDefault() throws Exception { - final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), + "testMaximumDeliveryCount", + false, + "owner", + false, + false, false, null); @@ -434,7 +468,7 @@ public class AMQQueueFactoryTest extends QpidTestCase { try { - _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, + _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, false, null); fail("queue with null name can not be created!"); @@ -459,7 +493,7 @@ public class AMQQueueFactoryTest extends QpidTestCase setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, false, attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); } @@ -484,7 +518,7 @@ public class AMQQueueFactoryTest extends QpidTestCase setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, false, attributes); fail("queue with DLE name having more than 255 characters can not be created!"); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 66a71c562f..d793895e7f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -28,10 +28,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; @@ -39,17 +36,12 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueFactory; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.security.*; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -131,7 +123,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _queueFactory = mock(QueueFactory.class); - when(_queueFactory.createAMQQueueImpl(idArg.capture(), queueArg.capture(), + when(_queueFactory.createQueue(idArg.capture(), queueArg.capture(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then( new Answer() { |
