summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-13 17:28:36 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-13 17:28:36 +0000
commited9320bc237cb6fae793efa9dbf042f384fd1c75 (patch)
treeec4131161f71c067224a0713b2f141b1b9ac7f72
parent1828f1ef33c60ca1d23c35e7706670ce43fbca84 (diff)
downloadqpid-python-ed9320bc237cb6fae793efa9dbf042f384fd1c75.tar.gz
753253 ritchiem QPID-1730 : Logging update highlighted that we were printing the queue before we had fully initialised it.
753220 ritchiem QPID-1730 : Improve the logging so we can see what is going one during persistent recovery as we do not have a working persistent module. 753219 ritchiem QPID-1730 : Update the order in which we initialise. We now load the config file from disk then recover from the persistent strore. This approach applies the vhost configuration and then applies the persistent state from the store to those objects rather than recreating them. The new inner classes on VirtualHost are to be removed once we have fully extracted the RoutingTable from the legacy MessageStores as this is the root of the problem. The Store needs to be open to create new durable objects but the current stores must recover their state before new entries are added. So now the persistent state is being loaded on to a broker in a consistent state after it has configured a) its default exchanges and b) loaded the queue config from the config file. Eventually we will only have one location for queue config and all will be right in the world. merged the above three changes from trunk that allow the broker to create it's model before the MessageStore is initialised. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-fix@764539 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java344
4 files changed, 273 insertions, 80 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index b6c741bbec..30af09ce4b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -191,7 +191,7 @@ public abstract class AbstractExchange implements Exchange, Managable
public String toString()
{
- return getClass().getName() + "[" + getName() +"]";
+ return getClass().getSimpleName() + "[" + getName() +"]";
}
public ManagedObject getManagedObject()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index e39c005750..e9af92bad8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -168,16 +168,14 @@ public class DirectExchange extends AbstractExchange
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Queue (" + queue.getName() + ")" + queue + " is already registered with routing key " + routingKey);
+ _logger.debug("Queue (" + queue + ") is already registered with routing key " + routingKey);
}
}
else
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Binding queue(" + queue.getName() + ") " + queue + " with routing key " + routingKey
- + (args == null ? "" : " and arguments " + args.toString())
- + " to exchange " + this);
+ _logger.debug("Binding queue:" + queue + " with routing key '" + routingKey +"' to exchange:" + this);
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 34a70c6969..bb6ce65d42 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -67,5 +67,4 @@ public class AMQPriorityQueue extends SimpleAMQQueue
}
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 535872df44..0ae0594de8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,19 +20,14 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Collections;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.management.NotCompliantMBeanException;
-
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -50,6 +45,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
@@ -57,12 +53,19 @@ import org.apache.qpid.server.security.access.plugins.SimpleXML;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import javax.management.NotCompliantMBeanException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
public class VirtualHost implements Accessable
{
private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
private final String _name;
private ConnectionRegistry _connectionRegistry;
@@ -84,7 +87,6 @@ public class VirtualHost implements Accessable
private ACLManager _accessManager;
private final Timer _houseKeepingTimer;
-
private VirtualHostConfiguration _configuration;
public void setAccessableName(String name)
@@ -134,15 +136,26 @@ public class VirtualHost implements Accessable
return VirtualHost.this;
}
-
} // End of MBean class
+ /**
+ * Normal Constructor
+ *
+ * @param hostConfig
+ *
+ * @throws Exception
+ */
+ public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ this(hostConfig, null);
+ }
public VirtualHost(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
{
_configuration = hostConfig;
- _name = hostConfig.getName();
- if (_name == null || _name.length() == 0)
+ _name = hostConfig.getName();
+
+ if (_name == null || _name.length() == 0)
{
throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
}
@@ -151,8 +164,27 @@ public class VirtualHost implements Accessable
_connectionRegistry = new ConnectionRegistry(this);
- _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true);
-
+ _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeFactory.initialise(hostConfig);
+
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ //Create a temporary RT to store the durable entries from the config file
+ // so we can replay them in to the real _RT after it has been loaded.
+ /// This should be removed after the _RT has been fully split from the the TL
+
+ StartupRoutingTable configFileRT = new StartupRoutingTable();
+
+ _messageStore = configFileRT;
+
+ // This needs to be after the RT has been defined as it creates the default durable exchanges.
+ _exchangeRegistry.initialise();
+ initialiseModel(hostConfig);
+
if (store != null)
{
_messageStore = store;
@@ -165,40 +197,44 @@ public class VirtualHost implements Accessable
}
initialiseMessageStore(hostConfig);
}
-
- _queueRegistry = new DefaultQueueRegistry(this);
-
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
- _exchangeRegistry.initialise();
-
- initialiseModel(hostConfig);
+
+ //Now that the RT has been initialised loop through the persistent queues/exchanges created from the config
+ // file and write them in to the new routing Table.
+ for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
+ {
+ _messageStore.createQueue(cqt.queue, cqt.arguments);
+ }
+
+ for (Exchange exchange : configFileRT.exchange)
+ {
+ _messageStore.createExchange(exchange);
+ }
+
+ for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings)
+ {
+ _messageStore.bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
+ }
+
_authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
_accessManager = ApplicationRegistry.getInstance().getAccessManager();
_accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration());
-
+
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
}
- public VirtualHost(VirtualHostConfiguration virtualHostConfig) throws Exception
- {
- this(virtualHostConfig, null);
- }
-
private void initialiseHouseKeeping(long period)
{
/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
- if(period != 0L)
+ if (period != 0L)
{
class RemoveExpiredMessagesTask extends TimerTask
{
public void run()
{
- for(AMQQueue q : _queueRegistry.getQueues())
+ for (AMQQueue q : _queueRegistry.getQueues())
{
try
@@ -207,7 +243,7 @@ public class VirtualHost implements Accessable
}
catch (AMQException e)
{
- _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
throw new RuntimeException(e);
}
}
@@ -215,8 +251,8 @@ public class VirtualHost implements Accessable
}
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
- period/2,
- period);
+ period / 2,
+ period);
}
}
@@ -232,47 +268,48 @@ public class VirtualHost implements Accessable
throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
" does not.");
}
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", hostConfig);
+ MessageStore messageStore = (MessageStore) o;
+ messageStore.configure(this, "store", hostConfig);
+ _messageStore = messageStore;
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
{
- _logger.debug("Loading configuration for virtualhost: "+config.getName());
+ _logger.debug("Loading configuration for virtualhost: " + config.getName());
List exchangeNames = config.getExchanges();
- for(Object exchangeNameObj : exchangeNames)
- {
- String exchangeName = String.valueOf(exchangeNameObj);
- configureExchange(config.getExchangeConfiguration(exchangeName));
- }
+ for (Object exchangeNameObj : exchangeNames)
+ {
+ String exchangeName = String.valueOf(exchangeNameObj);
+ configureExchange(config.getExchangeConfiguration(exchangeName));
+ }
String[] queueNames = config.getQueueNames();
- for(Object queueNameObj : queueNames)
- {
- String queueName = String.valueOf(queueNameObj);
- configureQueue(config.getQueueConfiguration(queueName));
- }
+ for (Object queueNameObj : queueNames)
+ {
+ String queueName = String.valueOf(queueNameObj);
+ configureQueue(config.getQueueConfiguration(queueName));
+ }
}
private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
{
AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
- Exchange exchange;
- exchange = _exchangeRegistry.getExchange(exchangeName);
- if(exchange == null)
- {
+ Exchange exchange;
+ exchange = _exchangeRegistry.getExchange(exchangeName);
+ if (exchange == null)
+ {
AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
boolean durable = exchangeConfiguration.getDurable();
boolean autodelete = exchangeConfiguration.getAutoDelete();
- Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
- _exchangeRegistry.registerExchange(newExchange);
- }
+ Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
+ _exchangeRegistry.registerExchange(newExchange);
+ }
}
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
@@ -288,33 +325,36 @@ public class VirtualHost implements Accessable
Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
- if(exchange == null)
- {
- exchange = _exchangeRegistry.getDefaultExchange();
- }
+ if (exchange == null)
+ {
+ exchange = _exchangeRegistry.getDefaultExchange();
+ }
if (exchange == null)
{
throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
}
- List routingKeys = queueConfiguration.getRoutingKeys();
- if(routingKeys == null || routingKeys.isEmpty())
- {
- routingKeys = Collections.singletonList(queue.getName());
- }
+ List routingKeys = queueConfiguration.getRoutingKeys();
+ if (routingKeys == null || routingKeys.isEmpty())
+ {
+ routingKeys = Collections.singletonList(queue.getName());
+ }
- for(Object routingKeyNameObj : routingKeys)
- {
- AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
- queue.bind(exchange, routingKey, null);
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
- }
+ for (Object routingKeyNameObj : routingKeys)
+ {
+ AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
+ }
+ queue.bind(exchange, routingKey, null);
+ }
- if(exchange != _exchangeRegistry.getDefaultExchange())
- {
- queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null);
- }
+ if (exchange != _exchangeRegistry.getDefaultExchange())
+ {
+ queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null);
+ }
}
public String getName()
@@ -355,7 +395,7 @@ public class VirtualHost implements Accessable
public ACLManager getAccessManager()
{
return _accessManager;
- }
+ }
public void close() throws Exception
{
@@ -394,4 +434,160 @@ public class VirtualHost implements Accessable
{
return _virtualHostMBean;
}
+
+ /**
+ * Temporary Startup RT class to record the creation of persistent queues / exchanges.
+ *
+ *
+ * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
+ * This should be removed after the _RT has been fully split from the the TL
+ */
+ private class StartupRoutingTable implements MessageStore
+ {
+ public List<Exchange> exchange = new LinkedList<Exchange>();
+ public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
+ public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
+
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ if (exchange.isDurable())
+ {
+ this.exchange.add(exchange);
+ }
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ if (exchange.isDurable() && queue.isDurable())
+ {
+ bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args));
+ }
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ createQueue(queue, null);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
+ if (queue.isDurable())
+ {
+ this.queue.add(new CreateQueueTuple(queue, arguments));
+ }
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Long getNewMessageId()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ private class CreateQueueTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+
+ public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
+ {
+ this.queue = queue;
+ this.arguments = arguments;
+ }
+ }
+
+ private class CreateBindingTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+ public Exchange exchange;
+ public AMQShortString routingKey;
+
+ public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ {
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.queue = queue;
+ arguments = args;
+ }
+ }
+ }
}