summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrea Gazzarini <agazzarini@apache.org>2009-02-14 10:49:38 +0000
committerAndrea Gazzarini <agazzarini@apache.org>2009-02-14 10:49:38 +0000
commit38c742e89e98588c8434e875c5d6c5d99fd26dc2 (patch)
tree7c8c955fd9087b17a3483c123d83309c7fe0d7be
parent7f949e5b55e35a772732dd5de81900f3510a37ee (diff)
downloadqpid-python-38c742e89e98588c8434e875c5d6c5d99fd26dc2.tar.gz
QPID-1664 : QMan Asynchronous notification capability
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@744478 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/management/client/etc/qman-config.xml31
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java13
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java6
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java5
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java77
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java11
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java5
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java108
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java251
-rw-r--r--qpid/java/management/client/src/test/java/log4j.xml7
-rw-r--r--qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java81
11 files changed, 484 insertions, 111 deletions
diff --git a/qpid/java/management/client/etc/qman-config.xml b/qpid/java/management/client/etc/qman-config.xml
index c1a5da9184..cdc840e967 100644
--- a/qpid/java/management/client/etc/qman-config.xml
+++ b/qpid/java/management/client/etc/qman-config.xml
@@ -18,13 +18,14 @@
- under the License.
-
-->
-<!--
-Default configuration for QMan is empty;
-that is, there's no broker configured at startup.
-If you want to connect with a running broker when QMan starts up,
-you can do that uncommenting and editing the template reported below.
--->
<configuration>
+
+<!--
+Default configuration for QMan has no broker settings;
+that is, there's no broker configured at startup.
+If you want to connect with a running broker when QMan starts up,
+you can do that uncommenting and editing the template reported below.
+-->
<!-- <brokers>
<broker>
<host>localhost</host>
@@ -47,5 +48,21 @@ you can do that uncommenting and editing the template reported below.
<max-wait-timeout>-1</max-wait-timeout>
</broker>
</brokers>
- -->
+ -->
+ <!-- Internal worked manager configuration-->
+ <work-manager>
+ <!-- The size of the worker thread pool -->
+ <pool-capacity>5</pool-capacity>
+
+ <!-- Maximum size of the worker thread pool -->
+ <max-pool-capacity>15</max-pool-capacity>
+
+ <!--
+ when the current number of threads is greater than
+ the pool-capacity, this is the maximum time that excess threads
+ can be in an idle state (without any task assigned) before terminating.
+ The value is expressed is milliseconds.
+ -->
+ <keep-alive-time>5000</keep-alive-time>
+ </work-manager>
</configuration>
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
index e88434f4d9..3d208835f0 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
@@ -64,9 +64,12 @@ public interface Messages
String QMAN_000029_DEFAULT_URI = "<QMAN-000029> : Default URI will be set to %s";
String QMAN_000030_RESOURCE_HAS_BEEN_CREATED = "<QMAN-000030> : New resource instance has been created and registered. Resource id is %s";
String QMAN_000031_RESOURCE_HAS_BEEN_REMOVED = "<QMAN-000031> : WS-Resource %s has been removed";
- String QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000032> : Events lifecycle topic has been created. Its name is %s";
- String QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000033> : Objects lifecycle topic has been created. Its name is %s";
-
+ String QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000032> : Events lifecycle topic has been created with name %s";
+ String QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000033> : Objects lifecycle topic has been created with name %s";
+ String QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED = "<QMAN-000034> : Unclassified object types lifecycle topic has been created with name %s";
+ String QMAN_000035_WORK_MANAGER_POOL_SIZE = "<QMAN-000035> : Work Manager thread pool size : %s";
+ String QMAN_000036_WORK_MANAGER_MAX_POOL_SIZE = "<QMAN-000036> : Work Manager thread pool max size : %s";
+ String QMAN_000037_WORK_MANAGER_KEEP_ALIVE_TIME = "<QMAN-000035> : Work Manager keep alive time : %s";
// DEBUG
String QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED = "<QMAN-200001> : New incoming message has been received. Message content is %s";
@@ -165,4 +168,8 @@ public interface Messages
String QMAN_100035_GET_ATTRIBUTE_FAILURE = "<QMAN-100035> : Get Attribute invocation failure for attribute %s, resource %s.";
String QMAN_100036_SET_ATTRIBUTE_FAILURE = "<QMAN-100036> : Set Attribute invocation failure for attribute %s, resource %s.";
String QMAN_100037_INVOKE_OPERATION_FAILURE = "<QMAN-100037> : Operation Invocation failure for operation.";
+ String QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION = "<QMAN-100038> : Unable to send notification.";
+ String QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER = "<QMAN-100039> : Unable to properly configure WorkManager. A malformed property (NaN) was given as input parameter.";
+
+
} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
index 8bff553579..351195db0b 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
@@ -170,6 +170,12 @@ public abstract class Names
"EventsLifeCycleTopic",
Names.PREFIX);
+ public final static QName UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME= new QName(
+ Names.NAMESPACE_URI,
+ "UnclassifiedLifeCycleTopic",
+ Names.PREFIX);
+
+
public final static String NAME_ATTRIBUTE = "name";
public final static String MODIFIABILITY = "modifiability";
public final static String READ_WRITE = "read-write";
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
index 368970af00..39981dc7cb 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java
@@ -26,9 +26,8 @@ import org.apache.qpid.management.Messages;
import org.apache.qpid.transport.util.Logger;
/**
- * Parser used for building access mode mappings.
- * For each access-mode-mappings/mapping element found in the configuration file, a new access mode mapping
- * is built and injected into the bridge configuration.
+ * Parser used for building broker connection data settings.
+ * The corresponding section on the configuration file is :
*
<broker>
<host>192.168.148.131</host>
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
index 0d9792aeb1..c9d45c5023 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
@@ -25,13 +25,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
import org.apache.qpid.management.Messages;
import org.apache.qpid.management.Names;
import org.apache.qpid.management.domain.handler.base.IMessageHandler;
-import org.apache.qpid.management.domain.handler.impl.InvocationResult;
import org.apache.qpid.management.domain.model.AccessMode;
import org.apache.qpid.management.domain.model.type.Type;
import org.apache.qpid.transport.DeliveryProperties;
@@ -49,6 +46,11 @@ public final class Configuration
private final static Logger LOGGER = Logger.get(Configuration.class);
private static Configuration INSTANCE = new Configuration();
+ // Work Manager default settings
+ private int _poolSize = 5;
+ private int _maxPoolSize = 15;
+ private long _keepAliveTime = 5000;
+
Map<Integer, Type> _typeMappings = new HashMap<Integer,Type>();
Map<Integer,AccessMode> _accessModes = new HashMap<Integer, AccessMode>();
Map<Type,String> _validators = new HashMap<Type, String>();
@@ -64,8 +66,7 @@ public final class Configuration
private Header _headerForCommandMessages;
private DeliveryProperties _deliveryProperties = new DeliveryProperties();
private MessageProperties _messageProperties = new MessageProperties();
- public BlockingQueue<InvocationResult> _resultExchangeChannel = new SynchronousQueue<InvocationResult>();
-
+
// Private constructor.
private Configuration()
{
@@ -355,4 +356,68 @@ public final class Configuration
LOGGER.debug(Messages.QMAN_200004_MANAGEMENT_QUEUE_NAME,_managementQueueName);
LOGGER.debug(Messages.QMAN_200005_METHOD_REPLY_QUEUE_NAME,_methodReplyQueueName);
}
-}
+
+ /**
+ * Returns the worker manager thread pool size.
+ *
+ * @return the worker manager thread pool size.
+ */
+ public int getWorkerManagerPoolSize()
+ {
+ return _poolSize;
+ }
+
+ /**
+ * Sets the size of the worker manager thread pool.
+ *
+ * @param poolSize the size of the worker manager thread pool.
+ */
+ void setWorkerManagerPoolSize(int poolSize)
+ {
+ this._poolSize = poolSize;
+ }
+
+ /**
+ * Returns the maximum size of the worker manager
+ * thread pool size.
+ *
+ * @return the max size of the worker manager thread pool.
+ */
+ public int getWorkerManagerMaxPoolSize()
+ {
+ return _maxPoolSize;
+ }
+
+ /**
+ * Sets the maximum size of the worker manager
+ * thread pool size.
+ *
+ * @param maxPoolSize the max size of the worker manager thread pool.
+ */
+ void setWorkerManagerMaxPoolSize(int maxPoolSize)
+ {
+ this._maxPoolSize = maxPoolSize;
+ }
+
+ /**
+ * Returns the max amount of time that an excess thread
+ * can be idle before purging from the pool.
+ *
+ * @return the max keep alive time.
+ */
+ public long getWorkerManagerKeepAliveTime()
+ {
+ return _keepAliveTime;
+ }
+
+ /**
+ * Sets the max amount of time that an excess thread
+ * can be idle before purging from the pool.
+ *
+ * @param keepAliveTime the max keep alive time.
+ */
+ void setWorkerManagerKeepAliveTime(long keepAliveTime)
+ {
+ this._keepAliveTime = keepAliveTime;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
index a65d3d9625..1cde9d5f88 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java
@@ -77,6 +77,7 @@ public class Configurator extends DefaultHandler
};
IParser _brokerConfigurationParser = new BrokerConnectionDataParser();
+ IParser _workerManagerConfigurationParser = new WorkerManagerConfigurationParser();
IParser _currentParser = DEFAULT_PARSER;
/**
@@ -97,12 +98,18 @@ public class Configurator extends DefaultHandler
@Override
public void startElement (String uri, String localName, String name, Attributes attributes) throws SAXException
{
- switch(Tag.get(name)) {
- case BROKERS:
+ switch(Tag.get(name))
+ {
+ case BROKERS :
{
_currentParser = _brokerConfigurationParser;
break;
}
+ case WORK_MANAGER :
+ {
+ _currentParser = _workerManagerConfigurationParser;
+ break;
+ }
}
}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
index 7ff23c9d13..c2b6e1e27d 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java
@@ -37,7 +37,10 @@ public enum Tag {
VIRTUAL_HOST { @Override public String toString() { return "virtual-host"; }},
USER { @Override public String toString() { return "user"; }},
PASSWORD { @Override public String toString() { return "password"; }},
- BROKERS { @Override public String toString() { return "brokers"; }};
+ BROKERS { @Override public String toString() { return "brokers"; }},
+ WORK_MANAGER { @Override public String toString() { return "work-manager"; }},
+ POOL_CAPACITY { @Override public String toString() { return "pool-capacity"; }},
+ KEEP_ALIVE_TIME { @Override public String toString() { return "keep-alive-time"; }};
/**
* Returns the enum entry associated to the given tag name.
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
new file mode 100644
index 0000000000..b99f272053
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/WorkerManagerConfigurationParser.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.configuration;
+
+import java.util.UUID;
+
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Parser used for building worker manager settings.
+ * The corresponding section of the configuration file is :
+ *
+ <work-manager>
+ <pool-capacity>5</pool-capacity>
+ <max-pool-capacity>15</max-pool-capacity>
+ <keep-alive-time>5000</keep-alive-time>
+ </work-manager>
+
+ *
+ * @author Andrea Gazzarini
+ */
+class WorkerManagerConfigurationParser implements IParser
+{
+ private final static Logger LOGGER = Logger.get(Configuration.class);
+ private String _currentValue;
+
+ private String _poolSizeAsString;
+ private String _maxPoolSizeAsString;
+ private String _keepAliveTimeAsString;
+
+ /**
+ * Callback : the given value is the text content of the current node.
+ */
+ public void setCurrrentAttributeValue (String value)
+ {
+ this._currentValue = value;
+ }
+
+ /**
+ * Callback: each time the end of an element is reached
+ * this method is called.
+ */
+ public void setCurrentAttributeName (String name)
+ {
+ switch (Tag.get(name))
+ {
+ case POOL_CAPACITY:
+ {
+ _poolSizeAsString = _currentValue.trim();
+ break;
+ }
+ case MAX_POOL_CAPACITY :
+ {
+ _maxPoolSizeAsString = _currentValue;
+ }
+ case KEEP_ALIVE_TIME:
+ {
+ _keepAliveTimeAsString = _currentValue;
+ break;
+ }
+ case WORK_MANAGER:
+ {
+ Configuration configuration = Configuration.getInstance();
+ try
+ {
+ configuration.setWorkerManagerPoolSize(Integer.parseInt(_poolSizeAsString));
+ configuration.setWorkerManagerMaxPoolSize(Integer.parseInt(_maxPoolSizeAsString));
+ configuration.setWorkerManagerKeepAliveTime(Long.parseLong(_keepAliveTimeAsString));
+ } catch(Exception exception)
+ {
+ LOGGER.error(Messages.QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER);
+ } finally {
+ LOGGER.info(Messages.QMAN_000035_WORK_MANAGER_POOL_SIZE,configuration.getWorkerManagerPoolSize());
+ LOGGER.info(Messages.QMAN_000036_WORK_MANAGER_MAX_POOL_SIZE,configuration.getWorkerManagerMaxPoolSize());
+ LOGGER.info(Messages.QMAN_000037_WORK_MANAGER_KEEP_ALIVE_TIME,configuration.getWorkerManagerKeepAliveTime());
+ }
+ break;
+ }
+ }
+ }
+
+ /**
+ * Gets an uuid in order to associate current connection data with a broker.
+ * @return
+ */
+ UUID getUUId(){
+ return UUID.randomUUID();
+ }
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java
index 2773d9e01a..980eb9c1b5 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java
@@ -25,7 +25,11 @@ import java.lang.reflect.Method;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
@@ -46,6 +50,7 @@ import org.apache.muse.ws.notification.NotificationProducer;
import org.apache.muse.ws.notification.WsnConstants;
import org.apache.qpid.management.Messages;
import org.apache.qpid.management.Names;
+import org.apache.qpid.management.configuration.Configuration;
import org.apache.qpid.management.jmx.EntityLifecycleNotification;
import org.apache.qpid.management.wsdm.common.ThreadSessionManager;
import org.apache.qpid.management.wsdm.muse.engine.WSDMAdapterEnvironment;
@@ -67,6 +72,40 @@ public class QManAdapterCapability extends AbstractCapability
private MBeanServer _mxServer;
private WsArtifactsFactory _artifactsFactory;
private URI _resourceURI;
+ private NotificationProducer _publisherCapability;
+ private ThreadPoolExecutor _workManager;
+ private Map<String, QName> _lifeCycleTopics = new HashMap<String, QName>();
+
+ /**
+ * Runnable wrapper used for sending asynchronous
+ * notifications.
+ *
+ * @author Andrea Gazzarini
+ */
+ private final class AsynchNotificationTask implements Runnable
+ {
+ private final QName topicName;
+ private final LifeCycleEvent event;
+
+ AsynchNotificationTask(QName tName, LifeCycleEvent evt)
+ {
+ topicName = tName;
+ event = evt;
+ }
+
+ public void run()
+ {
+ try
+ {
+ _publisherCapability.publish(topicName,event);
+ } catch (SoapFault exception)
+ {
+ LOGGER.error(
+ exception,
+ Messages.QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION);
+ }
+ }
+ };
/**
* NotificationFilter for "create" only events.
@@ -99,7 +138,6 @@ public class QManAdapterCapability extends AbstractCapability
{
return EntityLifecycleNotification.INSTANCE_REMOVED_NOTIFICATION_TYPE.equals(notification.getType());
}
-
};
/**
@@ -148,6 +186,16 @@ public class QManAdapterCapability extends AbstractCapability
LOGGER.info(
Messages.QMAN_000030_RESOURCE_HAS_BEEN_CREATED,
eventSourceName);
+
+ AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask(
+ getTopicName(lifecycleNotification.getClassKind()),
+ LifeCycleEvent.newCreateEvent(
+ eventSourceName.getKeyProperty(Names.OBJECT_ID),
+ lifecycleNotification.getPackageName(),
+ lifecycleNotification.getClassName()));
+
+ _workManager.execute(asynchNotificationTask);
+
} catch (ArtifactsNotAvailableException exception)
{
LOGGER.error(
@@ -213,6 +261,16 @@ public class QManAdapterCapability extends AbstractCapability
LOGGER.info(
Messages.QMAN_000031_RESOURCE_HAS_BEEN_REMOVED,
eventSourceName);
+
+ AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask(
+ getTopicName(lifecycleNotification.getClassKind()),
+ LifeCycleEvent.newRemoveEvent(
+ eventSourceName.getKeyProperty(Names.OBJECT_ID),
+ lifecycleNotification.getPackageName(),
+ lifecycleNotification.getClassName()));
+
+ _workManager.execute(asynchNotificationTask);
+
}
catch(Exception exception)
{
@@ -238,91 +296,14 @@ public class QManAdapterCapability extends AbstractCapability
createLifeCycleTopics();
+ initializeWorkManager();
+
createQManResourceURI();
_mxServer = ManagementFactory.getPlatformMBeanServer();
_artifactsFactory = new WsArtifactsFactory(getEnvironment(),_mxServer);
registerQManLifecycleListeners();
-
- new Thread()
- {
- @Override
- public void run()
- {
- while (true)
- {
- try
- {
- final NotificationProducer publisher = (NotificationProducer) getResource().getCapability(WsnConstants.PRODUCER_URI);
-
- publisher.publish(
- Names.OBJECTS_LIFECYLE_TOPIC_NAME,
- LifeCycleEvent.newCreateEvent(
- UUID.randomUUID().toString(),
- "org.apache.qpid.broker",
- "connection"));
- } catch (SoapFault e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- try
- {
- Thread.sleep(10000);
- } catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- }.start();
- }
-
- /**
- * This adapter capability needs to be an event listener of QMan JMX core
- * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s).
- *
- * @throws SoapFault when it's not possible to register event listener : is QMan running?
- */
- @SuppressWarnings("serial")
- private void registerQManLifecycleListeners() throws SoapFault
- {
- try
- {
- _mxServer.addNotificationListener(
- Names.QMAN_OBJECT_NAME,
- _listenerForNewInstances,
- _filterForNewInstances,
- null);
-
- _mxServer.addNotificationListener(
- Names.QMAN_OBJECT_NAME,
- _listenerForRemovedInstances,
- _filterForRemovedInstances,
- null);
-
- try
- {
- _mxServer.addNotificationListener(
- Names.QPID_EMULATOR_OBJECT_NAME,
- _listenerForNewInstances,
- _filterForNewInstances, null);
-
- _mxServer.addNotificationListener(
- Names.QPID_EMULATOR_OBJECT_NAME,
- _listenerForRemovedInstances,
- _filterForRemovedInstances, null);
-
- } catch (Exception exception)
- {
- LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND);
- }
- } catch(InstanceNotFoundException exception)
- {
- throw new SoapFault(exception);
- }
}
/**
@@ -369,7 +350,7 @@ public class QManAdapterCapability extends AbstractCapability
throw new SoapFault(exception);
}
}
-
+
/**
* Creates the message handlers for the given capability.
*
@@ -406,33 +387,68 @@ public class QManAdapterCapability extends AbstractCapability
}
return handlers;
}
+
+ /**
+ * Returns the publisher capability associated with the owner resource.
+ *
+ * @return the publisher capability associated with the owner resource.
+ */
+ NotificationProducer getPublisherCapability()
+ {
+ return (NotificationProducer) getResource().getCapability(WsnConstants.PRODUCER_URI);
+ }
/**
* Creates events & objects lifecycle topic that will be used to publish lifecycle event
* messages..
*/
- private void createLifeCycleTopics()
+ void createLifeCycleTopics()
{
try
{
- final NotificationProducer publisherCapability = (NotificationProducer) getResource()
- .getCapability(WsnConstants.PRODUCER_URI);
+ _publisherCapability = getPublisherCapability();
- publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME);
+ _publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME);
+ _lifeCycleTopics.put(Names.EVENT,Names.EVENTS_LIFECYLE_TOPIC_NAME);
+
LOGGER.info(
Messages.QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED,
Names.OBJECTS_LIFECYLE_TOPIC_NAME);
- publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME);
+ _publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME);
+ _lifeCycleTopics.put(Names.CLASS,Names.OBJECTS_LIFECYLE_TOPIC_NAME);
+
LOGGER.info(
Messages.QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED,
Names.OBJECTS_LIFECYLE_TOPIC_NAME);
+
+ _publisherCapability.addTopic(Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME);
+ LOGGER.info(
+ Messages.QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED,
+ Names.OBJECTS_LIFECYLE_TOPIC_NAME);
} catch(Exception exception)
{
LOGGER.error(exception, Messages.QMAN_100036_TOPIC_DECLARATION_FAILURE);
}
}
-
+
+ /**
+ * Starting from an object type (i.e. event or class) returns the name of the
+ * corresponding topic where the lifecycle message must be published.
+ * Note that if the given object type is unknown then the "Unclassified Object Types" topic
+ * will be returned (and therefore the message will be published there).
+ *
+ * @param objectType the type of the object.
+ * @return the name of the topic associated with the given object type.
+ */
+ QName getTopicName(String objectType)
+ {
+ QName topicName = _lifeCycleTopics.get(objectType);
+ return (topicName != null)
+ ? topicName
+ : Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME;
+ }
+
/**
* Workaround : it seems that is not possibile to declare a serializer
* for a byte array using muse descriptor...
@@ -471,4 +487,63 @@ public class QManAdapterCapability extends AbstractCapability
throw new SoapFault(exception);
}
}
+
+ /**
+ * Initializes the work manager used for asynchronous notifications.
+ */
+ private void initializeWorkManager()
+ {
+ Configuration configuration = Configuration.getInstance();
+ _workManager = new ThreadPoolExecutor(
+ configuration.getWorkerManagerPoolSize(),
+ configuration.getWorkerManagerMaxPoolSize(),
+ configuration.getWorkerManagerKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(30));
+ }
+
+ /**
+ * This adapter capability needs to be an event listener of QMan JMX core
+ * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s).
+ *
+ * @throws SoapFault when it's not possible to register event listener : is QMan running?
+ */
+ @SuppressWarnings("serial")
+ private void registerQManLifecycleListeners() throws SoapFault
+ {
+ try
+ {
+ _mxServer.addNotificationListener(
+ Names.QMAN_OBJECT_NAME,
+ _listenerForNewInstances,
+ _filterForNewInstances,
+ null);
+
+ _mxServer.addNotificationListener(
+ Names.QMAN_OBJECT_NAME,
+ _listenerForRemovedInstances,
+ _filterForRemovedInstances,
+ null);
+
+ try
+ {
+ _mxServer.addNotificationListener(
+ Names.QPID_EMULATOR_OBJECT_NAME,
+ _listenerForNewInstances,
+ _filterForNewInstances, null);
+
+ _mxServer.addNotificationListener(
+ Names.QPID_EMULATOR_OBJECT_NAME,
+ _listenerForRemovedInstances,
+ _filterForRemovedInstances, null);
+
+ } catch (Exception exception)
+ {
+ LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND);
+ }
+ } catch(InstanceNotFoundException exception)
+ {
+ throw new SoapFault(exception);
+ }
+ }
} \ No newline at end of file
diff --git a/qpid/java/management/client/src/test/java/log4j.xml b/qpid/java/management/client/src/test/java/log4j.xml
index 6624f23aa1..34634b7738 100644
--- a/qpid/java/management/client/src/test/java/log4j.xml
+++ b/qpid/java/management/client/src/test/java/log4j.xml
@@ -12,7 +12,12 @@
</layout>
</appender>
<category name="org.apache.qpid.management">
- <priority value="ERROR" />
+ <priority value="INFO" />
+ <appender-ref ref="CONSOLE" />
+ </category>
+
+ <category name="org.mortbay">
+ <priority value="INFO" />
<appender-ref ref="CONSOLE" />
</category>
diff --git a/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java b/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java
new file mode 100644
index 0000000000..648c7b2f60
--- /dev/null
+++ b/qpid/java/management/client/src/test/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapabilityTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.wsdm.capabilities;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.muse.ws.notification.NotificationProducer;
+import org.apache.qpid.management.Names;
+
+import junit.framework.TestCase;
+
+/**
+ * Test case for QMan adapter capability.
+ *
+ * @author Andrea Gazzarini
+ */
+public class QManAdapterCapabilityTest extends TestCase
+{
+ /**
+ * Tests the execution of the getTopicName() method.
+ *
+ * <br>precondition : an object type is given to the method (null is allowed).
+ * <br>postcondition : according to getTopicName() specs, the name of the
+ * topic associated with the given object type must be returned.
+ */
+ public void testGetTopicName()
+ {
+ final InvocationHandler invocationHandler = new InvocationHandler(){
+
+ public Object invoke(Object proxy, Method method, Object[] args)
+ {
+ return null;
+ }
+ };
+
+ QManAdapterCapability capability = new QManAdapterCapability(){
+ @Override
+ NotificationProducer getPublisherCapability()
+ {
+ return (NotificationProducer) Proxy.newProxyInstance(
+ getClass().getClassLoader(),
+ new Class[]{NotificationProducer.class},
+ invocationHandler);
+ }
+ };
+
+ capability.createLifeCycleTopics();
+
+ assertEquals(
+ Names.EVENTS_LIFECYLE_TOPIC_NAME,
+ capability.getTopicName(Names.EVENT));
+
+ assertEquals(
+ Names.OBJECTS_LIFECYLE_TOPIC_NAME,
+ capability.getTopicName(Names.CLASS));
+
+ assertEquals(
+ Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME,
+ capability.getTopicName("This is an unknown object Type @#!--!!@#"));
+ }
+} \ No newline at end of file