diff options
| author | Robert Gemmell <robbie@apache.org> | 2009-12-29 17:05:54 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2009-12-29 17:05:54 +0000 |
| commit | d2802522c0797e17329c6677ac5fda909eb522a5 (patch) | |
| tree | b01bbcd5bca440b4bea776f92997d303d46315c0 /qpid/java | |
| parent | 9b65d9381a986b77626eb0c4a997e195d10428fe (diff) | |
| download | qpid-python-d2802522c0797e17329c6677ac5fda909eb522a5.tar.gz | |
QPID-2096: decouple the addition of durable exchanges to the store from exchange registration
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@894441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 36 insertions, 15 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 5cfa8066e5..5f5fc4cbea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -202,6 +202,10 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, false, 0); _exchangeRegistry.registerExchange(exchange); + if (durable) + { + _messageStore.createExchange(exchange); + } } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 0ab8208d88..6130deb23e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -54,7 +54,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void initialise() throws AMQException { - new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this); + new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getMessageStore()); } public MessageStore getMessageStore() @@ -65,10 +65,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) throws AMQException { _exchangeMap.put(exchange.getName(), exchange); - if (exchange.isDurable()) - { - getMessageStore().createExchange(exchange); - } } public void setDefaultExchange(Exchange exchange) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index ba60808492..338c761ff2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -92,11 +92,16 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange try { - exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(), - body.getType() == null ? null : body.getType().intern(), - body.getDurable(), - body.getPassive(), body.getTicket()); - exchangeRegistry.registerExchange(exchange); + exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(), + body.getType() == null ? null : body.getType().intern(), + body.getDurable(), + body.getPassive(), body.getTicket()); + exchangeRegistry.registerExchange(exchange); + + if (exchange.isDurable()) + { + virtualHost.getMessageStore().createExchange(exchange); + } } catch(AMQUnknownExchangeType e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index 2abcecb6de..4b8343e3e0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -27,25 +27,32 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeType; +import org.apache.qpid.server.store.MessageStore; public class ExchangeInitialiser { - public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{ + public void initialise(ExchangeFactory factory, ExchangeRegistry registry, MessageStore store) throws AMQException{ for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes()) { - define (registry, factory, type.getDefaultExchangeName(), type.getName()); + define (registry, factory, type.getDefaultExchangeName(), type.getName(), store); } - define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, store); registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME)); } private void define(ExchangeRegistry r, ExchangeFactory f, - AMQShortString name, AMQShortString type) throws AMQException + AMQShortString name, AMQShortString type, MessageStore store) throws AMQException { if(r.getExchange(name)== null) { - r.registerExchange(f.createExchange(name, type, true, false, 0)); + Exchange exchange = f.createExchange(name, type, true, false, 0); + r.registerExchange(exchange); + + if(exchange.isDurable()) + { + store.createExchange(exchange); + } } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 7b569d9393..925580a97e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -332,6 +332,11 @@ public class VirtualHost implements Accessable Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); _exchangeRegistry.registerExchange(newExchange); + + if (newExchange.isDurable()) + { + _messageStore.createExchange(newExchange); + } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 5802655cfc..7d337eed30 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -457,6 +457,10 @@ public class MessageStoreTest extends TestCase try { _virtualHost.getExchangeRegistry().registerExchange(exchange); + if (durable) + { + _virtualHost.getMessageStore().createExchange(exchange); + } } catch (AMQException e) { |
