diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-09-25 10:59:23 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-09-25 10:59:23 +0000 |
commit | 224feaeb6610f4c7240cc571f147a942bc64706d (patch) | |
tree | e820223c6e5f146f6e22f5de602dd8aaa7398e61 | |
parent | 4773115f9b37cc7aea6278965e49088da72eb279 (diff) | |
download | qpid-python-224feaeb6610f4c7240cc571f147a942bc64706d.tar.gz |
AMQPFastProtocolHandler.java - Added (Integer appRegId) constructor to allow the AMQPFastProtocolHandler to request the ApplicationRegistry instance.
ApplicationRegistry.java - Modified to allow multiple ARs to exist each indexed by an integer id. Default AR of 0 always exists.
NullApplicationRegistry.java - Copied from Test cases but modified to have a simple MemoryMessageStore.
MemoryMessageStore.java - Added a configure() that uses the class defaults.
NullAuthenticationManager.java - Moved from Test
Renamed NullApplicationRegistry.java to TestApplicationRegistry.java for the Test cases.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@449643 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 229 insertions, 25 deletions
diff --git a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index a51dbd5d59..4478a0cf32 100644 --- a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -23,6 +23,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ConnectorConfiguration; import org.apache.qpid.ssl.BogusSSLContextFactory; import org.apache.log4j.Logger; @@ -45,7 +46,7 @@ import java.io.IOException; * the state for the connection. * */ -public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList +public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList { private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); @@ -63,6 +64,15 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoc private boolean _useSSL; + public AMQPFastProtocolHandler(Integer applicationRegistryInstance) + { + IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance); + + _queueRegistry = registry.getQueueRegistry(); + _exchangeRegistry = registry.getExchangeRegistry(); + _logger.debug("AMQPFastProtocolHandler created"); + } + public AMQPFastProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry) { diff --git a/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java index 4d1b0dbcfe..c70ecf3f91 100644 --- a/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -22,33 +22,58 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. - * + * <p/> * Subclasses should handle the construction of the "registered objects" such as the exchange registry. - * */ public abstract class ApplicationRegistry implements IApplicationRegistry { private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); - private static IApplicationRegistry _instance; + private static Map _instanceMap = new HashMap(); private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>(); protected final Configuration _configuration; + public static final int DEFAULT_INSTANCE = 0; + public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry"; + + static + { + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); + } + private static class ShutdownService implements Runnable { public void run() { - _logger.info("Shutting down application registry..."); + _logger.info("Shutting down application registries..."); try { - _instance.getMessageStore().close(); + synchronized (ApplicationRegistry.class) + { + Iterator keyIterator = _instanceMap.keySet().iterator(); + + while (keyIterator.hasNext()) + { + int key = (Integer) keyIterator.next(); + IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(key); + + if ((instance != null)) + { + if (instance.getMessageStore() != null) + { + instance.getMessageStore().close(); + } + } + } + } } catch (Exception e) { @@ -59,11 +84,49 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static void initialise(IApplicationRegistry instance) throws Exception { - _instance = instance; - instance.initialise(); - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); + initialise(instance, DEFAULT_INSTANCE); } + public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception + { + if (instance != null) + { + _logger.info("Initialising Application Registry:" + instanceID); + _instanceMap.put(instanceID, instance); + + try + { + instance.initialise(); + } + catch (Exception e) + { + _instanceMap.remove(instanceID); + throw e; + } + } + else + { + remove(instanceID); + } + } + + public static void remove(int instanceID) + { + try + { + ((IApplicationRegistry) _instanceMap.get(instanceID)).getMessageStore().close(); + } + catch (Exception e) + { + + } + finally + { + _instanceMap.remove(instanceID); + } + } + + protected ApplicationRegistry(Configuration configuration) { _configuration = configuration; @@ -71,13 +134,33 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static IApplicationRegistry getInstance() { - if (_instance == null) + return getInstance(DEFAULT_INSTANCE); + } + + public static IApplicationRegistry getInstance(int instanceID) + { + IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(instanceID); + + if (instance == null) { - throw new RuntimeException("Application registry not initialised"); + try + { + _logger.info("Creating DEFAULT_APPLICATION_REGISTRY: " + DEFAULT_APPLICATION_REGISTRY + " : Instance:" + instanceID); + IApplicationRegistry registry = (IApplicationRegistry) Class.forName(DEFAULT_APPLICATION_REGISTRY).getConstructor((Class[]) null).newInstance((Object[]) null); + ApplicationRegistry.initialise(registry, instanceID); + _logger.info("Initialised Application Registry:" + instanceID); + return registry; + } + catch (Exception e) + { + _logger.error("Error configuring application: " + e, e); + //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); + throw new RuntimeException("Unable to create Application Registry"); + } } else { - return _instance; + return instance; } } diff --git a/java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java b/java/broker/src/org/apache/qpid/server/security/auth/NullAuthenticationManager.java index cdde505451..95a53951ad 100644 --- a/java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java +++ b/java/broker/src/org/apache/qpid/server/security/auth/NullAuthenticationManager.java @@ -15,7 +15,7 @@ * limitations under the License. * */ -package org.apache.qpid.server.util; +package org.apache.qpid.server.security.auth; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java index d15a035259..baa414ff19 100644 --- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java @@ -45,6 +45,12 @@ public class MemoryMessageStore implements MessageStore private final AtomicLong _messageId = new AtomicLong(1); + public void configure() + { + _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table"); + _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); + } + public void configure(String base, Configuration config) { int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); diff --git a/java/broker/src/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/org/apache/qpid/server/util/NullApplicationRegistry.java new file mode 100644 index 0000000000..0fab1f6895 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -0,0 +1,106 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.util; + +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.management.NoopManagedObjectRegistry; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.AuthenticationManager; +import org.apache.qpid.server.security.auth.NullAuthenticationManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; + +import java.util.HashMap; + +public class NullApplicationRegistry extends ApplicationRegistry +{ + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private ManagedObjectRegistry _managedObjectRegistry; + + private AuthenticationManager _authenticationManager; + + private MessageStore _messageStore; + + + public NullApplicationRegistry() + { + super(new MapConfiguration(new HashMap())); + } + + public void initialise() throws Exception + { + _managedObjectRegistry = new NoopManagedObjectRegistry(); + _queueRegistry = new DefaultQueueRegistry(); + _exchangeFactory = new DefaultExchangeFactory(); + _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + _authenticationManager = new NullAuthenticationManager(); + _messageStore = new MemoryMessageStore(); + ((MemoryMessageStore)_messageStore).configure(); + + _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes + } + + public Configuration getConfiguration() + { + return _configuration; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public ManagedObjectRegistry getManagedObjectRegistry() + { + return _managedObjectRegistry; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public MessageStore getMessageStore() + { + return _messageStore; + } +} + diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 86414ffae2..1c594d4374 100644 --- a/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -21,7 +21,7 @@ import org.junit.Test; import org.junit.Before; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.NullApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; import junit.framework.JUnit4TestAdapter; public class HeadersExchangeTest extends AbstractHeadersExchangeTest @@ -29,7 +29,7 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTest @Before public void init() throws Exception { - ApplicationRegistry.initialise(new NullApplicationRegistry()); + ApplicationRegistry.initialise(new TestApplicationRegistry()); } @Test diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java index 904665949c..8ce006b3ae 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.NullApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.log4j.Logger; import java.util.Iterator; @@ -55,7 +55,7 @@ public class AckTest public AckTest() throws Exception { - ApplicationRegistry.initialise(new NullApplicationRegistry()); + ApplicationRegistry.initialise(new TestApplicationRegistry()); } @Before diff --git a/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java index d00cd55fa1..f84d426877 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -21,8 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.junit.Test; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.AMQException; import junit.framework.JUnit4TestAdapter; diff --git a/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java b/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java index f9baa77b65..6e8e78f74b 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java @@ -22,7 +22,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.NullApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.AMQException; class MessageTestHelper @@ -31,7 +31,7 @@ class MessageTestHelper MessageTestHelper() throws Exception { - ApplicationRegistry.initialise(new NullApplicationRegistry()); + ApplicationRegistry.initialise(new TestApplicationRegistry()); } AMQMessage message() throws AMQException diff --git a/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java b/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java index eff65a9350..6d1b89176f 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java @@ -35,7 +35,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.NullApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.util.TimedRun; import java.util.ArrayList; @@ -77,7 +77,7 @@ public class SendPerfTest extends TimedRun public static void main(String[] argv) throws Exception { - ApplicationRegistry.initialise(new NullApplicationRegistry()); + ApplicationRegistry.initialise(new TestApplicationRegistry()); int clients = Integer.parseInt(argv[0]); int messages = Integer.parseInt(argv[1]); int iterations = Integer.parseInt(argv[2]); diff --git a/java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/test/src/org/apache/qpid/server/util/TestApplicationRegistry.java index 3daf143561..69939b88e4 100644 --- a/java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/test/src/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.AuthenticationManager; +import org.apache.qpid.server.security.auth.NullAuthenticationManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.commons.configuration.Configuration; @@ -34,7 +35,7 @@ import org.apache.commons.configuration.MapConfiguration; import java.util.HashMap; -public class NullApplicationRegistry extends ApplicationRegistry +public class TestApplicationRegistry extends ApplicationRegistry { private QueueRegistry _queueRegistry; @@ -48,7 +49,7 @@ public class NullApplicationRegistry extends ApplicationRegistry private MessageStore _messageStore; - public NullApplicationRegistry() + public TestApplicationRegistry() { super(new MapConfiguration(new HashMap())); } |