summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-03-13 12:22:23 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-03-13 12:22:23 +0000
commit2e1a72985bc53a23219e4e9619fc572ff96c8a4a (patch)
treeece05b9bc6ffb3962c86000033fd083268aebc53 /java/broker
parentd32dc27b89a3c98d5d4eeef37aabee42051f4b7a (diff)
downloadqpid-python-2e1a72985bc53a23219e4e9619fc572ff96c8a4a.tar.gz
QPID-1730 : Update the order in which we initialise. We now load the config file from disk then recover from the persistent strore. This approach applies the vhost configuration and then applies the persistent state from the store to those objects rather than recreating them. The new inner classes on VirtualHost are to be removed once we have fully extracted the RoutingTable from the legacy MessageStores as this is the root of the problem. The Store needs to be open to create new durable objects but the current stores must recover their state before new entries are added. So now the persistent state is being loaded on to a broker in a consistent state after it has configured a) its default exchanges and b) loaded the queue config from the config file. Eventually we will only have one location for queue config and all will be right in the world.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@753219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java203
1 files changed, 162 insertions, 41 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 5d2a31b80d..8a8cbd23cf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,17 +20,11 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Collections;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.management.NotCompliantMBeanException;
-
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -47,10 +41,9 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.QueueBackingStore;
import org.apache.qpid.server.queue.FileQueueBackingStoreFactory;
import org.apache.qpid.server.queue.QueueBackingStoreFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.security.access.ACLManager;
@@ -59,11 +52,17 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import javax.management.NotCompliantMBeanException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
public class VirtualHost implements Accessable
{
private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
private final String _name;
private ConnectionRegistry _connectionRegistry;
@@ -87,7 +86,7 @@ public class VirtualHost implements Accessable
private ACLManager _accessManager;
private final Timer _houseKeepingTimer;
-
+
private VirtualHostConfiguration _configuration;
private QueueBackingStoreFactory _queueBackingStoreFactory;
@@ -114,7 +113,7 @@ public class VirtualHost implements Accessable
public VirtualHostConfiguration getConfiguration()
{
- return _configuration ;
+ return _configuration;
}
public QueueBackingStoreFactory getQueueBackingStoreFactory()
@@ -148,12 +147,13 @@ public class VirtualHost implements Accessable
return VirtualHost.this;
}
-
} // End of MBean class
/**
* Normal Constructor
+ *
* @param hostConfig
+ *
* @throws Exception
*/
public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception
@@ -165,7 +165,7 @@ public class VirtualHost implements Accessable
{
_configuration = hostConfig;
_name = hostConfig.getName();
-
+
if (_name == null || _name.length() == 0)
{
throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
@@ -175,12 +175,30 @@ public class VirtualHost implements Accessable
_connectionRegistry = new ConnectionRegistry(this);
- _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true);
+ _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
_queueRegistry = new DefaultQueueRegistry(this);
+
_exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeFactory.initialise(hostConfig);
+
_exchangeRegistry = new DefaultExchangeRegistry(this);
+ _queueBackingStoreFactory = new FileQueueBackingStoreFactory();
+ _queueBackingStoreFactory.configure(this, hostConfig);
+
+ //Create a temporary RT to store the durable entries from the config file
+ // so we can replay them in to the real _RT after it has been loaded.
+ /// This should be removed after the _RT has been fully split from the the TL
+
+ StartupRoutingTable configFileRT = new StartupRoutingTable();
+
+ _routingTable = configFileRT;
+
+ // This needs to be after the RT has been defined as it creates the default durable exchanges.
+ _exchangeRegistry.initialise();
+ initialiseModel(hostConfig);
+
if (transactionLog != null)
{
_transactionLog = transactionLog;
@@ -195,19 +213,28 @@ public class VirtualHost implements Accessable
initialiseRoutingTable(hostConfig);
}
- _queueBackingStoreFactory = new FileQueueBackingStoreFactory();
- _queueBackingStoreFactory.configure(this, hostConfig);
+ //Now that the RT has been initialised loop through the persistent queues/exchanges created from the config
+ // file and write them in to the new routing Table.
+ for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
+ {
+ _routingTable.createQueue(cqt.queue, cqt.arguments);
+ }
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry.initialise();
+ for (Exchange exchange : configFileRT.exchange)
+ {
+ _routingTable.createExchange(exchange);
+ }
+
+ for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings)
+ {
+ _routingTable.bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
+ }
- initialiseModel(hostConfig);
-
_authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
_accessManager = ApplicationRegistry.getInstance().getAccessManager();
_accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration());
-
+
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
@@ -216,13 +243,13 @@ public class VirtualHost implements Accessable
private void initialiseHouseKeeping(long period)
{
/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
- if(period != 0L)
+ if (period != 0L)
{
class RemoveExpiredMessagesTask extends TimerTask
{
public void run()
{
- for(AMQQueue q : _queueRegistry.getQueues())
+ for (AMQQueue q : _queueRegistry.getQueues())
{
try
@@ -231,7 +258,7 @@ public class VirtualHost implements Accessable
}
catch (AMQException e)
{
- _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
throw new RuntimeException(e);
}
}
@@ -239,8 +266,8 @@ public class VirtualHost implements Accessable
}
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
- period/2,
- period);
+ period / 2,
+ period);
}
}
@@ -259,10 +286,10 @@ public class VirtualHost implements Accessable
}
_transactionLog = (TransactionLog) o;
- //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
+ //Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable.
if (_transactionLog instanceof RoutingTable)
{
- _routingTable = (RoutingTable)_transactionLog;
+ _routingTable = (RoutingTable) _transactionLog;
}
_transactionLog.configure(this, "store", config);
@@ -294,14 +321,14 @@ public class VirtualHost implements Accessable
}
}
}
-
+
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
{
- _logger.debug("Loading configuration for virtualhost: "+config.getName());
+ _logger.debug("Loading configuration for virtualhost: " + config.getName());
List exchangeNames = config.getExchanges();
- for(Object exchangeNameObj : exchangeNames)
+ for (Object exchangeNameObj : exchangeNames)
{
String exchangeName = String.valueOf(exchangeNameObj);
configureExchange(config.getExchangeConfiguration(exchangeName));
@@ -309,7 +336,7 @@ public class VirtualHost implements Accessable
String[] queueNames = config.getQueueNames();
- for(Object queueNameObj : queueNames)
+ for (Object queueNameObj : queueNames)
{
String queueName = String.valueOf(queueNameObj);
configureQueue(config.getQueueConfiguration(queueName));
@@ -322,14 +349,14 @@ public class VirtualHost implements Accessable
Exchange exchange;
exchange = _exchangeRegistry.getExchange(exchangeName);
- if(exchange == null)
+ if (exchange == null)
{
AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
boolean durable = exchangeConfiguration.getDurable();
boolean autodelete = exchangeConfiguration.getAutoDelete();
- Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
+ Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
_exchangeRegistry.registerExchange(newExchange);
}
}
@@ -347,7 +374,7 @@ public class VirtualHost implements Accessable
Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
- if(exchange == null)
+ if (exchange == null)
{
exchange = _exchangeRegistry.getDefaultExchange();
}
@@ -358,19 +385,22 @@ public class VirtualHost implements Accessable
}
List routingKeys = queueConfiguration.getRoutingKeys();
- if(routingKeys == null || routingKeys.isEmpty())
+ if (routingKeys == null || routingKeys.isEmpty())
{
routingKeys = Collections.singletonList(queue.getName());
}
- for(Object routingKeyNameObj : routingKeys)
+ for (Object routingKeyNameObj : routingKeys)
{
AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
+ }
queue.bind(exchange, routingKey, null);
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
}
- if(exchange != _exchangeRegistry.getDefaultExchange())
+ if (exchange != _exchangeRegistry.getDefaultExchange())
{
queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null);
}
@@ -414,7 +444,7 @@ public class VirtualHost implements Accessable
public ACLManager getAccessManager()
{
return _accessManager;
- }
+ }
public void close() throws Exception
{
@@ -453,4 +483,95 @@ public class VirtualHost implements Accessable
{
return _virtualHostMBean;
}
+
+ /**
+ * Temporary Startup RT class to record the creation of persistent queues / exchanges.
+ *
+ *
+ * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
+ * This should be removed after the _RT has been fully split from the the TL
+ */
+ private class StartupRoutingTable implements RoutingTable
+ {
+ public List<Exchange> exchange = new LinkedList<Exchange>();
+ public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
+ public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
+
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ if (exchange.isDurable())
+ {
+ this.exchange.add(exchange);
+ }
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ if (exchange.isDurable() && queue.isDurable())
+ {
+ bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args));
+ }
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ createQueue(queue, null);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
+ if (queue.isDurable())
+ {
+ this.queue.add(new CreateQueueTuple(queue, arguments));
+ }
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ private class CreateQueueTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+
+ public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
+ {
+ this.queue = queue;
+ this.arguments = arguments;
+ }
+ }
+
+ private class CreateBindingTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+ public Exchange exchange;
+ public AMQShortString routingKey;
+
+ public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ {
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.queue = queue;
+ arguments = args;
+ }
+ }
+ }
}