diff options
Diffstat (limited to 'qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java')
-rw-r--r-- | qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java | 557 |
1 files changed, 557 insertions, 0 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java new file mode 100644 index 0000000000..414f37a746 --- /dev/null +++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/wsdm/capabilities/QManAdapterCapability.java @@ -0,0 +1,557 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.wsdm.capabilities; + +import java.lang.management.ManagementFactory; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServer; +import javax.management.Notification; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; +import javax.management.ObjectName; +import javax.xml.namespace.QName; + +import org.apache.muse.core.AbstractCapability; +import org.apache.muse.core.Resource; +import org.apache.muse.core.ResourceManager; +import org.apache.muse.core.routing.MessageHandler; +import org.apache.muse.core.serializer.SerializerRegistry; +import org.apache.muse.ws.addressing.EndpointReference; +import org.apache.muse.ws.addressing.soap.SoapFault; +import org.apache.muse.ws.notification.NotificationProducer; +import org.apache.muse.ws.notification.WsnConstants; +import org.apache.qpid.management.Messages; +import org.apache.qpid.management.Names; +import org.apache.qpid.management.configuration.Configuration; +import org.apache.qpid.management.jmx.EntityLifecycleNotification; +import org.apache.qpid.management.wsdm.common.ThreadSessionManager; +import org.apache.qpid.management.wsdm.common.UnableToConnectWithBrokerFault; +import org.apache.qpid.management.wsdm.muse.engine.WSDMAdapterEnvironment; +import org.apache.qpid.management.wsdm.muse.serializer.ByteArraySerializer; +import org.apache.qpid.management.wsdm.notifications.LifeCycleEvent; +import org.apache.qpid.transport.util.Logger; + +/** + * QMan Adapter capability. + * Basically it acts as a lifecycle manager of all ws resource that correspond to entities on JMX side. + * + * @author Andrea Gazzarini +*/ +@SuppressWarnings("serial") +public class QManAdapterCapability extends AbstractCapability +{ + private final static Logger LOGGER = Logger.get(QManAdapterCapability.class); + + private MBeanServer _mxServer; + private WsArtifactsFactory _artifactsFactory; + private URI _resourceURI; + private NotificationProducer _publisherCapability; + private ThreadPoolExecutor _workManager; + private Map<String, QName> _lifeCycleTopics = new HashMap<String, QName>(); + + /** + * Runnable wrapper used for sending asynchronous + * notifications. + * + * @author Andrea Gazzarini + */ + private final class AsynchNotificationTask implements Runnable + { + private final QName topicName; + private final LifeCycleEvent event; + + AsynchNotificationTask(QName tName, LifeCycleEvent evt) + { + topicName = tName; + event = evt; + } + + public void run() + { + try + { + _publisherCapability.publish(topicName,event); + } catch (SoapFault exception) + { + LOGGER.error( + exception, + Messages.QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION); + } + } + }; + + /** + * NotificationFilter for "create" only events. + */ + private final NotificationFilter _filterForNewInstances = new NotificationFilter(){ + + /** + * Returns true when the notification is related to a creation of a new instance. + * + * @return true when the notification is related to a creation of a new instance. + */ + public boolean isNotificationEnabled(Notification notification) + { + return EntityLifecycleNotification.INSTANCE_ADDED_NOTIFICATION_TYPE.equals(notification.getType()); + } + + }; + + /** + * NotificationFilter for "remove" only events. + */ + private final NotificationFilter _filterForRemovedInstances = new NotificationFilter(){ + + /** + * Returns true when the notification is related to a deletion of an existing instance. + * + * @return true when the notification is related to a deletion of an existing instance. + */ + public boolean isNotificationEnabled(Notification notification) + { + return EntityLifecycleNotification.INSTANCE_REMOVED_NOTIFICATION_TYPE.equals(notification.getType()); + } + }; + + /** + * This listener handles "create" mbean events and therefore provides procedure to create and initialize + * corresponding ws resources. + */ + private final NotificationListener _listenerForNewInstances = new NotificationListener() + { + /** + * Handles JMX "create" notification type. + * + * @param notification the entity lifecycle notification. + * @param data user data associated with the incoming notifiication : it is not used at the moment. + */ + public void handleNotification(Notification notification, Object data) + { + ObjectName eventSourceName = null; + try + { + EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotification) notification; + eventSourceName = lifecycleNotification.getObjectName(); + + ThreadSessionManager.getInstance().getSession().setObjectName(eventSourceName); + + LOGGER.debug(Messages.QMAN_200039_DEBUG_JMX_NOTIFICATION, notification); + + ResourceManager resourceManager = getResource().getResourceManager(); + Resource resource = resourceManager.createResource(Names.QMAN_RESOURCE_NAME); + + WsArtifacts artifacts = _artifactsFactory.getArtifactsFor(resource,eventSourceName); + MBeanCapability capability = _artifactsFactory.createCapability( + artifacts.getCapabilityClass(), + eventSourceName); + + ThreadSessionManager.getInstance().getSession().setWsdlDocument(artifacts.getWsdl()); + ThreadSessionManager.getInstance().getSession().setResourceMetadataDescriptor(artifacts.getResourceMetadataDescriptor()); + + resource.setWsdlPortType(Names.QMAN_RESOURCE_PORT_TYPE_NAME); + capability.setCapabilityURI(Names.NAMESPACE_URI+"/"+capability.getClass().getSimpleName()); + capability.setMessageHandlers(createMessageHandlers(capability)); + + resource.addCapability(capability); + resource.initialize(); + resourceManager.addResource(resource.getEndpointReference(), resource); + + LOGGER.info( + Messages.QMAN_000030_RESOURCE_HAS_BEEN_CREATED, + eventSourceName); + + AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask( + getTopicName(lifecycleNotification.getClassKind()), + LifeCycleEvent.newCreateEvent( + eventSourceName.getKeyProperty(Names.OBJECT_ID), + lifecycleNotification.getPackageName(), + lifecycleNotification.getClassName())); + + _workManager.execute(asynchNotificationTask); + + } catch (ArtifactsNotAvailableException exception) + { + LOGGER.error( + exception, + Messages.QMAN_100023_BUILD_WS_ARTIFACTS_FAILURE); + } catch (IllegalAccessException exception) + { + LOGGER.error( + exception, + Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE, + eventSourceName); + } catch (InstantiationException exception) + { + LOGGER.error( + exception, + Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE, + eventSourceName); + } catch (SoapFault exception) + { + LOGGER.error( + exception,Messages.QMAN_100025_WSRF_FAILURE, + eventSourceName); + } catch (Exception exception) + { + LOGGER.error( + exception, + Messages.QMAN_100025_WSRF_FAILURE, + eventSourceName); + } + } + }; + + /** + * This listener handles "remove" mbean events and therefore provides procedure to shutdown and remove + * corresponding ws resources. + */ + private final NotificationListener _listenerForRemovedInstances = new NotificationListener() + { + /** + * Handles JMX "remove" notification type. + * + * @param notification the entity lifecycle notification. + * @param data user data associated with the incoming notifiication : it is not used at the moment. + */ + public void handleNotification(Notification notification, Object data) + { + EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotification) notification; + ObjectName eventSourceName = lifecycleNotification.getObjectName(); + + LOGGER.debug(Messages.QMAN_200042_REMOVING_RESOURCE, eventSourceName); + + EndpointReference endpointPointReference = new EndpointReference(_resourceURI); + endpointPointReference.addParameter( + Names.RESOURCE_ID_QNAME, + eventSourceName.getKeyProperty(Names.OBJECT_ID)); + + ResourceManager resourceManager = getResource().getResourceManager(); + try + { + Resource resource = resourceManager.getResource(endpointPointReference); + resource.shutdown(); + + LOGGER.info( + Messages.QMAN_000031_RESOURCE_HAS_BEEN_REMOVED, + eventSourceName); + + AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask( + getTopicName(lifecycleNotification.getClassKind()), + LifeCycleEvent.newRemoveEvent( + eventSourceName.getKeyProperty(Names.OBJECT_ID), + lifecycleNotification.getPackageName(), + lifecycleNotification.getClassName())); + + _workManager.execute(asynchNotificationTask); + + } + catch(Exception exception) + { + LOGGER.error( + exception, + Messages.QMAN_100027_RESOURCE_SHUTDOWN_FAILURE, + eventSourceName); + } + } + }; + + /** + * Initializes this capability. + * + * @throws SoapFault when the initialization fails.. + */ + @Override + public void initialize() throws SoapFault + { + super.initialize(); + + registerByteArraySerializer(); + + createLifeCycleTopics(); + + initializeWorkManager(); + + createQManResourceURI(); + + _mxServer = ManagementFactory.getPlatformMBeanServer(); + _artifactsFactory = new WsArtifactsFactory(getEnvironment(),_mxServer); + + registerQManLifecycleListeners(); + } + + /** + * Connects QMan with a broker with the given connection data. + * + * @param host the host where the broker is running. + * @param port the port number where the broker is running. + * @param username username for estabilshing connection. + * @param password password for estabilshing connection. + * @param virtualHost the virtualHost name. + * @param initialPoolCapacity the initial size of broker connection pool. + * @param maxPoolCapacity the max allowed size of broker connection pool. + * @param maxWaitTimeout the max wait timeout for retrieving connections. + * @throws SoapFault when the connection with broker cannot be estabilished. + */ + @SuppressWarnings("unchecked") + public void connect( + String host, + int port, + String username, + String password, + String virtualHost, + int initialPoolCapacity, + int maxPoolCapacity, + long maxWaitTimeout) throws SoapFault + { + try + { + _mxServer.invoke( + Names.QMAN_OBJECT_NAME, + "addBroker", + new Object[]{host,port,username,password,virtualHost,initialPoolCapacity,maxPoolCapacity,maxWaitTimeout}, + new String[]{ + String.class.getName(), + int.class.getName(), + String.class.getName(), + String.class.getName(), + String.class.getName(), + int.class.getName(), + int.class.getName(), + long.class.getName()}); + } catch(Exception exception) + { + LOGGER.error(Messages.QMAN_100017_UNABLE_TO_CONNECT,host,port); + throw new UnableToConnectWithBrokerFault( + getResource().getEndpointReference(), + host, + port, + username, + virtualHost, + exception.getMessage()); + } + } + + /** + * Creates the message handlers for the given capability. + * + * @param capability the QMan capability. + * @return a collection with message handlers for the given capability. + */ + protected Collection<MessageHandler> createMessageHandlers(MBeanCapability capability) + { + Collection<MessageHandler> handlers = new ArrayList<MessageHandler>(); + + for (Method method : capability.getClass().getDeclaredMethods()) + { + String name = method.getName(); + + QName requestName = new QName( + Names.NAMESPACE_URI, + name, + Names.PREFIX); + + QName returnValueName = new QName( + Names.NAMESPACE_URI, + name+"Response", + Names.PREFIX); + + String actionURI = Names.NAMESPACE_URI+"/"+name; + + MessageHandler handler = new QManMessageHandler( + actionURI, + requestName, + returnValueName); + + handler.setMethod(method); + handlers.add(handler); + } + return handlers; + } + + /** + * Returns the publisher capability associated with the owner resource. + * + * @return the publisher capability associated with the owner resource. + */ + NotificationProducer getPublisherCapability() + { + return (NotificationProducer) getResource().getCapability(WsnConstants.PRODUCER_URI); + } + + /** + * Creates events & objects lifecycle topic that will be used to publish lifecycle event + * messages.. + */ + void createLifeCycleTopics() + { + try + { + _publisherCapability = getPublisherCapability(); + + _publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME); + _lifeCycleTopics.put(Names.EVENT,Names.EVENTS_LIFECYLE_TOPIC_NAME); + + LOGGER.info( + Messages.QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, + Names.OBJECTS_LIFECYLE_TOPIC_NAME); + + _publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME); + _lifeCycleTopics.put(Names.CLASS,Names.OBJECTS_LIFECYLE_TOPIC_NAME); + + LOGGER.info( + Messages.QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, + Names.OBJECTS_LIFECYLE_TOPIC_NAME); + + _publisherCapability.addTopic(Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME); + LOGGER.info( + Messages.QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, + Names.OBJECTS_LIFECYLE_TOPIC_NAME); + } catch(Exception exception) + { + LOGGER.error(exception, Messages.QMAN_100036_TOPIC_DECLARATION_FAILURE); + } + } + + /** + * Starting from an object type (i.e. event or class) returns the name of the + * corresponding topic where the lifecycle message must be published. + * Note that if the given object type is unknown then the "Unclassified Object Types" topic + * will be returned (and therefore the message will be published there). + * + * @param objectType the type of the object. + * @return the name of the topic associated with the given object type. + */ + QName getTopicName(String objectType) + { + QName topicName = _lifeCycleTopics.get(objectType); + return (topicName != null) + ? topicName + : Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME; + } + + /** + * Workaround : it seems that is not possibile to declare a serializer + * for a byte array using muse descriptor... + * What is the stringified name of the class? + * byte[].getClass().getName() is [B but is not working (ClassNotFound). + * So, at the end, this is hard-coded here! + */ + private void registerByteArraySerializer() + { + SerializerRegistry.getInstance().registerSerializer( + byte[].class, + new ByteArraySerializer()); + } + + /** + * Creates the URI that will be later used to identify a QMan WS-Resource. + * Note that the resources that will be created are identified also with their resource id. + * Briefly we could say that this is the soap:address of the WS-Resource definition. + * + * @throws SoapFault when the URI cannot be built (probably it is malformed). + */ + private void createQManResourceURI() throws SoapFault + { + WSDMAdapterEnvironment environment = (WSDMAdapterEnvironment) getEnvironment(); + String resourceURI = environment.getDefaultURIPrefix()+Names.QMAN_RESOURCE_NAME; + try + { + _resourceURI = URI.create(resourceURI); + + } catch(IllegalArgumentException exception) + { + LOGGER.info( + exception, + Messages.QMAN_100029_MALFORMED_RESOURCE_URI_FAILURE, + resourceURI); + throw new SoapFault(exception); + } + } + + /** + * Initializes the work manager used for asynchronous notifications. + */ + private void initializeWorkManager() + { + Configuration configuration = Configuration.getInstance(); + _workManager = new ThreadPoolExecutor( + configuration.getWorkerManagerPoolSize(), + configuration.getWorkerManagerMaxPoolSize(), + configuration.getWorkerManagerKeepAliveTime(), + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(30)); + } + + /** + * This adapter capability needs to be an event listener of QMan JMX core + * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s). + * + * @throws SoapFault when it's not possible to register event listener : is QMan running? + */ + @SuppressWarnings("serial") + private void registerQManLifecycleListeners() throws SoapFault + { + try + { + _mxServer.addNotificationListener( + Names.QMAN_OBJECT_NAME, + _listenerForNewInstances, + _filterForNewInstances, + null); + + _mxServer.addNotificationListener( + Names.QMAN_OBJECT_NAME, + _listenerForRemovedInstances, + _filterForRemovedInstances, + null); + + try + { + _mxServer.addNotificationListener( + Names.QPID_EMULATOR_OBJECT_NAME, + _listenerForNewInstances, + _filterForNewInstances, null); + + _mxServer.addNotificationListener( + Names.QPID_EMULATOR_OBJECT_NAME, + _listenerForRemovedInstances, + _filterForRemovedInstances, null); + + } catch (Exception exception) + { + LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND); + } + } catch(InstanceNotFoundException exception) + { + throw new SoapFault(exception); + } + } +}
\ No newline at end of file |