summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java12
-rw-r--r--java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java105
-rw-r--r--java/broker/src/org/apache/qpid/server/security/auth/NullAuthenticationManager.java (renamed from java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java)2
-rw-r--r--java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java6
-rw-r--r--java/broker/src/org/apache/qpid/server/util/NullApplicationRegistry.java106
-rw-r--r--java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java4
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/AckTest.java4
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java2
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java4
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java4
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/TestApplicationRegistry.java (renamed from java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java)5
11 files changed, 229 insertions, 25 deletions
diff --git a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index a51dbd5d59..4478a0cf32 100644
--- a/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -23,6 +23,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ConnectorConfiguration;
import org.apache.qpid.ssl.BogusSSLContextFactory;
import org.apache.log4j.Logger;
@@ -45,7 +46,7 @@ import java.io.IOException;
* the state for the connection.
*
*/
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList
+public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
@@ -63,6 +64,15 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoc
private boolean _useSSL;
+ public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
+ {
+ IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance);
+
+ _queueRegistry = registry.getQueueRegistry();
+ _exchangeRegistry = registry.getExchangeRegistry();
+ _logger.debug("AMQPFastProtocolHandler created");
+ }
+
public AMQPFastProtocolHandler(QueueRegistry queueRegistry,
ExchangeRegistry exchangeRegistry)
{
diff --git a/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java
index 4d1b0dbcfe..c70ecf3f91 100644
--- a/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -22,33 +22,58 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
/**
* An abstract application registry that provides access to configuration information and handles the
* construction and caching of configurable objects.
- *
+ * <p/>
* Subclasses should handle the construction of the "registered objects" such as the exchange registry.
- *
*/
public abstract class ApplicationRegistry implements IApplicationRegistry
{
private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static IApplicationRegistry _instance;
+ private static Map _instanceMap = new HashMap();
private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>();
protected final Configuration _configuration;
+ public static final int DEFAULT_INSTANCE = 0;
+ public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry";
+
+ static
+ {
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
+ }
+
private static class ShutdownService implements Runnable
{
public void run()
{
- _logger.info("Shutting down application registry...");
+ _logger.info("Shutting down application registries...");
try
{
- _instance.getMessageStore().close();
+ synchronized (ApplicationRegistry.class)
+ {
+ Iterator keyIterator = _instanceMap.keySet().iterator();
+
+ while (keyIterator.hasNext())
+ {
+ int key = (Integer) keyIterator.next();
+ IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(key);
+
+ if ((instance != null))
+ {
+ if (instance.getMessageStore() != null)
+ {
+ instance.getMessageStore().close();
+ }
+ }
+ }
+ }
}
catch (Exception e)
{
@@ -59,11 +84,49 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public static void initialise(IApplicationRegistry instance) throws Exception
{
- _instance = instance;
- instance.initialise();
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
+ initialise(instance, DEFAULT_INSTANCE);
}
+ public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception
+ {
+ if (instance != null)
+ {
+ _logger.info("Initialising Application Registry:" + instanceID);
+ _instanceMap.put(instanceID, instance);
+
+ try
+ {
+ instance.initialise();
+ }
+ catch (Exception e)
+ {
+ _instanceMap.remove(instanceID);
+ throw e;
+ }
+ }
+ else
+ {
+ remove(instanceID);
+ }
+ }
+
+ public static void remove(int instanceID)
+ {
+ try
+ {
+ ((IApplicationRegistry) _instanceMap.get(instanceID)).getMessageStore().close();
+ }
+ catch (Exception e)
+ {
+
+ }
+ finally
+ {
+ _instanceMap.remove(instanceID);
+ }
+ }
+
+
protected ApplicationRegistry(Configuration configuration)
{
_configuration = configuration;
@@ -71,13 +134,33 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public static IApplicationRegistry getInstance()
{
- if (_instance == null)
+ return getInstance(DEFAULT_INSTANCE);
+ }
+
+ public static IApplicationRegistry getInstance(int instanceID)
+ {
+ IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(instanceID);
+
+ if (instance == null)
{
- throw new RuntimeException("Application registry not initialised");
+ try
+ {
+ _logger.info("Creating DEFAULT_APPLICATION_REGISTRY: " + DEFAULT_APPLICATION_REGISTRY + " : Instance:" + instanceID);
+ IApplicationRegistry registry = (IApplicationRegistry) Class.forName(DEFAULT_APPLICATION_REGISTRY).getConstructor((Class[]) null).newInstance((Object[]) null);
+ ApplicationRegistry.initialise(registry, instanceID);
+ _logger.info("Initialised Application Registry:" + instanceID);
+ return registry;
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error configuring application: " + e, e);
+ //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
+ throw new RuntimeException("Unable to create Application Registry");
+ }
}
else
{
- return _instance;
+ return instance;
}
}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java b/java/broker/src/org/apache/qpid/server/security/auth/NullAuthenticationManager.java
index cdde505451..95a53951ad 100644
--- a/java/broker/test/src/org/apache/qpid/server/util/NullAuthenticationManager.java
+++ b/java/broker/src/org/apache/qpid/server/security/auth/NullAuthenticationManager.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.qpid.server.util;
+package org.apache.qpid.server.security.auth;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
index d15a035259..baa414ff19 100644
--- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -45,6 +45,12 @@ public class MemoryMessageStore implements MessageStore
private final AtomicLong _messageId = new AtomicLong(1);
+ public void configure()
+ {
+ _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
+ _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+ }
+
public void configure(String base, Configuration config)
{
int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
diff --git a/java/broker/src/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/org/apache/qpid/server/util/NullApplicationRegistry.java
new file mode 100644
index 0000000000..0fab1f6895
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.NullAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+
+import java.util.HashMap;
+
+public class NullApplicationRegistry extends ApplicationRegistry
+{
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private ManagedObjectRegistry _managedObjectRegistry;
+
+ private AuthenticationManager _authenticationManager;
+
+ private MessageStore _messageStore;
+
+
+ public NullApplicationRegistry()
+ {
+ super(new MapConfiguration(new HashMap()));
+ }
+
+ public void initialise() throws Exception
+ {
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+ _queueRegistry = new DefaultQueueRegistry();
+ _exchangeFactory = new DefaultExchangeFactory();
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _authenticationManager = new NullAuthenticationManager();
+ _messageStore = new MemoryMessageStore();
+ ((MemoryMessageStore)_messageStore).configure();
+
+ _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+ }
+
+ public Configuration getConfiguration()
+ {
+ return _configuration;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return _managedObjectRegistry;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+}
+
diff --git a/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 86414ffae2..1c594d4374 100644
--- a/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -21,7 +21,7 @@ import org.junit.Test;
import org.junit.Before;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import junit.framework.JUnit4TestAdapter;
public class HeadersExchangeTest extends AbstractHeadersExchangeTest
@@ -29,7 +29,7 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTest
@Before
public void init() throws Exception
{
- ApplicationRegistry.initialise(new NullApplicationRegistry());
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
}
@Test
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
index 904665949c..8ce006b3ae 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
@@ -28,7 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.log4j.Logger;
import java.util.Iterator;
@@ -55,7 +55,7 @@ public class AckTest
public AckTest() throws Exception
{
- ApplicationRegistry.initialise(new NullApplicationRegistry());
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
}
@Before
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
index d00cd55fa1..f84d426877 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -21,8 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.AMQException;
import junit.framework.JUnit4TestAdapter;
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java b/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
index f9baa77b65..6e8e78f74b 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -22,7 +22,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.AMQException;
class MessageTestHelper
@@ -31,7 +31,7 @@ class MessageTestHelper
MessageTestHelper() throws Exception
{
- ApplicationRegistry.initialise(new NullApplicationRegistry());
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
}
AMQMessage message() throws AMQException
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java b/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
index eff65a9350..6d1b89176f 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
@@ -35,7 +35,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.TimedRun;
import java.util.ArrayList;
@@ -77,7 +77,7 @@ public class SendPerfTest extends TimedRun
public static void main(String[] argv) throws Exception
{
- ApplicationRegistry.initialise(new NullApplicationRegistry());
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
int clients = Integer.parseInt(argv[0]);
int messages = Integer.parseInt(argv[1]);
int iterations = Integer.parseInt(argv[2]);
diff --git a/java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/test/src/org/apache/qpid/server/util/TestApplicationRegistry.java
index 3daf143561..69939b88e4 100644
--- a/java/broker/test/src/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/java/broker/test/src/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.NullAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.commons.configuration.Configuration;
@@ -34,7 +35,7 @@ import org.apache.commons.configuration.MapConfiguration;
import java.util.HashMap;
-public class NullApplicationRegistry extends ApplicationRegistry
+public class TestApplicationRegistry extends ApplicationRegistry
{
private QueueRegistry _queueRegistry;
@@ -48,7 +49,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
private MessageStore _messageStore;
- public NullApplicationRegistry()
+ public TestApplicationRegistry()
{
super(new MapConfiguration(new HashMap()));
}