/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.disttest.jms; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.NamingException; import org.apache.qpid.disttest.DistributedTestException; import org.apache.qpid.disttest.controller.CommandListener; import org.apache.qpid.disttest.controller.config.QueueConfig; import org.apache.qpid.disttest.message.Command; import org.apache.qpid.disttest.message.RegisterClientCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ControllerJmsDelegate { private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJmsDelegate.class); private static final String QUEUE_CREATOR_CLASS_NAME_SYSTEM_PROPERTY = "qpid.disttest.queue.creator.class"; private final Map _clientNameToQueueMap = new ConcurrentHashMap(); private final Connection _connection; private final Destination _controllerQueue; private final Session _session; private QueueCreator _queueCreator; private List _commandListeners = new CopyOnWriteArrayList(); public ControllerJmsDelegate(final Context context) throws NamingException, JMSException { final ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("connectionfactory"); _connection = connectionFactory.createConnection(); _connection.start(); _controllerQueue = (Destination) context.lookup("controllerqueue"); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); createVendorSpecificQueueCreator(); } private void createVendorSpecificQueueCreator() { String queueCreatorClassName = System.getProperty(QUEUE_CREATOR_CLASS_NAME_SYSTEM_PROPERTY); if(queueCreatorClassName == null) { queueCreatorClassName = QpidQueueCreator.class.getName(); } else { LOGGER.info("Using overridden queue creator class " + queueCreatorClassName); } try { Class queueCreatorClass = (Class) Class.forName(queueCreatorClassName); _queueCreator = queueCreatorClass.newInstance(); } catch (ClassNotFoundException e) { throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e); } catch (InstantiationException e) { throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e); } catch (IllegalAccessException e) { throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e); } } public void start() { try { final MessageConsumer consumer = _session.createConsumer(_controllerQueue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(final Message message) { try { String jmsMessageID = message.getJMSMessageID(); LOGGER.debug("Received message " + jmsMessageID); final Command command = JmsMessageAdaptor.messageToCommand(message); LOGGER.debug("Converted message " + jmsMessageID + " into command: " + command); processCommandWithFirstSupportingListener(command); LOGGER.debug("Finished processing command for message " + jmsMessageID); } catch (Throwable t) { LOGGER.error("Can't handle JMS message", t); } } }); } catch (final JMSException e) { throw new DistributedTestException(e); } } /** ensures connections are closed, otherwise the JVM may be prevented from terminating */ public void closeConnections() { try { _session.close(); } catch (JMSException e) { LOGGER.error("Unable to close session", e); } try { _connection.stop(); } catch (JMSException e) { LOGGER.error("Unable to stop connection", e); } try { _connection.close(); } catch (JMSException e) { throw new DistributedTestException("Unable to close connection", e); } } public void registerClient(final RegisterClientCommand command) { final String clientName = command.getClientName(); final Destination clientIntructionQueue = createDestinationFromString(command.getClientQueueName()); _clientNameToQueueMap.put(clientName, clientIntructionQueue); } public void sendCommandToClient(final String clientName, final Command command) { final Destination clientQueue = _clientNameToQueueMap.get(clientName); if (clientQueue == null) { throw new DistributedTestException("Client name " + clientName + " not known. I know about: " + _clientNameToQueueMap.keySet()); } try { final MessageProducer producer = _session.createProducer(clientQueue); final Message message = JmsMessageAdaptor.commandToMessage(_session, command); producer.send(message); } catch (final JMSException e) { throw new DistributedTestException(e); } } private void processCommandWithFirstSupportingListener(Command command) { for (CommandListener listener : _commandListeners) { if (listener.supports(command)) { listener.processCommand(command); return; } } throw new IllegalStateException("There is no registered listener to process command " + command); } private Destination createDestinationFromString(final String clientQueueName) { Destination clientIntructionQueue; try { clientIntructionQueue = _session.createQueue(clientQueueName); } catch (JMSException e) { throw new DistributedTestException("Unable to create Destination from " + clientQueueName); } return clientIntructionQueue; } public void createQueues(List queues) { _queueCreator.createQueues(_connection, _session, queues); } public void deleteQueues(List queues) { _queueCreator.deleteQueues(_connection, _session, queues); } public void addCommandListener(CommandListener commandListener) { _commandListeners.add(commandListener); } public void removeCommandListener(CommandListener commandListener) { _commandListeners.remove(commandListener); } }